更新完成,准备推送到新仓库

This commit is contained in:
cxh 2025-10-14 17:13:08 +08:00
parent 408f248071
commit 6aacff609b
9 changed files with 461 additions and 461 deletions

86
config.json Normal file
View File

@ -0,0 +1,86 @@
{
"mqtt_server": {
"veh_id": 20004,
"address": "192.168.4.195",
"port": 51883,
"need_username_pwd": true,
"client_id": "20004_vehmedia",
"username": "20004_4A:69:BE:32:59:AE",
"password": "31d6bb29f177d5bf8560756c0f0e63c63fd412e52c5b9ea59476024eab893884a5f34f0637e0fe3ad42b802c16edb6feb37cde613957c3540c060c07b230cb0aa6b4547bb86fcae43d484179d3a11a1969a2f367ec0ceede4c10510757a89927af4c2d0c0484476be3241a9ff9242e7401f3fbcd824b5cfb19674663b7045e32dd2f97b4",
"mqtt_heart_threshold": 2000
},
"cameras": [
{
"device": "/dev/video11",
"name": "AHD1",
"enabled": true,
"resolution": "720p",
"width": 1280,
"height": 720,
"fps": 30
},
{
"device": "/dev/video12",
"name": "AHD2",
"enabled": false,
"resolution": "720p",
"width": 1280,
"height": 720,
"fps": 30
},
{
"device": "/dev/video13",
"name": "AHD3",
"enabled": false,
"resolution": "720p",
"width": 1280,
"height": 720,
"fps": 30
},
{
"device": "/dev/video14",
"name": "AHD4",
"enabled": false,
"resolution": "720p",
"width": 1280,
"height": 720,
"fps": 30
},
{
"device": "/dev/video3",
"name": "AHD5",
"enabled": false,
"resolution": "1080p",
"width": 1920,
"height": 1080,
"fps": 30
},
{
"device": "/dev/video2",
"name": "AHD6",
"enabled": false,
"resolution": "1080p",
"width": 1920,
"height": 1080,
"fps": 30
},
{
"device": "/dev/video1",
"name": "AHD7",
"enabled": false,
"resolution": "1080p",
"width": 1920,
"height": 1080,
"fps": 30
},
{
"device": "/dev/video0",
"name": "AHD8",
"enabled": false,
"resolution": "1080p",
"width": 1920,
"height": 1080,
"fps": 30
}
]
}

View File

@ -15,13 +15,20 @@ using json = nlohmann::json;
using ordered_json = nlohmann::ordered_json;
// ------------------- 摄像头结构体 -------------------
enum class StreamType
{
MAIN,
SUB
};
struct Camera
{
std::string device;
std::string name;
int width, height, fps;
int bitrate; // 新增码率字段 (bps)
int bitrate;
bool enabled;
StreamType stream_type; // 新增字段
};
// ------------------- MQTT Topic -------------------
@ -29,21 +36,11 @@ struct VehicleMQTTTopics
{
std::string heartbeat_up;
std::string video_down;
std::string video_down_ack;
std::string substream_down;
std::string substream_down_ack;
std::string reset_down;
std::string reset_down_ack;
void fill_with_veh_id(const std::string &vehId)
{
heartbeat_up = "adcpcmcc/v1/vehmedia/" + vehId + "/heartbeat/up";
video_down = "adcpcmcc/v1/vehmedia/" + vehId + "/video/down";
video_down_ack = "adcpcmcc/v1/vehmedia/" + vehId + "/video/down/ack";
substream_down = "adcpcmcc/v1/vehmedia/" + vehId + "/substream/down";
substream_down_ack = "adcpcmcc/v1/vehmedia/" + vehId + "/substream/down/ack";
reset_down = "adcpcmcc/v1/vehmedia/" + vehId + "/reset/down";
reset_down_ack = "adcpcmcc/v1/vehmedia/" + vehId + "/reset/down/ack";
heartbeat_up = "/kun/vehicle/video/heartbeat/" + vehId;
video_down = "/kun/vehicle/video/push/" + vehId;
}
};

