This commit is contained in:
cxh 2025-10-15 08:50:01 +08:00
parent 92dc5087bd
commit a12d902fc0
3 changed files with 155 additions and 161 deletions

View File

@ -2,42 +2,59 @@
#pragma once #pragma once
#include <gst/gst.h> #include <gst/gst.h>
#include "app_config.hpp"
#include <unordered_map> #include <unordered_map>
#include <string> #include <string>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <functional>
#include "app_config.hpp"
class RTMPManager class RTMPManager
{ {
public: public:
enum class StreamResult
{
OK,
PIPELINE_ERROR,
CONNECTION_FAIL,
EOS_RECEIVED,
UNKNOWN
};
struct StreamStatus
{
bool running;
StreamResult last_result;
std::string last_error;
};
using StreamCallback = std::function<void(const std::string &key, StreamStatus status)>;
static void init(); static void init();
static void start_camera(const Camera &cam, StreamType type); static void start_camera(const Camera &cam, StreamType type);
static void stop_camera(const std::string &cam_name, StreamType type); static void stop_camera(const std::string &cam_name, StreamType type);
static void stop_all(); static void stop_all();
static bool is_streaming(const std::string &cam_name, StreamType type); static bool is_streaming(const std::string &cam_name, StreamType type);
static bool is_any_streaming(); static bool is_any_streaming();
static std::string get_stream_url(const std::string &cam_name, StreamType type); static std::string get_stream_url(const std::string &cam_name, StreamType type);
static void set_status_callback(StreamCallback cb);
private: private:
struct StreamContext struct StreamContext
{ {
std::atomic<bool> running; std::atomic<bool> running{false};
std::thread thread; std::thread thread;
StreamStatus status;
StreamContext() : running(false) {}
StreamContext(StreamContext &&) = default; // 允许移动
StreamContext &operator=(StreamContext &&) = default;
StreamContext(const StreamContext &) = delete;
StreamContext &operator=(const StreamContext &) = delete;
}; };
static std::unordered_map<std::string, StreamContext> streams; static std::unordered_map<std::string, StreamContext> streams;
static std::mutex streams_mutex; static std::mutex streams_mutex;
static StreamCallback status_callback;
static void stream_loop(Camera cam, StreamType type); static void stream_loop(Camera cam, StreamType type);
static GstElement *create_pipeline(const Camera &cam, StreamType type); static GstElement *create_pipeline(const Camera &cam, StreamType type);
static std::string make_stream_key(const std::string &cam_name, StreamType type); static std::string make_stream_key(const std::string &cam_name, StreamType type);
static void update_status(const std::string &key, const StreamStatus &status);
}; };

View File

