kunlang_video/src/rtmp_manager.cpp
2025-10-17 17:26:38 +08:00

374 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtsp_manager.cpp
#include "rtmp_manager.hpp"
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <net/if.h>
#include <netinet/in.h>
#include <sys/stat.h>
#include <chrono>
#include <cstring>
#include <thread>
// ========== 工具函数 ==========
static bool device_exists(const std::string &path)
{
struct stat st;
return (stat(path.c_str(), &st) == 0);
}
// 动态获取指定网卡 IPv4 地址
static std::string get_ip_address(const std::string &ifname)
{
struct ifaddrs *ifaddr, *ifa;
char ip[INET_ADDRSTRLEN] = {0};
if (getifaddrs(&ifaddr) == -1)
{
perror("getifaddrs");
return "";
}
for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name)
{
void *addr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
if (inet_ntop(AF_INET, addr, ip, sizeof(ip)))
{
freeifaddrs(ifaddr);
return ip;
}
}
}
freeifaddrs(ifaddr);
return "";
}
// ========== 静态成员 ==========
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
// ========== 初始化 ==========
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
std::string RTMPManager::make_key(const std::string &name) { return name + "_main"; }
// ========== 创建推流管线 ==========
GstElement *RTMPManager::create_pipeline(const Camera &cam)
{
const int width = cam.width;
const int height = cam.height;
const int fps = cam.fps;
const int bitrate = cam.bitrate;
const std::string stream_name = cam.name + "_main";
const std::string app = "camera";
const std::string location = "rtmp://127.0.0.1:1935/" + app + "/" + stream_name + "?vhost=" + app;
// 给关键元素起名字src, vc, enc, par, mux, sink, tee
std::string pipeline_str = "v4l2src name=src device=" + cam.device +
" ! video/x-raw,width=" + std::to_string(width) + ",height=" + std::to_string(height) +
",framerate=" + std::to_string(fps) +
"/1 "
"! videoconvert name=vc ! video/x-raw,format=NV12 "
"! tee name=t "
"t. ! queue "
"! mpph264enc name=enc bps=" +
std::to_string(bitrate) + " gop=" + std::to_string(fps) +
" "
"! h264parse name=par "
"! flvmux name=mux streamable=true "
"audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. "
"mux. ! rtmpsink name=sink location=\"" +
location +
"\" sync=false "
// 预留第二路 tee 分支(以后加叠加/探测都行),先丢掉
"t. ! queue ! fakesink sync=false";
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message);
g_error_free(error);
return nullptr;
}
return pipeline;
}
// ========== 主推流循环 ==========
void RTMPManager::stream_loop(Camera cam, StreamContext *ctx)
{
const std::string key = make_key(cam.name);
while (ctx->running)
{
// 设备是否存在
struct stat st{};
if (stat(cam.device.c_str(), &st) != 0)
{
ctx->status.running = false;
ctx->status.last_error = "Device not found: " + cam.device;
LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error);
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 创建管线
GstElement *pipeline = create_pipeline(cam);
if (!pipeline)
{
ctx->status.running = false;
ctx->status.last_error = "Pipeline creation failed";
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
gst_element_set_name(pipeline, key.c_str());
GstBus *bus = gst_element_get_bus(pipeline);
// 在 videoconvert 的 src pad 上挂 probe 判断是否有帧流过
bool got_frame = false;
{
GstElement *vc = gst_bin_get_by_name(GST_BIN(pipeline), "vc");
if (vc)
{
GstPad *pad = gst_element_get_static_pad(vc, "src");
if (pad)
{
gst_pad_add_probe(
pad, GST_PAD_PROBE_TYPE_BUFFER,
[](GstPad *, GstPadProbeInfo *, gpointer user_data) -> GstPadProbeReturn
{
*static_cast<bool *>(user_data) = true;
return GST_PAD_PROBE_OK;
},
&got_frame, nullptr);
gst_object_unref(pad);
}
else
{
LOG_WARN("[RTMP] " + key + " - vc has no src pad?");
}
gst_object_unref(vc);
}
else
{
LOG_WARN("[RTMP] " + key + " - cannot find element 'vc' for pad-probe");
}
}
// 启动
LOG_INFO("[RTMP] Starting stream: " + key);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
// 等待进入 PLAYING最长 5s
bool confirmed_running = false;
{
GstState state = GST_STATE_NULL, pending = GST_STATE_NULL;
// 注意第三个参数单位是纳秒5s=5*GST_SECOND
if (gst_element_get_state(pipeline, &state, &pending, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS &&
state == GST_STATE_PLAYING)
{
confirmed_running = true;
ctx->status.running = true;
ctx->status.last_error.clear();
LOG_INFO("[RTMP] " + key + " confirmed PLAYING");
}
else
{
ctx->status.running = false;
ctx->status.last_error = "Pipeline failed to confirm PLAYING";
LOG_WARN("[RTMP] " + key + " - " + ctx->status.last_error);
}
}
if (!confirmed_running)
{
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS));
continue;
}
// 运行中5s 内必须看到帧,否则认定无信号重启
const auto start_t = std::chrono::steady_clock::now();
bool need_restart = false;
while (ctx->running)
{
// 先检查帧
if (!got_frame)
{
auto elapsed =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_t)
.count();
if (elapsed > 5)
{
ctx->status.running = false;
ctx->status.last_error = "No frames detected (no video signal)";
LOG_ERROR("[RTMP] " + key + " - " + ctx->status.last_error);
need_restart = true;
break;
}
}
else
{
// 一旦有帧,保持 running=true
ctx->status.running = true;
ctx->status.last_error.clear();
}
// 监听错误/EOS
GstMessage *msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND,
(GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
if (!msg) continue;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ERROR:
{
GError *err = nullptr;
gst_message_parse_error(msg, &err, nullptr);
ctx->status.running = false;
ctx->status.last_error = err ? err->message : "GStreamer error";
LOG_ERROR("[RTMP] " + key + " stream error: " + ctx->status.last_error);
if (err) g_error_free(err);
need_restart = true;
break;
}
case GST_MESSAGE_EOS:
ctx->status.running = false;
ctx->status.last_error = "End of stream (EOS)";
LOG_WARN("[RTMP] " + key + " reached EOS");
need_restart = true;
break;
default:
break;
}
gst_message_unref(msg);
if (need_restart) break;
}
// 收尾
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
if (ctx->running)
{
LOG_WARN("[RTMP] Restarting " + key + " in 3s...");
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS));
}
}
ctx->status.running = false;
LOG_INFO("[RTMP] Stream thread exited for " + key);
}
// ========== 启停与状态 ==========
void RTMPManager::start_all()
{
LOG_INFO("[RTMP] Starting all record streams...");
std::lock_guard<std::mutex> lock(streams_mutex);
int delay_ms = 0;
for (auto &cam : g_app_config.cameras)
{
auto key = make_key(cam.name);
if (streams.find(key) != streams.end())
{
LOG_INFO("[RTMP] Stream already running: " + key);
continue;
}
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
ctx->thread = std::thread(
[cam, ptr = ctx.get(), delay_ms]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
stream_loop(cam, ptr);
});
streams.emplace(key, std::move(ctx));
delay_ms += 200; // 每路错开 200ms
}
}
void RTMPManager::stop_all()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams) kv.second->running.store(false);
for (auto &kv : streams)
if (kv.second->thread.joinable()) kv.second->thread.join();
streams.clear();
}
bool RTMPManager::is_streaming(const std::string &cam_name)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_key(cam_name));
return (it != streams.end() && it->second->status.running);
}
std::string RTMPManager::get_stream_url(const std::string &cam_name)
{
std::string ip = get_ip_address("enP2p33s0");
if (ip.empty()) ip = "192.168.3.211"; // fallback
return "http://" + ip + ":1985/rtc/v1/whep/?app=record&stream=" + cam_name + "_main";
}
// ========== 汇总状态 ==========
std::vector<RTMPManager::ChannelInfo> RTMPManager::get_all_channels_status()
{
std::vector<ChannelInfo> result;
std::lock_guard<std::mutex> lock(streams_mutex);
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
{
const auto &cam = g_app_config.cameras[i];
auto key = make_key(cam.name);
ChannelInfo ch;
ch.loc = static_cast<int>(i);
ch.url.clear();
ch.running = false;
ch.reason = "Not started";
auto it = streams.find(key);
if (it != streams.end())
{
auto &status = it->second->status;
ch.running = status.running;
if (status.running)
{
ch.url = get_stream_url(cam.name);
ch.reason.clear();
}
else
{
ch.reason = status.last_error.empty() ? "Unknown error" : status.last_error;
}
}
else
{
ch.reason = "Context missing";
}
result.push_back(ch);
}
return result;
}