diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index feadf47..8ff3fa2 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -109,7 +109,9 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) while (ctx->running) { - if (!device_exists(cam.device)) + // 检查设备是否存在 + struct stat st; + if (stat(cam.device.c_str(), &st) != 0) { ctx->status.running = false; ctx->status.last_error = "Device not found: " + cam.device; @@ -118,101 +120,76 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) continue; } + // 创建 pipeline GstElement *pipeline = create_pipeline(cam); if (!pipeline) { ctx->status.running = false; - ctx->status.last_error = "Failed to create pipeline"; + ctx->status.last_error = "Pipeline creation failed"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); + LOG_INFO("[RTMP] Starting stream: " + key); - // 等 pipeline 真正进入 PLAYING 状态 - GstStateChangeReturn ret = gst_element_get_state(pipeline, nullptr, nullptr, 3 * GST_SECOND); - if (ret != GST_STATE_CHANGE_SUCCESS) - { - ctx->status.running = false; - ctx->status.last_error = "Pipeline failed to reach PLAYING"; - LOG_ERROR("[RTMP] " + key + " failed to start (maybe no signal)"); - gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(bus); - gst_object_unref(pipeline); - std::this_thread::sleep_for(std::chrono::seconds(3)); - continue; - } - - // 等首帧 - bool first_frame = false; - auto start_time = std::chrono::steady_clock::now(); - while (!first_frame && std::chrono::steady_clock::now() - start_time < std::chrono::seconds(3)) - { - GstMessage *msg = gst_bus_timed_pop_filtered(bus, 300 * GST_MSECOND, - (GstMessageType)(GST_MESSAGE_ELEMENT | GST_MESSAGE_ERROR)); - if (!msg) continue; - - if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ELEMENT) - { - const GstStructure *s = gst_message_get_structure(msg); - if (s && g_str_has_prefix(gst_structure_get_name(s), "fpsdisplaysink")) - { - first_frame = true; - break; - } - } - else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) - { - first_frame = false; - break; - } - gst_message_unref(msg); - } - - if (!first_frame) - { - ctx->status.running = false; - ctx->status.last_error = "No frames detected"; - LOG_ERROR("[RTMP] " + key + " - No frames detected (no video signal)"); - gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(bus); - gst_object_unref(pipeline); - std::this_thread::sleep_for(std::chrono::seconds(3)); - continue; - } - - // 进入正常推流状态 ctx->status.running = true; ctx->status.last_error.clear(); - LOG_INFO("[RTMP] Started stream for " + key); + bool got_frame = false; bool need_restart = false; + auto start_time = std::chrono::steady_clock::now(); + while (ctx->running) { - GstMessage *msg = gst_bus_timed_pop_filtered(bus, 500 * GST_MSECOND, - (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); + GstMessage *msg = gst_bus_timed_pop_filtered( + bus, 200 * GST_MSECOND, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_ELEMENT)); + + // 检查帧超时:3 秒内没有检测到帧 + auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count(); + if (!got_frame && elapsed > 3) + { + ctx->status.running = false; + ctx->status.last_error = "No frames detected (no video signal)"; + LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error); + need_restart = true; + break; + } if (!msg) continue; switch (GST_MESSAGE_TYPE(msg)) { + case GST_MESSAGE_ELEMENT: + // 检测 fpsdisplaysink 输出帧 + if (gst_message_has_name(msg, "fpsprobe")) + { + got_frame = true; + } + break; + case GST_MESSAGE_ERROR: { GError *err = nullptr; gst_message_parse_error(msg, &err, nullptr); ctx->status.running = false; - ctx->status.last_error = err ? err->message : "Unknown GStreamer error"; + ctx->status.last_error = err ? std::string(err->message) : "GStreamer error"; LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error); if (err) g_error_free(err); need_restart = true; break; } + case GST_MESSAGE_EOS: ctx->status.running = false; - ctx->status.last_error = "End of stream"; + ctx->status.last_error = "End of stream (EOS)"; + LOG_WARN("[RTMP] " + key + " reached EOS"); need_restart = true; break; + default: break; } @@ -221,8 +198,9 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) if (need_restart) break; } + // 清理资源 gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(bus); + if (bus) gst_object_unref(bus); gst_object_unref(pipeline); if (ctx->running) @@ -233,6 +211,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) } ctx->status.running = false; + LOG_INFO("[RTMP] Stream thread exited for " + key); } // ========== 启停与状态 ========== @@ -251,20 +230,34 @@ void RTMPManager::start_all() } } -void RTMPManager::stop_all() +void RTMPManager::start_all() { + LOG_INFO("[RTMP] Starting all record streams..."); std::lock_guard lock(streams_mutex); - for (auto &kv : streams) kv.second->running.store(false); - for (auto &kv : streams) - if (kv.second->thread.joinable()) kv.second->thread.join(); - streams.clear(); -} -bool RTMPManager::is_streaming(const std::string &cam_name) -{ - std::lock_guard lock(streams_mutex); - auto it = streams.find(make_key(cam_name)); - return (it != streams.end() && it->second->status.running); + int delay_ms = 0; + for (auto &cam : g_app_config.cameras) + { + auto key = make_key(cam.name); + if (streams.find(key) != streams.end()) + { + LOG_INFO("[RTMP] Stream already running: " + key); + continue; + } + + auto ctx = std::make_unique(); + ctx->running.store(true); + + ctx->thread = std::thread( + [cam, ptr = ctx.get(), delay_ms]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + stream_loop(cam, ptr); + }); + + streams.emplace(key, std::move(ctx)); + delay_ms += 200; // 每路错开 200ms + } } std::string RTMPManager::get_stream_url(const std::string &cam_name)