View File

@ -4,7 +4,7 @@
#include "app_config.hpp"
#include "logger.hpp"
#include "mqtt_client.hpp"
#include "rtsp_manager.hpp"
#include "rtmp_manager.hpp"
#include <memory>
#include <atomic>

42
include/rtmp_manager.hpp Normal file
View File

@ -0,0 +1,42 @@
// rtsp_manager.hpp
#pragma once
#include <gst/gst.h>
#include "app_config.hpp"
#include <unordered_map>
#include <string>
#include <mutex>
#include <thread>
#include <atomic>
class RTMPManager
{
public:
static void init();
// start/stop 增加 StreamType 参数
static void start_camera(const Camera &cam, StreamType type);
static void stop_camera(const std::string &cam_name, StreamType type);
static void stop_all();
static bool is_streaming(const std::string &cam_name);
static bool is_any_streaming();
// 获取推流 URL用于应答
static std::string get_stream_url(const std::string &cam_name, StreamType type);
private:
struct StreamContext
{
std::thread thread;
std::atomic<bool> running{false};
};
static std::unordered_map<std::string, StreamContext> streams;
static std::mutex streams_mutex;
// stream loop 接收 StreamType 作为参数
static void stream_loop(Camera cam, StreamType type);
static GstElement *create_pipeline(const Camera &cam, StreamType type);
// 辅助:构建 map key
static std::string make_stream_key(const std::string &cam_name, StreamType type);
};

View File

@ -1,48 +0,0 @@
// rtsp_manager.hpp
#pragma once
#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include "app_config.hpp"
#include <unordered_map>
#include <string>
#include <mutex>
#include <vector>
class RTSPManager
{
public:
static void init();
static void start(const std::vector<Camera> &cameras);
static void stop();
static void mount_camera(const Camera &cam);
static void unmount_camera(const Camera &cam);
static bool is_streaming(const std::string &cam_name);
static bool is_any_streaming();
private:
static GMainLoop *loop;
static GMainContext *main_context;
static GstRTSPServer *server;
static std::unordered_map<std::string, bool> streaming_status;
// 工厂创建函数
static GstRTSPMediaFactory *create_media_factory(const Camera &cam);
// 挂载/卸载函数
static gboolean mount_camera_in_main(gpointer data);
static gboolean unmount_camera_in_main(gpointer data);
// 静态 mutex 和工厂表
static std::unordered_map<std::string, GstRTSPMediaFactory *> mounted_factories;
static std::mutex mounted_factories_mutex;
// 媒体对象跟踪
static std::unordered_map<std::string, std::vector<GstRTSPMedia *>> media_map;
static std::mutex media_map_mutex;
// 信号处理函数
static void on_media_created(GstRTSPMediaFactory *factory, GstRTSPMedia *media, gpointer user_data);
static void on_media_unprepared(GstRTSPMedia *media, gpointer user_data);
};

Binary file not shown.

View File

