kunlang_video/src/rtmp_manager.cpp
2026-05-09 11:36:51 +08:00

437 lines
15 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtmp_manager.cpp
#include "rtmp_manager.hpp"
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <net/if.h>
#include <netinet/in.h>
#include <sys/stat.h>
#include <chrono>
#include <cstring>
#include <thread>
// ========== 工具函数 ==========
static bool device_exists(const std::string& path)
{
struct stat st;
return (stat(path.c_str(), &st) == 0);
}
// 动态获取指定网卡 IPv4 地址
std::string get_ip_address(const std::string& ifname)
{
struct ifaddrs *ifaddr, *ifa;
char ip[INET_ADDRSTRLEN] = {0};
if (getifaddrs(&ifaddr) == -1)
{
perror("getifaddrs");
return "";
}
for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name)
{
void* addr = &((struct sockaddr_in*)ifa->ifa_addr)->sin_addr;
if (inet_ntop(AF_INET, addr, ip, sizeof(ip)))
{
freeifaddrs(ifaddr);
return ip;
}
}
}
freeifaddrs(ifaddr);
return "";
}
// ========== 静态成员 ==========
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
// ========== 初始化 ==========
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; }
// ========== 创建推流管线 ==========
GstElement* RTMPManager::create_pipeline(const Camera& cam)
{
const int width = cam.width;
const int height = cam.height;
const int fps = cam.fps;
const int bitrate = cam.bitrate;
const std::string stream_name = cam.name + "_main";
const auto& srs = g_app_config.srs;
const std::string live_rtmp = "rtmp://" + srs.live_host + ":" + std::to_string(srs.live_rtmp_port) + "/" +
srs.stream_app + "/" + stream_name + "?vhost=" + srs.live_vhost;
const std::string record_rtmp = "rtmp://" + srs.record_host + ":" + std::to_string(srs.record_rtmp_port) + "/" +
srs.stream_app + "/" + stream_name + "?vhost=" + srs.record_vhost;
// 双路推流管线:一份流编码后 tee 分支分别送往两个 flvmux+rtmpsink
// std::string pipeline_str =
// "v4l2src name=src device=" + cam.device +
// " ! "
// "video/x-raw,format=NV12,width=" +
// std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) +
// "/1 ! "
// "mpph264enc bps=" +
// std::to_string(bitrate) + " gop=" + std::to_string(fps) +
// " rc-mode=cbr ! "
// "h264parse ! tee name=t "
// // --- 分支1低延时远控 SRS ---
// "t. ! queue max-size-buffers=5 leaky=downstream ! "
// "flvmux streamable=true name=mux_live "
// "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux_live. "
// "mux_live. ! rtmpsink location=\"" +
// live_rtmp +
// "\" sync=false async=false "
// // --- 分支2录像 SRS ---
// "t. ! queue max-size-buffers=5 leaky=downstream ! "
// "flvmux streamable=true name=mux_record "
// "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux_record. "
// "mux_record. ! rtmpsink location=\"" +
// record_rtmp +
// "\" sync=false async=false "
// // --- 可选第三分支AI/检测 ---
// "t. ! queue ! fakesink sync=false";
std::string pipeline_str = "v4l2src name=src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) +
"/1 "
" ! mpph264enc bps=" +
std::to_string(bitrate) + " gop=" + std::to_string(fps) +
" rc-mode=cbr "
" ! h264parse ! tee name=t ";
if (srs.live_enabled)
{
pipeline_str += "t. ! queue max-size-buffers=5 leaky=downstream "
" ! flvmux streamable=true name=mux_live "
" ! rtmpsink location=\"" +
live_rtmp + "\" sync=false async=false ";
}
if (srs.record_enabled)
{
pipeline_str += "t. ! queue max-size-buffers=5 leaky=downstream "
" ! flvmux streamable=true name=mux_record "
" ! rtmpsink location=\"" +
record_rtmp + "\" sync=false async=false ";
}
pipeline_str += "t. ! queue ! fakesink sync=false";
GError* error = nullptr;
GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message);
g_error_free(error);
return nullptr;
}
return pipeline;
}
// ========== 主推流循环 ==========
void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
{
const std::string key = make_key(cam.name);
while (ctx->thread_running)
{
// 1. 检查设备节点是否存在
struct stat st{};
if (stat(cam.device.c_str(), &st) != 0)
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "Device not found: " + cam.device;
LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error);
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 2. 创建 GStreamer 管线
GstElement* pipeline = create_pipeline(cam);
if (!pipeline)
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "Pipeline creation failed";
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
gst_element_set_name(pipeline, key.c_str());
GstBus* bus = gst_element_get_bus(pipeline);
// 3. 在 v4l2src 的 src pad 上挂探测 probe
bool got_frame = false;
{
GstElement* src = gst_bin_get_by_name(GST_BIN(pipeline), "src");
if (src)
{
GstPad* pad = gst_element_get_static_pad(src, "src");
if (pad)
{
gst_pad_add_probe(
pad, GST_PAD_PROBE_TYPE_BUFFER,
[](GstPad*, GstPadProbeInfo*, gpointer user_data) -> GstPadProbeReturn
{
*static_cast<bool*>(user_data) = true;
return GST_PAD_PROBE_OK;
},
&got_frame, nullptr);
gst_object_unref(pad);
}
else
{
LOG_WARN("[RTMP] " + key + " - src has no 'src' pad?");
}
gst_object_unref(src);
}
else
{
LOG_WARN("[RTMP] " + key + " - cannot find element 'src' for pad-probe");
}
}
// 4. 启动播放
LOG_INFO("[RTMP] Starting stream: " + key);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
// 等待进入 PLAYING 状态(最长 5s
GstState state = GST_STATE_NULL, pending = GST_STATE_NULL;
bool confirmed_running = false;
if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS &&
state == GST_STATE_PLAYING)
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
confirmed_running = true;
ctx->status.running = true;
ctx->status.last_error.clear();
LOG_INFO("[RTMP] " + key + " confirmed PLAYING");
}
else
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "Pipeline failed to confirm PLAYING";
LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error);
}
if (!confirmed_running)
{
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS));
continue;
}
// 5. 运行阶段:监测帧和错误
const auto start_t = std::chrono::steady_clock::now();
bool need_restart = false;
while (ctx->thread_running)
{
// 检查是否收到帧(前 5s 内)
if (!got_frame)
{
auto elapsed =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_t)
.count();
if (elapsed > 5)
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "No frames detected (no video signal)";
LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error);
need_restart = true;
break;
}
}
else
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = true;
ctx->status.last_error.clear();
}
// 等待错误或 EOS 消息
GstMessage* msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND,
(GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
if (!msg) continue;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ERROR:
{
GError* err = nullptr;
gst_message_parse_error(msg, &err, nullptr);
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = err ? err->message : "GStreamer error";
LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error);
if (err) g_error_free(err);
need_restart = true;
break;
}
case GST_MESSAGE_EOS:
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "End of stream (EOS)";
LOG_WARN("[RTMP] " + key + " reached EOS");
need_restart = true;
break;
}
default:
break;
}
gst_message_unref(msg);
if (need_restart) break;
}
// 6. 收尾清理
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
// 7. 若仍在运行状态,准备重启
if (ctx->thread_running)
{
LOG_WARN("[RTMP] Restarting " + key + " in 3s...");
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS));
}
}
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
}
LOG_INFO("[RTMP] Stream thread exited for " + key);
}
// ========== 启停与状态 ==========
void RTMPManager::start_all()
{
LOG_INFO("[RTMP] Starting all record streams...");
std::lock_guard<std::mutex> lock(streams_mutex);
int delay_ms = 0;
for (auto& cam : g_app_config.cameras)
{
if (!cam.enabled)
{
LOG_INFO("[RTMP] Camera disabled, skip: " + cam.name);
continue;
}
auto key = make_key(cam.name);
if (streams.find(key) != streams.end())
{
LOG_INFO("[RTMP] Stream already running: " + key);
continue;
}
auto ctx = std::make_unique<StreamContext>();
ctx->thread_running.store(true);
ctx->thread = std::thread(
[cam, ptr = ctx.get(), delay_ms]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
stream_loop(cam, ptr);
});
streams.emplace(key, std::move(ctx));
delay_ms += 200; // 每路错开 200ms
}
}
void RTMPManager::stop_all()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto& kv : streams) kv.second->thread_running.store(false);
for (auto& kv : streams)
if (kv.second->thread.joinable()) kv.second->thread.join();
streams.clear();
}
bool RTMPManager::is_streaming(const std::string& cam_name)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_key(cam_name));
if (it == streams.end()) return false;
auto& ctx = *(it->second);
std::lock_guard<std::mutex> lk(ctx.status_mutex);
return ctx.status.running;
}
std::string RTMPManager::get_stream_url(const std::string& cam_name)
{
const auto& srs = g_app_config.srs;
std::string ip = get_ip_address(srs.public_interface);
if (ip.empty()) ip = "127.0.0.1";
return "http://" + ip + ":" + std::to_string(srs.live_http_api_port) + "/rtc/v1/whep/?app=" + srs.stream_app +
"&stream=" + cam_name + "_main&vhost=" + srs.live_vhost;
}
// ========== 汇总状态 ==========
std::vector<RTMPManager::ChannelInfo> RTMPManager::get_all_channels_status()
{
std::vector<ChannelInfo> result;
std::lock_guard<std::mutex> lock(streams_mutex);
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
{
const auto& cam = g_app_config.cameras[i];
auto key = make_key(cam.name);
ChannelInfo ch;
ch.loc = static_cast<int>(i);
ch.url.clear();
ch.running = false;
ch.reason = "Not started";
auto it = streams.find(key);
if (it != streams.end())
{
auto& ctx = *(it->second);
std::lock_guard<std::mutex> lk(ctx.status_mutex);
auto& status = it->second->status;
ch.running = status.running;
if (status.running)
{
ch.url = get_stream_url(cam.name);
ch.reason.clear();
}
else
{
ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error;
}
}
result.push_back(ch);
}
return result;
}