diff --git a/config.json b/config.json new file mode 100644 index 0000000..fc5b64a --- /dev/null +++ b/config.json @@ -0,0 +1,86 @@ +{ + "mqtt_server": { + "veh_id": 20004, + "address": "192.168.4.195", + "port": 51883, + "need_username_pwd": true, + "client_id": "20004_vehmedia", + "username": "20004_4A:69:BE:32:59:AE", + "password": "31d6bb29f177d5bf8560756c0f0e63c63fd412e52c5b9ea59476024eab893884a5f34f0637e0fe3ad42b802c16edb6feb37cde613957c3540c060c07b230cb0aa6b4547bb86fcae43d484179d3a11a1969a2f367ec0ceede4c10510757a89927af4c2d0c0484476be3241a9ff9242e7401f3fbcd824b5cfb19674663b7045e32dd2f97b4", + "mqtt_heart_threshold": 2000 + }, + "cameras": [ + { + "device": "/dev/video11", + "name": "AHD1", + "enabled": true, + "resolution": "720p", + "width": 1280, + "height": 720, + "fps": 30 + }, + { + "device": "/dev/video12", + "name": "AHD2", + "enabled": false, + "resolution": "720p", + "width": 1280, + "height": 720, + "fps": 30 + }, + { + "device": "/dev/video13", + "name": "AHD3", + "enabled": false, + "resolution": "720p", + "width": 1280, + "height": 720, + "fps": 30 + }, + { + "device": "/dev/video14", + "name": "AHD4", + "enabled": false, + "resolution": "720p", + "width": 1280, + "height": 720, + "fps": 30 + }, + { + "device": "/dev/video3", + "name": "AHD5", + "enabled": false, + "resolution": "1080p", + "width": 1920, + "height": 1080, + "fps": 30 + }, + { + "device": "/dev/video2", + "name": "AHD6", + "enabled": false, + "resolution": "1080p", + "width": 1920, + "height": 1080, + "fps": 30 + }, + { + "device": "/dev/video1", + "name": "AHD7", + "enabled": false, + "resolution": "1080p", + "width": 1920, + "height": 1080, + "fps": 30 + }, + { + "device": "/dev/video0", + "name": "AHD8", + "enabled": false, + "resolution": "1080p", + "width": 1920, + "height": 1080, + "fps": 30 + } + ] +} \ No newline at end of file diff --git a/include/app_config.hpp b/include/app_config.hpp index 921eabe..88a673e 100644 --- a/include/app_config.hpp +++ b/include/app_config.hpp @@ -15,13 +15,20 @@ using json = nlohmann::json; using ordered_json = nlohmann::ordered_json; // ------------------- 摄像头结构体 ------------------- +enum class StreamType +{ + MAIN, + SUB +}; + struct Camera { std::string device; std::string name; int width, height, fps; - int bitrate; // 新增码率字段 (bps) + int bitrate; bool enabled; + StreamType stream_type; // 新增字段 }; // ------------------- MQTT Topic ------------------- @@ -29,21 +36,11 @@ struct VehicleMQTTTopics { std::string heartbeat_up; std::string video_down; - std::string video_down_ack; - std::string substream_down; - std::string substream_down_ack; - std::string reset_down; - std::string reset_down_ack; void fill_with_veh_id(const std::string &vehId) { - heartbeat_up = "adcpcmcc/v1/vehmedia/" + vehId + "/heartbeat/up"; - video_down = "adcpcmcc/v1/vehmedia/" + vehId + "/video/down"; - video_down_ack = "adcpcmcc/v1/vehmedia/" + vehId + "/video/down/ack"; - substream_down = "adcpcmcc/v1/vehmedia/" + vehId + "/substream/down"; - substream_down_ack = "adcpcmcc/v1/vehmedia/" + vehId + "/substream/down/ack"; - reset_down = "adcpcmcc/v1/vehmedia/" + vehId + "/reset/down"; - reset_down_ack = "adcpcmcc/v1/vehmedia/" + vehId + "/reset/down/ack"; + heartbeat_up = "/kun/vehicle/video/heartbeat/" + vehId; + video_down = "/kun/vehicle/video/push/" + vehId; } }; diff --git a/include/mqtt_client_wrapper.hpp b/include/mqtt_client_wrapper.hpp index 20d10d7..7516c9a 100644 --- a/include/mqtt_client_wrapper.hpp +++ b/include/mqtt_client_wrapper.hpp @@ -4,7 +4,7 @@ #include "app_config.hpp" #include "logger.hpp" #include "mqtt_client.hpp" -#include "rtsp_manager.hpp" +#include "rtmp_manager.hpp" #include #include diff --git a/include/rtmp_manager.hpp b/include/rtmp_manager.hpp new file mode 100644 index 0000000..9a4e646 --- /dev/null +++ b/include/rtmp_manager.hpp @@ -0,0 +1,42 @@ +// rtsp_manager.hpp +#pragma once + +#include +#include "app_config.hpp" +#include +#include +#include +#include +#include + +class RTMPManager +{ +public: + static void init(); + // start/stop 增加 StreamType 参数 + static void start_camera(const Camera &cam, StreamType type); + static void stop_camera(const std::string &cam_name, StreamType type); + static void stop_all(); + static bool is_streaming(const std::string &cam_name); + static bool is_any_streaming(); + + // 获取推流 URL,用于应答 + static std::string get_stream_url(const std::string &cam_name, StreamType type); + +private: + struct StreamContext + { + std::thread thread; + std::atomic running{false}; + }; + + static std::unordered_map streams; + static std::mutex streams_mutex; + + // stream loop 接收 StreamType 作为参数 + static void stream_loop(Camera cam, StreamType type); + static GstElement *create_pipeline(const Camera &cam, StreamType type); + + // 辅助:构建 map key + static std::string make_stream_key(const std::string &cam_name, StreamType type); +}; diff --git a/include/rtsp_manager.hpp b/include/rtsp_manager.hpp deleted file mode 100644 index f35754d..0000000 --- a/include/rtsp_manager.hpp +++ /dev/null @@ -1,48 +0,0 @@ -// rtsp_manager.hpp -#pragma once - -#include -#include -#include "app_config.hpp" -#include -#include -#include -#include - -class RTSPManager -{ -public: - static void init(); - static void start(const std::vector &cameras); - static void stop(); - - static void mount_camera(const Camera &cam); - static void unmount_camera(const Camera &cam); - static bool is_streaming(const std::string &cam_name); - static bool is_any_streaming(); - -private: - static GMainLoop *loop; - static GMainContext *main_context; - static GstRTSPServer *server; - static std::unordered_map streaming_status; - - // 工厂创建函数 - static GstRTSPMediaFactory *create_media_factory(const Camera &cam); - - // 挂载/卸载函数 - static gboolean mount_camera_in_main(gpointer data); - static gboolean unmount_camera_in_main(gpointer data); - - // 静态 mutex 和工厂表 - static std::unordered_map mounted_factories; - static std::mutex mounted_factories_mutex; - - // 媒体对象跟踪 - static std::unordered_map> media_map; - static std::mutex media_map_mutex; - - // 信号处理函数 - static void on_media_created(GstRTSPMediaFactory *factory, GstRTSPMedia *media, gpointer user_data); - static void on_media_unprepared(GstRTSPMedia *media, gpointer user_data); -}; \ No newline at end of file diff --git a/rtsp_release_250910.tar.gz b/rtsp_release_250910.tar.gz deleted file mode 100644 index b9d530d..0000000 Binary files a/rtsp_release_250910.tar.gz and /dev/null differ diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index dac8abe..c611a7c 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -57,145 +57,88 @@ static void on_mqtt_message_received(const std::string &topic, const std::string try { + auto j = nlohmann::json::parse(message); + if (topic == g_app_config.mqtt.topics.video_down) { - // 处理 video_down - auto j = nlohmann::json::parse(message); - - if (!j.contains("data") || !j["data"].contains("status") || !j["data"].contains("seqNo")) + if (topic.find("/kun/vehicle/video/push/") != std::string::npos && j["type"] == "request") { - LOG_WARN("[MQTT] video_down JSON missing required fields"); - return; - } + std::string seqNo = j.value("seqNo", ""); + auto data = j["data"]; - // 写 dispatchId 并设置 streaming 状态 - { - std::lock_guard lock(g_dispatch_id_mutex); - if (j["data"].contains("dispatchId")) - g_dispatch_id = j["data"]["dispatchId"].get(); - } + int switch_val = data.value("switch", 0); + int streamType = data.value("streamType", 0); // 0主 1子 + StreamType type = (streamTypeInt == 0) ? StreamType::MAIN : StreamType::SUB; + auto channels = data.value("channels", std::vector{}); - int status = j["data"]["status"].get(); + nlohmann::json resp_data = nlohmann::json::array(); - std::string seqNo = j["data"]["seqNo"].get(); - - bool success = true; // 标记是否操作成功 - - if (status == 0) - { - g_streaming = true; - // 启动推流:挂载本地配置中 enabled 的摄像头 - for (const auto &cam : g_app_config.cameras) + for (int ch : channels) { - if (!cam.enabled) - continue; + // 根据通道号找到摄像头 + auto it = std::find_if(g_app_config.cameras.begin(), g_app_config.cameras.end(), + [ch](const Camera &c) + { return c.channel == ch; }); - if (!RTSPManager::is_streaming(cam.name)) - { - RTSPManager::mount_camera(cam); - LOG_INFO("[MQTT] Started streaming: " + cam.name); - } - } - } - else if (status == 1) - { - g_streaming = false; - std::lock_guard lock(g_dispatch_id_mutex); - g_dispatch_id.clear(); // 停止拉流就清空 dispatchId - // 停止推流:卸载本地配置中 enabled 的摄像头 - for (const auto &cam : g_app_config.cameras) - { - if (!cam.enabled) - continue; + nlohmann::json ch_resp; + ch_resp["loc"] = ch; - if (RTSPManager::is_streaming(cam.name)) + if (it != g_app_config.cameras.end()) { - RTSPManager::unmount_camera(cam); - LOG_INFO("[MQTT] Stopped streaming: " + cam.name); + Camera &cam = *it; + bool op_result = false; + + if (switch_val == 0) + { + // 启动流 + if (!RTMPManager::is_streaming(cam.name, type)) + { + RTMPManager::start_camera(cam, type); + op_result = true; + } + } + else + { + // 停止流 + if (RTMPManager::is_streaming(cam.name, type)) + { + RTMPManager::stop_camera(cam.name, type); + op_result = true; + } + } + + ch_resp["url"] = op_result ? RTMPManager::get_stream_url(cam.name, type) : ""; + ch_resp["result"] = op_result ? 0 : 1; + ch_resp["reason"] = op_result ? "" : "already in requested state"; } + else + { + ch_resp["url"] = ""; + ch_resp["result"] = 1; + ch_resp["reason"] = "channel not found"; + } + + resp_data.push_back(ch_resp); } + + // 构造应答 + nlohmann::json reply; + reply["type"] = "response"; + reply["seqNo"] = seqNo; + reply["data"] = resp_data; + + mqtt_client->publish(topic, reply.dump()); + LOG_INFO("[MQTT] Replied to video push request: " + reply.dump()); } else { - LOG_WARN("[MQTT] video_down: unknown status value " + std::to_string(status)); - success = false; - } - - // 获取当前时间 yyyyMMddHHmmssSSS (UTC+8) - std::string time_str = Logger::get_current_time_utc8(); - - nlohmann::json reply_data; - reply_data["time"] = time_str; - reply_data["result"] = success ? 0 : 1; - reply_data["seqNo"] = seqNo; - - // 封装外层 - nlohmann::json reply; - reply["data"] = reply_data; - reply["isEnc"] = 0; - reply["type"] = 0; - - // 发送应答 - if (mqtt_client) - { - mqtt_client->publish(g_app_config.mqtt.topics.video_down_ack, reply.dump()); - LOG_INFO("[MQTT] Replied to video_down: " + reply.dump()); + LOG_WARN("[MQTT] Unknown topic or non-request type: " + topic); } } - else if (topic == g_app_config.mqtt.topics.substream_down) + catch (const std::exception &e) { - // 处理 substream_down - LOG_INFO("[MQTT] substream_down message received (not implemented yet)."); + LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); } - else if (topic == g_app_config.mqtt.topics.reset_down) - { - auto j = nlohmann::json::parse(message); - - // reset/down 的 payload 在 data 里 - auto data = j.contains("data") ? j["data"] : nlohmann::json::object(); - - std::string seqNo = data.value("seqNo", ""); - std::string errCode = data.value("errorCode", ""); - std::string des = data.value("des", ""); - - LOG_WARN("[MQTT] Reset command received, errorCode=" + errCode + ", des=" + des); - - // 停止所有流,相当于复位 - for (const auto &cam : g_app_config.cameras) - { - if (RTSPManager::is_streaming(cam.name)) - { - RTSPManager::unmount_camera(cam); - LOG_INFO("[RTSP] Camera " + cam.name + " reset/unmounted"); - } - } - - // 组装应答 data - nlohmann::json reply_data; - reply_data["time"] = Logger::get_current_time_utc8(); - reply_data["result"] = 0; // 0=成功 - reply_data["seqNo"] = seqNo; - - // 外层封装 - nlohmann::json reply; - reply["data"] = reply_data; - reply["isEnc"] = 0; - reply["type"] = 0; - - if (mqtt_client) - { - mqtt_client->publish(g_app_config.mqtt.topics.reset_down_ack, reply.dump()); - LOG_INFO("[MQTT] Replied to reset_down: " + reply.dump()); - } - } - else - { - LOG_WARN("[MQTT] Unknown topic: " + topic); - } - } - catch (const std::exception &e) - { - LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); } } diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp new file mode 100644 index 0000000..a5dbc81 --- /dev/null +++ b/src/rtmp_manager.cpp @@ -0,0 +1,259 @@ +// rtsp_manager.cpp +#include "rtmp_manager.hpp" +#include "logger.hpp" +#include +#include +#include + +std::unordered_map RTMPManager::streams; +std::mutex RTMPManager::streams_mutex; + +static inline std::string stream_type_suffix(StreamType type) +{ + return (type == StreamType::MAIN) ? "_main" : "_sub"; +} + +std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) +{ + return cam_name + stream_type_suffix(type); +} + +void RTMPManager::init() +{ + gst_init(nullptr, nullptr); + LOG_INFO("[RTMP] GStreamer initialized."); +} + +GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) +{ + // 拷贝原始参数 + int width = cam.width; + int height = cam.height; + int fps = cam.fps; + int bitrate = cam.bitrate; + + if (type == StreamType::SUB) + { + // 简单一刀切策略:分辨率 /2,帧率 /2,码率 /2,临界值保护 + width = std::max(160, width / 2); + height = std::max(120, height / 2); + fps = std::max(10, fps / 2); + bitrate = std::max(300000, bitrate / 2); // 最低 300kbps + } + + // 构建不同的流名(stream key) + std::string stream_name = cam.name + (type == StreamType::MAIN ? "_main" : "_sub"); + + std::string pipeline_str = + "v4l2src device=" + cam.device + + " ! video/x-raw,format=NV12,width=" + std::to_string(width) + + ",height=" + std::to_string(height) + + ",framerate=" + std::to_string(fps) + "/1" + " ! queue max-size-buffers=1 leaky=downstream " + " ! mpph264enc bps=" + + std::to_string(bitrate) + + " gop=" + std::to_string(fps) + + " ! h264parse " + " ! flvmux streamable=true name=mux " + " ! rtmpsink location=\"rtmp://127.0.0.1/live/" + + stream_name + + " live=1\" sync=false"; + + LOG_INFO("[RTMP] Creating pipeline for '" + cam.name + "' (" + + (type == StreamType::MAIN ? "MAIN" : "SUB") + ") -> " + pipeline_str); + + GError *error = nullptr; + GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); + if (error) + { + LOG_ERROR("[RTMP] Failed to parse pipeline: " + std::string(error->message)); + g_error_free(error); + return nullptr; + } + return pipeline; +} + +void RTMPManager::stream_loop(Camera cam, StreamType type) +{ + const std::string key = make_stream_key(cam.name, type); + LOG_INFO("[RTMP] Stream thread started for '" + key + "'"); + + while (true) + { + { + std::lock_guard lock(streams_mutex); + if (!streams[key].running) + break; + } + + GstElement *pipeline = create_pipeline(cam, type); + if (!pipeline) + { + LOG_ERROR("[RTMP] Failed to create pipeline for '" + key + "', retrying in 3s..."); + std::this_thread::sleep_for(std::chrono::seconds(3)); + continue; + } + + GstBus *bus = gst_element_get_bus(pipeline); + gst_element_set_state(pipeline, GST_STATE_PLAYING); + LOG_INFO("[RTMP] Camera '" + key + "' streaming..."); + + bool error_occurred = false; + // 简单重连计数(可选):连续失败超过 N 次可以退出 + int consecutive_failures = 0; + const int MAX_RETRIES = 5; + + while (true) + { + GstMessage *msg = gst_bus_timed_pop_filtered( + bus, GST_CLOCK_TIME_NONE, + static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); + + { + std::lock_guard lock(streams_mutex); + if (!streams[key].running) + break; + } + + if (!msg) + continue; + + if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) + { + GError *err = nullptr; + gchar *debug = nullptr; + gst_message_parse_error(msg, &err, &debug); + LOG_ERROR("[RTMP] Error on '" + key + "': " + std::string(err->message)); + if (debug) + LOG_DEBUG(std::string(debug)); + g_error_free(err); + g_free(debug); + error_occurred = true; + consecutive_failures++; + } + else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) + { + LOG_WARN("[RTMP] EOS received on '" + key + "'"); + error_occurred = true; + consecutive_failures++; + } + + gst_message_unref(msg); + if (error_occurred) + break; + } + + gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(pipeline); + gst_object_unref(bus); + + if (!error_occurred) + break; + + if (consecutive_failures >= MAX_RETRIES) + { + LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts."); + // 可以选择将 running 置为 false,让上层知道失败 + std::lock_guard lock(streams_mutex); + streams[key].running = false; + break; + } + + LOG_WARN("[RTMP] Reconnecting '" + key + "' in 3s..."); + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + + LOG_INFO("[RTMP] Stream thread exited for '" + key + "'"); +} + +void RTMPManager::start_camera(const Camera &cam, StreamType type) +{ + std::lock_guard lock(streams_mutex); + std::string key = make_stream_key(cam.name, type); + if (streams.count(key) && streams[key].running) + { + LOG_WARN("[RTMP] Camera '" + key + "' already streaming."); + return; + } + + StreamContext ctx; + ctx.running = true; + ctx.thread = std::thread([cam, type]() + { RTMPManager::stream_loop(cam, type); }); + streams[key] = std::move(ctx); +} + +void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) +{ + std::lock_guard lock(streams_mutex); + std::string key = make_stream_key(cam_name, type); + auto it = streams.find(key); + if (it == streams.end()) + { + LOG_WARN("[RTMP] Camera '" + key + "' not found."); + return; + } + + it->second.running = false; + LOG_INFO("[RTMP] Stopping camera '" + key + "'..."); + + if (it->second.thread.joinable()) + it->second.thread.join(); + + streams.erase(it); +} + +bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) +{ + std::lock_guard lock(streams_mutex); + std::string key = make_stream_key(cam_name, type); + auto it = streams.find(key); + return it != streams.end() && it->second.running; +} + +bool RTMPManager::is_any_streaming() +{ + std::lock_guard lock(streams_mutex); + for (auto &kv : streams) + if (kv.second.running) + return true; + return false; +} + +void RTMPManager::stop_all() +{ + std::vector names; + { + std::lock_guard lock(streams_mutex); + for (auto &kv : streams) + names.push_back(kv.first); + } + + for (auto &name : names) + { + // name 中包含后缀,但是 stop_camera 需要 cam_name + StreamType + // 我们可以解析后缀,或新增一个 stop_by_key。为简单起见解析: + if (name.size() > 5 && name.find("_sub") == name.size() - 4) + { + std::string cam_name = name.substr(0, name.size() - 4); + stop_camera(cam_name, StreamType::SUB); + } + else if (name.size() > 6 && name.find("_main") == name.size() - 5) + { + std::string cam_name = name.substr(0, name.size() - 5); + stop_camera(cam_name, StreamType::MAIN); + } + else + { + // fallback: stop by treating as MAIN + stop_camera(name, StreamType::MAIN); + } + } +} + +std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) +{ + // 根据你本地 RTMP 服务地址组装 URL,和 create_pipeline 的 stream_name 保持一致 + std::string stream_name = cam_name + stream_type_suffix(type); + return std::string("rtmp://127.0.0.1/live/") + stream_name; +} diff --git a/src/rtsp_manager.cpp b/src/rtsp_manager.cpp deleted file mode 100644 index 488d83f..0000000 --- a/src/rtsp_manager.cpp +++ /dev/null @@ -1,279 +0,0 @@ -// rtsp_manager.cpp -#include "rtsp_manager.hpp" -#include "logger.hpp" -#include -#include -#include -#include - -// 静态变量定义 -GMainLoop *RTSPManager::loop = nullptr; -GMainContext *RTSPManager::main_context = nullptr; -GstRTSPServer *RTSPManager::server = nullptr; -std::unordered_map RTSPManager::streaming_status; -std::unordered_map RTSPManager::mounted_factories; -std::mutex RTSPManager::mounted_factories_mutex; -std::unordered_map> RTSPManager::media_map; -std::mutex RTSPManager::media_map_mutex; - -void RTSPManager::init() -{ - gst_init(nullptr, nullptr); - LOG_INFO("[RTSP] GStreamer initialized."); -} - -// 创建 media factory -GstRTSPMediaFactory *RTSPManager::create_media_factory(const Camera &cam) -{ - // 输出分辨率直接用相机原始分辨率 - int out_width = cam.width; - int out_height = cam.height; - - // 构建 pipeline - std::string launch_str = - "( v4l2src device=" + cam.device + - " ! video/x-raw,format=NV12,width=" + std::to_string(out_width) + - ",height=" + std::to_string(out_height) + - ",framerate=" + std::to_string(cam.fps) + "/1" - " ! queue max-size-buffers=1 leaky=downstream" // 最小队列 - " ! mpph264enc bps=" + - std::to_string(cam.bitrate) + - " gop=" + std::to_string(cam.fps) + // GOP 设置为 1 秒 - " ! h264parse" - " ! rtph264pay name=pay0 pt=96 config-interval=1 )"; // 每秒发送 SPS/PPS - - GstRTSPMediaFactory *factory = gst_rtsp_media_factory_new(); - gst_rtsp_media_factory_set_launch(factory, launch_str.c_str()); - gst_rtsp_media_factory_set_shared(factory, TRUE); - - // 使用 media-configure 信号代替 media-created - g_signal_connect_data(factory, "media-configure", G_CALLBACK(on_media_created), - g_strdup(cam.name.c_str()), (GClosureNotify)g_free, static_cast(0)); - - return factory; -} - -// 启动 RTSP server -void RTSPManager::start(const std::vector &cameras) -{ - server = gst_rtsp_server_new(); - gst_rtsp_server_set_service(server, "8554"); - - loop = g_main_loop_new(nullptr, FALSE); - main_context = g_main_loop_get_context(loop); - - gst_rtsp_server_attach(server, nullptr); - - LOG_INFO("[RTSP] Server running on rtsp://localhost:8554"); - g_main_loop_run(loop); - - if (server) - { - g_object_unref(server); - server = nullptr; - } - if (loop) - { - g_main_loop_unref(loop); - loop = nullptr; - } - - LOG_INFO("[RTSP] Server stopped."); -} - -// media-configure 信号处理 -void RTSPManager::on_media_created(GstRTSPMediaFactory *factory, GstRTSPMedia *media, gpointer user_data) -{ - const char *cam_name = static_cast(user_data); - - g_object_ref(media); // 增加引用计数,防止被提前销毁 - { - std::lock_guard lock(media_map_mutex); - media_map[cam_name].push_back(media); - } - - // 连接 unprepared 信号,当 pipeline 被销毁时移除 - g_signal_connect_data(media, "unprepared", G_CALLBACK(on_media_unprepared), - g_strdup(cam_name), (GClosureNotify)g_free, static_cast(0)); -} - -// unprepared 信号处理 -void RTSPManager::on_media_unprepared(GstRTSPMedia *media, gpointer user_data) -{ - const char *cam_name = static_cast(user_data); - { - std::lock_guard lock(media_map_mutex); - auto it = media_map.find(cam_name); - if (it != media_map.end()) - { - auto &media_list = it->second; - media_list.erase(std::remove(media_list.begin(), media_list.end(), media), media_list.end()); - if (media_list.empty()) - { - media_map.erase(it); - } - } - } - g_object_unref(media); // 释放引用 -} - -// 挂载摄像头 -gboolean RTSPManager::mount_camera_in_main(gpointer data) -{ - Camera *cam = static_cast(data); - if (!cam || !server) - { - delete cam; - return G_SOURCE_REMOVE; - } - - GstRTSPMountPoints *mounts = gst_rtsp_server_get_mount_points(server); - if (!mounts) - { - delete cam; - return G_SOURCE_REMOVE; - } - - std::string mount_point = "/" + cam->name; - GstRTSPMediaFactory *factory = create_media_factory(*cam); - if (!factory) - { - g_object_unref(mounts); - delete cam; - return G_SOURCE_REMOVE; - } - - gst_rtsp_mount_points_add_factory(mounts, mount_point.c_str(), factory); - g_object_unref(mounts); - - { - std::lock_guard lock(mounted_factories_mutex); - mounted_factories[cam->name] = factory; - streaming_status[cam->name] = true; - } - - LOG_INFO("[RTSP] Camera '" + cam->name + "' mounted at rtsp://localhost:8554" + mount_point); - delete cam; - return G_SOURCE_REMOVE; -} - -// 卸载摄像头 -gboolean RTSPManager::unmount_camera_in_main(gpointer data) -{ - Camera *cam = static_cast(data); - if (!cam || !server) - { - delete cam; - return G_SOURCE_REMOVE; - } - std::string cam_name = cam->name; - std::string mount_point = "/" + cam_name; - - // 停止所有媒体 - { - std::lock_guard lock(media_map_mutex); - auto it = media_map.find(cam_name); - if (it != media_map.end()) - { - for (GstRTSPMedia *media : it->second) - { - gst_element_set_state(gst_rtsp_media_get_element(media), GST_STATE_NULL); - // g_object_unref(media); - } - it->second.clear(); - media_map.erase(it); - } - } - - // 卸载 factory - GstRTSPMountPoints *mounts = gst_rtsp_server_get_mount_points(server); - if (mounts) - { - gst_rtsp_mount_points_remove_factory(mounts, mount_point.c_str()); - g_object_unref(mounts); - } - - { - std::lock_guard lock(mounted_factories_mutex); - auto it = mounted_factories.find(cam_name); - if (it != mounted_factories.end()) - { - if (it->second && G_IS_OBJECT(it->second)) - g_object_unref(it->second); - mounted_factories.erase(it); - } - streaming_status[cam_name] = false; - } - - LOG_INFO("[RTSP] Camera '" + cam_name + "' unmounted."); - delete cam; - return G_SOURCE_REMOVE; -} - -// 公共挂载/卸载接口 -void RTSPManager::mount_camera(const Camera &cam) -{ - Camera *camCopy = new Camera(cam); - g_main_context_invoke(main_context, [](gpointer data) -> gboolean - { return RTSPManager::mount_camera_in_main(data); }, camCopy); -} - -void RTSPManager::unmount_camera(const Camera &cam) -{ - Camera *camCopy = new Camera(cam); - g_main_context_invoke(main_context, [](gpointer data) -> gboolean - { return RTSPManager::unmount_camera_in_main(data); }, camCopy); -} - -// 是否正在流 -bool RTSPManager::is_streaming(const std::string &cam_name) -{ - std::lock_guard lock(mounted_factories_mutex); - auto it = streaming_status.find(cam_name); - return it != streaming_status.end() ? it->second : false; -} - -// 停止 server -void RTSPManager::stop() -{ - // 先卸载所有挂载摄像头 - std::vector cams_to_unmount; - { - std::lock_guard lock(mounted_factories_mutex); - for (const auto &kv : mounted_factories) - cams_to_unmount.push_back(kv.first); - } - - for (const auto &cam_name : cams_to_unmount) - { - Camera cam; - cam.name = cam_name; - unmount_camera(cam); - } - - // 等待所有流停止(最多 5 秒) - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - while (is_any_streaming() && std::chrono::steady_clock::now() < deadline) - { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } - - // 退出 main loop - if (loop) - { - g_main_context_invoke(main_context, [](gpointer data) -> gboolean - { - g_main_loop_quit(static_cast(data)); - return G_SOURCE_REMOVE; }, loop); - } -} - -// 新增接口:检查是否还有摄像头在流 -bool RTSPManager::is_any_streaming() -{ - std::lock_guard lock(mounted_factories_mutex); - for (const auto &kv : streaming_status) - if (kv.second) - return true; - return false; -}