This commit is contained in:
cxh 2025-10-17 15:52:47 +08:00
parent b1d61be69d
commit f56d94dbca
2 changed files with 130 additions and 40 deletions

View File

@ -24,32 +24,42 @@ static void send_heartbeat()
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count(); auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
// 获取当前所有通道状态 // 获取 RTMP 状态
auto status_list = RTMPManager::get_all_status(); auto channels_info = RTMPManager::get_all_channels_status();
int total = static_cast<int>(status_list.size());
int running_count = 0;
nlohmann::json channels = nlohmann::json::array(); nlohmann::json channels = nlohmann::json::array();
int total = static_cast<int>(channels_info.size());
int running_count = 0;
for (int i = 0; i < total; ++i) for (const auto &ch : channels_info)
{ {
const auto &[key, running] = status_list[i];
if (running) running_count++;
nlohmann::json item; nlohmann::json item;
item["loc"] = i; // 摄像头位置索引 item["loc"] = ch.loc;
item["url"] = RTMPManager::get_stream_url(g_app_config.cameras[i].name); item["running"] = ch.running;
item["running"] = running;
if (ch.running)
{
item["url"] = ch.url;
running_count++;
}
else
{
item["reason"] = ch.reason.empty() ? "Unknown error" : ch.reason;
}
channels.push_back(item); channels.push_back(item);
} }
nlohmann::json hb; nlohmann::json hb;
hb["timestamp"] = ms; hb["timestamp"] = ms;
hb["status"] = (running_count == total) ? 1 : (running_count == 0 ? 0 : 2); hb["status"] = (running_count == 0) ? 0 // 全部失败
: (running_count == total ? 1 : 2); // 全部正常 or 部分异常
hb["channels"] = channels; hb["channels"] = channels;
mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump()); // 发布心跳
LOG_INFO("[MQTT] Sent video heartbeat: " + hb.dump()); mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump(), 0);
LOG_INFO("[MQTT] Sent video heartbeat (" + std::to_string(running_count) + "/" + std::to_string(total) +
" running): " + hb.dump());
} }
// MQTT 回调 // MQTT 回调

View File

