kunlang_video/src/rtmp_manager.cpp
2025-10-16 13:38:23 +08:00

417 lines
13 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtsp_manager.cpp
#include "rtmp_manager.hpp"
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <net/if.h>
#include <netinet/in.h>
#include <chrono>
#include <cstring>
#include <iostream>
#include <string>
#include <thread>
#include "logger.hpp"
static std::string get_ip_address(const std::string &ifname)
{
struct ifaddrs *ifaddr, *ifa;
char ip[INET_ADDRSTRLEN] = {0};
if (getifaddrs(&ifaddr) == -1)
{
perror("getifaddrs");
return "";
}
for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
// 只看 IPv4
if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name)
{
void *addr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
if (inet_ntop(AF_INET, addr, ip, sizeof(ip)))
{
freeifaddrs(ifaddr);
return ip;
}
}
}
freeifaddrs(ifaddr);
return ""; // 没找到
}
static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; }
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
RTMPManager::StreamCallback RTMPManager::status_callback = nullptr;
std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type)
{
return cam_name + stream_type_suffix(type);
}
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); }
void RTMPManager::update_status(const std::string &key, const StreamStatus &status)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end()) it->second->status = status;
}
if (status_callback) status_callback(key, status);
}
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
{
int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate;
if (type == StreamType::SUB)
{
width = std::max(160, width / 2);
height = std::max(120, height / 2);
fps = std::max(10, fps / 2);
bitrate = std::max(300000, bitrate / 2);
}
std::string stream_name = cam.name + stream_type_suffix(type);
std::string pipeline_str = "v4l2src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) +
"/1 ! queue max-size-buffers=1 leaky=downstream "
"! mpph264enc bps=" +
std::to_string(bitrate) + " gop=" + std::to_string(fps) +
" ! h264parse ! flvmux streamable=true name=mux "
"! rtmpsink location=\"rtmp://127.0.0.1/live/" +
stream_name + " live=1\" sync=false";
LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str);
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message));
g_error_free(error);
return nullptr;
}
return pipeline;
}
int get_camera_index(const std::string &name)
{
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
if (g_app_config.cameras[i].name == name) return static_cast<int>(i);
return -1;
}
RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type)
{
StreamResultInfo res;
res.loc = get_camera_index(cam.name);
res.url = get_stream_url(cam.name, type);
if (res.loc < 0 || res.loc >= static_cast<int>(g_app_config.cameras.size()))
{
res.result = 1;
res.reason = "Invalid channel index";
return res;
}
const std::string key = make_stream_key(cam.name, type);
std::unique_ptr<StreamContext> ctx;
std::future<StreamStatus> status_future;
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end() && it->second->running.load())
{
res.result = 0;
res.reason = "Already streaming";
return res;
}
ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
// 重要:在把 ctx 放入 map 之前,先从 ctx->start_promise 拿到 future
status_future = ctx->start_promise.get_future();
StreamContext *ctx_ptr = ctx.get();
ctx->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); });
streams.emplace(key, std::move(ctx));
}
// 等待首帧或错误,最多 7 秒
if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready)
{
const StreamStatus s = status_future.get();
res.result = s.running ? 0 : 1;
res.reason = s.running ? "Started OK" : (s.last_error.empty() ? "Failed to start" : s.last_error);
}
else
{
res.result = 1;
res.reason = "Timeout waiting for stream";
}
return res;
}
RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
{
StreamResultInfo res;
res.loc = get_camera_index(cam_name);
res.url = get_stream_url(cam_name, type);
// 检查通道号是否合法
if (res.loc < 0 || res.loc >= static_cast<int>(g_app_config.cameras.size()))
{
res.result = 1;
res.reason = "Invalid channel index";
return res;
}
std::unique_ptr<StreamContext> ctx;
std::string key = make_stream_key(cam_name, type);
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end())
{
// 没有这个流(可能从未推过或启动失败)→ 视为成功,因为状态已经是“未推”
res.result = 0;
res.reason = "Already stopped (no active stream)";
return res;
}
// 找到了正在推的流 → 停止
ctx = std::move(it->second);
streams.erase(it);
}
bool was_running = ctx->running.load();
ctx->running.store(false);
if (ctx->thread.joinable()) ctx->thread.join();
res.result = 0;
res.reason = was_running ? "Stopped manually" : "Already stopped";
return res;
}
void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx)
{
const std::string key = make_stream_key(cam.name, type);
auto try_set_start = [&](const StreamStatus &st)
{
bool expected = false;
if (ctx && ctx->start_promise_set.compare_exchange_strong(expected, true))
{
try
{
ctx->start_promise.set_value(st);
}
catch (...)
{
}
}
};
StreamStatus status;
status.running = false;
status.last_result = StreamResult::UNKNOWN;
status.last_error.clear();
GstElement *pipeline = create_pipeline(cam, type);
if (!pipeline)
{
status.last_result = StreamResult::PIPELINE_ERROR;
status.last_error = "Failed to create pipeline";
try_set_start(status);
return;
}
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
bool first_frame = false;
const auto t0 = std::chrono::steady_clock::now();
while (true)
{
// 手动停止
if (!ctx->running.load())
{
status.running = false;
status.last_result = StreamResult::UNKNOWN;
status.last_error = "Stream stopped manually";
try_set_start(status); // 若启动阶段还没返回,这里补一次
break;
}
// 首帧超时5s
if (!first_frame &&
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - t0).count() > 5)
{
status.running = false;
status.last_result = StreamResult::TIMEOUT;
status.last_error = "No frames received within timeout";
try_set_start(status);
break;
}
GstMessage *msg = gst_bus_timed_pop_filtered(
bus, 100 * GST_MSECOND,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
if (!msg) continue;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_STATE_CHANGED:
{
GstState old_state, new_state;
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && !first_frame)
{
first_frame = true;
status.running = true;
status.last_result = StreamResult::OK;
status.last_error.clear();
try_set_start(status); // 首帧成功:通知 start_camera
}
break;
}
case GST_MESSAGE_ERROR:
{
GError *err = nullptr;
gst_message_parse_error(msg, &err, nullptr);
status.running = false;
status.last_result = StreamResult::CONNECTION_FAIL;
status.last_error = err ? err->message : "GStreamer error";
if (err) g_error_free(err);
try_set_start(status);
break;
}
case GST_MESSAGE_EOS:
{
status.running = false;
status.last_result = StreamResult::EOS_RECEIVED;
status.last_error = "EOS received";
try_set_start(status);
break;
}
default:
break;
}
gst_message_unref(msg);
// 错误/EOS 后退出循环
if (status.last_result == StreamResult::CONNECTION_FAIL || status.last_result == StreamResult::EOS_RECEIVED)
{
break;
}
}
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
// 线程收尾:这里不要删 map 里的 ctxstop_camera 已经负责转移并 join
}
void RTMPManager::stop_all()
{
std::vector<std::unique_ptr<StreamContext>> ctxs;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams) kv.second->running.store(false);
for (auto &kv : streams) ctxs.push_back(std::move(kv.second));
streams.clear();
}
for (auto &ctx : ctxs)
if (ctx->thread.joinable()) ctx->thread.join();
LOG_INFO("[RTMP] stop_all completed.");
}
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_stream_key(cam_name, type));
return it != streams.end() && it->second->running.load();
}
bool RTMPManager::is_any_streaming()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
if (kv.second->running.load()) return true;
return false;
}
std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)
{
// 获取当前设备 enp1s0 网卡 IP或者改成你希望的接口
std::string ip = get_ip_address("enP2p33s0");
if (ip.empty()) ip = "127.0.0.1";
// 构建流名,例如 AHD2_main
std::string stream_name = make_stream_key(cam_name, type);
// 最终返回 WebRTC 拉流地址(不加 .flv
return "http://" + ip + ":1985/rtc/v1/whep/?app=live&stream=" + stream_name;
}
std::vector<RTMPManager::StreamResultInfo> RTMPManager::process_push_request(const VideoPushRequest &req)
{
std::vector<StreamResultInfo> results;
for (const auto &item : req.data)
{
StreamType type = (item.streamType == 0) ? StreamType::MAIN : StreamType::SUB;
for (int ch : item.channels)
{
if (ch < 0 || ch >= static_cast<int>(g_app_config.cameras.size())) continue;
const auto &cam = g_app_config.cameras[ch];
StreamResultInfo info;
if (item.switchVal == 0)
{
// 开启推流
info = start_camera(cam, type);
}
else
{
// 停止推流
info = stop_camera(cam.name, type);
}
results.push_back(info);
}
}
return results;
}