sweeper_video/src/rtmp_manager.cpp

431 lines
14 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtmp_manager.cpp
#include "rtmp_manager.hpp"
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <netinet/in.h>
#include <sys/stat.h>
#include <chrono>
#include <cstring>
#include <memory>
#include <thread>
std::atomic<bool> RTMPManager::g_live_enabled{false};
// =======================================================
// 工具函数
// =======================================================
static bool device_exists(const std::string& path)
{
struct stat st;
return stat(path.c_str(), &st) == 0;
}
// 获取指定网卡 IPv4
std::string get_ip_address(const std::string& ifname)
{
struct ifaddrs* ifaddr = nullptr;
if (getifaddrs(&ifaddr) != 0) return "";
char ip[INET_ADDRSTRLEN] = {0};
for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next)
{
if (!ifa->ifa_addr) continue;
if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name)
{
auto* addr = &reinterpret_cast<sockaddr_in*>(ifa->ifa_addr)->sin_addr;
inet_ntop(AF_INET, addr, ip, sizeof(ip));
break;
}
}
freeifaddrs(ifaddr);
return ip;
}
// =======================================================
// 静态成员
// =======================================================
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
// =======================================================
// 初始化
// =======================================================
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized");
}
std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; }
// =======================================================
// 创建推流管线(稳定版)
// =======================================================
GstElement* RTMPManager::create_pipeline(const Camera& cam)
{
const std::string stream = cam.name + "_main";
const std::string app = "camera";
const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live";
const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream + "?vhost=record";
std::string pipeline_str = "v4l2src device=" + cam.device +
" io-mode=dmabuf "
"! video/x-raw,format=NV12,width=1920,height=1080,framerate=" +
std::to_string(cam.fps) +
"/1 "
"! videoscale "
"! video/x-raw,width=" +
std::to_string(cam.width) + ",height=" + std::to_string(cam.height) +
" "
"! queue max-size-buffers=2 leaky=downstream "
"! mpph264enc rc-mode=cbr bps=" +
std::to_string(cam.bitrate) + " gop=" + std::to_string(cam.fps) +
" header-mode=each-idr profile=baseline "
"! h264parse config-interval=1 "
"! tee name=t "
// ===== record永远稳定 =====
"t. ! queue max-size-buffers=8 leaky=downstream "
"! flvmux name=rec_mux streamable=true "
"! rtmpsink name=rec_sink location=\"" +
record_rtmp +
"\" sync=false async=false "
// ===== live可随意开关 =====
"t. ! queue max-size-buffers=8 leaky=downstream "
"! valve name=live_valve drop=true "
"! queue max-size-buffers=8 leaky=downstream "
"! flvmux name=live_mux streamable=true "
"! rtmpsink name=live_sink location=\"" +
live_rtmp + "\" sync=false async=false ";
GError* err = nullptr;
GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &err);
if (err)
{
LOG_ERROR("[RTMP] Pipeline creation failed: " + std::string(err->message));
g_error_free(err);
return nullptr;
}
return pipeline;
}
// =======================================================
// 主推流线程
// =======================================================
void RTMPManager::stream_loop(Camera cam, StreamContext* ctx)
{
const std::string key = make_key(cam.name);
while (ctx->thread_running)
{
// 1. 设备存在性
if (!device_exists(cam.device))
{
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "Device not found: " + cam.device;
}
LOG_WARN("[RTMP] " + key + " - device not found");
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 2. 创建 pipeline
GstElement* pipeline = create_pipeline(cam);
if (!pipeline)
{
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "Pipeline creation failed";
}
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 2.1 获取 live_valve用于 MQTT 控制)
GstElement* live_valve = gst_bin_get_by_name(GST_BIN(pipeline), "live_valve");
if (!live_valve)
{
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "live_valve not found";
}
LOG_ERROR("[RTMP] " + key + " - live_valve not found");
gst_object_unref(pipeline);
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
// 保存到 ctx
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (ctx->live_valve) gst_object_unref(ctx->live_valve);
ctx->live_valve = live_valve; // ⚠️ live_valve 引用交给 ctx 管理
}
g_object_set(G_OBJECT(ctx->live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr);
GstBus* bus = gst_element_get_bus(pipeline);
LOG_INFO("[RTMP] Starting stream: " + key);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
// 3. 等待进入 PLAYING
GstState state = GST_STATE_NULL;
if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS &&
state == GST_STATE_PLAYING)
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = true;
ctx->status.last_error.clear();
LOG_INFO("[RTMP] " + key + " confirmed PLAYING");
}
else
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "Failed to reach PLAYING";
LOG_ERROR("[RTMP] " + key + " failed to PLAY");
goto cleanup;
}
// 4. 正常运行:只监听 ERROR / EOS
while (ctx->thread_running)
{
GstMessage* msg = gst_bus_timed_pop_filtered(
bus, 300 * GST_MSECOND, static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
if (!msg)
{
// 心跳pipeline 没死就认为在跑
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = true;
continue;
}
if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR)
{
GError* err = nullptr;
gst_message_parse_error(msg, &err, nullptr);
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = err ? err->message : "Unknown GStreamer error";
}
LOG_ERROR("[RTMP] " + key + " ERROR: " + (err ? err->message : "unknown"));
if (err) g_error_free(err);
gst_message_unref(msg);
break;
}
if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS)
{
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
ctx->status.last_error = "EOS";
}
LOG_WARN("[RTMP] " + key + " EOS");
gst_message_unref(msg);
break;
}
gst_message_unref(msg);
}
cleanup:
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (ctx->live_valve)
{
gst_object_unref(ctx->live_valve);
ctx->live_valve = nullptr;
}
}
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
if (ctx->thread_running)
{
LOG_WARN("[RTMP] Restarting " + key + " in 3s...");
std::this_thread::sleep_for(std::chrono::seconds(3));
}
}
{
std::lock_guard<std::mutex> lk(ctx->status_mutex);
ctx->status.running = false;
}
LOG_INFO("[RTMP] Stream thread exited for " + key);
}
// =======================================================
// 启停管理
// =======================================================
void RTMPManager::start_all()
{
LOG_INFO("[RTMP] Starting enabled streams...");
std::lock_guard<std::mutex> lock(streams_mutex);
int delay_ms = 0;
for (const auto& cam : g_app_config.cameras)
{
if (!cam.enabled)
{
LOG_INFO("[RTMP] Skip disabled camera: " + cam.name);
continue;
}
const auto key = make_key(cam.name);
if (streams.count(key)) continue;
auto ctx = std::make_unique<StreamContext>();
ctx->thread_running.store(true);
ctx->thread = std::thread(
[cam, ptr = ctx.get(), delay_ms]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
stream_loop(cam, ptr);
});
streams.emplace(key, std::move(ctx));
delay_ms += 200;
}
}
void RTMPManager::stop_all()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto& kv : streams)
{
kv.second->thread_running.store(false);
std::lock_guard<std::mutex> lk(kv.second->status_mutex);
if (kv.second->live_valve)
{
gst_object_unref(kv.second->live_valve);
kv.second->live_valve = nullptr;
}
}
for (auto& kv : streams)
if (kv.second->thread.joinable()) kv.second->thread.join();
streams.clear();
}
// =======================================================
// 状态 & URL
// =======================================================
bool RTMPManager::is_streaming(const std::string& cam_name)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_key(cam_name));
if (it == streams.end()) return false;
std::lock_guard<std::mutex> lk(it->second->status_mutex);
return it->second->status.running;
}
std::string RTMPManager::get_stream_url(const std::string& cam_name)
{
std::string ip = get_ip_address("enP2p33s0");
if (ip.empty()) ip = "192.168.4.194";
return "http://" + ip + ":11985/rtc/v1/whep/?app=camera&stream=" + cam_name + "_main&vhost=live";
}
// =======================================================
// 汇总状态
// =======================================================
std::vector<RTMPManager::ChannelInfo> RTMPManager::get_all_channels_status()
{
std::vector<ChannelInfo> result;
std::lock_guard<std::mutex> lock(streams_mutex);
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
{
const auto& cam = g_app_config.cameras[i];
ChannelInfo ch{};
ch.loc = static_cast<int>(i);
if (!cam.enabled)
{
ch.running = false;
ch.reason = "Disabled by config";
result.push_back(ch);
continue;
}
auto it = streams.find(make_key(cam.name));
if (it != streams.end())
{
std::lock_guard<std::mutex> lk(it->second->status_mutex);
ch.running = it->second->status.running;
ch.reason = it->second->status.last_error;
if (ch.running) ch.url = get_stream_url(cam.name);
}
else
{
ch.running = false;
ch.reason = "Not started";
}
result.push_back(ch);
}
return result;
}
void RTMPManager::set_live_enabled_all(bool enable)
{
std::lock_guard<std::mutex> lock(streams_mutex);
g_live_enabled.store(enable);
for (auto& kv : streams)
{
auto* ctx = kv.second.get();
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (!ctx->live_valve) continue;
// enable=true → drop=false放行 live
g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr);
}
LOG_INFO(std::string("[RTMP] Live ") + (enable ? "ENABLED" : "DISABLED") + " for all streams");
}
void RTMPManager::set_live_enabled(const std::string& cam_name, bool enable)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_key(cam_name));
if (it == streams.end()) return;
auto* ctx = it->second.get();
std::lock_guard<std::mutex> lk(ctx->status_mutex);
if (!ctx->live_valve) return;
g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr);
LOG_INFO("[RTMP] Live " + std::string(enable ? "ENABLED" : "DISABLED") + " for " + cam_name);
}