kunlang_video/src/rtmp_manager.cpp
2025-10-14 17:46:07 +08:00

258 lines
8.0 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtsp_manager.cpp
#include "rtmp_manager.hpp"
#include "logger.hpp"
#include <iostream>
#include <chrono>
#include <thread>
std::unordered_map<std::string, RTMPManager::StreamContext> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
static inline std::string stream_type_suffix(StreamType type)
{
return (type == StreamType::MAIN) ? "_main" : "_sub";
}
std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type)
{
return cam_name + stream_type_suffix(type);
}
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
{
// 拷贝原始参数
int width = cam.width;
int height = cam.height;
int fps = cam.fps;
int bitrate = cam.bitrate;
if (type == StreamType::SUB)
{
// 简单一刀切策略:分辨率 /2帧率 /2码率 /2临界值保护
width = std::max(160, width / 2);
height = std::max(120, height / 2);
fps = std::max(10, fps / 2);
bitrate = std::max(300000, bitrate / 2); // 最低 300kbps
}
// 构建不同的流名stream key
std::string stream_name = cam.name + (type == StreamType::MAIN ? "_main" : "_sub");
std::string pipeline_str =
"v4l2src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(width) +
",height=" + std::to_string(height) +
",framerate=" + std::to_string(fps) + "/1"
" ! queue max-size-buffers=1 leaky=downstream "
" ! mpph264enc bps=" +
std::to_string(bitrate) +
" gop=" + std::to_string(fps) +
" ! h264parse "
" ! flvmux streamable=true name=mux "
" ! rtmpsink location=\"rtmp://127.0.0.1/live/" +
stream_name +
" live=1\" sync=false";
LOG_INFO("[RTMP] Creating pipeline for '" + cam.name + "' (" +
(type == StreamType::MAIN ? "MAIN" : "SUB") + ") -> " + pipeline_str);
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR("[RTMP] Failed to parse pipeline: " + std::string(error->message));
g_error_free(error);
return nullptr;
}
return pipeline;
}
void RTMPManager::stream_loop(Camera cam, StreamType type)
{
const std::string key = make_stream_key(cam.name, type);
LOG_INFO("[RTMP] Stream thread started for '" + key + "'");
while (true)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running)
break;
}
GstElement *pipeline = create_pipeline(cam, type);
if (!pipeline)
{
LOG_ERROR("[RTMP] Failed to create pipeline for '" + key + "', retrying in 3s...");
std::this_thread::sleep_for(std::chrono::seconds(3));
continue;
}
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
LOG_INFO("[RTMP] Camera '" + key + "' streaming...");
bool error_occurred = false;
// 简单重连计数(可选):连续失败超过 N 次可以退出
int consecutive_failures = 0;
const int MAX_RETRIES = 5;
while (true)
{
GstMessage *msg = gst_bus_timed_pop_filtered(
bus, GST_CLOCK_TIME_NONE,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
{
std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running)
break;
}
if (!msg)
continue;
if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR)
{
GError *err = nullptr;
gchar *debug = nullptr;
gst_message_parse_error(msg, &err, &debug);
LOG_ERROR("[RTMP] Error on '" + key + "': " + std::string(err->message));
g_error_free(err);
g_free(debug);
error_occurred = true;
consecutive_failures++;
}
else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS)
{
LOG_WARN("[RTMP] EOS received on '" + key + "'");
error_occurred = true;
consecutive_failures++;
}
gst_message_unref(msg);
if (error_occurred)
break;
}
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
gst_object_unref(bus);
if (!error_occurred)
break;
if (consecutive_failures >= MAX_RETRIES)
{
LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts.");
// 可以选择将 running 置为 false让上层知道失败
std::lock_guard<std::mutex> lock(streams_mutex);
streams[key].running = false;
break;
}
LOG_WARN("[RTMP] Reconnecting '" + key + "' in 3s...");
std::this_thread::sleep_for(std::chrono::seconds(3));
}
LOG_INFO("[RTMP] Stream thread exited for '" + key + "'");
}
void RTMPManager::start_camera(const Camera &cam, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam.name, type);
if (streams.count(key) && streams[key].running)
{
LOG_WARN("[RTMP] Camera '" + key + "' already streaming.");
return;
}
StreamContext ctx;
ctx.running = true;
ctx.thread = std::thread([cam, type]()
{ RTMPManager::stream_loop(cam, type); });
streams.emplace(key, std::move(ctx));
}
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type);
auto it = streams.find(key);
if (it == streams.end())
{
LOG_WARN("[RTMP] Camera '" + key + "' not found.");
return;
}
it->second.running = false;
LOG_INFO("[RTMP] Stopping camera '" + key + "'...");
if (it->second.thread.joinable())
it->second.thread.join();
streams.erase(it);
}
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type);
auto it = streams.find(key);
return it != streams.end() && it->second.running;
}
bool RTMPManager::is_any_streaming()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
if (kv.second.running)
return true;
return false;
}
void RTMPManager::stop_all()
{
std::vector<std::string> names;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
names.push_back(kv.first);
}
for (auto &name : names)
{
// name 中包含后缀,但是 stop_camera 需要 cam_name + StreamType
// 我们可以解析后缀,或新增一个 stop_by_key。为简单起见解析
if (name.size() > 5 && name.find("_sub") == name.size() - 4)
{
std::string cam_name = name.substr(0, name.size() - 4);
stop_camera(cam_name, StreamType::SUB);
}
else if (name.size() > 6 && name.find("_main") == name.size() - 5)
{
std::string cam_name = name.substr(0, name.size() - 5);
stop_camera(cam_name, StreamType::MAIN);
}
else
{
// fallback: stop by treating as MAIN
stop_camera(name, StreamType::MAIN);
}
}
}
std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)
{
// 根据你本地 RTMP 服务地址组装 URL和 create_pipeline 的 stream_name 保持一致
std::string stream_name = cam_name + stream_type_suffix(type);
return std::string("rtmp://127.0.0.1/live/") + stream_name;
}