@ -57,145 +57,88 @@ static void on_mqtt_message_received(const std::string &topic, const std::string
try
{
auto j = nlohmann::json::parse(message);
if (topic == g_app_config.mqtt.topics.video_down)
{
// 处理 video_down
auto j = nlohmann::json::parse(message);
if (!j.contains("data") || !j["data"].contains("status") || !j["data"].contains("seqNo"))
if (topic.find("/kun/vehicle/video/push/") != std::string::npos && j["type"] == "request")
{
LOG_WARN("[MQTT] video_down JSON missing required fields");
return;
}
std::string seqNo = j.value("seqNo", "");
auto data = j["data"];
// 写 dispatchId 并设置 streaming 状态
{
std::lock_guard<std::mutex> lock(g_dispatch_id_mutex);
if (j["data"].contains("dispatchId"))
g_dispatch_id = j["data"]["dispatchId"].get<std::string>();
}
int switch_val = data.value("switch", 0);
int streamType = data.value("streamType", 0); // 0主 1子
StreamType type = (streamTypeInt == 0) ? StreamType::MAIN : StreamType::SUB;
auto channels = data.value("channels", std::vector<int>{});
int status = j["data"]["status"].get<int>();
nlohmann::json resp_data = nlohmann::json::array();
std::string seqNo = j["data"]["seqNo"].get<std::string>();
bool success = true; // 标记是否操作成功
if (status == 0)
{
g_streaming = true;
// 启动推流:挂载本地配置中 enabled 的摄像头
for (const auto &cam : g_app_config.cameras)
for (int ch : channels)
{
if (!cam.enabled)
continue;
// 根据通道号找到摄像头
auto it = std::find_if(g_app_config.cameras.begin(), g_app_config.cameras.end(),
[ch](const Camera &c)
{ return c.channel == ch; });
if (!RTSPManager::is_streaming(cam.name))
{
RTSPManager::mount_camera(cam);
LOG_INFO("[MQTT] Started streaming: " + cam.name);
}
}
}
else if (status == 1)
{
g_streaming = false;
std::lock_guard<std::mutex> lock(g_dispatch_id_mutex);
g_dispatch_id.clear(); // 停止拉流就清空 dispatchId
// 停止推流:卸载本地配置中 enabled 的摄像头
for (const auto &cam : g_app_config.cameras)
{
if (!cam.enabled)
continue;
nlohmann::json ch_resp;
ch_resp["loc"] = ch;
if (RTSPManager::is_streaming(cam.name))
if (it != g_app_config.cameras.end())
{
RTSPManager::unmount_camera(cam);
LOG_INFO("[MQTT] Stopped streaming: " + cam.name);
Camera &cam = *it;
bool op_result = false;
if (switch_val == 0)
{
// 启动流
if (!RTMPManager::is_streaming(cam.name, type))
{
RTMPManager::start_camera(cam, type);
op_result = true;
}
}
else
{
// 停止流
if (RTMPManager::is_streaming(cam.name, type))
{
RTMPManager::stop_camera(cam.name, type);
op_result = true;
}
}
ch_resp["url"] = op_result ? RTMPManager::get_stream_url(cam.name, type) : "";
ch_resp["result"] = op_result ? 0 : 1;
ch_resp["reason"] = op_result ? "" : "already in requested state";
}
else
{
ch_resp["url"] = "";
ch_resp["result"] = 1;
ch_resp["reason"] = "channel not found";
}
resp_data.push_back(ch_resp);
}
// 构造应答
nlohmann::json reply;
reply["type"] = "response";
reply["seqNo"] = seqNo;
reply["data"] = resp_data;
mqtt_client->publish(topic, reply.dump());
LOG_INFO("[MQTT] Replied to video push request: " + reply.dump());
}
else
{
LOG_WARN("[MQTT] video_down: unknown status value " + std::to_string(status));
success = false;
}
// 获取当前时间 yyyyMMddHHmmssSSS (UTC+8)
std::string time_str = Logger::get_current_time_utc8();
nlohmann::json reply_data;
reply_data["time"] = time_str;
reply_data["result"] = success ? 0 : 1;
reply_data["seqNo"] = seqNo;
// 封装外层
nlohmann::json reply;
reply["data"] = reply_data;
reply["isEnc"] = 0;
reply["type"] = 0;
// 发送应答
if (mqtt_client)
{
mqtt_client->publish(g_app_config.mqtt.topics.video_down_ack, reply.dump());
LOG_INFO("[MQTT] Replied to video_down: " + reply.dump());
LOG_WARN("[MQTT] Unknown topic or non-request type: " + topic);
}
}
else if (topic == g_app_config.mqtt.topics.substream_down)
catch (const std::exception &e)
{
// 处理 substream_down
LOG_INFO("[MQTT] substream_down message received (not implemented yet).");
LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what());
}
else if (topic == g_app_config.mqtt.topics.reset_down)
{
auto j = nlohmann::json::parse(message);
// reset/down 的 payload 在 data 里
auto data = j.contains("data") ? j["data"] : nlohmann::json::object();
std::string seqNo = data.value("seqNo", "");
std::string errCode = data.value("errorCode", "");
std::string des = data.value("des", "");
LOG_WARN("[MQTT] Reset command received, errorCode=" + errCode + ", des=" + des);
// 停止所有流,相当于复位
for (const auto &cam : g_app_config.cameras)
{
if (RTSPManager::is_streaming(cam.name))
{
RTSPManager::unmount_camera(cam);
LOG_INFO("[RTSP] Camera " + cam.name + " reset/unmounted");
}
}
// 组装应答 data
nlohmann::json reply_data;
reply_data["time"] = Logger::get_current_time_utc8();
reply_data["result"] = 0; // 0=成功
reply_data["seqNo"] = seqNo;
// 外层封装
nlohmann::json reply;
reply["data"] = reply_data;
reply["isEnc"] = 0;
reply["type"] = 0;
if (mqtt_client)
{
mqtt_client->publish(g_app_config.mqtt.topics.reset_down_ack, reply.dump());
LOG_INFO("[MQTT] Replied to reset_down: " + reply.dump());
}
}
else
{
LOG_WARN("[MQTT] Unknown topic: " + topic);
}
}
catch (const std::exception &e)
{
LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what());
}
}