@ -3,123 +3,82 @@
#include "rtmp_manager.hpp" #include "rtmp_manager.hpp"
#include "logger.hpp" #include "logger.hpp"
#include "mqtt_client_wrapper.hpp" #include "mqtt_client_wrapper.hpp"
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <csignal> #include <csignal>
#include <unistd.h> // write, STDOUT_FILENO #include <unistd.h>
#include <chrono> #include <chrono>
// 可通过这些开关快速启用/禁用线程进行调试 static std::atomic<bool> g_running(true);
constexpr bool ENABLE_RTMP_THREAD = true; // 设置为 false 禁用 RTSP 线程
constexpr bool ENABLE_MQTT_THREAD = true; // 设置为 false 禁用 MQTT 线程
std::atomic<bool> g_running(true); static void signal_handler(int signum)
static void minimal_signal_handler(int signum)
{ {
// 只做非常有限且 async-signal-safe 的操作
g_running.store(false, std::memory_order_relaxed); g_running.store(false, std::memory_order_relaxed);
const char msg[] = "[MAIN] Signal received, initiating shutdown...\n"; const char msg[] = "[MAIN] Signal received, shutting down...\n";
write(STDERR_FILENO, msg, sizeof(msg) - 1); write(STDERR_FILENO, msg, sizeof(msg) - 1);
// 不要调用 LOG_*、malloc、std::string、mutex、exit 等
} }
int main() int main()
{ {
// 安装信号处理SIGINT 和 SIGTERM // 安装信号处理
struct sigaction sigAct{}; struct sigaction sa{};
sigAct.sa_handler = minimal_signal_handler; sa.sa_handler = signal_handler;
sigemptyset(&sigAct.sa_mask); sigemptyset(&sa.sa_mask);
sigAct.sa_flags = 0; sa.sa_flags = 0;
sigaction(SIGINT, &sigAct, nullptr); sigaction(SIGINT, &sa, nullptr);
sigaction(SIGTERM, &sigAct, nullptr); sigaction(SIGTERM, &sa, nullptr);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
// 初始化日志文件 // 初始化日志
Logger::set_log_to_file(get_executable_dir_file_path("app.log")); Logger::set_log_to_file(get_executable_dir_file_path("app.log"));
LOG_INFO("[MAIN] Application starting...");
try try
{ {
// 从可执行文件所在目录读取配置文件 // 加载配置
g_app_config = AppConfig::load_from_file(get_executable_dir_file_path("config.json")); g_app_config = AppConfig::load_from_file(get_executable_dir_file_path("config.json"));
LOG_INFO("[MAIN] Loaded config from config.json");
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
LOG_ERROR(std::string("Failed to load config: ") + e.what()); LOG_ERROR(std::string("[MAIN] Failed to load config: ") + e.what());
return -1; return -1;
} }
// 线程退出标志,用于超时等待 // 初始化 GStreamer
std::atomic<bool> rtsp_thread_exited(false);
std::atomic<bool> mqtt_thread_exited(false);
// 先在主线程初始化 GStreamer
RTMPManager::init(); RTMPManager::init();
std::thread rtsp_thread; // 启动 MQTT 线程
std::thread mqtt_thread; std::thread mqtt_thread([]
// 启动 MQTT 线程(如果启用)
if (ENABLE_MQTT_THREAD)
{ {
mqtt_thread = std::thread([&]() try
{ {
mqtt_client_thread_func(); LOG_INFO("[MAIN] MQTT thread started.");
mqtt_thread_exited.store(true, std::memory_order_relaxed); }); mqtt_client_thread_func(); // 在回调里执行推流控制
LOG_INFO("[MAIN] MQTT thread started");
} }
else catch (const std::exception &e)
{ {
LOG_INFO("[MAIN] MQTT thread disabled by build-time toggle"); LOG_ERROR(std::string("[MAIN] MQTT thread crashed: ") + e.what());
} }
LOG_INFO("[MAIN] MQTT thread exiting..."); });
// 等待退出信号 // 主循环,仅等待退出信号
while (g_running.load(std::memory_order_relaxed)) while (g_running.load(std::memory_order_relaxed))
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
LOG_INFO("[MAIN] Shutdown requested, stopping services..."); LOG_INFO("[MAIN] Shutdown requested. Cleaning up...");
// 等线程优雅退出:总等待时间 (可调整) // 停止所有 RTMP 流
const auto max_wait = std::chrono::seconds(5); RTMPManager::stop_all();
const auto poll_interval = std::chrono::milliseconds(100);
auto deadline = std::chrono::steady_clock::now() + max_wait;
// 重置 MQTT 线程等待的截止时间
deadline = std::chrono::steady_clock::now() + max_wait;
if (ENABLE_MQTT_THREAD)
{
while (!mqtt_thread_exited.load(std::memory_order_relaxed) &&
std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(poll_interval);
}
// 等待 MQTT 线程退出
if (mqtt_thread.joinable()) if (mqtt_thread.joinable())
{
if (mqtt_thread_exited.load(std::memory_order_relaxed))
{ {
mqtt_thread.join(); mqtt_thread.join();
LOG_INFO("[MAIN] MQTT thread finished and joined."); LOG_INFO("[MAIN] MQTT thread joined.");
}
else
{
LOG_WARN("[MAIN] MQTT thread did not exit within timeout.");
}
}
} }
// 如果有线程仍未退出,则强制终止 LOG_INFO("[MAIN] Application exited cleanly.");
bool any_failed = false;
if (ENABLE_MQTT_THREAD && mqtt_thread.joinable() && !mqtt_thread_exited.load(std::memory_order_relaxed))
any_failed = true;
if (any_failed)
{
LOG_ERROR("[MAIN] Threads did not exit in time. Forcing immediate termination.");
_exit(1);
}
LOG_INFO("[MAIN] Program exited cleanly.");
return 0; return 0;
} }

View File

