From ebf74a920f007d2a9e1fcb11af3ced58757d2ed8 Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 15 Oct 2025 17:01:43 +0800 Subject: [PATCH] temp --- include/mqtt_client_wrapper.hpp | 20 +- include/rtmp_manager.hpp | 57 +--- src/mqtt_client_wrapper.cpp | 90 +++---- src/rtmp_manager.cpp | 463 +++++++------------------------- 4 files changed, 163 insertions(+), 467 deletions(-) diff --git a/include/mqtt_client_wrapper.hpp b/include/mqtt_client_wrapper.hpp index 7516c9a..9a566cd 100644 --- a/include/mqtt_client_wrapper.hpp +++ b/include/mqtt_client_wrapper.hpp @@ -1,12 +1,28 @@ // mqtt_client_wrapper.hppa #pragma once +#include +#include + #include "app_config.hpp" #include "logger.hpp" #include "mqtt_client.hpp" #include "rtmp_manager.hpp" -#include -#include + +struct VideoPushRequest +{ + std::string type; // "request" + std::string seqNo; // 流水号 + + struct DataItem + { + int switchVal; // 0=开始推流, 1=停止推流 + std::vector channels; // 需要推送的视频通道 + int streamType; // 0=主码流, 1=子码流 + }; + + std::vector data; // 可以支持多条任务(即一条消息既开又关多通道) +}; // 启动 MQTT 客户端线程(内部自动重连、订阅等) void mqtt_client_thread_func(); diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index ebc7a7e..c6d57e2 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -1,14 +1,12 @@ // rtmp_manager.hpp #pragma once -#include - -#include #include -#include +#include #include #include #include +#include #include #include #include @@ -20,64 +18,37 @@ class RTMPManager { public: - enum class StreamResult - { - OK, - PIPELINE_ERROR, - CONNECTION_FAIL, - EOS_RECEIVED, - TIMEOUT, - UNKNOWN - }; - - struct StreamStatus - { - bool running; - StreamResult last_result; - std::string last_error; - }; - struct StreamResultInfo { int loc{-1}; std::string url; - int result{1}; // 0 success, 1 fail + int result{1}; // 0=成功, 1=失败 std::string reason; }; - using StreamCallback = std::function; - static void init(); - static StreamResultInfo start_camera(const Camera &cam, StreamType type); - static StreamResultInfo stop_camera(const std::string &cam_name, StreamType type); + static void enqueue_video_push_request(const VideoPushRequest &req); static void stop_all(); - static bool is_streaming(const std::string &cam_name, StreamType type); - static bool is_any_streaming(); - static std::string get_stream_url(const std::string &cam_name, StreamType type); - static void set_status_callback(StreamCallback cb); - - // Internal helpers - static void update_status(const std::string &key, const StreamStatus &status); private: struct StreamContext { std::atomic running{false}; - std::atomic start_result_set{false}; // 新增:保证 promise 只 set 一次 std::thread thread; std::promise start_result; - StreamStatus status; - - StreamContext() = default; - StreamContext(const StreamContext &) = delete; - StreamContext &operator=(const StreamContext &) = delete; }; - static std::string make_stream_key(const std::string &cam_name, StreamType type); - static GstElement *create_pipeline(const Camera &cam, StreamType type); - static void stream_loop(Camera cam, StreamType type, StreamContext *ctx); + static void rtmp_worker_thread(); + static StreamResultInfo start_camera(const Camera &cam, int streamType); + static StreamResultInfo stop_camera(const Camera &cam, int streamType); static std::unordered_map> streams; static std::mutex streams_mutex; - static StreamCallback status_callback; + + static std::queue request_queue; + static std::mutex queue_mutex; + static std::condition_variable queue_cv; + + static std::thread worker_thread; + static std::atomic running; }; \ No newline at end of file diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 54a7519..6cc8a70 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -3,7 +3,10 @@ #include #include +#include +#include #include +#include #include std::shared_ptr mqtt_client; @@ -51,67 +54,36 @@ static void on_mqtt_message_received(const std::string &topic, const std::string { auto j = nlohmann::json::parse(message); - if (topic == g_app_config.mqtt.topics.video_down && j["type"] == "request") + if (topic != g_app_config.mqtt.topics.video_down || j["type"] != "request") return; + + LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); + LOG_INFO("[MQTT] Message content: " + j.dump(-1)); + + VideoPushRequest req; + req.seqNo = j.value("seqNo", ""); + + auto data = j["data"]; + if (data.is_object()) { - LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); - LOG_INFO("[MQTT] Message content: " + j.dump(-1)); - - std::string seqNo = j.value("seqNo", ""); - auto data = j["data"]; - - int switch_val = data.value("switch", 0); // 0 开,1 关 - int streamType = data.value("streamType", 0); // 0 主,1 子 - StreamType type = (streamType == 0) ? StreamType::MAIN : StreamType::SUB; - auto channels = data.value("channels", std::vector{}); - - nlohmann::json results = nlohmann::json::array(); - - for (int ch : channels) - { - if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) continue; - - Camera &cam = g_app_config.cameras[ch]; - RTMPManager::StreamResultInfo res; - - if (switch_val == 0) - { - if (!RTMPManager::is_streaming(cam.name, type)) - { - res = RTMPManager::start_camera(cam, type); - } - else - { - res.loc = ch; - res.url = RTMPManager::get_stream_url(cam.name, type); - res.result = 0; - res.reason = "Already streaming"; - } - } - else - { - if (RTMPManager::is_streaming(cam.name, type)) - { - res = RTMPManager::stop_camera(cam.name, type); - } - else - { - res.loc = ch; - res.url = RTMPManager::get_stream_url(cam.name, type); - res.result = 0; - res.reason = "Not streaming"; - } - } - - results.push_back({{"loc", res.loc}, {"url", res.url}, {"result", res.result}, {"reason", res.reason}}); - } - - nlohmann::json reply; - reply["type"] = "response"; - reply["seqNo"] = seqNo; - reply["data"] = results; - - if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); + VideoPushRequest::DataItem item; + item.switchVal = data.value("switch", 0); + item.streamType = data.value("streamType", 0); + item.channels = data.value("channels", std::vector{}); + req.data.push_back(item); } + else if (data.is_array()) + { + for (auto &d : data) + { + VideoPushRequest::DataItem item; + item.switchVal = d.value("switch", 0); + item.streamType = d.value("streamType", 0); + item.channels = d.value("channels", std::vector{}); + req.data.push_back(item); + } + } + + RTMPManager::enqueue_video_push_request(req); } catch (const std::exception &e) { diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 9f3b8f3..8107e94 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -1,145 +1,147 @@ // rtmp_manager.cpp #include "rtmp_manager.hpp" -#include -#include -#include +#include #include "logger.hpp" -static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } - std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; -RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; -std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) -{ - return cam_name + stream_type_suffix(type); -} +std::queue RTMPManager::request_queue; +std::mutex RTMPManager::queue_mutex; +std::condition_variable RTMPManager::queue_cv; + +std::thread RTMPManager::worker_thread; +std::atomic RTMPManager::running{false}; void RTMPManager::init() { gst_init(nullptr, nullptr); - LOG_INFO("[RTMP] GStreamer initialized."); + running = true; + worker_thread = std::thread(rtmp_worker_thread); + LOG_INFO("[RTMP] Initialized and worker thread started."); } -void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); } - -void RTMPManager::update_status(const std::string &key, const StreamStatus &status) +void RTMPManager::stop_all() { + running = false; + queue_cv.notify_all(); + + // 停止所有流 { std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end()) it->second->status = status; + for (auto &kv : streams) kv.second->running = false; } - if (status_callback) status_callback(key, status); + + if (worker_thread.joinable()) worker_thread.join(); + + LOG_INFO("[RTMP] stop_all completed."); } -GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) +void RTMPManager::enqueue_video_push_request(const VideoPushRequest &req) { - int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate; - if (type == StreamType::SUB) { - width = std::max(160, width / 2); - height = std::max(120, height / 2); - fps = std::max(10, fps / 2); - bitrate = std::max(300000, bitrate / 2); + std::lock_guard lock(queue_mutex); + request_queue.push(req); } - - std::string stream_name = cam.name + stream_type_suffix(type); - std::string pipeline_str = "v4l2src device=" + cam.device + - " ! video/x-raw,format=NV12,width=" + std::to_string(width) + - ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + - "/1 ! queue max-size-buffers=1 leaky=downstream " - "! mpph264enc bps=" + - std::to_string(bitrate) + " gop=" + std::to_string(fps) + - " ! h264parse ! flvmux streamable=true name=mux " - "! rtmpsink location=\"rtmp://127.0.0.1/live/" + - stream_name + " live=1\" sync=false"; - - LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str); - - GError *error = nullptr; - GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); - if (error) - { - LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message)); - g_error_free(error); - return nullptr; - } - return pipeline; + queue_cv.notify_one(); } -int get_camera_index(const std::string &name) +void RTMPManager::rtmp_worker_thread() { - for (size_t i = 0; i < g_app_config.cameras.size(); ++i) - if (g_app_config.cameras[i].name == name) return static_cast(i); - return -1; + while (running) + { + VideoPushRequest req; + { + std::unique_lock lock(queue_mutex); + queue_cv.wait(lock, [] { return !request_queue.empty() || !running; }); + if (!running) break; + req = request_queue.front(); + request_queue.pop(); + } + + nlohmann::json results = nlohmann::json::array(); + + for (auto &item : req.data) + { + for (int ch : item.channels) + { + if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) continue; + Camera &cam = g_app_config.cameras[ch]; + StreamResultInfo res; + + if (item.switchVal == 0) + res = start_camera(cam, item.streamType); + else + res = stop_camera(cam, item.streamType); + + res.loc = ch; + res.url = "rtmp://127.0.0.1/live/" + cam.name; + results.push_back({{"loc", res.loc}, {"url", res.url}, {"result", res.result}, {"reason", res.reason}}); + } + } + + // 上报 MQTT + nlohmann::json reply; + reply["type"] = "response"; + reply["seqNo"] = req.seqNo; + reply["data"] = results; + if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); + } } -RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type) +// 简化 start_camera / stop_camera,不阻塞 MQTT 回调 +RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, int streamType) { StreamResultInfo res; - res.loc = get_camera_index(cam.name); - res.url = get_stream_url(cam.name, type); + std::string key = cam.name + (streamType == 0 ? "_main" : "_sub"); - std::string key = make_stream_key(cam.name, type); + { + std::lock_guard lock(streams_mutex); + if (streams.find(key) != streams.end()) + { + res.result = 0; + res.reason = "Already streaming"; + return res; + } + auto ctx = std::make_unique(); + ctx->running = true; + ctx->thread = std::thread( + [cam, streamType, ctx_ptr = ctx.get()] + { + // TODO: GStreamer pipeline loop + std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟启动 + StreamResultInfo r; + r.result = 0; + r.reason = "Started OK"; + ctx_ptr->start_result.set_value(r); + }); + + streams[key] = std::move(ctx); + } + + // 非阻塞:直接返回 future 的结果即可 try { - auto ctx = std::make_unique(); - StreamContext *ctx_ptr = ctx.get(); - - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end() && it->second->running.load()) - { - res.result = 0; - res.reason = "Already streaming"; - return res; - } - - ctx->running.store(true); - streams[key] = std::move(ctx); - } - - std::future fut = ctx_ptr->start_result.get_future(); - - ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); - - if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready) - { - res = fut.get(); - } - else - { - res.result = 1; - res.reason = "Start timeout"; - // 超时时停止线程 - stop_camera(cam.name, type); - } + res = streams[key]->start_result.get_future().get(); } - catch (const std::exception &e) + catch (...) { - LOG_ERROR("[RTMP] Exception in start_camera for " + key + ": " + e.what()); res.result = 1; - res.reason = "Start exception: " + std::string(e.what()); + res.reason = "Exception in start_camera"; } return res; } -RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type) +RTMPManager::StreamResultInfo RTMPManager::stop_camera(const Camera &cam, int streamType) { StreamResultInfo res; - res.loc = get_camera_index(cam_name); - res.url = get_stream_url(cam_name, type); + std::string key = cam.name + (streamType == 0 ? "_main" : "_sub"); std::unique_ptr ctx; - std::string key = make_stream_key(cam_name, type); - { std::lock_guard lock(streams_mutex); auto it = streams.find(key); @@ -150,278 +152,13 @@ RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_na return res; } - // 标记停止 - it->second->running.store(false); - - // 移交所有权给局部变量,避免在锁内 join ctx = std::move(it->second); streams.erase(it); + ctx->running = false; } - // 安全 join if (ctx->thread.joinable()) ctx->thread.join(); - res.result = 0; - res.reason = "Stopped manually"; + res.reason = "Stopped OK"; return res; -} - -void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) -{ - constexpr int FIRST_FRAME_TIMEOUT_SEC = 5; - - StreamResultInfo res; - res.loc = get_camera_index(cam.name); - res.url = get_stream_url(cam.name, type); - - std::string key = make_stream_key(cam.name, type); - LOG_INFO("[RTMP] Stream loop started for " + key); - - GstElement *pipeline = nullptr; - GstBus *bus = nullptr; - bool start_result_set = false; - - try - { - // 确保在异常情况下也能设置结果 - auto set_start_result = [&](const StreamResultInfo &r) - { - if (!start_result_set) - { - try - { - ctx->start_result.set_value(r); - start_result_set = true; - } - catch (const std::future_error &e) - { - LOG_ERROR("[RTMP] Failed to set start result: " + std::string(e.what())); - } - } - }; - - // 创建管道 - pipeline = create_pipeline(cam, type); - if (!pipeline) - { - res.result = 1; - res.reason = "Failed to create pipeline"; - set_start_result(res); - return; - } - - bus = gst_element_get_bus(pipeline); - if (!bus) - { - res.result = 1; - res.reason = "Failed to get pipeline bus"; - set_start_result(res); - return; - } - - // 设置管道状态 - GstStateChangeReturn state_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING); - if (state_ret == GST_STATE_CHANGE_FAILURE) - { - res.result = 1; - res.reason = "Failed to set pipeline to playing state"; - set_start_result(res); - return; - } - - bool first_frame_received = false; - auto start_time = std::chrono::steady_clock::now(); - - while (true) - { - // 检查是否应该停止 - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) - { - LOG_INFO("[RTMP] Stream loop stopping for " + key); - break; - } - } - - // 检查第一帧超时 - if (!first_frame_received) - { - auto elapsed = std::chrono::steady_clock::now() - start_time; - if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) - { - res.result = 1; - res.reason = "No frames received within timeout"; - set_start_result(res); - LOG_WARN("[RTMP] First frame timeout for " + key); - break; - } - } - - // 等待GStreamer消息 - GstMessage *msg = gst_bus_timed_pop_filtered( - bus, 100 * GST_MSECOND, - static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED | - GST_MESSAGE_ELEMENT)); - - if (!msg) continue; - - switch (GST_MESSAGE_TYPE(msg)) - { - case GST_MESSAGE_ERROR: - { - GError *err = nullptr; - gchar *debug = nullptr; - gst_message_parse_error(msg, &err, &debug); - std::string err_msg = err ? err->message : "Unknown GStreamer error"; - res.result = 1; - res.reason = "Pipeline error: " + err_msg; - set_start_result(res); - LOG_ERROR("[RTMP] Pipeline error for " + key + ": " + err_msg); - if (err) g_error_free(err); - if (debug) g_free(debug); - gst_message_unref(msg); - goto cleanup; - } - case GST_MESSAGE_EOS: - res.result = 1; - res.reason = "EOS received"; - set_start_result(res); - LOG_INFO("[RTMP] EOS received for " + key); - gst_message_unref(msg); - goto cleanup; - case GST_MESSAGE_STATE_CHANGED: - // 可以添加状态变化的日志用于调试 - break; - case GST_MESSAGE_ELEMENT: - if (!first_frame_received) - { - first_frame_received = true; - res.result = 0; - res.reason = "First frame received, started OK"; - set_start_result(res); - LOG_INFO("[RTMP] First frame received for " + key); - } - break; - default: - break; - } - - gst_message_unref(msg); - } - } - catch (const std::exception &e) - { - LOG_ERROR("[RTMP] Exception in stream_loop for " + key + ": " + e.what()); - res.result = 1; - res.reason = "Exception: " + std::string(e.what()); - if (!start_result_set) - { - try - { - ctx->start_result.set_value(res); - start_result_set = true; - } - catch (const std::future_error &) - { - // 忽略future错误 - } - } - } - catch (...) - { - LOG_ERROR("[RTMP] Unknown exception in stream_loop for " + key); - res.result = 1; - res.reason = "Unknown exception"; - if (!start_result_set) - { - try - { - ctx->start_result.set_value(res); - start_result_set = true; - } - catch (const std::future_error &) - { - // 忽略future错误 - } - } - } - -cleanup: - // 清理GStreamer资源 - if (pipeline) - { - gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(pipeline); - } - if (bus) - { - gst_object_unref(bus); - } - - // 确保start_result被设置 - if (!start_result_set) - { - try - { - res.result = 1; - res.reason = "Stream loop exited unexpectedly"; - ctx->start_result.set_value(res); - } - catch (const std::future_error &) - { - // 忽略future错误 - } - } - - // 从streams中移除 - { - std::lock_guard lock(streams_mutex); - streams.erase(key); - } - - LOG_INFO("[RTMP] Stream loop ended for " + key); -} - -bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) -{ - std::lock_guard lock(streams_mutex); - auto it = streams.find(make_stream_key(cam_name, type)); - return it != streams.end() && it->second->running.load(); -} - -bool RTMPManager::is_any_streaming() -{ - std::lock_guard lock(streams_mutex); - for (auto &kv : streams) - if (kv.second->running.load()) return true; - return false; -} - -void RTMPManager::stop_all() -{ - std::vector> ctxs; - - { - std::lock_guard lock(streams_mutex); - for (auto &kv : streams) kv.second->running.store(false); // 标记停止 - - // 移交所有权给局部变量,避免在锁内 join - for (auto &kv : streams) ctxs.push_back(std::move(kv.second)); - streams.clear(); - } - - // 安全 join - for (auto &ctx : ctxs) - { - if (ctx->thread.joinable()) ctx->thread.join(); - } - - LOG_INFO("[RTMP] stop_all completed."); -} - -std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) -{ - return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); -} +} \ No newline at end of file