This commit is contained in:
cxh 2025-10-15 08:57:40 +08:00
parent 17abcd0f78
commit eca18de463
2 changed files with 96 additions and 41 deletions

View File

@ -8,6 +8,7 @@
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <memory>
#include "app_config.hpp" #include "app_config.hpp"
class RTMPManager class RTMPManager
@ -47,9 +48,15 @@ private:
std::atomic<bool> running{false}; std::atomic<bool> running{false};
std::thread thread; std::thread thread;
StreamStatus status; StreamStatus status;
StreamContext() = default;
// non-copyable
StreamContext(const StreamContext &) = delete;
StreamContext &operator=(const StreamContext &) = delete;
// movable? we avoid moving by using unique_ptr
}; };
static std::unordered_map<std::string, StreamContext> streams; // store unique_ptr to avoid copy/move issues with atomic/thread
static std::unordered_map<std::string, std::unique_ptr<StreamContext>> streams;
static std::mutex streams_mutex; static std::mutex streams_mutex;
static StreamCallback status_callback; static StreamCallback status_callback;

View File

@ -5,8 +5,10 @@
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#include <algorithm> #include <algorithm>
#include <utility> // for move
#include <memory>
std::unordered_map<std::string, RTMPManager::StreamContext> RTMPManager::streams; std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
std::mutex RTMPManager::streams_mutex; std::mutex RTMPManager::streams_mutex;
RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr;
@ -33,10 +35,12 @@ void RTMPManager::set_status_callback(StreamCallback cb)
void RTMPManager::update_status(const std::string &key, const StreamStatus &status) void RTMPManager::update_status(const std::string &key, const StreamStatus &status)
{ {
std::lock_guard<std::mutex> lock(streams_mutex); {
auto it = streams.find(key); std::lock_guard<std::mutex> lock(streams_mutex);
if (it != streams.end()) auto it = streams.find(key);
it->second.status = status; if (it != streams.end())
it->second->status = status;
}
if (status_callback) if (status_callback)
status_callback(key, status); status_callback(key, status);
} }
@ -95,7 +99,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
{ {
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running) auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load())
break; break;
} }
@ -125,7 +130,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running) auto it = streams.find(key);
if (it == streams.end() || !it->second->running.load())
{ {
stop_flag = true; stop_flag = true;
break; break;
@ -143,8 +149,10 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
std::string err_msg = err ? err->message : "Unknown GStreamer error"; std::string err_msg = err ? err->message : "Unknown GStreamer error";
LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg); LOG_ERROR("[RTMP] Error in " + key + ": " + err_msg);
update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg}); update_status(key, {false, StreamResult::CONNECTION_FAIL, err_msg});
g_error_free(err); if (err)
g_free(debug); g_error_free(err);
if (debug)
g_free(debug);
stop_flag = true; stop_flag = true;
} }
else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) else if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS)
@ -160,7 +168,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
} }
gst_element_set_state(pipeline, GST_STATE_NULL); gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(bus); if (bus)
gst_object_unref(bus);
gst_object_unref(pipeline); gst_object_unref(pipeline);
if (!stop_flag) if (!stop_flag)
@ -183,59 +192,98 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
void RTMPManager::start_camera(const Camera &cam, StreamType type) void RTMPManager::start_camera(const Camera &cam, StreamType type)
{ {
std::string key = make_stream_key(cam.name, type); std::string key = make_stream_key(cam.name, type);
std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key);
if (it != streams.end() && it->second.running)
{ {
LOG_WARN("[RTMP] " + key + " already streaming."); std::lock_guard<std::mutex> lock(streams_mutex);
return; auto it = streams.find(key);
if (it != streams.end() && it->second->running.load())
{
LOG_WARN("[RTMP] " + key + " already streaming.");
return;
}
// create a new context on heap and keep a unique_ptr in map
auto ctx = std::make_unique<StreamContext>();
ctx->running.store(true);
ctx->status = {true, StreamResult::OK, ""};
// start thread after moving unique_ptr into map (to avoid racing)
// insert placeholder first to reserve key, then start thread
streams.emplace(key, std::move(ctx));
} }
StreamContext ctx; // actually start thread outside the lock (we need to get pointer again)
ctx.running = true; {
ctx.thread = std::thread([cam, type]() std::lock_guard<std::mutex> lock(streams_mutex);
{ stream_loop(cam, type); }); auto it = streams.find(key);
streams.emplace(key, std::move(ctx)); if (it != streams.end())
{
// start the thread that runs stream_loop; capture cam and type by value
it->second->thread = std::thread([cam, type]()
{ stream_loop(cam, type); });
}
}
LOG_INFO("[RTMP] start_camera requested for " + key);
} }
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type) void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
{ {
std::string key = make_stream_key(cam_name, type); std::string key = make_stream_key(cam_name, type);
std::unique_lock<std::mutex> lock(streams_mutex);
auto it = streams.find(key); std::unique_ptr<StreamContext> taken_ctx;
if (it == streams.end())
{ {
LOG_WARN("[RTMP] " + key + " not found."); std::lock_guard<std::mutex> lock(streams_mutex);
return; auto it = streams.find(key);
if (it == streams.end())
{
LOG_WARN("[RTMP] " + key + " not found.");
return;
}
// mark it to stop and take ownership
it->second->running.store(false);
taken_ctx = std::move(it->second);
streams.erase(it);
} }
it->second.running = false; // join outside lock
auto th = std::move(it->second.thread); if (taken_ctx && taken_ctx->thread.joinable())
streams.erase(it); taken_ctx->thread.join();
lock.unlock();
if (th.joinable())
th.join();
update_status(key, {false, StreamResult::OK, "Stopped manually"}); update_status(key, {false, StreamResult::OK, "Stopped manually"});
LOG_INFO("[RTMP] stop_camera finished for " + key);
} }
void RTMPManager::stop_all() void RTMPManager::stop_all()
{ {
std::vector<std::unique_ptr<StreamContext>> taken;
std::vector<std::string> keys; std::vector<std::string> keys;
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams) for (auto &kv : streams)
{
keys.push_back(kv.first); keys.push_back(kv.first);
kv.second->running.store(false);
}
// move all contexts out
for (auto &k : keys)
{
auto it = streams.find(k);
if (it != streams.end())
taken.push_back(std::move(it->second));
streams.erase(k);
}
} }
for (auto &key : keys)
// join threads
for (size_t i = 0; i < taken.size(); ++i)
{ {
if (key.find("_main") != std::string::npos) if (taken[i] && taken[i]->thread.joinable())
stop_camera(key.substr(0, key.size() - 5), StreamType::MAIN); taken[i]->thread.join();
else
stop_camera(key.substr(0, key.size() - 4), StreamType::SUB);
} }
LOG_INFO("[RTMP] stop_all completed.");
} }
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
@ -243,14 +291,14 @@ bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
auto key = make_stream_key(cam_name, type); auto key = make_stream_key(cam_name, type);
auto it = streams.find(key); auto it = streams.find(key);
return it != streams.end() && it->second.running; return it != streams.end() && it->second->running.load();
} }
bool RTMPManager::is_any_streaming() bool RTMPManager::is_any_streaming()
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams) for (auto &kv : streams)
if (kv.second.running) if (kv.second->running.load())
return true; return true;
return false; return false;
} }