@ -11,6 +11,13 @@
#include <cstring> #include <cstring>
#include <thread> #include <thread>
// ========== 工具函数 ==========
static bool device_exists(const std::string &path)
{
struct stat st;
return (stat(path.c_str(), &st) == 0);
}
// 动态获取指定网卡 IPv4 地址 // 动态获取指定网卡 IPv4 地址
static std::string get_ip_address(const std::string &ifname) static std::string get_ip_address(const std::string &ifname)
{ {
@ -39,18 +46,14 @@ static std::string get_ip_address(const std::string &ifname)
} }
freeifaddrs(ifaddr); freeifaddrs(ifaddr);
return ""; // 没找到 return "";
} }
// ========== 静态成员 ==========
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams; std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex; std::mutex RTMPManager::streams_mutex;
static bool device_exists(const std::string &path) // ========== 初始化 ==========
{
struct stat st;
return (stat(path.c_str(), &st) == 0);
}
void RTMPManager::init() void RTMPManager::init()
{ {
gst_init(nullptr, nullptr); gst_init(nullptr, nullptr);
@ -59,6 +62,7 @@ void RTMPManager::init()
std::string RTMPManager::make_key(const std::string &name) { return name + "_main"; } std::string RTMPManager::make_key(const std::string &name) { return name + "_main"; }
// ========== 创建推流管线 ==========
GstElement *RTMPManager::create_pipeline(const Camera &cam) GstElement *RTMPManager::create_pipeline(const Camera &cam)
{ {
int width = cam.width; int width = cam.width;
@ -68,18 +72,22 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam)
std::string stream_name = cam.name + "_main"; std::string stream_name = cam.name + "_main";
std::string app = "record"; std::string app = "record";
std::string location = "rtmp://127.0.0.1:1935/" + app + "/" + stream_name + "?vhost=" + app; std::string location = "rtmp://127.0.0.1:1935/" + app + "/" + stream_name + "?vhost=" + app;
std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) + // fpsdisplaysink 探测首帧
",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + std::string pipeline_str =
"/1 ! videoconvert ! video/x-raw,format=NV12 " "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) +
"! mpph264enc bps=" + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) +
std::to_string(bitrate) + " gop=" + std::to_string(fps) + "/1 ! videoconvert ! video/x-raw,format=NV12 "
" ! h264parse ! flvmux name=mux streamable=true " "! tee name=t "
"audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. " "t. ! queue ! mpph264enc bps=" +
"mux. ! rtmpsink location=\"" + std::to_string(bitrate) + " gop=" + std::to_string(fps) +
location + "\" sync=false"; " ! h264parse ! flvmux name=mux streamable=true "
"audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. "
"mux. ! rtmpsink location=\"" +
location +
"\" sync=false "
"t. ! queue ! fpsdisplaysink name=fpsprobe text-overlay=false video-sink=fakesink sync=false";
LOG_INFO("[RTMP] Pipeline: " + pipeline_str); LOG_INFO("[RTMP] Pipeline: " + pipeline_str);
@ -94,6 +102,7 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam)
return pipeline; return pipeline;
} }
// ========== 主推流循环 ==========
void RTMPManager::stream_loop(Camera cam, StreamContext *ctx) void RTMPManager::stream_loop(Camera cam, StreamContext *ctx)
{ {
std::string key = make_key(cam.name); std::string key = make_key(cam.name);
@ -121,14 +130,13 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx)
GstBus *bus = gst_element_get_bus(pipeline); GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING); gst_element_set_state(pipeline, GST_STATE_PLAYING);
// -------- 等待状态切换结果 -------- // 等 pipeline 真正进入 PLAYING 状态
GstStateChangeReturn ret = gst_element_get_state(pipeline, nullptr, nullptr, 3 * GST_SECOND); GstStateChangeReturn ret = gst_element_get_state(pipeline, nullptr, nullptr, 3 * GST_SECOND);
if (ret != GST_STATE_CHANGE_SUCCESS) if (ret != GST_STATE_CHANGE_SUCCESS)
{ {
ctx->status.running = false; ctx->status.running = false;
ctx->status.last_error = "Pipeline failed to reach PLAYING"; ctx->status.last_error = "Pipeline failed to reach PLAYING";
LOG_ERROR("[RTMP] " + key + " failed to start (no video signal?)"); LOG_ERROR("[RTMP] " + key + " failed to start (maybe no signal)");
gst_element_set_state(pipeline, GST_STATE_NULL); gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(bus); gst_object_unref(bus);
gst_object_unref(pipeline); gst_object_unref(pipeline);
@ -136,11 +144,49 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx)
continue; continue;
} }
// 等首帧
bool first_frame = false;
auto start_time = std::chrono::steady_clock::now();
while (!first_frame && std::chrono::steady_clock::now() - start_time < std::chrono::seconds(3))
{
GstMessage *msg = gst_bus_timed_pop_filtered(bus, 300 * GST_MSECOND,
(GstMessageType)(GST_MESSAGE_ELEMENT | GST_MESSAGE_ERROR));
if (!msg) continue;
if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ELEMENT)
{
const GstStructure *s = gst_message_get_structure(msg);
if (s && g_str_has_prefix(gst_structure_get_name(s), "fpsdisplaysink"))
{
first_frame = true;
break;
}
}
else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR)
{
first_frame = false;
break;
}
gst_message_unref(msg);
}
if (!first_frame)
{
ctx->status.running = false;
ctx->status.last_error = "No frames detected";
LOG_ERROR("[RTMP] " + key + " - No frames detected (no video signal)");
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(bus);
gst_object_unref(pipeline);
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 进入正常推流状态
ctx->status.running = true; ctx->status.running = true;
ctx->status.last_error.clear(); ctx->status.last_error.clear();
LOG_INFO("[RTMP] Started stream for " + key); LOG_INFO("[RTMP] Started stream for " + key);
// -------- 主循环 --------
bool need_restart = false; bool need_restart = false;
while (ctx->running) while (ctx->running)
{ {
@ -189,6 +235,7 @@ void RTMPManager::stream_loop(Camera cam, StreamContext *ctx)
ctx->status.running = false; ctx->status.running = false;
} }
// ========== 启停与状态 ==========
void RTMPManager::start_all() void RTMPManager::start_all()
{ {
LOG_INFO("[RTMP] Starting all record streams..."); LOG_INFO("[RTMP] Starting all record streams...");
@ -222,16 +269,49 @@ bool RTMPManager::is_streaming(const std::string &cam_name)
std::string RTMPManager::get_stream_url(const std::string &cam_name) std::string RTMPManager::get_stream_url(const std::string &cam_name)
{ {
// 优先动态检测网卡 IP失败则使用固定值
std::string ip = get_ip_address("enP2p33s0"); std::string ip = get_ip_address("enP2p33s0");
if (ip.empty()) ip = "127.0.0.1"; // 或用动态获取网卡 IP if (ip.empty()) ip = "192.168.3.211"; // fallback
return "http://" + ip + ":1985/rtc/v1/whep/?app=record&stream=" + cam_name + "_main"; return "http://" + ip + ":1985/rtc/v1/whep/?app=record&stream=" + cam_name + "_main";
} }
std::vector<std::pair<std::string, bool>> RTMPManager::get_all_status() // ========== 汇总状态 ==========
std::vector<RTMPManager::ChannelInfo> RTMPManager::get_all_channels_status()
{ {
std::vector<std::pair<std::string, bool>> result; std::vector<ChannelInfo> result;
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams) result.emplace_back(kv.first, kv.second->status.running);
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
{
const auto &cam = g_app_config.cameras[i];
auto key = make_key(cam.name);
ChannelInfo ch;
ch.loc = static_cast<int>(i);
ch.url.clear();
ch.running = false;
ch.reason = "Not started";
auto it = streams.find(key);
if (it != streams.end())
{
auto &status = it->second->status;
ch.running = status.running;
if (status.running)
{
ch.url = get_stream_url(cam.name);
ch.reason.clear();
}
else
{
ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error;
}
}
else
{
ch.reason = "Context missing";
}
result.push_back(ch);
}
return result; return result;
} }