// rtmp_manager.cpp #include "rtmp_manager.hpp" #include #include #include #include "logger.hpp" static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) { return cam_name + stream_type_suffix(type); } void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized."); } void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); } void RTMPManager::update_status(const std::string &key, const StreamStatus &status) { { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it != streams.end()) it->second->status = status; } if (status_callback) status_callback(key, status); } GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate; if (type == StreamType::SUB) { width = std::max(160, width / 2); height = std::max(120, height / 2); fps = std::max(10, fps / 2); bitrate = std::max(300000, bitrate / 2); } std::string stream_name = cam.name + stream_type_suffix(type); std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,format=NV12,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + "/1 ! queue max-size-buffers=1 leaky=downstream " "! mpph264enc bps=" + std::to_string(bitrate) + " gop=" + std::to_string(fps) + " ! h264parse ! flvmux streamable=true name=mux " "! rtmpsink location=\"rtmp://127.0.0.1/live/" + stream_name + " live=1\" sync=false"; LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str); GError *error = nullptr; GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message)); g_error_free(error); return nullptr; } return pipeline; } int get_camera_index(const std::string &name) { for (size_t i = 0; i < g_app_config.cameras.size(); ++i) if (g_app_config.cameras[i].name == name) return static_cast(i); return -1; } RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type) { StreamResultInfo res; res.loc = get_camera_index(cam.name); res.url = get_stream_url(cam.name, type); std::string key = make_stream_key(cam.name, type); try { auto ctx = std::make_unique(); StreamContext *ctx_ptr = ctx.get(); { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it != streams.end() && it->second->running.load()) { res.result = 0; res.reason = "Already streaming"; return res; } ctx->running.store(true); streams[key] = std::move(ctx); } std::future fut = ctx_ptr->start_result.get_future(); ctx_ptr->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); if (fut.wait_for(std::chrono::seconds(10)) == std::future_status::ready) { res = fut.get(); } else { res.result = 1; res.reason = "Start timeout"; // 超时时停止线程 stop_camera(cam.name, type); } } catch (const std::exception &e) { LOG_ERROR("[RTMP] Exception in start_camera for " + key + ": " + e.what()); res.result = 1; res.reason = "Start exception: " + std::string(e.what()); } return res; } RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { StreamResultInfo res; res.loc = get_camera_index(cam_name); res.url = get_stream_url(cam_name, type); std::unique_ptr ctx; std::string key = make_stream_key(cam_name, type); { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end()) { res.result = 0; res.reason = "Not streaming"; return res; } // 标记停止 it->second->running.store(false); // 移交所有权给局部变量,避免在锁内 join ctx = std::move(it->second); streams.erase(it); } // 安全 join if (ctx->thread.joinable()) ctx->thread.join(); res.result = 0; res.reason = "Stopped manually"; return res; } void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { constexpr int FIRST_FRAME_TIMEOUT_SEC = 5; StreamResultInfo res; res.loc = get_camera_index(cam.name); res.url = get_stream_url(cam.name, type); std::string key = make_stream_key(cam.name, type); LOG_INFO("[RTMP] Stream loop started for " + key); GstElement *pipeline = nullptr; GstBus *bus = nullptr; bool start_result_set = false; try { // 确保在异常情况下也能设置结果 auto set_start_result = [&](const StreamResultInfo &r) { if (!start_result_set) { try { ctx->start_result.set_value(r); start_result_set = true; } catch (const std::future_error &e) { LOG_ERROR("[RTMP] Failed to set start result: " + std::string(e.what())); } } }; // 创建管道 pipeline = create_pipeline(cam, type); if (!pipeline) { res.result = 1; res.reason = "Failed to create pipeline"; set_start_result(res); return; } bus = gst_element_get_bus(pipeline); if (!bus) { res.result = 1; res.reason = "Failed to get pipeline bus"; set_start_result(res); return; } // 设置管道状态 GstStateChangeReturn state_ret = gst_element_set_state(pipeline, GST_STATE_PLAYING); if (state_ret == GST_STATE_CHANGE_FAILURE) { res.result = 1; res.reason = "Failed to set pipeline to playing state"; set_start_result(res); return; } bool first_frame_received = false; auto start_time = std::chrono::steady_clock::now(); while (true) { // 检查是否应该停止 { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end() || !it->second->running.load()) { LOG_INFO("[RTMP] Stream loop stopping for " + key); break; } } // 检查第一帧超时 if (!first_frame_received) { auto elapsed = std::chrono::steady_clock::now() - start_time; if (std::chrono::duration_cast(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC) { res.result = 1; res.reason = "No frames received within timeout"; set_start_result(res); LOG_WARN("[RTMP] First frame timeout for " + key); break; } } // 等待GStreamer消息 GstMessage *msg = gst_bus_timed_pop_filtered( bus, 100 * GST_MSECOND, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ELEMENT)); if (!msg) continue; switch (GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_ERROR: { GError *err = nullptr; gchar *debug = nullptr; gst_message_parse_error(msg, &err, &debug); std::string err_msg = err ? err->message : "Unknown GStreamer error"; res.result = 1; res.reason = "Pipeline error: " + err_msg; set_start_result(res); LOG_ERROR("[RTMP] Pipeline error for " + key + ": " + err_msg); if (err) g_error_free(err); if (debug) g_free(debug); gst_message_unref(msg); goto cleanup; } case GST_MESSAGE_EOS: res.result = 1; res.reason = "EOS received"; set_start_result(res); LOG_INFO("[RTMP] EOS received for " + key); gst_message_unref(msg); goto cleanup; case GST_MESSAGE_STATE_CHANGED: // 可以添加状态变化的日志用于调试 break; case GST_MESSAGE_ELEMENT: if (!first_frame_received) { first_frame_received = true; res.result = 0; res.reason = "First frame received, started OK"; set_start_result(res); LOG_INFO("[RTMP] First frame received for " + key); } break; default: break; } gst_message_unref(msg); } } catch (const std::exception &e) { LOG_ERROR("[RTMP] Exception in stream_loop for " + key + ": " + e.what()); res.result = 1; res.reason = "Exception: " + std::string(e.what()); if (!start_result_set) { try { ctx->start_result.set_value(res); start_result_set = true; } catch (const std::future_error &) { // 忽略future错误 } } } catch (...) { LOG_ERROR("[RTMP] Unknown exception in stream_loop for " + key); res.result = 1; res.reason = "Unknown exception"; if (!start_result_set) { try { ctx->start_result.set_value(res); start_result_set = true; } catch (const std::future_error &) { // 忽略future错误 } } } cleanup: // 清理GStreamer资源 if (pipeline) { gst_element_set_state(pipeline, GST_STATE_NULL); gst_object_unref(pipeline); } if (bus) { gst_object_unref(bus); } // 确保start_result被设置 if (!start_result_set) { try { res.result = 1; res.reason = "Stream loop exited unexpectedly"; ctx->start_result.set_value(res); } catch (const std::future_error &) { // 忽略future错误 } } // 从streams中移除 { std::lock_guard lock(streams_mutex); streams.erase(key); } LOG_INFO("[RTMP] Stream loop ended for " + key); } bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_stream_key(cam_name, type)); return it != streams.end() && it->second->running.load(); } bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) if (kv.second->running.load()) return true; return false; } void RTMPManager::stop_all() { std::vector> ctxs; { std::lock_guard lock(streams_mutex); for (auto &kv : streams) kv.second->running.store(false); // 标记停止 // 移交所有权给局部变量,避免在锁内 join for (auto &kv : streams) ctxs.push_back(std::move(kv.second)); streams.clear(); } // 安全 join for (auto &ctx : ctxs) { if (ctx->thread.joinable()) ctx->thread.join(); } LOG_INFO("[RTMP] stop_all completed."); } std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) { return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type); }