From e5307260438912d4e5dce115f70004cb641cd9a6 Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 15 Oct 2025 15:01:55 +0800 Subject: [PATCH] temp --- include/rtmp_manager.hpp | 46 +++-- src/mqtt_client_wrapper.cpp | 68 ++++--- src/rtmp_manager.cpp | 358 +++++++++++++++--------------------- 3 files changed, 219 insertions(+), 253 deletions(-) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 1fe4719..172024b 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -1,21 +1,24 @@ // rtsp_manager.hpp -#pragma once +##pragma once #include -#include -#include -#include -#include + +#include #include #include #include -#include +#include +#include +#include +#include +#include + #include "app_config.hpp" #include "mqtt_client_wrapper.hpp" -class RTMPManager + class RTMPManager { -public: + public: enum class StreamResult { OK, @@ -33,18 +36,29 @@ public: std::string last_error; }; + struct StreamResultInfo + { + int loc{-1}; + std::string url; + int result{1}; // 0 success, 1 fail + std::string reason; + }; + using StreamCallback = std::function; static void init(); - static void start_camera(const Camera &cam, StreamType type, const std::string &seqNo); - static void stop_camera(const std::string &cam_name, StreamType type, const std::string &seqNo); + static StreamResultInfo start_camera(const Camera &cam, StreamType type); + static StreamResultInfo stop_camera(const std::string &cam_name, StreamType type); static void stop_all(); static bool is_streaming(const std::string &cam_name, StreamType type); static bool is_any_streaming(); static std::string get_stream_url(const std::string &cam_name, StreamType type); static void set_status_callback(StreamCallback cb); -private: + // Internal helpers + static void update_status(const std::string &key, const StreamStatus &status); + + private: struct StreamContext { std::atomic running{false}; @@ -55,18 +69,10 @@ private: StreamContext &operator=(const StreamContext &) = delete; }; - // helper static std::string make_stream_key(const std::string &cam_name, StreamType type); static GstElement *create_pipeline(const Camera &cam, StreamType type); - static void update_status(const std::string &key, const StreamStatus &status); + static void stream_loop(Camera cam, StreamType type); - // publish a single-camera response on mqtt (uses global mqtt_client) - static void publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, const std::string &reason); - - // actual loop (internal) - note it takes seqNo so it can report back - static void stream_loop(Camera cam, StreamType type, const std::string &seqNo); - - // storage static std::unordered_map> streams; static std::mutex streams_mutex; static StreamCallback status_callback; diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index c76df18..73498f3 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -1,9 +1,10 @@ // mqtt_client_wrapper.cpp #include "mqtt_client_wrapper.hpp" -#include + +#include #include #include -#include +#include std::shared_ptr mqtt_client; std::atomic mqtt_restart_required{false}; @@ -14,8 +15,7 @@ std::mutex g_dispatch_id_mutex; static void send_heartbeat() { - if (!mqtt_client || !mqtt_client->isConnected()) - return; + if (!mqtt_client || !mqtt_client->isConnected()) return; nlohmann::json hb_data; hb_data["time"] = Logger::get_current_time_utc8(); @@ -43,52 +43,74 @@ static void on_mqtt_connected() mqtt_client->subscribe(topics.video_down); } -static void on_mqtt_disconnected() -{ - LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); -} +static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } static void on_mqtt_message_received(const std::string &topic, const std::string &message) { - LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); - LOG_INFO("[MQTT] Message content: " + message); // 打印实际内容 - try { auto j = nlohmann::json::parse(message); if (topic == g_app_config.mqtt.topics.video_down && j["type"] == "request") { + LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); + LOG_INFO("[MQTT] Message content: " + j.dump(0)); + std::string seqNo = j.value("seqNo", ""); auto data = j["data"]; - int switch_val = data.value("switch", 0); - int streamType = data.value("streamType", 0); // 0主 1子 + int switch_val = data.value("switch", 0); // 0 开,1 关 + int streamType = data.value("streamType", 0); // 0 主,1 子 StreamType type = (streamType == 0) ? StreamType::MAIN : StreamType::SUB; auto channels = data.value("channels", std::vector{}); + nlohmann::json results = nlohmann::json::array(); + for (int ch : channels) { - if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) - continue; + if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) continue; Camera &cam = g_app_config.cameras[ch]; + RTMPManager::StreamResultInfo res; if (switch_val == 0) { if (!RTMPManager::is_streaming(cam.name, type)) { - RTMPManager::start_camera(cam, type, seqNo); + res = RTMPManager::start_camera(cam, type); + } + else + { + res.loc = ch; + res.url = RTMPManager::get_stream_url(cam.name, type); + res.result = 0; + res.reason = "Already streaming"; } } else { if (RTMPManager::is_streaming(cam.name, type)) { - RTMPManager::stop_camera(cam.name, type, seqNo); + res = RTMPManager::stop_camera(cam.name, type); + } + else + { + res.loc = ch; + res.url = RTMPManager::get_stream_url(cam.name, type); + res.result = 0; + res.reason = "Not streaming"; } } + + results.push_back({{"loc", res.loc}, {"url", res.url}, {"result", res.result}, {"reason", res.reason}}); } + + nlohmann::json reply; + reply["type"] = "response"; + reply["seqNo"] = seqNo; + reply["data"] = results; + + if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); } } catch (const std::exception &e) @@ -116,8 +138,7 @@ void mqtt_client_thread_func() for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++) std::this_thread::sleep_for(std::chrono::milliseconds(50)); - if (!g_running && !mqtt_client->isConnected()) - mqtt_client->force_disconnect(); + if (!g_running && !mqtt_client->isConnected()) mqtt_client->force_disconnect(); } // 主循环:心跳 @@ -133,8 +154,7 @@ void mqtt_client_thread_func() sleep_time -= chunk; } - if (!g_running && mqtt_client->isConnected()) - mqtt_client->force_disconnect(); + if (!g_running && mqtt_client->isConnected()) mqtt_client->force_disconnect(); } // 清理 @@ -149,12 +169,10 @@ void mqtt_client_thread_func() mqtt_restart_required = false; - if (!g_running) - break; + if (!g_running) break; // 短暂等待再重连 - for (int i = 0; i < 5 && g_running; i++) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200)); } LOG_INFO("[MQTT] Client thread exiting."); } diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index ca79a43..ddd18ec 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -4,13 +4,11 @@ #include #include #include -#include #include "logger.hpp" static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } -// static members std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; @@ -41,7 +39,6 @@ void RTMPManager::update_status(const std::string &key, const StreamStatus &stat GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate; - if (type == StreamType::SUB) { width = std::max(160, width / 2); @@ -74,232 +71,59 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) return pipeline; } -void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, - const std::string &reason) +int get_camera_index(const std::string &name) { - // 找 camera index - int cam_index = -1; - Camera *cam_ptr = nullptr; for (size_t i = 0; i < g_app_config.cameras.size(); ++i) - { - if (g_app_config.cameras[i].name == cam_name) - { - cam_index = static_cast(i); - cam_ptr = &g_app_config.cameras[i]; - break; - } - } - - nlohmann::json ch_resp; - ch_resp["loc"] = cam_index; - ch_resp["url"] = (ok && cam_ptr) ? get_stream_url(cam_ptr->name, type) : ""; - ch_resp["result"] = ok ? 0 : 1; - ch_resp["reason"] = reason; - - nlohmann::json reply; - reply["type"] = "response"; - reply["seqNo"] = seqNo; - reply["data"] = nlohmann::json::array({ch_resp}); - - if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); + if (g_app_config.cameras[i].name == name) return static_cast(i); + return -1; } -void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &seqNo) +// Start camera, return result info (不再 publish) +RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type) { + StreamResultInfo res; + res.loc = get_camera_index(cam.name); + res.url = get_stream_url(cam.name, type); + std::string key = make_stream_key(cam.name, type); - LOG_INFO("[RTMP] Stream loop started for " + key); - - const int MAX_RETRIES = 0; - int retry_count = 0; - - const int FIRST_FRAME_TIMEOUT_SEC = 5; // 等待第一帧最大秒数 - - while (true) - { - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) break; - } - - GstElement *pipeline = create_pipeline(cam, type); - if (!pipeline) - { - std::string reason = "Failed to create pipeline (device unavailable or resolution unsupported)"; - update_status(key, {false, StreamResult::PIPELINE_ERROR, reason}); - publish_stream_status(cam.name, type, seqNo, false, reason); - if (++retry_count > MAX_RETRIES) break; - std::this_thread::sleep_for(std::chrono::seconds(1)); - continue; - } - - GstBus *bus = gst_element_get_bus(pipeline); - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - bool first_frame_received = false; - bool stop_flag = false; - auto first_frame_start = std::chrono::steady_clock::now(); - - while (!stop_flag) - { - GstMessage *msg = gst_bus_timed_pop_filtered( - bus, 100 * GST_MSECOND, - static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); - - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) - { - stop_flag = true; - break; - } - } - - // check first-frame timeout - if (!first_frame_received) - { - auto elapsed = std::chrono::steady_clock::now() - first_frame_start; - if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) - { - std::string reason = "No frames received within timeout"; - LOG_WARN("[RTMP] " + key + ": " + reason); - update_status(key, {false, StreamResult::TIMEOUT, reason}); - publish_stream_status(cam.name, type, seqNo, false, reason); - stop_flag = true; - } - } - - if (!msg) continue; - - switch (GST_MESSAGE_TYPE(msg)) - { - case GST_MESSAGE_ERROR: - { - GError *err = nullptr; - gchar *debug = nullptr; - gst_message_parse_error(msg, &err, &debug); - std::string err_msg = err ? err->message : "Unknown GStreamer error"; - LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); - update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); - publish_stream_status(cam.name, type, seqNo, false, err_msg); - if (err) g_error_free(err); - if (debug) g_free(debug); - stop_flag = true; - break; - } - case GST_MESSAGE_EOS: - LOG_WARN("[RTMP] EOS on " + key); - update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); - publish_stream_status(cam.name, type, seqNo, false, "EOS received"); - stop_flag = true; - break; - case GST_MESSAGE_STATE_CHANGED: - { - GstState old_state, new_state; - gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); - if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && - !first_frame_received) - { - first_frame_received = true; - LOG_INFO("[RTMP] First frame received for " + key); - update_status(key, {true, StreamResult::OK, ""}); - publish_stream_status(cam.name, type, seqNo, true, "Streaming OK"); - } - break; - } - default: - break; - } - - gst_message_unref(msg); - } - - gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); - gst_object_unref(pipeline); - - if (!stop_flag) break; - - if (++retry_count > MAX_RETRIES) - { - LOG_ERROR("[RTMP] " + key + " reached max retries. Giving up."); - break; - } - - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - // ensure we remove ourselves from streams map if still present and belong to this thread - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end()) - { - // If thread stored in map matches current thread id, erase it. - // This avoids racing with stop_camera which may already have moved/erased it. - try - { - if (it->second && it->second->thread.get_id() == std::this_thread::get_id()) - { - streams.erase(it); - } - } - catch (...) - { - // in case thread id comparisons throw (shouldn't), just ignore - streams.erase(it); - } - } - } - - update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); - LOG_INFO("[RTMP] Stream loop ended for " + key); -} - -// 修改 start_camera,新增 seqNo 参数 -void RTMPManager::start_camera(const Camera &cam, StreamType type, const std::string &seqNo) -{ - std::string key = make_stream_key(cam.name, type); - { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it != streams.end() && it->second->running.load()) { - LOG_WARN("[RTMP] " + key + " already streaming."); - // 按协议:如果已经在推,直接回报已在推(成功),并返回 URL - publish_stream_status(cam.name, type, seqNo, true, "Already streaming"); - return; + res.result = 0; + res.reason = "Already streaming"; + return res; } - // create and store context auto ctx = std::make_unique(); ctx->running.store(true); - ctx->status = {true, StreamResult::UNKNOWN, ""}; - // create thread and place it into context. Need to assign thread before inserting to avoid race. - ctx->thread = std::thread([cam, type, seqNo]() { stream_loop(cam, type, seqNo); }); - + ctx->thread = std::thread([cam, type]() { stream_loop(cam, type); }); streams.emplace(key, std::move(ctx)); } - LOG_INFO("[RTMP] start_camera requested for " + key); + res.result = 0; + res.reason = "Started OK"; + return res; } -void RTMPManager::stop_camera(const std::string &cam_name, StreamType type, const std::string &seqNo) +// Stop camera, return result info (不再 publish) +RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { - std::string key = make_stream_key(cam_name, type); - std::unique_ptr ctx; + StreamResultInfo res; + res.loc = get_camera_index(cam_name); + res.url = get_stream_url(cam_name, type); + std::unique_ptr ctx; + std::string key = make_stream_key(cam_name, type); { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end()) { - LOG_WARN("[RTMP] " + key + " not found."); - // 按协议:如果没有在推,则回复“未推”的成功响应(实际不做任何动作) - publish_stream_status(cam_name, type, seqNo, true, "Not streaming"); - return; + res.result = 0; + res.reason = "Not streaming"; + return res; } it->second->running.store(false); @@ -309,17 +133,14 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type, cons if (ctx->thread.joinable()) ctx->thread.join(); - update_status(key, {false, StreamResult::OK, "Stopped manually"}); - LOG_INFO("[RTMP] stop_camera completed: " + key); - - // MQTT 上报手动停止成功 - publish_stream_status(cam_name, type, seqNo, true, "Stopped manually"); + res.result = 0; + res.reason = "Stopped manually"; + return res; } void RTMPManager::stop_all() { std::vector> ctxs; - { std::lock_guard lock(streams_mutex); for (auto &kv : streams) kv.second->running.store(false); @@ -352,3 +173,124 @@ std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType { return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); } + +void RTMPManager::stream_loop(Camera cam, StreamType type) +{ + std::string key = make_stream_key(cam.name, type); + LOG_INFO("[RTMP] Stream loop started for " + key); + + const int FIRST_FRAME_TIMEOUT_SEC = 5; + + while (true) + { + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it == streams.end() || !it->second->running.load()) break; + } + + GstElement *pipeline = create_pipeline(cam, type); + if (!pipeline) + { + update_status(key, {false, StreamResult::PIPELINE_ERROR, "Failed to create pipeline"}); + break; + } + + GstBus *bus = gst_element_get_bus(pipeline); + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + bool first_frame_received = false; + bool stop_flag = false; + auto first_frame_start = std::chrono::steady_clock::now(); + + while (!stop_flag) + { + GstMessage *msg = gst_bus_timed_pop_filtered( + bus, 100 * GST_MSECOND, + static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED)); + + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it == streams.end() || !it->second->running.load()) + { + stop_flag = true; + break; + } + } + + if (!first_frame_received) + { + auto elapsed = std::chrono::steady_clock::now() - first_frame_start; + if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) + { + update_status(key, {false, StreamResult::TIMEOUT, "No frames received within timeout"}); + stop_flag = true; + } + } + + if (!msg) continue; + + switch (GST_MESSAGE_TYPE(msg)) + { + case GST_MESSAGE_ERROR: + { + GError *err = nullptr; + gchar *debug = nullptr; + gst_message_parse_error(msg, &err, &debug); + std::string err_msg = err ? err->message : "Unknown GStreamer error"; + update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); + if (err) g_error_free(err); + if (debug) g_free(debug); + stop_flag = true; + break; + } + case GST_MESSAGE_EOS: + update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); + stop_flag = true; + break; + case GST_MESSAGE_STATE_CHANGED: + { + GstState old_state, new_state; + gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr); + if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && + !first_frame_received) + { + first_frame_received = true; + update_status(key, {true, StreamResult::OK, ""}); + } + break; + } + default: + break; + } + + gst_message_unref(msg); + } + + gst_element_set_state(pipeline, GST_STATE_NULL); + if (bus) gst_object_unref(bus); + gst_object_unref(pipeline); + + if (!stop_flag) break; + } + + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end()) + { + try + { + if (it->second && it->second->thread.get_id() == std::this_thread::get_id()) streams.erase(it); + } + catch (...) + { + streams.erase(it); + } + } + } + + update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"}); + LOG_INFO("[RTMP] Stream loop ended for " + key); +}