This commit is contained in:
cxh 2025-10-15 08:39:35 +08:00
parent 8ae850dd63
commit 92dc5087bd

View File

@ -81,7 +81,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
{
{
std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running)
if (!streams[key].running.load())
break;
}
@ -109,7 +109,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
if (!streams[key].running)
if (!streams[key].running.load())
break;
}
@ -140,8 +140,8 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
}
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
gst_object_unref(bus);
gst_object_unref(pipeline);
if (!error_occurred)
break;
@ -150,7 +150,7 @@ void RTMPManager::stream_loop(Camera cam, StreamType type)
{
LOG_ERROR("[RTMP] Max retries reached for '" + key + "'. Stopping reconnection attempts.");
std::lock_guard<std::mutex> lock(streams_mutex);
streams[key].running = false;
streams[key].running.store(false);
break;
}
@ -165,22 +165,25 @@ void RTMPManager::start_camera(const Camera &cam, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam.name, type);
if (streams.count(key) && streams[key].running)
if (streams.count(key) && streams[key].running.load())
{
LOG_WARN("[RTMP] Camera '" + key + "' already streaming.");
return;
}
StreamContext ctx;
ctx.running = true;
ctx.running.store(true);
ctx.thread = std::thread([cam, type]()
{ RTMPManager::stream_loop(cam, type); });
streams.emplace(key, std::move(ctx));
// 修复 emplace 构造失败问题
streams.emplace(std::move(key), std::move(ctx));
}
void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
{
std::lock_guard<std::mutex> lock(streams_mutex);
std::unique_lock<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type);
auto it = streams.find(key);
if (it == streams.end())
@ -189,11 +192,13 @@ void RTMPManager::stop_camera(const std::string &cam_name, StreamType type)
return;
}
it->second.running = false;
if (it->second.thread.joinable())
it->second.thread.join();
it->second.running.store(false);
auto th = std::move(it->second.thread);
streams.erase(it);
lock.unlock();
if (th.joinable())
th.join();
}
bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
@ -201,42 +206,35 @@ bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type)
std::lock_guard<std::mutex> lock(streams_mutex);
std::string key = make_stream_key(cam_name, type);
auto it = streams.find(key);
return it != streams.end() && it->second.running;
return it != streams.end() && it->second.running.load();
}
bool RTMPManager::is_any_streaming()
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
if (kv.second.running)
if (kv.second.running.load())
return true;
return false;
}
void RTMPManager::stop_all()
{
std::vector<std::string> keys;
std::vector<std::pair<std::string, StreamType>> targets;
{
std::lock_guard<std::mutex> lock(streams_mutex);
for (auto &kv : streams)
keys.push_back(kv.first);
{
if (kv.first.size() > 4 && kv.first.find("_sub") == kv.first.size() - 4)
targets.emplace_back(kv.first.substr(0, kv.first.size() - 4), StreamType::SUB);
else
targets.emplace_back(kv.first.substr(0, kv.first.size() - 5), StreamType::MAIN);
}
}
for (auto &key : keys)
{
if (key.size() > 4 && key.find("_sub") == key.size() - 4)
{
stop_camera(key.substr(0, key.size() - 4), StreamType::SUB);
}
else if (key.size() > 5 && key.find("_main") == key.size() - 5)
{
stop_camera(key.substr(0, key.size() - 5), StreamType::MAIN);
}
else
{
stop_camera(key, StreamType::MAIN);
}
}
for (auto &[name, type] : targets)
stop_camera(name, type);
}
std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type)