259
src/rtmp_manager.cpp Normal file
View File

@ -0,0 +1,259 @@
// rtsp_manager.cpp
#include "rtmp_manager.hpp"
#include "logger.hpp"
#include <iostream>
#include <chrono>
#include <thread>
std::unordered_map<std::string, RTMPManager::StreamContext> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
static inline std::string stream_type_suffix(StreamType type)
{
return (type == StreamType::MAIN) ? "_main" : "_sub";
}
std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type)
{
return cam_name + stream_type_suffix(type);
}
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
{
// 拷贝原始参数
int width = cam.width;
int height = cam.height;
int fps = cam.fps;
int bitrate = cam.bitrate;
if (type == StreamType::SUB)
{
// 简单一刀切策略:分辨率 /2帧率 /2码率 /2临界值保护
width = std::max(160, width / 2);
height = std::max(120, height / 2);
fps = std::max(10, fps / 2);
bitrate = std::max(300000, bitrate / 2); // 最低 300kbps
}
// 构建不同的流名stream key
std::string stream_name = cam.name + (type == StreamType::MAIN ? "_main" : "_sub");
std::string pipeline_str =
"v4l2src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(width) +
",height=" + std::to_string(height) +
",framerate=" + std::to_string(fps) + "/1"
" ! queue max-size-buffers=1 leaky=downstream "
" ! mpph264enc bps=" +
std::to_string(bitrate) +
" gop=" + std::to_string(fps) +
" ! h264parse "
" ! flvmux streamable=true name=mux "
" ! rtmpsink location=\"rtmp://127.0.0.1/live/" +
stream_name +
" live=1\" sync=false";
LOG_INFO("[RTMP] Creating pipeline for '" + cam.name + "' (" +
(type == StreamType::MAIN ? "MAIN" : "SUB") + ") -> " + pipeline_str);
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR("[RTMP] Failed to parse pipeline: " + std::string(error->message));
g_error_free(error);
return nullptr;
}
return pipeline;
}
void RTMPManager::stream_loop(Camera cam, StreamType type)
{
const std::string key = make_stream_key(cam.name, type);
LOG_INFO("[RTMP] Stream thread started for '" + key + "'");
while (true)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running)
break;
}
GstElement *pipeline = create_pipeline(cam, type);
if (!pipeline)
{
LOG_ERROR("[RTMP] Failed to create pipeline for '" + key + "', retrying in 3s...");
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
LOG_INFO("[RTMP] Camera '" + key + "' streaming...");
bool error_occurred = false;
// 简单重连计数(可选):连续失败超过 N 次可以退出
int consecutive_failures = 0;
const int MAX_RETRIES = 5;
while (true)
{
GstMessage *msg = gst_bus_timed_pop_filtered(
bus, GST_CLOCK_TIME_NONE,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
{
std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running)
break;
}
if (!msg)
continue;
if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR)
{
GError *err = nullptr;
gchar *debug = nullptr;
gst_message_parse_error(msg, &err, &debug);
LOG_ERROR("[RTMP] Error on '" + key + "': " + std::string(err->message));
if (debug)
LOG_DEBUG(std::string(debug));
g_error_free(err);
g_free(debug);
error_occurred = true;
consecutive_failures++;
}
else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS)
{
LOG_WARN("[RTMP] EOS received on '" + key + "'");
error_occurred = true;
consecutive_failures++;
}
gst_message_unref(msg);
if (error_occurred)
break;
}
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
gst_object_unref(bus);
if (!error_occurred)
break;
if (consecutive_failures >= MAX_RETRIES)
{
LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts.");
// 可以选择将 running 置为 false让上层知道失败
std::lock_guard<std::mutex> lock(streams_mutex);
streams[key].running = false;
break;
}
LOG_WARN("[RTMP] Reconnecting '" + key + "' in 3s...");
std::this_thread::sleep_for(std::chrono::seconds(3));
}
LOG_INFO("[RTMP] Stream thread exited for '" + key + "'");
}
void RTMPManager::start_camera(const Camera &cam, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam.name, type);
if (streams.count(key) && streams[key].running)
{
LOG_WARN("[RTMP] Camera '" + key + "' already streaming.");
return;
}
StreamContext ctx;
ctx.running = true;
ctx.thread = std::thread([cam, type]()
{ RTMPManager::stream_loop(cam, type); });
streams[key] = std::move(ctx);
}
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type);
auto it = streams.find(key);
if (it == streams.end())
{
LOG_WARN("[RTMP] Camera '" + key + "' not found.");
return;
}
it->second.running = false;
LOG_INFO("[RTMP] Stopping camera '" + key + "'...");
if (it->second.thread.joinable())
it->second.thread.join();
streams.erase(it);
}
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type);
auto it = streams.find(key);
return it != streams.end() && it->second.running;
}
bool RTMPManager::is_any_streaming()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
if (kv.second.running)
return true;
return false;
}
void RTMPManager::stop_all()
{
std::vector<std::string> names;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
names.push_back(kv.first);
}
for (auto &name : names)
{
// name 中包含后缀,但是 stop_camera 需要 cam_name + StreamType
// 我们可以解析后缀,或新增一个 stop_by_key。为简单起见解析
if (name.size() > 5 && name.find("_sub") == name.size() - 4)
{
std::string cam_name = name.substr(0, name.size() - 4);
stop_camera(cam_name, StreamType::SUB);
}
else if (name.size() > 6 && name.find("_main") == name.size() - 5)
{
std::string cam_name = name.substr(0, name.size() - 5);
stop_camera(cam_name, StreamType::MAIN);
}
else
{
// fallback: stop by treating as MAIN
stop_camera(name, StreamType::MAIN);
}
}
}
std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)
{
// 根据你本地 RTMP 服务地址组装 URL和 create_pipeline 的 stream_name 保持一致
std::string stream_name = cam_name + stream_type_suffix(type);
return std::string("rtmp://127.0.0.1/live/") + stream_name;
}

