sweeper_video/src/rtmp_manager.cpp
2025-10-17 15:58:54 +08:00

311 lines
9.4 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtsp_manager.cpp
#include "rtmp_manager.hpp"
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <net/if.h>
#include <netinet/in.h>
#include <sys/stat.h>
#include <chrono>
#include <cstring>
#include <thread>
// ========== 工具函数 ==========
static bool device_exists(const std::string &path)
{
struct stat st;
return (stat(path.c_str(), &st) == 0);
}
// 动态获取指定网卡 IPv4 地址
static std::string get_ip_address(const std::string &ifname)
{
struct ifaddrs *ifaddr, *ifa;
char ip[INET_ADDRSTRLEN] = {0};
if (getifaddrs(&ifaddr) == -1)
{
perror("getifaddrs");
return "";
}
for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name)
{
void *addr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
if (inet_ntop(AF_INET, addr, ip, sizeof(ip)))
{
freeifaddrs(ifaddr);
return ip;
}
}
}
freeifaddrs(ifaddr);
return "";
}
// ========== 静态成员 ==========
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
// ========== 初始化 ==========
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
std::string RTMPManager::make_key(const std::string &name) { return name + "_main"; }
// ========== 创建推流管线 ==========
GstElement *RTMPManager::create_pipeline(const Camera &cam)
{
int width = cam.width;
int height = cam.height;
int fps = cam.fps;
int bitrate = cam.bitrate;
std::string stream_name = cam.name + "_main";
std::string app = "record";
std::string location = "rtmp://127.0.0.1:1935/" + app + "/" + stream_name + "?vhost=" + app;
// fpsdisplaysink 探测首帧
std::string pipeline_str =
"v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) +
"/1 ! videoconvert ! video/x-raw,format=NV12 "
"! tee name=t "
"t. ! queue ! mpph264enc bps=" +
std::to_string(bitrate) + " gop=" + std::to_string(fps) +
" ! h264parse ! flvmux name=mux streamable=true "
"audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. "
"mux. ! rtmpsink location=\"" +
location +
"\" sync=false "
"t. ! queue ! fpsdisplaysink name=fpsprobe text-overlay=false video-sink=fakesink sync=false";
LOG_INFO("[RTMP] Pipeline: " + pipeline_str);
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR("[RTMP] Pipeline creation failed: " + std::string(error->message));
g_error_free(error);
return nullptr;
}
return pipeline;
}
// ========== 主推流循环 ==========
void RTMPManager::stream_loop(Camera cam, StreamContext *ctx)
{
std::string key = make_key(cam.name);
while (ctx->running)
{
// 检查设备是否存在
struct stat st;
if (stat(cam.device.c_str(), &st) != 0)
{
ctx->status.running = false;
ctx->status.last_error = "Device not found: " + cam.device;
LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error);
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 创建 pipeline
GstElement *pipeline = create_pipeline(cam);
if (!pipeline)
{
ctx->status.running = false;
ctx->status.last_error = "Pipeline creation failed";
LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error);
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
LOG_INFO("[RTMP] Starting stream: " + key);
ctx->status.running = true;
ctx->status.last_error.clear();
bool got_frame = false;
bool need_restart = false;
auto start_time = std::chrono::steady_clock::now();
while (ctx->running)
{
GstMessage *msg = gst_bus_timed_pop_filtered(
bus, 200 * GST_MSECOND, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_ELEMENT));
// 检查帧超时3 秒内没有检测到帧
auto elapsed =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time).count();
if (!got_frame && elapsed > 3)
{
ctx->status.running = false;
ctx->status.last_error = "No frames detected (no video signal)";
LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error);
need_restart = true;
break;
}
if (!msg) continue;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ELEMENT:
// 检测 fpsdisplaysink 输出帧
if (gst_message_has_name(msg, "fpsprobe"))
{
got_frame = true;
}
break;
case GST_MESSAGE_ERROR:
{
GError *err = nullptr;
gst_message_parse_error(msg, &err, nullptr);
ctx->status.running = false;
ctx->status.last_error = err ? std::string(err->message) : "GStreamer error";
LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error);
if (err) g_error_free(err);
need_restart = true;
break;
}
case GST_MESSAGE_EOS:
ctx->status.running = false;
ctx->status.last_error = "End of stream (EOS)";
LOG_WARN("[RTMP] " + key + " reached EOS");
need_restart = true;
break;
default:
break;
}
gst_message_unref(msg);
if (need_restart) break;
}
// 清理资源
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
if (ctx->running)
{
LOG_WARN("[RTMP] Restarting " + key + " in 3s...");
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS));
}
}
ctx->status.running = false;
LOG_INFO("[RTMP] Stream thread exited for " + key);
}
// ========== 启停与状态 ==========
void RTMPManager::start_all()
{
LOG_INFO("[RTMP] Starting all record streams...");
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &cam : g_app_config.cameras)
{
auto key = make_key(cam.name);
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
ctx->thread = std::thread([cam, ptr = ctx.get()]() { stream_loop(cam, ptr); });
streams.emplace(key, std::move(ctx));
}
}
void RTMPManager::start_all()
{
LOG_INFO("[RTMP] Starting all record streams...");
std::lock_guard<std::mutex> lock(streams_mutex);
int delay_ms = 0;
for (auto &cam : g_app_config.cameras)
{
auto key = make_key(cam.name);
if (streams.find(key) != streams.end())
{
LOG_INFO("[RTMP] Stream already running: " + key);
continue;
}
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
ctx->thread = std::thread(
[cam, ptr = ctx.get(), delay_ms]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
stream_loop(cam, ptr);
});
streams.emplace(key, std::move(ctx));
delay_ms += 200; // 每路错开 200ms
}
}
std::string RTMPManager::get_stream_url(const std::string &cam_name)
{
std::string ip = get_ip_address("enP2p33s0");
if (ip.empty()) ip = "192.168.3.211"; // fallback
return "http://" + ip + ":1985/rtc/v1/whep/?app=record&stream=" + cam_name + "_main";
}
// ========== 汇总状态 ==========
std::vector<RTMPManager::ChannelInfo> RTMPManager::get_all_channels_status()
{
std::vector<ChannelInfo> result;
std::lock_guard<std::mutex> lock(streams_mutex);
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
{
const auto &cam = g_app_config.cameras[i];
auto key = make_key(cam.name);
ChannelInfo ch;
ch.loc = static_cast<int>(i);
ch.url.clear();
ch.running = false;
ch.reason = "Not started";
auto it = streams.find(key);
if (it != streams.end())
{
auto &status = it->second->status;
ch.running = status.running;
if (status.running)
{
ch.url = get_stream_url(cam.name);
ch.reason.clear();
}
else
{
ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error;
}
}
else
{
ch.reason = "Context missing";
}
result.push_back(ch);
}
return result;
}