diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index c0a3298..6f86b72 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -4,9 +4,6 @@ #include #include -#include -#include -#include #include #include #include @@ -14,101 +11,39 @@ #include #include "app_config.hpp" -#include "data_manager.hpp" +#include "logger.hpp" class RTMPManager { public: - enum class StreamResult - { - OK, - PIPELINE_ERROR, - CONNECTION_FAIL, - EOS_RECEIVED, - TIMEOUT, - UNKNOWN - }; - struct StreamStatus { bool running{false}; - StreamResult last_result{StreamResult::UNKNOWN}; std::string last_error; }; - struct StreamResultInfo - { - int loc{-1}; - std::string url; - int result{1}; // 0 success, 1 fail - std::string reason; - }; - - using StreamCallback = std::function; - static void init(); - static void set_status_callback(StreamCallback cb); - - // 单路接口 - static StreamResultInfo start_camera(const Camera &cam, StreamType type); - static StreamResultInfo stop_camera(const std::string &cam_name, StreamType type); - - // 批量接口:处理 VideoPushRequest - static std::vector process_push_request(const VideoPushRequest &req); - + static void start_all(); static void stop_all(); - // 自动启动所有录像流(上电调用一次) - static void start_all_record_streams(); - static bool is_streaming(const std::string &cam_name, StreamType type); - static bool is_any_streaming(); - static std::string get_stream_url(const std::string &cam_name, StreamType type); + static bool is_streaming(const std::string &cam_name); + static std::string get_stream_url(const std::string &cam_name); + static std::vector> get_all_status(); private: struct StreamContext { std::atomic running{false}; std::thread thread; - - // 启动阶段的 promise,用于首次启动结果反馈(7s 内) - std::promise start_promise; - std::atomic start_promise_set{false}; - StreamStatus status; - - StreamContext() = default; - StreamContext(const StreamContext &) = delete; - StreamContext &operator=(const StreamContext &) = delete; - - ~StreamContext() - { - try - { - running.store(false); - if (thread.joinable()) - { - thread.join(); // 防止 std::terminate() - } - } - catch (...) - { - // 安全兜底,不抛异常 - } - } }; - static std::string make_stream_key(const std::string &cam_name, StreamType type); - static GstElement *create_pipeline(const Camera &cam, StreamType type); - static void stream_loop(Camera cam, StreamType type, StreamContext *ctx); - static bool do_stream_once(const Camera &cam, StreamType type, StreamContext *ctx); - - static void update_status(const std::string &key, const StreamStatus &status); + static void stream_loop(Camera cam, StreamContext *ctx); + static GstElement *create_pipeline(const Camera &cam); + static std::string make_key(const std::string &name); static std::unordered_map> streams; static std::mutex streams_mutex; - static StreamCallback status_callback; - // 自动重试参数 - // MAIN(record)为永久重试;SUB(live)使用限次重试 - static constexpr int LIVE_MAX_RETRIES = 5; - static constexpr int RETRY_BASE_DELAY_MS = 2000; // 2s -> 可指数退避 -}; \ No newline at end of file + // 参数 + static constexpr int RETRY_BASE_DELAY_MS = 3000; +}; diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index caf2fed..a5257e3 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -24,23 +24,36 @@ static void send_heartbeat() auto now = std::chrono::system_clock::now(); auto ms = std::chrono::duration_cast(now.time_since_epoch()).count(); - g_streaming.store(RTMPManager::is_any_streaming()); + // 获取当前所有通道状态 + auto status_list = RTMPManager::get_all_status(); + + int total = static_cast(status_list.size()); + int running_count = 0; + nlohmann::json channels = nlohmann::json::array(); + + for (int i = 0; i < total; ++i) + { + const auto &[key, running] = status_list[i]; + if (running) running_count++; + + nlohmann::json item; + item["loc"] = i; // 摄像头位置索引 + item["url"] = RTMPManager::get_stream_url(g_app_config.cameras[i].name); + item["running"] = running; + channels.push_back(item); + } nlohmann::json hb; hb["timestamp"] = ms; - hb["status"] = g_streaming ? 1 : 0; // 0:等待中,1:推流中 + hb["status"] = (running_count == total) ? 1 : (running_count == 0 ? 0 : 2); + hb["channels"] = channels; mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump()); + LOG_INFO("[MQTT] Sent video heartbeat: " + hb.dump()); } // MQTT 回调 -static void on_mqtt_connected() -{ - LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip); - - const auto &topics = g_app_config.mqtt.topics; - mqtt_client->subscribe(topics.video_down); -} +static void on_mqtt_connected() { LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip); } static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } @@ -50,74 +63,8 @@ static void on_mqtt_message_received(const std::string &topic, const std::string { auto j = nlohmann::json::parse(message); - if (topic != g_app_config.mqtt.topics.video_down || j["type"] != "request") return; - LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); LOG_INFO("[MQTT] Message content: " + j.dump(-1)); - - VideoPushRequest req; - req.seqNo = j.value("seqNo", ""); - - auto data = j["data"]; - if (data.is_object()) - { - VideoPushRequest::DataItem item; - item.switchVal = data.value("switch", 0); - item.streamType = data.value("streamType", 1); // ✅ 默认子码流 - if (item.streamType == 0) item.streamType = 1; // ✅ 防止平台误发 main - item.channels = data.value("channels", std::vector{}); - req.data.push_back(item); - } - else if (data.is_array()) - { - for (auto &d : data) - { - VideoPushRequest::DataItem item; - item.switchVal = d.value("switch", 0); - item.streamType = data.value("streamType", 1); // ✅ 默认子码流 - if (item.streamType == 0) item.streamType = 1; // ✅ 防止平台误发 main - item.channels = d.value("channels", std::vector{}); - req.data.push_back(item); - } - } - - // 异步执行推流任务 - std::thread( - [req]() - { - try - { - auto results = RTMPManager::process_push_request(req); - - // 组装 MQTT 回复 - nlohmann::json reply; - reply["type"] = "response"; - reply["seqNo"] = req.seqNo; - reply["data"] = nlohmann::json::array(); - - for (const auto &r : results) - { - nlohmann::json item; - item["loc"] = r.loc; - item["url"] = r.url; - item["result"] = r.result; - item["reason"] = r.reason; - reply["data"].push_back(item); - } - - mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump(), 1); - LOG_INFO("[MQTT] Sent RTMP response: " + reply.dump()); - } - catch (const std::exception &e) - { - LOG_ERROR(std::string("[MQTT] Exception in RTMP thread: ") + e.what()); - } - catch (...) - { - LOG_ERROR("[MQTT] Unknown exception in RTMP thread"); - } - }) - .detach(); } catch (const std::exception &e) { diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index f90ccb7..00efd09 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -6,25 +6,12 @@ #include #include #include -#include -#include #include #include -#include -#include #include -#include "logger.hpp" - -// ---------- 小工具 ---------- - -static bool device_exists(const std::string &path) -{ - struct stat st; - return (stat(path.c_str(), &st) == 0); -} - +// 动态获取指定网卡 IPv4 地址 static std::string get_ip_address(const std::string &ifname) { struct ifaddrs *ifaddr, *ifa; @@ -55,19 +42,13 @@ static std::string get_ip_address(const std::string &ifname) return ""; // 没找到 } -static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } - -// ---------- 静态成员 ---------- - std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; -RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; -// ---------- 内部工具 ---------- - -std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) +static bool device_exists(const std::string &path) { - return cam_name + stream_type_suffix(type); + struct stat st; + return (stat(path.c_str(), &st) == 0); } void RTMPManager::init() @@ -76,258 +57,114 @@ void RTMPManager::init() LOG_INFO("[RTMP] GStreamer initialized."); } -void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); } +std::string RTMPManager::make_key(const std::string &name) { return name + "_main"; } -void RTMPManager::update_status(const std::string &key, const StreamStatus &status) +GstElement *RTMPManager::create_pipeline(const Camera &cam) { - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end()) it->second->status = status; - } - if (status_callback) status_callback(key, status); -} + int width = cam.width; + int height = cam.height; + int fps = cam.fps; + int bitrate = cam.bitrate; -// 决定 app:MAIN -> record(录像),SUB -> live(按需) -static inline std::string pick_app(StreamType type) { return (type == StreamType::MAIN) ? "record" : "live"; } + std::string stream_name = cam.name + "_main"; + std::string app = "record"; -GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) -{ - int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate; - if (type == StreamType::SUB) - { - bitrate = std::max(300000, bitrate / 2); - } - - const std::string stream_name = cam.name + stream_type_suffix(type); - const std::string app = (type == StreamType::MAIN) ? "record" : "live"; // 🎯 主码流录像,子码流远程推流 - - // 👇 确保明确指定 vhost,避免 “no vhost 127.0.0.1” 错误 - const std::string location = "rtmp://127.0.0.1:1935/" + app + "/" + stream_name + "?vhost=" + app; + std::string location = "rtmp://127.0.0.1:1935/" + app + "/" + stream_name + "?vhost=" + app; std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + - "/1 " - "! videoconvert ! video/x-raw,format=NV12 " + "/1 ! videoconvert ! video/x-raw,format=NV12 " "! mpph264enc bps=" + std::to_string(bitrate) + " gop=" + std::to_string(fps) + - " " - "! h264parse " - "! flvmux name=mux streamable=true " + " ! h264parse ! flvmux name=mux streamable=true " "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. " - "mux. ! rtmpsink name=rtmp_sink location=\"" + + "mux. ! rtmpsink location=\"" + location + "\" sync=false"; - LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "' → app '" + app + "', url=" + location); + LOG_INFO("[RTMP] Pipeline: " + pipeline_str); GError *error = nullptr; GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { - LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message)); + LOG_ERROR("[RTMP] Pipeline creation failed: " + std::string(error->message)); g_error_free(error); return nullptr; } - return pipeline; } -// 摄像头在配置中的索引(用于 reply.loc) -static int get_camera_index(const std::string &name) +void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) { - for (size_t i = 0; i < g_app_config.cameras.size(); ++i) - if (g_app_config.cameras[i].name == name) return static_cast(i); - return -1; -} + std::string key = make_key(cam.name); -// ---------- 对外接口:启动/停止 ---------- - -RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type) -{ - StreamResultInfo res; - res.loc = get_camera_index(cam.name); - res.url = get_stream_url(cam.name, type); - - if (res.loc < 0 || res.loc >= static_cast(g_app_config.cameras.size())) + while (ctx->running) { - res.result = 1; - res.reason = "Invalid channel index"; - return res; - } - - // 主码流(record)允许设备缺失时也进入“重试守护”,所以这里只在 SUB 时严格检查 - if (type == StreamType::SUB && !device_exists(cam.device)) - { - res.result = 1; - res.reason = "Device not found: " + cam.device; - return res; - } - - const std::string key = make_stream_key(cam.name, type); - - std::unique_ptr ctx; - std::future status_future; - - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - - // 若旧线程仍在运行,先安全停止并移除 - if (it != streams.end()) + if (!device_exists(cam.device)) { - if (it->second->running.load()) - { - LOG_WARN("[RTMP] Restarting existing stream: " + key); - it->second->running.store(false); - if (it->second->thread.joinable()) it->second->thread.join(); - } - streams.erase(it); + ctx->status.running = false; + ctx->status.last_error = "Device not found: " + cam.device; + LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error); + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; } - ctx = std::make_unique(); - ctx->running.store(true); - status_future = ctx->start_promise.get_future(); + GstElement *pipeline = create_pipeline(cam); + if (!pipeline) + { + ctx->status.running = false; + ctx->status.last_error = "Failed to create pipeline"; + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; + } - StreamContext *ctx_ptr = ctx.get(); - ctx->thread = std::thread( - [cam, type, ctx_ptr]() + GstBus *bus = gst_element_get_bus(pipeline); + gst_element_set_state(pipeline, GST_STATE_PLAYING); + ctx->status.running = true; + ctx->status.last_error.clear(); + LOG_INFO("[RTMP] Started stream for " + key); + + bool need_restart = false; + while (ctx->running) + { + GstMessage *msg = gst_bus_timed_pop_filtered(bus, 500 * GST_MSECOND, + (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); + + if (!msg) continue; + + switch (GST_MESSAGE_TYPE(msg)) { - try + case GST_MESSAGE_ERROR: { - RTMPManager::stream_loop(cam, type, ctx_ptr); + GError *err = nullptr; + gst_message_parse_error(msg, &err, nullptr); + ctx->status.running = false; + ctx->status.last_error = err ? err->message : "Unknown GStreamer error"; + LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error); + if (err) g_error_free(err); + need_restart = true; + break; } - catch (const std::exception &e) - { - LOG_ERROR("[RTMP] Exception in stream thread: " + std::string(e.what())); - } - catch (...) - { - LOG_ERROR("[RTMP] Unknown exception in stream thread"); - } - }); + case GST_MESSAGE_EOS: + ctx->status.running = false; + ctx->status.last_error = "End of stream"; + need_restart = true; + break; + default: + break; + } + gst_message_unref(msg); - streams.emplace(key, std::move(ctx)); - } - - // 等待首帧或错误,最多 7 秒 - if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready) - { - const StreamStatus s = status_future.get(); - res.result = s.running ? 0 : 1; - res.reason = s.running ? "Started OK" : (s.last_error.empty() ? "Failed to start" : s.last_error); - } - else - { - res.result = 1; - res.reason = "Timeout waiting for stream"; - } - - return res; -} - -RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type) -{ - StreamResultInfo res; - res.loc = get_camera_index(cam_name); - res.url = get_stream_url(cam_name, type); - - if (res.loc < 0 || res.loc >= static_cast(g_app_config.cameras.size())) - { - res.result = 1; - res.reason = "Invalid channel index"; - return res; - } - - std::unique_ptr ctx; - std::string key = make_stream_key(cam_name, type); - - { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end()) - { - res.result = 0; - res.reason = "Already stopped (no active stream)"; - return res; + if (need_restart) break; } - ctx = std::move(it->second); - streams.erase(it); - } + gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(bus); + gst_object_unref(pipeline); - bool was_running = ctx->running.load(); - ctx->running.store(false); - if (ctx->thread.joinable()) ctx->thread.join(); - - if (was_running) - { - res.result = 0; - res.reason = "Stopped manually"; - } - else - { - res.result = 1; - res.reason = "Not running (stream never started)"; - } - - return res; -} - -// ---------- 核心:推流循环(带重试策略) ---------- - -void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) -{ - const std::string key = make_stream_key(cam.name, type); - - auto try_set_start = [&](const StreamStatus &st) - { - bool expected = false; - if (ctx && ctx->start_promise_set.compare_exchange_strong(expected, true)) + if (ctx->running) { - try - { - ctx->start_promise.set_value(st); - } - catch (...) - { - } - } - }; - - // MAIN(record)= 永久自动重试;SUB(live)= 限次重试 - if (type == StreamType::MAIN) - { - int backoff_ms = RETRY_BASE_DELAY_MS; // 可指数退避 - while (ctx->running.load()) - { - const bool ok = do_stream_once(cam, type, ctx); - if (!ok && ctx->running.load()) - { - LOG_WARN("[RTMP] MAIN(record) stream lost, retry in " + std::to_string(backoff_ms) + "ms: " + cam.name); - std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); - backoff_ms = std::min(backoff_ms * 2, 60000); // up to 60s - } - else - { - backoff_ms = RETRY_BASE_DELAY_MS; // 成功后重置退避 - } - } - } - else // SUB / live - { - int retries = 0; - while (ctx->running.load()) - { - const bool ok = do_stream_once(cam, type, ctx); - if (ok) break; // 正常退出(被 stop 或 EOS) - retries++; - if (retries >= LIVE_MAX_RETRIES || !ctx->running.load()) - { - LOG_ERROR("[RTMP] SUB(live) stream failed after retries, giving up: " + cam.name); - break; - } - LOG_WARN("[RTMP] SUB(live) retry " + std::to_string(retries) + "/" + std::to_string(LIVE_MAX_RETRIES) + - " in " + std::to_string(RETRY_BASE_DELAY_MS) + "ms: " + cam.name); + LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); } } @@ -335,323 +172,49 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) ctx->status.running = false; } -// 单次推流:创建 pipeline -> PLAYING -> loop 直到 ERROR/EOS/stop -bool RTMPManager::do_stream_once(const Camera &cam, StreamType type, StreamContext *ctx) +void RTMPManager::start_all() { - const std::string key = make_stream_key(cam.name, type); + LOG_INFO("[RTMP] Starting all record streams..."); + std::lock_guard lock(streams_mutex); - auto try_set_start = [&](const StreamStatus &st) + for (auto &cam : g_app_config.cameras) { - bool expected = false; - if (ctx && ctx->start_promise_set.compare_exchange_strong(expected, true)) - { - try - { - ctx->start_promise.set_value(st); - } - catch (...) - { - } - } - }; - - RTMPManager::StreamStatus status; - status.running = false; - status.last_result = StreamResult::UNKNOWN; - status.last_error.clear(); - - GstElement *pipeline = create_pipeline(cam, type); - if (!pipeline) - { - status.last_result = StreamResult::PIPELINE_ERROR; - status.last_error = "Failed to create pipeline"; - update_status(key, status); - try_set_start(status); - return false; + auto key = make_key(cam.name); + auto ctx = std::make_unique(); + ctx->running.store(true); + ctx->thread = std::thread([cam, ptr = ctx.get()]() { stream_loop(cam, ptr); }); + streams.emplace(key, std::move(ctx)); } - - GstBus *bus = gst_element_get_bus(pipeline); - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - bool started_reported = false; - auto t0 = std::chrono::steady_clock::now(); - const int first_frame_timeout_s = 5; - const int connect_guard_ms = 2000; - - bool need_restart = false; - bool ok = false; - - while (true) - { - if (!ctx->running.load()) - { - status.running = false; - status.last_result = StreamResult::UNKNOWN; - status.last_error = "Stream stopped manually"; - update_status(key, status); - try_set_start(status); - break; - } - - // 首帧/连通性超时保护 - if (!started_reported) - { - auto elapsed_s = - std::chrono::duration_cast(std::chrono::steady_clock::now() - t0).count(); - if (elapsed_s > first_frame_timeout_s) - { - status.running = false; - status.last_result = StreamResult::TIMEOUT; - status.last_error = "No frames/connection established within timeout"; - update_status(key, status); - try_set_start(status); - need_restart = true; - break; - } - } - - GstMessage *msg = - gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, - (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | - GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_STREAM_STATUS)); - - if (!msg) - { - // 过了 connect_guard_ms 且 rtmpsink 进入 PLAYING,认为启动成功 - if (!started_reported) - { - auto elapsed_ms = - std::chrono::duration_cast(std::chrono::steady_clock::now() - t0) - .count(); - if (elapsed_ms >= connect_guard_ms) - { - GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "rtmp_sink"); - if (sink) - { - GstState cur, pending; - if (gst_element_get_state(sink, &cur, &pending, 0) == GST_STATE_CHANGE_SUCCESS && - cur == GST_STATE_PLAYING) - { - status.running = true; - ctx->status.running = true; - status.last_result = StreamResult::OK; - status.last_error.clear(); - update_status(key, status); - try_set_start(status); - started_reported = true; - LOG_INFO("[RTMP] Guard-start OK for '" + cam.name + "'"); - } - gst_object_unref(sink); - } - } - } - continue; - } - - switch (GST_MESSAGE_TYPE(msg)) - { - case GST_MESSAGE_STREAM_STATUS: - { - GstStreamStatusType type_ss; - GstElement *owner = nullptr; - gst_message_parse_stream_status(msg, &type_ss, &owner); - if (!started_reported && owner) - { - const gchar *oname = GST_OBJECT_NAME(owner); - if (oname && std::strcmp(oname, "rtmp_sink") == 0) - { - if (type_ss == GST_STREAM_STATUS_TYPE_CREATE || type_ss == GST_STREAM_STATUS_TYPE_ENTER || - type_ss == GST_STREAM_STATUS_TYPE_START) - { - status.running = true; - ctx->status.running = true; - status.last_result = StreamResult::OK; - status.last_error.clear(); - update_status(key, status); - try_set_start(status); - started_reported = true; - LOG_INFO(std::string("[RTMP] Connected (STREAM_STATUS) for '") + cam.name + "'"); - } - } - } - break; - } - - case GST_MESSAGE_STATE_CHANGED: - // 可用于扩展日志;不以 pipeline 的 PLAYING 作为成功判据 - break; - - case GST_MESSAGE_ERROR: - { - GError *err = nullptr; - gchar *dbg = nullptr; - gst_message_parse_error(msg, &err, &dbg); - - status.running = false; - status.last_result = StreamResult::CONNECTION_FAIL; - status.last_error = err ? err->message : "GStreamer error"; - - LOG_ERROR("[RTMP] Stream error from '" + cam.name + "': " + status.last_error); - - ctx->status.running = false; - need_restart = true; - - if (err) g_error_free(err); - if (dbg) g_free(dbg); - update_status(key, status); - try_set_start(status); - break; - } - - case GST_MESSAGE_EOS: - { - status.running = false; - status.last_result = StreamResult::EOS_RECEIVED; - status.last_error = "EOS received"; - update_status(key, status); - try_set_start(status); - need_restart = true; - break; - } - - default: - break; - } - - gst_message_unref(msg); - - if (need_restart) break; - if (started_reported) - { - // 正常运行中,继续监听 - } - } - - // 收尾 - gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); - gst_object_unref(pipeline); - - ok = started_reported && !need_restart; // 若在 stop 前一直运行且未触发重启则认为 ok - return ok; } -// ---------- 其他工具 ---------- - void RTMPManager::stop_all() -{ - std::vector> ctxs; - { - std::lock_guard lock(streams_mutex); - for (auto &kv : streams) kv.second->running.store(false); - for (auto &kv : streams) ctxs.push_back(std::move(kv.second)); - streams.clear(); - } - - for (auto &ctx : ctxs) - if (ctx->thread.joinable()) ctx->thread.join(); - - LOG_INFO("[RTMP] stop_all completed."); -} - -bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) -{ - std::lock_guard lock(streams_mutex); - auto it = streams.find(make_stream_key(cam_name, type)); - return it != streams.end() && it->second->running.load(); -} - -bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); + for (auto &kv : streams) kv.second->running.store(false); for (auto &kv : streams) - if (kv.second->running.load()) return true; - return false; + if (kv.second->thread.joinable()) kv.second->thread.join(); + streams.clear(); } -std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) +bool RTMPManager::is_streaming(const std::string &cam_name) { - // 你可以把这里的网卡名替换成你的默认上行网卡 + std::lock_guard lock(streams_mutex); + auto it = streams.find(make_key(cam_name)); + return (it != streams.end() && it->second->status.running); +} + +std::string RTMPManager::get_stream_url(const std::string &cam_name) +{ + // 优先动态检测网卡 IP,失败则使用固定值 std::string ip = get_ip_address("enP2p33s0"); - if (ip.empty()) ip = "127.0.0.1"; - - const std::string app = pick_app(type); - const std::string stream_name = make_stream_key(cam_name, type); - - // 返回 WebRTC 拉流地址(WHEP) - return "http://" + ip + ":1985/rtc/v1/whep/?app=" + app + "&stream=" + stream_name; + if (ip.empty()) ip = "127.0.0.1"; // 或用动态获取网卡 IP + return "http://" + ip + ":1985/rtc/v1/whep/?app=record&stream=" + cam_name + "_main"; } -// 批量处理 -std::vector RTMPManager::process_push_request(const VideoPushRequest &req) +std::vector> RTMPManager::get_all_status() { - std::vector results; - std::vector> futures; - - for (const auto &item : req.data) - { - StreamType type = (item.streamType == 0) ? StreamType::MAIN : StreamType::SUB; - - for (int ch : item.channels) - { - if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) continue; - - const auto &cam = g_app_config.cameras[ch]; - - if (item.switchVal == 0) - { - futures.emplace_back( - std::async(std::launch::async, [&, cam, type]() { return start_camera(cam, type); })); - } - else - { - futures.emplace_back( - std::async(std::launch::async, [&, cam, type]() { return stop_camera(cam.name, type); })); - } - } - } - - for (auto &f : futures) - { - try - { - results.push_back(f.get()); - } - catch (const std::exception &e) - { - StreamResultInfo err; - err.result = 1; - err.reason = std::string("Exception: ") + e.what(); - results.push_back(err); - } - } - - return results; -} - -void RTMPManager::start_all_record_streams() -{ - LOG_INFO("[RTMP] Auto-starting all record (MAIN) streams..."); - - for (const auto &cam : g_app_config.cameras) - { - // 检查是否已经存在 - if (is_streaming(cam.name, StreamType::MAIN)) - { - LOG_INFO("[RTMP] Record stream already running: " + cam.name); - continue; - } - - auto res = start_camera(cam, StreamType::MAIN); - if (res.result == 0) - { - LOG_INFO("[RTMP] Record stream started for " + cam.name + ": " + res.reason); - } - else - { - LOG_WARN("[RTMP] Failed to start record stream for " + cam.name + ": " + res.reason); - } - } - - LOG_INFO("[RTMP] Auto-start for record streams done."); + std::vector> result; + std::lock_guard lock(streams_mutex); + for (auto &kv : streams) result.emplace_back(kv.first, kv.second->status.running); + return result; }