View File

@ -1,279 +0,0 @@
// rtsp_manager.cpp
#include "rtsp_manager.hpp"
#include "logger.hpp"
#include <iostream>
#include <algorithm>
#include <thread>
#include <chrono>
// 静态变量定义
GMainLoop *RTSPManager::loop = nullptr;
GMainContext *RTSPManager::main_context = nullptr;
GstRTSPServer *RTSPManager::server = nullptr;
std::unordered_map<std::string, bool> RTSPManager::streaming_status;
std::unordered_map<std::string, GstRTSPMediaFactory *> RTSPManager::mounted_factories;
std::mutex RTSPManager::mounted_factories_mutex;
std::unordered_map<std::string, std::vector<GstRTSPMedia *>> RTSPManager::media_map;
std::mutex RTSPManager::media_map_mutex;
void RTSPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTSP] GStreamer initialized.");
}
// 创建 media factory
GstRTSPMediaFactory *RTSPManager::create_media_factory(const Camera &cam)
{
// 输出分辨率直接用相机原始分辨率
int out_width = cam.width;
int out_height = cam.height;
// 构建 pipeline
std::string launch_str =
"( v4l2src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(out_width) +
",height=" + std::to_string(out_height) +
",framerate=" + std::to_string(cam.fps) + "/1"
" ! queue max-size-buffers=1 leaky=downstream" // 最小队列
" ! mpph264enc bps=" +
std::to_string(cam.bitrate) +
" gop=" + std::to_string(cam.fps) + // GOP 设置为 1 秒
" ! h264parse"
" ! rtph264pay name=pay0 pt=96 config-interval=1 )"; // 每秒发送 SPS/PPS
GstRTSPMediaFactory *factory = gst_rtsp_media_factory_new();
gst_rtsp_media_factory_set_launch(factory, launch_str.c_str());
gst_rtsp_media_factory_set_shared(factory, TRUE);
// 使用 media-configure 信号代替 media-created
g_signal_connect_data(factory, "media-configure", G_CALLBACK(on_media_created),
g_strdup(cam.name.c_str()), (GClosureNotify)g_free, static_cast<GConnectFlags>(0));
return factory;
}
// 启动 RTSP server
void RTSPManager::start(const std::vector<Camera> &cameras)
{
server = gst_rtsp_server_new();
gst_rtsp_server_set_service(server, "8554");
loop = g_main_loop_new(nullptr, FALSE);
main_context = g_main_loop_get_context(loop);
gst_rtsp_server_attach(server, nullptr);
LOG_INFO("[RTSP] Server running on rtsp://localhost:8554");
g_main_loop_run(loop);
if (server)
{
g_object_unref(server);
server = nullptr;
}
if (loop)
{
g_main_loop_unref(loop);
loop = nullptr;
}
LOG_INFO("[RTSP] Server stopped.");
}
// media-configure 信号处理
void RTSPManager::on_media_created(GstRTSPMediaFactory *factory, GstRTSPMedia *media, gpointer user_data)
{
const char *cam_name = static_cast<const char *>(user_data);
g_object_ref(media); // 增加引用计数,防止被提前销毁
{
std::lock_guard<std::mutex> lock(media_map_mutex);
media_map[cam_name].push_back(media);
}
// 连接 unprepared 信号,当 pipeline 被销毁时移除
g_signal_connect_data(media, "unprepared", G_CALLBACK(on_media_unprepared),
g_strdup(cam_name), (GClosureNotify)g_free, static_cast<GConnectFlags>(0));
}
// unprepared 信号处理
void RTSPManager::on_media_unprepared(GstRTSPMedia *media, gpointer user_data)
{
const char *cam_name = static_cast<const char *>(user_data);
{
std::lock_guard<std::mutex> lock(media_map_mutex);
auto it = media_map.find(cam_name);
if (it != media_map.end())
{
auto &media_list = it->second;
media_list.erase(std::remove(media_list.begin(), media_list.end(), media), media_list.end());
if (media_list.empty())
{
media_map.erase(it);
}
}
}
g_object_unref(media); // 释放引用
}
// 挂载摄像头
gboolean RTSPManager::mount_camera_in_main(gpointer data)
{
Camera *cam = static_cast<Camera *>(data);
if (!cam || !server)
{
delete cam;
return G_SOURCE_REMOVE;
}
GstRTSPMountPoints *mounts = gst_rtsp_server_get_mount_points(server);
if (!mounts)
{
delete cam;
return G_SOURCE_REMOVE;
}
std::string mount_point = "/" + cam->name;
GstRTSPMediaFactory *factory = create_media_factory(*cam);
if (!factory)
{
g_object_unref(mounts);
delete cam;
return G_SOURCE_REMOVE;
}
gst_rtsp_mount_points_add_factory(mounts, mount_point.c_str(), factory);
g_object_unref(mounts);
{
std::lock_guard<std::mutex> lock(mounted_factories_mutex);
mounted_factories[cam->name] = factory;
streaming_status[cam->name] = true;
}
LOG_INFO("[RTSP] Camera '" + cam->name + "' mounted at rtsp://localhost:8554" + mount_point);
delete cam;
return G_SOURCE_REMOVE;
}
// 卸载摄像头
gboolean RTSPManager::unmount_camera_in_main(gpointer data)
{
Camera *cam = static_cast<Camera *>(data);
if (!cam || !server)
{
delete cam;
return G_SOURCE_REMOVE;
}
std::string cam_name = cam->name;
std::string mount_point = "/" + cam_name;
// 停止所有媒体
{
std::lock_guard<std::mutex> lock(media_map_mutex);
auto it = media_map.find(cam_name);
if (it != media_map.end())
{
for (GstRTSPMedia *media : it->second)
{
gst_element_set_state(gst_rtsp_media_get_element(media), GST_STATE_NULL);
// g_object_unref(media);
}
it->second.clear();
media_map.erase(it);
}
}
// 卸载 factory
GstRTSPMountPoints *mounts = gst_rtsp_server_get_mount_points(server);
if (mounts)
{
gst_rtsp_mount_points_remove_factory(mounts, mount_point.c_str());
g_object_unref(mounts);
}
{
std::lock_guard<std::mutex> lock(mounted_factories_mutex);
auto it = mounted_factories.find(cam_name);
if (it != mounted_factories.end())
{
if (it->second && G_IS_OBJECT(it->second))
g_object_unref(it->second);
mounted_factories.erase(it);
}
streaming_status[cam_name] = false;
}
LOG_INFO("[RTSP] Camera '" + cam_name + "' unmounted.");
delete cam;
return G_SOURCE_REMOVE;
}
// 公共挂载/卸载接口
void RTSPManager::mount_camera(const Camera &cam)
{
Camera *camCopy = new Camera(cam);
g_main_context_invoke(main_context, [](gpointer data) -> gboolean
{ return RTSPManager::mount_camera_in_main(data); }, camCopy);
}
void RTSPManager::unmount_camera(const Camera &cam)
{
Camera *camCopy = new Camera(cam);
g_main_context_invoke(main_context, [](gpointer data) -> gboolean
{ return RTSPManager::unmount_camera_in_main(data); }, camCopy);
}
// 是否正在流
bool RTSPManager::is_streaming(const std::string &cam_name)
{
std::lock_guard<std::mutex> lock(mounted_factories_mutex);
auto it = streaming_status.find(cam_name);
return it != streaming_status.end() ? it->second : false;
}
// 停止 server
void RTSPManager::stop()
{
// 先卸载所有挂载摄像头
std::vector<std::string> cams_to_unmount;
{
std::lock_guard<std::mutex> lock(mounted_factories_mutex);
for (const auto &kv : mounted_factories)
cams_to_unmount.push_back(kv.first);
}
for (const auto &cam_name : cams_to_unmount)
{
Camera cam;
cam.name = cam_name;
unmount_camera(cam);
}
// 等待所有流停止(最多 5 秒)
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
while (is_any_streaming() && std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
// 退出 main loop
if (loop)
{
g_main_context_invoke(main_context, [](gpointer data) -> gboolean
{
g_main_loop_quit(static_cast<GMainLoop *>(data));
return G_SOURCE_REMOVE; }, loop);
}
}
// 新增接口:检查是否还有摄像头在流
bool RTSPManager::is_any_streaming()
{
std::lock_guard<std::mutex> lock(mounted_factories_mutex);
for (const auto &kv : streaming_status)
if (kv.second)
return true;
return false;
}