@ -1,14 +1,14 @@
// rtsp_manager.cpp // rtsp_manager.cpp
#include "rtmp_manager.hpp" #include "rtmp_manager.hpp"
#include "logger.hpp" #include "logger.hpp"
#include <iostream>
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include <gst/gst.h> #include <iostream>
#include <algorithm> #include <algorithm>
std::unordered_map<std::string, RTMPManager::StreamContext> RTMPManager::streams; std::unordered_map<std::string, RTMPManager::StreamContext> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex; std::mutex RTMPManager::streams_mutex;
RTMPManager::StreamCallback RTMPManager::status_callback = nullptr;
static inline std::string stream_type_suffix(StreamType type) static inline std::string stream_type_suffix(StreamType type)
{ {
@ -26,6 +26,21 @@ void RTMPManager::init()
LOG_INFO("[RTMP] GStreamer initialized."); LOG_INFO("[RTMP] GStreamer initialized.");
} }
void RTMPManager::set_status_callback(StreamCallback cb)
{
status_callback = std::move(cb);
}
void RTMPManager::update_status(const std::string &key, const StreamStatus &status)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end())
it->second.status = status;
if (status_callback)
status_callback(key, status);
}
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
{ {
int width = cam.width; int width = cam.width;
@ -42,30 +57,26 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
} }
std::string stream_name = cam.name + stream_type_suffix(type); std::string stream_name = cam.name + stream_type_suffix(type);
std::string pipeline_str = std::string pipeline_str =
"v4l2src device=" + cam.device + "v4l2src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(width) + " ! video/x-raw,format=NV12,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",height=" + std::to_string(height) +
",framerate=" + std::to_string(fps) + "/1" ",framerate=" + std::to_string(fps) +
" ! queue max-size-buffers=1 leaky=downstream " "/1 ! queue max-size-buffers=1 leaky=downstream "
"! mpph264enc bps=" + "! mpph264enc bps=" +
std::to_string(bitrate) + std::to_string(bitrate) +
" gop=" + std::to_string(fps) + " gop=" + std::to_string(fps) +
" ! h264parse " " ! h264parse ! flvmux streamable=true name=mux "
" ! flvmux streamable=true name=mux "
"! rtmpsink location=\"rtmp://127.0.0.1/live/" + "! rtmpsink location=\"rtmp://127.0.0.1/live/" +
stream_name + stream_name + " live=1\" sync=false";
" live=1\" sync=false";
LOG_INFO("[RTMP] Creating pipeline for '" + cam.name + "' (" + LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str);
(type == StreamType::MAIN ? "MAIN" : "SUB") + ") -> " + pipeline_str);
GError *error = nullptr; GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error) if (error)
{ {
LOG_ERROR("[RTMP] Failed to parse pipeline: " + std::string(error->message)); LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message));
g_error_free(error); g_error_free(error);
return nullptr; return nullptr;
} }
@ -75,43 +86,51 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
void RTMPManager::stream_loop(Camera cam, StreamType type) void RTMPManager::stream_loop(Camera cam, StreamType type)
{ {
const std::string key = make_stream_key(cam.name, type); const std::string key = make_stream_key(cam.name, type);
LOG_INFO("[RTMP] Stream thread started for '" + key + "'"); LOG_INFO("[RTMP] Stream loop started for " + key);
const int MAX_RETRIES = 5;
int retry_count = 0;
while (true) while (true)
{ {
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running.load()) if (!streams[key].running)
break; break;
} }
GstElement *pipeline = create_pipeline(cam, type); GstElement *pipeline = create_pipeline(cam, type);
if (!pipeline) if (!pipeline)
{ {
LOG_ERROR("[RTMP] Failed to create pipeline for '" + key + "', retrying in 3s..."); update_status(key, {false, StreamResult::PIPELINE_ERROR, "Failed to create pipeline"});
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(3));
if (++retry_count > MAX_RETRIES)
break;
continue; continue;
} }
GstBus *bus = gst_element_get_bus(pipeline); GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING); gst_element_set_state(pipeline, GST_STATE_PLAYING);
LOG_INFO("[RTMP] Camera '" + key + "' streaming..."); update_status(key, {true, StreamResult::OK, ""});
LOG_INFO("[RTMP] " + key + " is streaming.");
bool error_occurred = false; bool stop_flag = false;
int consecutive_failures = 0; GstMessage *msg = nullptr;
const int MAX_RETRIES = 5;
while (true) while (true)
{ {
GstMessage *msg = gst_bus_timed_pop_filtered( msg = gst_bus_timed_pop_filtered(
bus, GST_CLOCK_TIME_NONE, bus, GST_CLOCK_TIME_NONE,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running.load()) if (!streams[key].running)
{
stop_flag = true;
break; break;
} }
}
if (!msg) if (!msg)
continue; continue;
@ -121,21 +140,22 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
GError *err = nullptr; GError *err = nullptr;
gchar *debug = nullptr; gchar *debug = nullptr;
gst_message_parse_error(msg, &err, &debug); gst_message_parse_error(msg, &err, &debug);
LOG_ERROR("[RTMP] Error on '" + key + "': " + std::string(err->message)); std::string err_msg = err ? err->message : "Unknown GStreamer error";
LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg);
update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg});
g_error_free(err); g_error_free(err);
g_free(debug); g_free(debug);
error_occurred = true; stop_flag = true;
consecutive_failures++;
} }
else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS)
{ {
LOG_WARN("[RTMP] EOS received on '" + key + "'"); LOG_WARN("[RTMP] EOS on " + key);
error_occurred = true; update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"});
consecutive_failures++; stop_flag = true;
} }
gst_message_unref(msg); gst_message_unref(msg);
if (error_occurred) if (stop_flag)
break; break;
} }
@ -143,100 +163,98 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
gst_object_unref(bus); gst_object_unref(bus);
gst_object_unref(pipeline); gst_object_unref(pipeline);
if (!error_occurred) if (!stop_flag)
break; break;
if (consecutive_failures >= MAX_RETRIES) if (++retry_count > MAX_RETRIES)
{ {
LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts."); LOG_ERROR("[RTMP] " + key + " reached max retries. Giving up.");
std::lock_guard<std::mutex> lock(streams_mutex);
streams[key].running.store(false);
break; break;
} }
LOG_WARN("[RTMP] Reconnecting '" + key + "' in 3s..."); LOG_WARN("[RTMP] Reconnecting " + key + " in 3s...");
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(3));
} }
LOG_INFO("[RTMP] Stream thread exited for '" + key + "'"); update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"});
LOG_INFO("[RTMP] Stream loop ended for " + key);
} }
void RTMPManager::start_camera(const Camera &cam, StreamType type) void RTMPManager::start_camera(const Camera &cam, StreamType type)
{ {
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam.name, type); std::string key = make_stream_key(cam.name, type);
std::lock_guard<std::mutex> lock(streams_mutex);
if (streams.count(key) && streams[key].running.load()) auto it = streams.find(key);
if (it != streams.end() && it->second.running)
{ {
LOG_WARN("[RTMP] Camera '" + key + "' already streaming."); LOG_WARN("[RTMP] " + key + " already streaming.");
return; return;
} }
StreamContext ctx; StreamContext ctx;
ctx.running.store(true); ctx.running = true;
ctx.thread = std::thread([cam, type]() ctx.thread = std::thread([cam, type]()
{ RTMPManager::stream_loop(cam, type); }); { stream_loop(cam, type); });
streams[key] = std::move(ctx);
// 修复 emplace 构造失败问题
streams.emplace(std::move(key), std::move(ctx));
} }
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
{ {
std::unique_lock<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type); std::string key = make_stream_key(cam_name, type);
std::unique_lock<std::mutex> lock(streams_mutex);
auto it = streams.find(key); auto it = streams.find(key);
if (it == streams.end()) if (it == streams.end())
{ {
LOG_WARN("[RTMP] Camera '" + key + "' not found."); LOG_WARN("[RTMP] " + key + " not found.");
return; return;
} }
it->second.running.store(false); it->second.running = false;
auto th = std::move(it->second.thread); auto th = std::move(it->second.thread);
streams.erase(it); streams.erase(it);
lock.unlock(); lock.unlock();
if (th.joinable()) if (th.joinable())
th.join(); th.join();
update_status(key, {false, StreamResult::OK, "Stopped manually"});
}
void RTMPManager::stop_all()
{
std::vector<std::string> keys;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
keys.push_back(kv.first);
}
for (auto &key : keys)
{
if (key.find("_main") != std::string::npos)
stop_camera(key.substr(0, key.size() - 5), StreamType::MAIN);
else
stop_camera(key.substr(0, key.size() - 4), StreamType::SUB);
}
} }
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type); auto key = make_stream_key(cam_name, type);
auto it = streams.find(key); auto it = streams.find(key);
return it != streams.end() && it->second.running.load(); return it != streams.end() && it->second.running;
} }
bool RTMPManager::is_any_streaming() bool RTMPManager::is_any_streaming()
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams) for (auto &kv : streams)
if (kv.second.running.load()) if (kv.second.running)
return true; return true;
return false; return false;
} }
void RTMPManager::stop_all()
{
std::vector<std::pair<std::string, StreamType>> targets;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
{
if (kv.first.size() > 4 && kv.first.find("_sub") == kv.first.size() - 4)
targets.emplace_back(kv.first.substr(0, kv.first.size() - 4), StreamType::SUB);
else
targets.emplace_back(kv.first.substr(0, kv.first.size() - 5), StreamType::MAIN);
}
}
for (auto &[name, type] : targets)
stop_camera(name, type);
}
std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)
{ {
return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type);