sweeper_video/src/rtmp_manager.cpp
2025-10-15 14:14:00 +08:00

355 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtsp_manager.cpp
#include "rtmp_manager.hpp"
#include <chrono>
#include <iostream>
#include <thread>
#include <utility>
#include "logger.hpp"
static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; }
// static members
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
RTMPManager::StreamCallback RTMPManager::status_callback = nullptr;
std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type)
{
return cam_name + stream_type_suffix(type);
}
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); }
void RTMPManager::update_status(const std::string &key, const StreamStatus &status)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end()) it->second->status = status;
}
if (status_callback) status_callback(key, status);
}
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
{
int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate;
if (type == StreamType::SUB)
{
width = std::max(160, width / 2);
height = std::max(120, height / 2);
fps = std::max(10, fps / 2);
bitrate = std::max(300000, bitrate / 2);
}
std::string stream_name = cam.name + stream_type_suffix(type);
std::string pipeline_str = "v4l2src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) +
"/1 ! queue max-size-buffers=1 leaky=downstream "
"! mpph264enc bps=" +
std::to_string(bitrate) + " gop=" + std::to_string(fps) +
" ! h264parse ! flvmux streamable=true name=mux "
"! rtmpsink location=\"rtmp://127.0.0.1/live/" +
stream_name + " live=1\" sync=false";
LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str);
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message));
g_error_free(error);
return nullptr;
}
return pipeline;
}
void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok,
const std::string &reason)
{
// 找 camera index
int cam_index = -1;
Camera *cam_ptr = nullptr;
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
{
if (g_app_config.cameras[i].name == cam_name)
{
cam_index = static_cast<int>(i);
cam_ptr = &g_app_config.cameras[i];
break;
}
}
nlohmann::json ch_resp;
ch_resp["loc"] = cam_index;
ch_resp["url"] = (ok && cam_ptr) ? get_stream_url(cam_ptr->name, type) : "";
ch_resp["result"] = ok ? 0 : 1;
ch_resp["reason"] = reason;
nlohmann::json reply;
reply["type"] = "response";
reply["seqNo"] = seqNo;
reply["data"] = nlohmann::json::array({ch_resp});
if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump());
}
void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &seqNo)
{
std::string key = make_stream_key(cam.name, type);
LOG_INFO("[RTMP] Stream loop started for " + key);
const int MAX_RETRIES = 0;
int retry_count = 0;
const int FIRST_FRAME_TIMEOUT_SEC = 5; // 等待第一帧最大秒数
while (true)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load()) break;
}
GstElement *pipeline = create_pipeline(cam, type);
if (!pipeline)
{
std::string reason = "Failed to create pipeline (device unavailable or resolution unsupported)";
update_status(key, {false, StreamResult::PIPELINE_ERROR, reason});
publish_stream_status(cam.name, type, seqNo, false, reason);
if (++retry_count > MAX_RETRIES) break;
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
bool first_frame_received = false;
bool stop_flag = false;
auto first_frame_start = std::chrono::steady_clock::now();
while (!stop_flag)
{
GstMessage *msg = gst_bus_timed_pop_filtered(
bus, 100 * GST_MSECOND,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load())
{
stop_flag = true;
break;
}
}
// check first-frame timeout
if (!first_frame_received)
{
auto elapsed = std::chrono::steady_clock::now() - first_frame_start;
if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC)
{
std::string reason = "No frames received within timeout";
LOG_WARN("[RTMP] " + key + ": " + reason);
update_status(key, {false, StreamResult::TIMEOUT, reason});
publish_stream_status(cam.name, type, seqNo, false, reason);
stop_flag = true;
}
}
if (!msg) continue;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ERROR:
{
GError *err = nullptr;
gchar *debug = nullptr;
gst_message_parse_error(msg, &err, &debug);
std::string err_msg = err ? err->message : "Unknown GStreamer error";
LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg);
update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg});
publish_stream_status(cam.name, type, seqNo, false, err_msg);
if (err) g_error_free(err);
if (debug) g_free(debug);
stop_flag = true;
break;
}
case GST_MESSAGE_EOS:
LOG_WARN("[RTMP] EOS on " + key);
update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"});
publish_stream_status(cam.name, type, seqNo, false, "EOS received");
stop_flag = true;
break;
case GST_MESSAGE_STATE_CHANGED:
{
GstState old_state, new_state;
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING &&
!first_frame_received)
{
first_frame_received = true;
LOG_INFO("[RTMP] First frame received for " + key);
update_status(key, {true, StreamResult::OK, ""});
publish_stream_status(cam.name, type, seqNo, true, "Streaming OK");
}
break;
}
default:
break;
}
gst_message_unref(msg);
}
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus) gst_object_unref(bus);
gst_object_unref(pipeline);
if (!stop_flag) break;
if (++retry_count > MAX_RETRIES)
{
LOG_ERROR("[RTMP] " + key + " reached max retries. Giving up.");
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// ensure we remove ourselves from streams map if still present and belong to this thread
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end())
{
// If thread stored in map matches current thread id, erase it.
// This avoids racing with stop_camera which may already have moved/erased it.
try
{
if (it->second && it->second->thread.get_id() == std::this_thread::get_id())
{
streams.erase(it);
}
}
catch (...)
{
// in case thread id comparisons throw (shouldn't), just ignore
streams.erase(it);
}
}
}
update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"});
LOG_INFO("[RTMP] Stream loop ended for " + key);
}
// 修改 start_camera新增 seqNo 参数
void RTMPManager::start_camera(const Camera &cam, StreamType type, const std::string &seqNo)
{
std::string key = make_stream_key(cam.name, type);
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end() && it->second->running.load())
{
LOG_WARN("[RTMP] " + key + " already streaming.");
// 按协议:如果已经在推,直接回报已在推(成功),并返回 URL
publish_stream_status(cam.name, type, seqNo, true, "Already streaming");
return;
}
// create and store context
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
ctx->status = {true, StreamResult::UNKNOWN, ""};
// create thread and place it into context. Need to assign thread before inserting to avoid race.
ctx->thread = std::thread([cam, type, seqNo]() { stream_loop(cam, type, seqNo); });
streams.emplace(key, std::move(ctx));
}
LOG_INFO("[RTMP] start_camera requested for " + key);
}
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type, const std::string &seqNo)
{
std::string key = make_stream_key(cam_name, type);
std::unique_ptr<StreamContext> ctx;
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end())
{
LOG_WARN("[RTMP] " + key + " not found.");
// 按协议:如果没有在推,则回复“未推”的成功响应(实际不做任何动作)
publish_stream_status(cam_name, type, seqNo, true, "Not streaming");
return;
}
it->second->running.store(false);
ctx = std::move(it->second);
streams.erase(it);
}
if (ctx->thread.joinable()) ctx->thread.join();
update_status(key, {false, StreamResult::OK, "Stopped manually"});
LOG_INFO("[RTMP] stop_camera completed: " + key);
// MQTT 上报手动停止成功
publish_stream_status(cam_name, type, seqNo, true, "Stopped manually");
}
void RTMPManager::stop_all()
{
std::vector<std::unique_ptr<StreamContext>> ctxs;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams) kv.second->running.store(false);
for (auto &kv : streams) ctxs.push_back(std::move(kv.second));
streams.clear();
}
for (auto &ctx : ctxs)
if (ctx->thread.joinable()) ctx->thread.join();
LOG_INFO("[RTMP] stop_all completed.");
}
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_stream_key(cam_name, type));
return it != streams.end() && it->second->running.load();
}
bool RTMPManager::is_any_streaming()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
if (kv.second->running.load()) return true;
return false;
}
std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)
{
return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type);
}