sweeper_video/src/rtmp_manager.cpp
2025-10-15 13:31:50 +08:00

340 lines
11 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rtsp_manager.cpp
#include "rtmp_manager.hpp"
#include "logger.hpp"
#include <chrono>
#include <thread>
#include <iostream>
#include <utility>
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex;
RTMPManager::StreamCallback RTMPManager::status_callback = nullptr;
std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type)
{
return cam_name + stream_type_suffix(type);
}
void RTMPManager::init()
{
gst_init(nullptr, nullptr);
LOG_INFO("[RTMP] GStreamer initialized.");
}
void RTMPManager::set_status_callback(StreamCallback cb)
{
status_callback = std::move(cb);
}
void RTMPManager::update_status(const std::string &key, const StreamStatus &status)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end())
it->second->status = status;
}
if (status_callback)
status_callback(key, status);
}
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
{
int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate;
if (type == StreamType::SUB)
{
width = std::max(160, width / 2);
height = std::max(120, height / 2);
fps = std::max(10, fps / 2);
bitrate = std::max(300000, bitrate / 2);
}
std::string stream_name = cam.name + stream_type_suffix(type);
std::string pipeline_str =
"v4l2src device=" + cam.device +
" ! video/x-raw,format=NV12,width=" + std::to_string(width) +
",height=" + std::to_string(height) +
",framerate=" + std::to_string(fps) +
"/1 ! queue max-size-buffers=1 leaky=downstream "
"! mpph264enc bps=" +
std::to_string(bitrate) +
" gop=" + std::to_string(fps) +
" ! h264parse ! flvmux streamable=true name=mux "
"! rtmpsink location=\"rtmp://127.0.0.1/live/" +
stream_name + " live=1\" sync=false";
LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "': " + pipeline_str);
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
if (error)
{
LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message));
g_error_free(error);
return nullptr;
}
return pipeline;
}
void RTMPManager::publish_stream_status(const std::string &cam_name, StreamType type, const std::string &seqNo, bool ok, const std::string &reason)
{
// 找 camera index
int cam_index = -1;
Camera *cam_ptr = nullptr;
for (size_t i = 0; i < g_app_config.cameras.size(); ++i)
{
if (g_app_config.cameras[i].name == cam_name)
{
cam_index = static_cast<int>(i);
cam_ptr = &g_app_config.cameras[i];
break;
}
}
nlohmann::json ch_resp;
ch_resp["loc"] = cam_index;
ch_resp["url"] = ok && cam_ptr ? get_stream_url(cam_ptr->name, type) : "";
ch_resp["result"] = ok ? 0 : 1;
ch_resp["reason"] = reason;
nlohmann::json reply;
reply["type"] = "response";
reply["seqNo"] = seqNo;
reply["data"] = nlohmann::json::array({ch_resp});
mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump());
}
// 修改 stream_loop新增 seqNo 参数
void RTMPManager::stream_loop(Camera cam, StreamType type, const std::string &seqNo)
{
std::string key = make_stream_key(cam.name, type);
LOG_INFO("[RTMP] Stream loop started for " + key);
const int MAX_RETRIES = 3;
int retry_count = 0;
while (true)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load())
break;
}
GstElement *pipeline = create_pipeline(cam, type);
if (!pipeline)
{
std::string reason = "Failed to create pipeline (device unavailable or resolution unsupported)";
update_status(key, {false, StreamResult::PIPELINE_ERROR, reason});
publish_stream_status(cam.name, type, seqNo, false, reason);
if (++retry_count > MAX_RETRIES)
break;
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
GstBus *bus = gst_element_get_bus(pipeline);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
bool first_frame_received = false;
bool stop_flag = false;
const int FIRST_FRAME_TIMEOUT_SEC = 5; // 等待第一帧最大秒数
auto first_frame_start = std::chrono::steady_clock::now();
while (!stop_flag)
{
GstMessage *msg = gst_bus_timed_pop_filtered(
bus, 100 * GST_MSECOND,
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED));
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load())
{
stop_flag = true;
break;
}
}
if (!first_frame_received)
{
auto elapsed = std::chrono::steady_clock::now() - first_frame_start;
if (std::chrono::duration_cast<std::chrono::seconds>(elapsed).count() > FIRST_FRAME_TIMEOUT_SEC)
{
std::string reason = "No frames received within timeout";
LOG_WARN("[RTMP] " + key + ": " + reason);
update_status(key, {false, StreamResult::TIMEOUT, reason});
publish_stream_status(cam.name, type, seqNo, false, reason);
stop_flag = true;
}
}
if (!msg)
continue;
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_ERROR:
{
GError *err = nullptr;
gchar *debug = nullptr;
gst_message_parse_error(msg, &err, &debug);
std::string err_msg = err ? err->message : "Unknown GStreamer error";
LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg);
update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg});
publish_stream_status(cam.name, type, seqNo, false, err_msg);
if (err)
g_error_free(err);
if (debug)
g_free(debug);
stop_flag = true;
break;
}
case GST_MESSAGE_EOS:
LOG_WARN("[RTMP] EOS on " + key);
update_status(key, {false, StreamResult::EOS_RECEIVED, "EOS"});
publish_stream_status(cam.name, type, seqNo, false, "EOS received");
stop_flag = true;
break;
case GST_MESSAGE_STATE_CHANGED:
{
GstState old_state, new_state;
gst_message_parse_state_changed(msg, &old_state, &new_state, nullptr);
if (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipeline) && new_state == GST_STATE_PLAYING && !first_frame_received)
{
first_frame_received = true;
LOG_INFO("[RTMP] First frame received for " + key);
update_status(key, {true, StreamResult::OK, ""});
publish_stream_status(cam.name, type, seqNo, true, "Streaming OK");
}
break;
}
}
gst_message_unref(msg);
}
gst_element_set_state(pipeline, GST_STATE_NULL);
if (bus)
gst_object_unref(bus);
gst_object_unref(pipeline);
if (!stop_flag)
break;
if (++retry_count > MAX_RETRIES)
{
LOG_ERROR("[RTMP] " + key + " reached max retries. Giving up.");
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
update_status(key, {false, StreamResult::UNKNOWN, "Stream loop exited"});
LOG_INFO("[RTMP] Stream loop ended for " + key);
}
// 修改 start_camera新增 seqNo 参数
void RTMPManager::start_camera(const Camera &cam, StreamType type, const std::string &seqNo)
{
std::string key = make_stream_key(cam.name, type);
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end() && it->second->running.load())
{
LOG_WARN("[RTMP] " + key + " already streaming.");
// 上报已经在请求状态
publish_stream_status(cam.name, type, seqNo, false, "Already streaming");
return;
}
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
ctx->thread = std::thread([cam, type, seqNo]()
{
stream_loop(cam, type, seqNo); // stream_loop 内部处理 MQTT 上报
});
streams[key] = std::move(ctx);
LOG_INFO("[RTMP] start_camera requested for " + key);
}
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type, const std::string &seqNo)
{
std::string key = make_stream_key(cam_name, type);
std::unique_ptr<StreamContext> ctx;
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it == streams.end())
{
LOG_WARN("[RTMP] " + key + " not found.");
// 上报停止失败(未找到)
publish_stream_status(cam_name, type, seqNo, false, "Stream not found");
return;
}
it->second->running.store(false);
ctx = std::move(it->second);
streams.erase(it);
}
if (ctx->thread.joinable())
ctx->thread.join();
update_status(key, {false, StreamResult::OK, "Stopped manually"});
LOG_INFO("[RTMP] stop_camera completed: " + key);
// MQTT 上报手动停止成功
publish_stream_status(cam_name, type, seqNo, true, "Stopped manually");
}
void RTMPManager::stop_all()
{
std::vector<std::unique_ptr<StreamContext>> ctxs;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
kv.second->running.store(false);
for (auto &kv : streams)
ctxs.push_back(std::move(kv.second));
streams.clear();
}
for (auto &ctx : ctxs)
if (ctx->thread.joinable())
ctx->thread.join();
LOG_INFO("[RTMP] stop_all completed.");
}
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(make_stream_key(cam_name, type));
return it != streams.end() && it->second->running.load();
}
bool RTMPManager::is_any_streaming()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
if (kv.second->running.load())
return true;
return false;
}
std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)
{
return "rtmp://127.0.0.1/live/" + make_stream_key(cam_name, type);
}