diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp index 7bb466a..96a4d8d 100644 --- a/include/rtmp_manager.hpp +++ b/include/rtmp_manager.hpp @@ -67,7 +67,6 @@ class RTMPManager std::atomic running{false}; std::thread thread; - // 新增:把启动阶段的 promise 放到上下文里,避免悬垂引用 std::promise start_promise; std::atomic start_promise_set{false}; @@ -76,6 +75,22 @@ class RTMPManager StreamContext() = default; StreamContext(const StreamContext &) = delete; StreamContext &operator=(const StreamContext &) = delete; + + ~StreamContext() + { + try + { + running.store(false); + if (thread.joinable()) + { + thread.join(); // 防止 std::terminate() + } + } + catch (...) + { + // 安全兜底,不抛异常 + } + } }; static std::string make_stream_key(const std::string &cam_name, StreamType type); diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index dcb6d64..e19ed48 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -79,31 +79,41 @@ static void on_mqtt_message_received(const std::string &topic, const std::string } } + // 异步执行推流任务 std::thread( [req]() { - // 调用 RTMP 模块执行批量推流启停 - auto results = RTMPManager::process_push_request(req); - - // 组装 MQTT 回复 - nlohmann::json reply; - reply["type"] = "response"; - reply["seqNo"] = req.seqNo; - reply["data"] = nlohmann::json::array(); - - for (const auto &r : results) + try { - nlohmann::json item; - item["loc"] = r.loc; - item["url"] = r.url; - item["result"] = r.result; - item["reason"] = r.reason; - reply["data"].push_back(item); - } + auto results = RTMPManager::process_push_request(req); - // 发送回复到 MQTT - mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump(), 1); - LOG_INFO("[MQTT] Sent RTMP response: " + reply.dump()); + // 组装 MQTT 回复 + nlohmann::json reply; + reply["type"] = "response"; + reply["seqNo"] = req.seqNo; + reply["data"] = nlohmann::json::array(); + + for (const auto &r : results) + { + nlohmann::json item; + item["loc"] = r.loc; + item["url"] = r.url; + item["result"] = r.result; + item["reason"] = r.reason; + reply["data"].push_back(item); + } + + mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump(), 1); + LOG_INFO("[MQTT] Sent RTMP response: " + reply.dump()); + } + catch (const std::exception &e) + { + LOG_ERROR(std::string("[MQTT] Exception in RTMP thread: ") + e.what()); + } + catch (...) + { + LOG_ERROR("[MQTT] Unknown exception in RTMP thread"); + } }) .detach(); } diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 6b07379..71cd5e9 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -162,11 +162,17 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea { std::lock_guard lock(streams_mutex); auto it = streams.find(key); - if (it != streams.end() && it->second->running.load()) + + // 若旧线程仍在运行,先安全停止 + if (it != streams.end()) { - res.result = 1; // 修改为 1:表示重复启动,不是真成功 - res.reason = "Already streaming"; - return res; + if (it->second->running.load()) + { + LOG_WARN("[RTMP] Restarting existing stream: " + key); + it->second->running.store(false); + if (it->second->thread.joinable()) it->second->thread.join(); + } + streams.erase(it); } ctx = std::make_unique(); @@ -174,7 +180,22 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea status_future = ctx->start_promise.get_future(); StreamContext *ctx_ptr = ctx.get(); - ctx->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); + ctx->thread = std::thread( + [cam, type, ctx_ptr]() + { + try + { + RTMPManager::stream_loop(cam, type, ctx_ptr); + } + catch (const std::exception &e) + { + LOG_ERROR("[RTMP] Exception in stream thread: " + std::string(e.what())); + } + catch (...) + { + LOG_ERROR("[RTMP] Unknown exception in stream thread"); + } + }); streams.emplace(key, std::move(ctx)); }