From 1e7557d2e313820bf6fa203970748746f6587313 Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 15 Oct 2025 13:09:10 +0800 Subject: [PATCH] temp --- include/rtmp_manager.hpp | 6 ++-- src/mqtt_client_wrapper.cpp | 4 +-- src/rtmp_manager.cpp | 69 +++++++++++++++++++++++-------------- 3 files changed, 49 insertions(+), 30 deletions(-) diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 572fc93..747eb38 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -35,8 +35,8 @@ public: using StreamCallback = std::function; static void init(); - static void start_camera(const Camera &cam, StreamType type); - static void stop_camera(const std::string &cam_name, StreamType type); + static void start_camera(const Camera &cam, StreamType type, const std::string &seqNo); + static void stop_camera(const std::string &cam_name, StreamType type, const std::string &seqNo); static void stop_all(); static bool is_streaming(const std::string &cam_name, StreamType type); static bool is_any_streaming(); @@ -58,7 +58,7 @@ private: static std::mutex streams_mutex; static StreamCallback status_callback; - static void stream_loop(Camera cam, StreamType type); + static void stream_loop(Camera cam, StreamType type, const std::string &seqNo); static GstElement *create_pipeline(const Camera &cam, StreamType type); static std::string make_stream_key(const std::string &cam_name, StreamType type); static void update_status(const std::string &key, const StreamStatus &status); diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 7b4cff4..38dca02 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -81,7 +81,7 @@ static void on_mqtt_message_received(const std::string &topic, const std::string { if (!RTMPManager::is_streaming(cam.name, type)) { - RTMPManager::start_camera(cam, type); + RTMPManager::start_camera(cam, type, seqNo); op_result = true; } } @@ -89,7 +89,7 @@ static void on_mqtt_message_received(const std::string &topic, const std::string { if (RTMPManager::is_streaming(cam.name, type)) { - RTMPManager::stop_camera(cam.name, type); + RTMPManager::stop_camera(cam.name, type, seqNo); op_result = true; } } diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index ca28ad3..e90309f 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -77,11 +77,22 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) return pipeline; } -void RTMPManager::publish_stream_status(const Camera &cam, StreamType type, const std::string &seqNo, bool ok, const std::string &reason) +void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, const std::string &reason) { + // 获取摄像头 index + int cam_index = -1; + for (size_t i = 0; i < g_app_config.cameras.size(); ++i) + { + if (g_app_config.cameras[i].name == cam_name) + { + cam_index = static_cast(i); + break; + } + } + nlohmann::json ch_resp; - ch_resp["loc"] = cam.index; - ch_resp["url"] = ok ? get_stream_url(cam.name, type) : ""; + ch_resp["loc"] = cam_index; + ch_resp["url"] = ok ? get_stream_url(cam_name, type) : ""; ch_resp["result"] = ok ? 0 : 1; ch_resp["reason"] = reason; @@ -93,7 +104,8 @@ void RTMPManager::publish_stream_status(const Camera &cam, StreamType type, cons mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); } -void RTMPManager::stream_loop(Camera cam, StreamType type) +// 修改 stream_loop,新增 seqNo 参数 +void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &seqNo) { std::string key = make_stream_key(cam.name, type); LOG_INFO("[RTMP] Stream loop started for " + key); @@ -103,7 +115,6 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) while (true) { - // 检查是否被 stop { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -116,7 +127,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) { std::string reason = "Failed to create pipeline (device unavailable or resolution unsupported)"; update_status(key, {false, StreamResult::PIPELINE_ERROR, reason}); - publish_stream_status(key, false, reason); // MQTT 上报 + publish_stream_status(cam.name, type, seqNo, false, reason); if (++retry_count > MAX_RETRIES) break; std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -129,7 +140,6 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) bool first_frame_received = false; bool stop_flag = false; - // 等待消息,同时监控是否有第一帧 while (!stop_flag) { GstMessage *msg = gst_bus_timed_pop_filtered( @@ -159,7 +169,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) std::string err_msg = err ? err->message : "Unknown GStreamer error"; LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); - publish_stream_status(key, false, err_msg); // MQTT 上报 + publish_stream_status(cam.name, type, seqNo, false, err_msg); if (err) g_error_free(err); if (debug) @@ -170,7 +180,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) case GST_MESSAGE_EOS: LOG_WARN("[RTMP] EOS on " + key); update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"}); - publish_stream_status(key, false, "EOS received"); // MQTT 上报 + publish_stream_status(cam.name, type, seqNo, false, "EOS received"); stop_flag = true; break; case GST_MESSAGE_STATE_CHANGED: @@ -182,7 +192,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) first_frame_received = true; LOG_INFO("[RTMP] First frame received for " + key); update_status(key, {true, StreamResult::OK, ""}); - publish_stream_status(key, true, "Streaming OK"); // MQTT 上报 + publish_stream_status(cam.name, type, seqNo, true, "Streaming OK"); } break; } @@ -212,30 +222,33 @@ void RTMPManager::stream_loop(Camera cam, StreamType type) LOG_INFO("[RTMP] Stream loop ended for " + key); } -void RTMPManager::start_camera(const Camera &cam, StreamType type) +// 修改 start_camera,新增 seqNo 参数 +void RTMPManager::start_camera(const Camera &cam, StreamType type, const std::string &seqNo) { std::string key = make_stream_key(cam.name, type); - auto ctx = std::make_unique(); - ctx->running.store(true); - ctx->status = {true, StreamResult::OK, ""}; - ctx->thread = std::thread([cam, type]() - { stream_loop(cam, type); }); + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end() && it->second->running.load()) { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end() && it->second->running.load()) - { - LOG_WARN("[RTMP] " + key + " already streaming."); - return; - } - streams.emplace(key, std::move(ctx)); + LOG_WARN("[RTMP] " + key + " already streaming."); + // 上报已经在请求状态 + publish_stream_status(cam.name, type, seqNo, false, "Already streaming"); + return; } + auto ctx = std::make_unique(); + ctx->running.store(true); + ctx->thread = std::thread([cam, type, seqNo]() + { + stream_loop(cam, type, seqNo); // stream_loop 内部处理 MQTT 上报 + }); + + streams[key] = std::move(ctx); LOG_INFO("[RTMP] start_camera requested for " + key); } -void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) +void RTMPManager::stop_camera(const std::string &cam_name, StreamType type, const std::string &seqNo) { std::string key = make_stream_key(cam_name, type); std::unique_ptr ctx; @@ -246,8 +259,11 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) if (it == streams.end()) { LOG_WARN("[RTMP] " + key + " not found."); + // 上报停止失败(未找到) + publish_stream_status(cam_name, type, seqNo, false, "Stream not found"); return; } + it->second->running.store(false); ctx = std::move(it->second); streams.erase(it); @@ -258,6 +274,9 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) update_status(key, {false, StreamResult::OK, "Stopped manually"}); LOG_INFO("[RTMP] stop_camera completed: " + key); + + // MQTT 上报手动停止成功 + publish_stream_status(cam_name, type, seqNo, true, "Stopped manually"); } void RTMPManager::stop_all()