diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 53697f3..9f3b8f3 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -85,35 +85,47 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea res.url = get_stream_url(cam.name, type); std::string key = make_stream_key(cam.name, type); - auto ctx = std::make_unique(); - StreamContext *ctx_ptr = ctx.get(); + try { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it != streams.end() && it->second->running.load()) + auto ctx = std::make_unique(); + StreamContext *ctx_ptr = ctx.get(); + { - res.result = 0; - res.reason = "Already streaming"; - return res; + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it != streams.end() && it->second->running.load()) + { + res.result = 0; + res.reason = "Already streaming"; + return res; + } + + ctx->running.store(true); + streams[key] = std::move(ctx); } - ctx->running.store(true); - streams[key] = std::move(ctx); + std::future fut = ctx_ptr->start_result.get_future(); + + ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); + + if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready) + { + res = fut.get(); + } + else + { + res.result = 1; + res.reason = "Start timeout"; + // 超时时停止线程 + stop_camera(cam.name, type); + } } - - std::future fut = ctx_ptr->start_result.get_future(); - - ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); - - if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready) - { - res = fut.get(); - } - else + catch (const std::exception &e) { + LOG_ERROR("[RTMP] Exception in start_camera for " + key + ": " + e.what()); res.result = 1; - res.reason = "Start timeout"; + res.reason = "Start exception: " + std::string(e.what()); } return res; @@ -165,104 +177,205 @@ void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) std::string key = make_stream_key(cam.name, type); LOG_INFO("[RTMP] Stream loop started for " + key); - GstElement *pipeline = create_pipeline(cam, type); - if (!pipeline) - { - res.result = 1; - res.reason = "Failed to create pipeline"; - ctx->start_result.set_value(res); - return; - } - - GstBus *bus = gst_element_get_bus(pipeline); - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - bool first_frame_received = false; + GstElement *pipeline = nullptr; + GstBus *bus = nullptr; bool start_result_set = false; - auto set_start_result = [&](const StreamResultInfo &r) + try { + // 确保在异常情况下也能设置结果 + auto set_start_result = [&](const StreamResultInfo &r) + { + if (!start_result_set) + { + try + { + ctx->start_result.set_value(r); + start_result_set = true; + } + catch (const std::future_error &e) + { + LOG_ERROR("[RTMP] Failed to set start result: " + std::string(e.what())); + } + } + }; + + // 创建管道 + pipeline = create_pipeline(cam, type); + if (!pipeline) + { + res.result = 1; + res.reason = "Failed to create pipeline"; + set_start_result(res); + return; + } + + bus = gst_element_get_bus(pipeline); + if (!bus) + { + res.result = 1; + res.reason = "Failed to get pipeline bus"; + set_start_result(res); + return; + } + + // 设置管道状态 + GstStateChangeReturn state_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING); + if (state_ret == GST_STATE_CHANGE_FAILURE) + { + res.result = 1; + res.reason = "Failed to set pipeline to playing state"; + set_start_result(res); + return; + } + + bool first_frame_received = false; + auto start_time = std::chrono::steady_clock::now(); + + while (true) + { + // 检查是否应该停止 + { + std::lock_guard lock(streams_mutex); + auto it = streams.find(key); + if (it == streams.end() || !it->second->running.load()) + { + LOG_INFO("[RTMP] Stream loop stopping for " + key); + break; + } + } + + // 检查第一帧超时 + if (!first_frame_received) + { + auto elapsed = std::chrono::steady_clock::now() - start_time; + if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) + { + res.result = 1; + res.reason = "No frames received within timeout"; + set_start_result(res); + LOG_WARN("[RTMP] First frame timeout for " + key); + break; + } + } + + // 等待GStreamer消息 + GstMessage *msg = gst_bus_timed_pop_filtered( + bus, 100 * GST_MSECOND, + static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED | + GST_MESSAGE_ELEMENT)); + + if (!msg) continue; + + switch (GST_MESSAGE_TYPE(msg)) + { + case GST_MESSAGE_ERROR: + { + GError *err = nullptr; + gchar *debug = nullptr; + gst_message_parse_error(msg, &err, &debug); + std::string err_msg = err ? err->message : "Unknown GStreamer error"; + res.result = 1; + res.reason = "Pipeline error: " + err_msg; + set_start_result(res); + LOG_ERROR("[RTMP] Pipeline error for " + key + ": " + err_msg); + if (err) g_error_free(err); + if (debug) g_free(debug); + gst_message_unref(msg); + goto cleanup; + } + case GST_MESSAGE_EOS: + res.result = 1; + res.reason = "EOS received"; + set_start_result(res); + LOG_INFO("[RTMP] EOS received for " + key); + gst_message_unref(msg); + goto cleanup; + case GST_MESSAGE_STATE_CHANGED: + // 可以添加状态变化的日志用于调试 + break; + case GST_MESSAGE_ELEMENT: + if (!first_frame_received) + { + first_frame_received = true; + res.result = 0; + res.reason = "First frame received, started OK"; + set_start_result(res); + LOG_INFO("[RTMP] First frame received for " + key); + } + break; + default: + break; + } + + gst_message_unref(msg); + } + } + catch (const std::exception &e) + { + LOG_ERROR("[RTMP] Exception in stream_loop for " + key + ": " + e.what()); + res.result = 1; + res.reason = "Exception: " + std::string(e.what()); if (!start_result_set) { - ctx->start_result.set_value(r); - start_result_set = true; + try + { + ctx->start_result.set_value(res); + start_result_set = true; + } + catch (const std::future_error &) + { + // 忽略future错误 + } } - }; - - auto start_time = std::chrono::steady_clock::now(); - - while (true) + } + catch (...) { + LOG_ERROR("[RTMP] Unknown exception in stream_loop for " + key); + res.result = 1; + res.reason = "Unknown exception"; + if (!start_result_set) { - std::lock_guard lock(streams_mutex); - auto it = streams.find(key); - if (it == streams.end() || !it->second->running.load()) break; - } - - GstMessage *msg = - gst_bus_timed_pop_filtered(bus, 100 * GST_MSECOND, - static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | - GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ELEMENT)); - - // 超时未收到第一帧 - if (!first_frame_received) - { - auto elapsed = std::chrono::steady_clock::now() - start_time; - if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) + try { - res.result = 1; - res.reason = "No frames received within timeout"; - set_start_result(res); - break; + ctx->start_result.set_value(res); + start_result_set = true; + } + catch (const std::future_error &) + { + // 忽略future错误 } } - - if (!msg) continue; - - switch (GST_MESSAGE_TYPE(msg)) - { - case GST_MESSAGE_ERROR: - { - GError *err = nullptr; - gchar *debug = nullptr; - gst_message_parse_error(msg, &err, &debug); - std::string err_msg = err ? err->message : "Unknown GStreamer error"; - res.result = 1; - res.reason = "Pipeline error: " + err_msg; - set_start_result(res); - if (err) g_error_free(err); - if (debug) g_free(debug); - break; - } - case GST_MESSAGE_EOS: - res.result = 1; - res.reason = "EOS received"; - set_start_result(res); - break; - case GST_MESSAGE_STATE_CHANGED: - // pipeline 播放状态变化,不算成功 - break; - case GST_MESSAGE_ELEMENT: - if (!first_frame_received) - { - first_frame_received = true; - res.result = 0; - res.reason = "First frame received, started OK"; - set_start_result(res); - } - break; - default: - break; - } - - gst_message_unref(msg); } - gst_element_set_state(pipeline, GST_STATE_NULL); - if (bus) gst_object_unref(bus); - gst_object_unref(pipeline); +cleanup: + // 清理GStreamer资源 + if (pipeline) + { + gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(pipeline); + } + if (bus) + { + gst_object_unref(bus); + } - // 清理 streams + // 确保start_result被设置 + if (!start_result_set) + { + try + { + res.result = 1; + res.reason = "Stream loop exited unexpectedly"; + ctx->start_result.set_value(res); + } + catch (const std::future_error &) + { + // 忽略future错误 + } + } + + // 从streams中移除 { std::lock_guard lock(streams_mutex); streams.erase(key);