temp
This commit is contained in:
parent
fd75b753d1
commit
cb37ca9eb3
@ -9,8 +9,7 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <condition_variable>
|
#include <algorithm>
|
||||||
#include <queue>
|
|
||||||
#include "app_config.hpp"
|
#include "app_config.hpp"
|
||||||
|
|
||||||
class RTMPManager
|
class RTMPManager
|
||||||
@ -41,7 +40,6 @@ public:
|
|||||||
static bool is_streaming(const std::string &cam_name, StreamType type);
|
static bool is_streaming(const std::string &cam_name, StreamType type);
|
||||||
static bool is_any_streaming();
|
static bool is_any_streaming();
|
||||||
static std::string get_stream_url(const std::string &cam_name, StreamType type);
|
static std::string get_stream_url(const std::string &cam_name, StreamType type);
|
||||||
|
|
||||||
static void set_status_callback(StreamCallback cb);
|
static void set_status_callback(StreamCallback cb);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -51,19 +49,10 @@ private:
|
|||||||
std::thread thread;
|
std::thread thread;
|
||||||
StreamStatus status;
|
StreamStatus status;
|
||||||
StreamContext() = default;
|
StreamContext() = default;
|
||||||
// non-copyable
|
|
||||||
StreamContext(const StreamContext &) = delete;
|
StreamContext(const StreamContext &) = delete;
|
||||||
StreamContext &operator=(const StreamContext &) = delete;
|
StreamContext &operator=(const StreamContext &) = delete;
|
||||||
// movable? we avoid moving by using unique_ptr
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// RTMPManager 内部增加
|
|
||||||
static std::mutex stop_queue_mutex;
|
|
||||||
static std::condition_variable stop_cv;
|
|
||||||
static std::queue<std::unique_ptr<StreamContext>> stop_queue;
|
|
||||||
static std::atomic<bool> stop_thread_running;
|
|
||||||
static std::thread stop_thread;
|
|
||||||
// store unique_ptr to avoid copy/move issues with atomic/thread
|
|
||||||
static std::unordered_map<std::string, std::unique_ptr<StreamContext>> streams;
|
static std::unordered_map<std::string, std::unique_ptr<StreamContext>> streams;
|
||||||
static std::mutex streams_mutex;
|
static std::mutex streams_mutex;
|
||||||
static StreamCallback status_callback;
|
static StreamCallback status_callback;
|
||||||
@ -72,4 +61,8 @@ private:
|
|||||||
static GstElement *create_pipeline(const Camera &cam, StreamType type);
|
static GstElement *create_pipeline(const Camera &cam, StreamType type);
|
||||||
static std::string make_stream_key(const std::string &cam_name, StreamType type);
|
static std::string make_stream_key(const std::string &cam_name, StreamType type);
|
||||||
static void update_status(const std::string &key, const StreamStatus &status);
|
static void update_status(const std::string &key, const StreamStatus &status);
|
||||||
|
static inline std::string stream_type_suffix(StreamType type)
|
||||||
|
{
|
||||||
|
return (type == StreamType::MAIN) ? "_main" : "_sub";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@ -4,24 +4,12 @@
|
|||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <algorithm>
|
#include <utility>
|
||||||
#include <utility> // for move
|
|
||||||
#include <memory>
|
|
||||||
|
|
||||||
std::mutex RTMPManager::stop_queue_mutex;
|
|
||||||
std::condition_variable RTMPManager::stop_cv;
|
|
||||||
std::queue<std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::stop_queue;
|
|
||||||
std::atomic<bool> RTMPManager::stop_thread_running{false};
|
|
||||||
std::thread RTMPManager::stop_thread;
|
|
||||||
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
|
std::unordered_map<std::string, std::unique_ptr<RTMPManager::StreamContext>> RTMPManager::streams;
|
||||||
std::mutex RTMPManager::streams_mutex;
|
std::mutex RTMPManager::streams_mutex;
|
||||||
RTMPManager::StreamCallback RTMPManager::status_callback = nullptr;
|
RTMPManager::StreamCallback RTMPManager::status_callback = nullptr;
|
||||||
|
|
||||||
static inline std::string stream_type_suffix(StreamType type)
|
|
||||||
{
|
|
||||||
return (type == StreamType::MAIN) ? "_main" : "_sub";
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type)
|
std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type)
|
||||||
{
|
{
|
||||||
return cam_name + stream_type_suffix(type);
|
return cam_name + stream_type_suffix(type);
|
||||||
@ -31,31 +19,6 @@ void RTMPManager::init()
|
|||||||
{
|
{
|
||||||
gst_init(nullptr, nullptr);
|
gst_init(nullptr, nullptr);
|
||||||
LOG_INFO("[RTMP] GStreamer initialized.");
|
LOG_INFO("[RTMP] GStreamer initialized.");
|
||||||
|
|
||||||
// 启动 stop_worker_thread
|
|
||||||
stop_thread_running.store(true);
|
|
||||||
stop_thread = std::thread([]
|
|
||||||
{
|
|
||||||
while (stop_thread_running.load())
|
|
||||||
{
|
|
||||||
std::unique_ptr<StreamContext> ctx;
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(RTMPManager::stop_queue_mutex);
|
|
||||||
RTMPManager::stop_cv.wait(lock, []{
|
|
||||||
return !RTMPManager::stop_queue.empty() || !RTMPManager::stop_thread_running.load();
|
|
||||||
});
|
|
||||||
if (!RTMPManager::stop_queue.empty())
|
|
||||||
{
|
|
||||||
ctx = std::move(RTMPManager::stop_queue.front());
|
|
||||||
RTMPManager::stop_queue.pop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (ctx && ctx->thread.joinable())
|
|
||||||
{
|
|
||||||
ctx->thread.join();
|
|
||||||
LOG_INFO("[RTMP] Stream thread joined asynchronously");
|
|
||||||
}
|
|
||||||
} });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RTMPManager::set_status_callback(StreamCallback cb)
|
void RTMPManager::set_status_callback(StreamCallback cb)
|
||||||
@ -77,10 +40,7 @@ void RTMPManager::update_status(const std::string &key, const StreamStatus &stat
|
|||||||
|
|
||||||
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
|
GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
|
||||||
{
|
{
|
||||||
int width = cam.width;
|
int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate;
|
||||||
int height = cam.height;
|
|
||||||
int fps = cam.fps;
|
|
||||||
int bitrate = cam.bitrate;
|
|
||||||
|
|
||||||
if (type == StreamType::SUB)
|
if (type == StreamType::SUB)
|
||||||
{
|
{
|
||||||
@ -119,7 +79,7 @@ GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type)
|
|||||||
|
|
||||||
void RTMPManager::stream_loop(Camera cam, StreamType type)
|
void RTMPManager::stream_loop(Camera cam, StreamType type)
|
||||||
{
|
{
|
||||||
const std::string key = make_stream_key(cam.name, type);
|
std::string key = make_stream_key(cam.name, type);
|
||||||
LOG_INFO("[RTMP] Stream loop started for " + key);
|
LOG_INFO("[RTMP] Stream loop started for " + key);
|
||||||
|
|
||||||
const int MAX_RETRIES = 5;
|
const int MAX_RETRIES = 5;
|
||||||
@ -154,8 +114,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
msg = gst_bus_timed_pop_filtered(
|
msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE,
|
||||||
bus, GST_CLOCK_TIME_NONE,
|
|
||||||
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
|
static_cast<GstMessageType>(GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -222,6 +181,12 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
|
|||||||
void RTMPManager::start_camera(const Camera &cam, StreamType type)
|
void RTMPManager::start_camera(const Camera &cam, StreamType type)
|
||||||
{
|
{
|
||||||
std::string key = make_stream_key(cam.name, type);
|
std::string key = make_stream_key(cam.name, type);
|
||||||
|
auto ctx = std::make_unique<StreamContext>();
|
||||||
|
ctx->running.store(true);
|
||||||
|
ctx->status = {true, StreamResult::OK, ""};
|
||||||
|
ctx->thread = std::thread([cam, type]()
|
||||||
|
{ stream_loop(cam, type); });
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||||
auto it = streams.find(key);
|
auto it = streams.find(key);
|
||||||
@ -230,36 +195,16 @@ void RTMPManager::start_camera(const Camera &cam, StreamType type)
|
|||||||
LOG_WARN("[RTMP] " + key + " already streaming.");
|
LOG_WARN("[RTMP] " + key + " already streaming.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new context on heap and keep a unique_ptr in map
|
|
||||||
auto ctx = std::make_unique<StreamContext>();
|
|
||||||
ctx->running.store(true);
|
|
||||||
ctx->status = {true, StreamResult::OK, ""};
|
|
||||||
|
|
||||||
// start thread after moving unique_ptr into map (to avoid racing)
|
|
||||||
// insert placeholder first to reserve key, then start thread
|
|
||||||
streams.emplace(key, std::move(ctx));
|
streams.emplace(key, std::move(ctx));
|
||||||
}
|
}
|
||||||
|
|
||||||
// actually start thread outside the lock (we need to get pointer again)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
|
||||||
auto it = streams.find(key);
|
|
||||||
if (it != streams.end())
|
|
||||||
{
|
|
||||||
// start the thread that runs stream_loop; capture cam and type by value
|
|
||||||
it->second->thread = std::thread([cam, type]()
|
|
||||||
{ stream_loop(cam, type); });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_INFO("[RTMP] start_camera requested for " + key);
|
LOG_INFO("[RTMP] start_camera requested for " + key);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
|
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
|
||||||
{
|
{
|
||||||
std::string key = make_stream_key(cam_name, type);
|
std::string key = make_stream_key(cam_name, type);
|
||||||
std::unique_ptr<StreamContext> taken_ctx;
|
std::unique_ptr<StreamContext> ctx;
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||||
@ -269,54 +214,34 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
|
|||||||
LOG_WARN("[RTMP] " + key + " not found.");
|
LOG_WARN("[RTMP] " + key + " not found.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
it->second->running.store(false);
|
it->second->running.store(false);
|
||||||
taken_ctx = std::move(it->second);
|
ctx = std::move(it->second);
|
||||||
streams.erase(it);
|
streams.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 异步加入 stop 队列,由 stop_worker_thread join
|
if (ctx->thread.joinable())
|
||||||
if (taken_ctx)
|
ctx->thread.join();
|
||||||
{
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(stop_queue_mutex);
|
|
||||||
stop_queue.push(std::move(taken_ctx));
|
|
||||||
}
|
|
||||||
stop_cv.notify_one();
|
|
||||||
}
|
|
||||||
|
|
||||||
update_status(key, {false, StreamResult::OK, "Stopped manually"});
|
update_status(key, {false, StreamResult::OK, "Stopped manually"});
|
||||||
LOG_INFO("[RTMP] stop_camera queued for async stop: " + key);
|
LOG_INFO("[RTMP] stop_camera completed: " + key);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RTMPManager::stop_all()
|
void RTMPManager::stop_all()
|
||||||
{
|
{
|
||||||
std::vector<std::unique_ptr<StreamContext>> taken;
|
std::vector<std::unique_ptr<StreamContext>> ctxs;
|
||||||
std::vector<std::string> keys;
|
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||||
for (auto &kv : streams)
|
for (auto &kv : streams)
|
||||||
{
|
|
||||||
keys.push_back(kv.first);
|
|
||||||
kv.second->running.store(false);
|
kv.second->running.store(false);
|
||||||
}
|
for (auto &kv : streams)
|
||||||
// move all contexts out
|
ctxs.push_back(std::move(kv.second));
|
||||||
for (auto &k : keys)
|
streams.clear();
|
||||||
{
|
|
||||||
auto it = streams.find(k);
|
|
||||||
if (it != streams.end())
|
|
||||||
taken.push_back(std::move(it->second));
|
|
||||||
streams.erase(k);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// join threads
|
for (auto &ctx : ctxs)
|
||||||
for (size_t i = 0; i < taken.size(); ++i)
|
if (ctx->thread.joinable())
|
||||||
{
|
ctx->thread.join();
|
||||||
if (taken[i] && taken[i]->thread.joinable())
|
|
||||||
taken[i]->thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_INFO("[RTMP] stop_all completed.");
|
LOG_INFO("[RTMP] stop_all completed.");
|
||||||
}
|
}
|
||||||
@ -324,8 +249,7 @@ void RTMPManager::stop_all()
|
|||||||
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
|
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(streams_mutex);
|
std::lock_guard<std::mutex> lock(streams_mutex);
|
||||||
auto key = make_stream_key(cam_name, type);
|
auto it = streams.find(make_stream_key(cam_name, type));
|
||||||
auto it = streams.find(key);
|
|
||||||
return it != streams.end() && it->second->running.load();
|
return it != streams.end() && it->second->running.load();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user