From 81877ced1b97ae3ca8808b9328bd375ed538bf41 Mon Sep 17 00:00:00 2001 From: cxh Date: Mon, 24 Nov 2025 14:49:37 +0800 Subject: [PATCH] 1 --- src/main.cpp | 48 ++++---- src/mqtt_client_wrapper.cpp | 234 +++++++++--------------------------- src/rtsp_manager.cpp | 1 - 3 files changed, 77 insertions(+), 206 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index fa0bff8..ab21729 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6,27 +6,23 @@ #include #include #include -#include // write, STDOUT_FILENO +#include #include -// 可通过这些开关快速启用/禁用线程进行调试 -constexpr bool ENABLE_RTSP_THREAD = true; // 设置为 false 禁用 RTSP 线程 -constexpr bool ENABLE_MQTT_THREAD = true; // 设置为 false 禁用 MQTT 线程 +constexpr bool ENABLE_RTSP_THREAD = true; +constexpr bool ENABLE_MQTT_THREAD = true; std::atomic g_running(true); static void minimal_signal_handler(int signum) { - // 只做非常有限且 async-signal-safe 的操作 g_running.store(false, std::memory_order_relaxed); const char msg[] = "[MAIN] Signal received, initiating shutdown...\n"; write(STDERR_FILENO, msg, sizeof(msg) - 1); - // 不要调用 LOG_*、malloc、std::string、mutex、exit 等 } int main() { - // 安装信号处理(SIGINT 和 SIGTERM) struct sigaction sigAct{}; sigAct.sa_handler = minimal_signal_handler; sigemptyset(&sigAct.sa_mask); @@ -35,12 +31,10 @@ int main() sigaction(SIGTERM, &sigAct, nullptr); signal(SIGPIPE, SIG_IGN); - // 初始化日志文件 Logger::set_log_to_file(get_executable_dir_file_path("app.log")); try { - // 从可执行文件所在目录读取配置文件 g_app_config = AppConfig::load_from_file(get_executable_dir_file_path("config.json")); } catch (const std::exception &e) @@ -49,58 +43,64 @@ int main() return -1; } - // 线程退出标志,用于超时等待 std::atomic rtsp_thread_exited(false); std::atomic mqtt_thread_exited(false); - // 先在主线程初始化 GStreamer + // 初始化 GStreamer RTSPManager::init(); + // ⭐ 程序启动 → 强制 mount 所有摄像头(RTSP 永远在线) + for (const auto &cam : g_app_config.cameras) + { + if (cam.enabled) + { + RTSPManager::mount_camera(cam); + LOG_INFO("[INIT] Mounted camera: " + cam.name); + } + } + std::thread rtsp_thread; std::thread mqtt_thread; - // 启动 RTSP 线程(如果启用) if (ENABLE_RTSP_THREAD) { rtsp_thread = std::thread([&]() { RTSPManager::start(g_app_config.cameras); rtsp_thread_exited.store(true, std::memory_order_relaxed); }); + LOG_INFO("[MAIN] RTSP thread started"); } else { - LOG_INFO("[MAIN] RTSP thread disabled by build-time toggle"); + LOG_INFO("[MAIN] RTSP thread disabled by toggle"); } - // 启动 MQTT 线程(如果启用) if (ENABLE_MQTT_THREAD) { mqtt_thread = std::thread([&]() { mqtt_client_thread_func(); mqtt_thread_exited.store(true, std::memory_order_relaxed); }); + LOG_INFO("[MAIN] MQTT thread started"); } else { - LOG_INFO("[MAIN] MQTT thread disabled by build-time toggle"); + LOG_INFO("[MAIN] MQTT thread disabled by toggle"); } - // 等待退出信号 while (g_running.load(std::memory_order_relaxed)) std::this_thread::sleep_for(std::chrono::milliseconds(200)); LOG_INFO("[MAIN] Shutdown requested, stopping services..."); - // 等线程优雅退出:总等待时间 (可调整) const auto max_wait = std::chrono::seconds(5); const auto poll_interval = std::chrono::milliseconds(100); - auto deadline = std::chrono::steady_clock::now() + max_wait; if (ENABLE_RTSP_THREAD) { - RTSPManager::stop(); // 只负责让 g_main_loop_run 跳出 + RTSPManager::stop(); if (rtsp_thread.joinable()) rtsp_thread.join(); @@ -108,8 +108,7 @@ int main() LOG_INFO("[MAIN] RTSP thread finished and joined."); } - // 重置 MQTT 线程等待的截止时间 - deadline = std::chrono::steady_clock::now() + max_wait; + auto deadline = std::chrono::steady_clock::now() + max_wait; if (ENABLE_MQTT_THREAD) { @@ -133,11 +132,10 @@ int main() } } - // 如果有线程仍未退出,则强制终止 bool any_failed = false; - if (ENABLE_RTSP_THREAD && rtsp_thread.joinable() && !rtsp_thread_exited.load(std::memory_order_relaxed)) + if (ENABLE_RTSP_THREAD && rtsp_thread.joinable() && !rtsp_thread_exited.load()) any_failed = true; - if (ENABLE_MQTT_THREAD && mqtt_thread.joinable() && !mqtt_thread_exited.load(std::memory_order_relaxed)) + if (ENABLE_MQTT_THREAD && mqtt_thread.joinable() && !mqtt_thread_exited.load()) any_failed = true; if (any_failed) @@ -148,4 +146,4 @@ int main() LOG_INFO("[MAIN] Program exited cleanly."); return 0; -} \ No newline at end of file +} diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index d4d303f..cdf084a 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -6,11 +6,7 @@ #include std::shared_ptr mqtt_client; -std::atomic mqtt_restart_required{false}; extern std::atomic g_running; -std::atomic g_streaming{false}; -std::string g_dispatch_id; -std::mutex g_dispatch_id_mutex; static void send_heartbeat() { @@ -19,12 +15,8 @@ static void send_heartbeat() nlohmann::json hb_data; hb_data["time"] = Logger::get_current_time_utc8(); - hb_data["status"] = g_streaming ? 0 : 2; - - { - std::lock_guard lock(g_dispatch_id_mutex); - hb_data["dispatchId"] = g_streaming ? g_dispatch_id : ""; - } + hb_data["status"] = 0; // 固定上报正常 + hb_data["dispatchId"] = ""; // 已无任务状态 nlohmann::json msg; msg["data"] = hb_data; @@ -34,29 +26,11 @@ static void send_heartbeat() mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, msg.dump()); } -// MQTT 回调 +// MQTT 连接成功 static void on_mqtt_connected() { LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip); - // --- ★关键!MQTT 重连后立即执行全局复位 --- - g_streaming = false; // 清除推流状态 - { - std::lock_guard lock(g_dispatch_id_mutex); - g_dispatch_id.clear(); // 清除旧任务 ID - } - - // 停掉所有流,清理 RTSP 状态 - for (const auto &cam : g_app_config.cameras) - { - if (RTSPManager::is_streaming(cam.name)) - { - RTSPManager::unmount_camera(cam); - LOG_WARN("[MQTT] Reconnected -> force unmount camera: " + cam.name); - } - } - // ------------------------------------------------------- - send_heartbeat(); const auto &topics = g_app_config.mqtt.topics; @@ -65,170 +39,83 @@ static void on_mqtt_connected() mqtt_client->subscribe(topics.reset_down); } +// MQTT 连接断开 static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } +// 处理消息 static void on_mqtt_message_received(const std::string &topic, const std::string &message) { - LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); - LOG_INFO("[MQTT] Message content: " + message); // 打印实际内容 + LOG_INFO("[MQTT] Received message on topic [" + topic + "]"); + LOG_INFO("[MQTT] Content: " + message); try { + // ------- video_down 应答 ------- if (topic == g_app_config.mqtt.topics.video_down) { - // 处理 video_down auto j = nlohmann::json::parse(message); + auto seqNo = j["data"].value("seqNo", ""); - if (!j.contains("data") || !j["data"].contains("status") || !j["data"].contains("seqNo")) - { - LOG_WARN("[MQTT] video_down JSON missing required fields"); - return; - } + LOG_INFO("[MQTT] video_down received, RTSP always running (no action)."); - // 写 dispatchId 并设置 streaming 状态 - { - std::lock_guard lock(g_dispatch_id_mutex); - if (j["data"].contains("dispatchId")) - g_dispatch_id = j["data"]["dispatchId"].get(); - } - - int status = j["data"]["status"].get(); - - std::string seqNo = j["data"]["seqNo"].get(); - - bool success = true; // 标记是否操作成功 - - if (status == 0) - { - g_streaming = true; - - for (const auto &cam : g_app_config.cameras) - { - if (!cam.enabled) - continue; - - // 不再提前卸载 - RTSPManager::mount_camera(cam); - LOG_INFO("[MQTT] Started streaming: " + cam.name); - } - } - else if (status == 1) - { - g_streaming = false; - std::lock_guard lock(g_dispatch_id_mutex); - g_dispatch_id.clear(); // 停止拉流就清空 dispatchId - // 停止推流:卸载本地配置中 enabled 的摄像头 - for (const auto &cam : g_app_config.cameras) - { - if (!cam.enabled) - continue; - - if (RTSPManager::is_streaming(cam.name)) - { - RTSPManager::unmount_camera(cam); - LOG_INFO("[MQTT] Stopped streaming: " + cam.name); - } - } - } - else - { - LOG_WARN("[MQTT] video_down: unknown status value " + std::to_string(status)); - success = false; - } - - // 获取当前时间 yyyyMMddHHmmssSSS (UTC+8) - std::string time_str = Logger::get_current_time_utc8(); - - nlohmann::json reply_data; - reply_data["time"] = time_str; - reply_data["result"] = success ? 0 : 1; - reply_data["seqNo"] = seqNo; - - // 封装外层 - nlohmann::json reply; - reply["data"] = reply_data; - reply["isEnc"] = 0; - reply["type"] = 0; - - // 发送应答 - if (mqtt_client) - { - mqtt_client->publish(g_app_config.mqtt.topics.video_down_ack, reply.dump()); - LOG_INFO("[MQTT] Replied to video_down: " + reply.dump()); - } - } - else if (topic == g_app_config.mqtt.topics.substream_down) - { - // 处理 substream_down - LOG_INFO("[MQTT] substream_down message received (not implemented yet)."); - } - else if (topic == g_app_config.mqtt.topics.reset_down) - { - auto j = nlohmann::json::parse(message); - - // reset/down 的 payload 在 data 里 - auto data = j.contains("data") ? j["data"] : nlohmann::json::object(); - - std::string seqNo = data.value("seqNo", ""); - std::string errCode = data.value("errorCode", ""); - std::string des = data.value("des", ""); - - LOG_WARN("[MQTT] Reset command received, errorCode=" + errCode + ", des=" + des); - - // --- ★★ 新增:重置全局 streaming 状态与 dispatchId --- - g_streaming = false; - { - std::lock_guard lock(g_dispatch_id_mutex); - g_dispatch_id.clear(); - } - // ------------------------------------------------------ - - // 停止所有流,相当于复位 - for (const auto &cam : g_app_config.cameras) - { - if (RTSPManager::is_streaming(cam.name)) - { - RTSPManager::unmount_camera(cam); - LOG_INFO("[RTSP] Camera " + cam.name + " reset/unmounted"); - } - } - - // 组装应答 data nlohmann::json reply_data; reply_data["time"] = Logger::get_current_time_utc8(); - reply_data["result"] = 0; // 0=成功 + reply_data["result"] = 0; reply_data["seqNo"] = seqNo; - // 外层封装 nlohmann::json reply; reply["data"] = reply_data; reply["isEnc"] = 0; reply["type"] = 0; - if (mqtt_client) - { - mqtt_client->publish(g_app_config.mqtt.topics.reset_down_ack, reply.dump()); - LOG_INFO("[MQTT] Replied to reset_down: " + reply.dump()); - } + mqtt_client->publish(g_app_config.mqtt.topics.video_down_ack, reply.dump()); + return; } - else + + // ------- substream_down 应答 ------- + if (topic == g_app_config.mqtt.topics.substream_down) { - LOG_WARN("[MQTT] Unknown topic: " + topic); + LOG_INFO("[MQTT] substream_down received (ignored)."); + return; } + + // ------- reset/down 应答 ------- + if (topic == g_app_config.mqtt.topics.reset_down) + { + auto j = nlohmann::json::parse(message); + auto seqNo = j["data"].value("seqNo", ""); + + LOG_WARN("[MQTT] reset/down received (ignored)."); + + nlohmann::json reply_data; + reply_data["time"] = Logger::get_current_time_utc8(); + reply_data["result"] = 0; + reply_data["seqNo"] = seqNo; + + nlohmann::json reply; + reply["data"] = reply_data; + reply["isEnc"] = 0; + reply["type"] = 0; + + mqtt_client->publish(g_app_config.mqtt.topics.reset_down_ack, reply.dump()); + return; + } + + LOG_WARN("[MQTT] Unknown topic: " + topic); } catch (const std::exception &e) { - LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); + LOG_ERROR(std::string("[MQTT] JSON processing failed: ") + e.what()); } } void mqtt_client_thread_func() { const auto &cfg = g_app_config.mqtt; - auto heartbeat_interval = std::chrono::milliseconds(static_cast(cfg.keep_alive * 0.9)); + auto heartbeat_interval = std::chrono::milliseconds((int)(cfg.keep_alive * 0.9)); while (g_running) { @@ -241,48 +128,35 @@ void mqtt_client_thread_func() while (g_running && !mqtt_client->isConnected()) { mqtt_client->connect(); + for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++) std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - if (!g_running && !mqtt_client->isConnected()) - mqtt_client->force_disconnect(); } - // 主循环:心跳 + // 心跳循环 while (g_running && mqtt_client->isConnected()) { send_heartbeat(); - auto sleep_time = heartbeat_interval; - while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected()) + auto remaining = heartbeat_interval; + while (remaining.count() > 0 && g_running && mqtt_client->isConnected()) { - auto chunk = std::min(sleep_time, std::chrono::milliseconds(50)); - std::this_thread::sleep_for(chunk); - sleep_time -= chunk; + auto step = std::min(remaining, std::chrono::milliseconds(50)); + std::this_thread::sleep_for(step); + remaining -= step; } - - if (!g_running && mqtt_client->isConnected()) - mqtt_client->force_disconnect(); } - // 清理 if (mqtt_client) { - if (g_running) - mqtt_client->disconnect(); - else - mqtt_client->force_disconnect(); + mqtt_client->force_disconnect(); mqtt_client.reset(); } - mqtt_restart_required = false; - - if (!g_running) - break; - - // 短暂等待再重连 + // 重连间隔 for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200)); } + LOG_INFO("[MQTT] Client thread exiting."); } diff --git a/src/rtsp_manager.cpp b/src/rtsp_manager.cpp index c184abd..1a53dd4 100644 --- a/src/rtsp_manager.cpp +++ b/src/rtsp_manager.cpp @@ -48,7 +48,6 @@ GstRTSPMediaFactory *RTSPManager::create_media_factory(const Camera &cam) gst_rtsp_media_factory_set_shared(factory, TRUE); gst_rtsp_media_factory_set_suspend_mode(factory, GST_RTSP_SUSPEND_MODE_RESET); - // ⭐ 修复点:最后一个参数必须用 GConnectFlags 类型 g_signal_connect_data(factory, "media-configure", G_CALLBACK(on_media_created),