This commit is contained in:
cxh 2025-10-17 09:00:12 +08:00
parent ba8d3ece68
commit b2e59af1bf
3 changed files with 72 additions and 26 deletions

View File

@ -67,7 +67,6 @@ class RTMPManager
std::atomic<bool> running{false}; std::atomic<bool> running{false};
std::thread thread; std::thread thread;
// 新增:把启动阶段的 promise 放到上下文里,避免悬垂引用
std::promise<StreamStatus> start_promise; std::promise<StreamStatus> start_promise;
std::atomic<bool> start_promise_set{false}; std::atomic<bool> start_promise_set{false};
@ -76,6 +75,22 @@ class RTMPManager
StreamContext() = default; StreamContext() = default;
StreamContext(const StreamContext &) = delete; StreamContext(const StreamContext &) = delete;
StreamContext &operator=(const StreamContext &) = delete; StreamContext &operator=(const StreamContext &) = delete;
~StreamContext()
{
try
{
running.store(false);
if (thread.joinable())
{
thread.join(); // 防止 std::terminate()
}
}
catch (...)
{
// 安全兜底,不抛异常
}
}
}; };
static std::string make_stream_key(const std::string &cam_name, StreamType type); static std::string make_stream_key(const std::string &cam_name, StreamType type);

View File

@ -79,10 +79,12 @@ static void on_mqtt_message_received(const std::string &topic, const std::string
} }
} }
// 异步执行推流任务
std::thread( std::thread(
[req]() [req]()
{ {
// 调用 RTMP 模块执行批量推流启停 try
{
auto results = RTMPManager::process_push_request(req); auto results = RTMPManager::process_push_request(req);
// 组装 MQTT 回复 // 组装 MQTT 回复
@ -101,9 +103,17 @@ static void on_mqtt_message_received(const std::string &topic, const std::string
reply["data"].push_back(item); reply["data"].push_back(item);
} }
// 发送回复到 MQTT
mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump(), 1); mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump(), 1);
LOG_INFO("[MQTT] Sent RTMP response: " + reply.dump()); LOG_INFO("[MQTT] Sent RTMP response: " + reply.dump());
}
catch (const std::exception &e)
{
LOG_ERROR(std::string("[MQTT] Exception in RTMP thread: ") + e.what());
}
catch (...)
{
LOG_ERROR("[MQTT] Unknown exception in RTMP thread");
}
}) })
.detach(); .detach();
} }

View File

@ -162,11 +162,17 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
{ {
std::lock_guard<std::mutex> lock(streams_mutex); std::lock_guard<std::mutex> lock(streams_mutex);
auto it = streams.find(key); auto it = streams.find(key);
if (it != streams.end() && it->second->running.load())
// 若旧线程仍在运行,先安全停止
if (it != streams.end())
{ {
res.result = 1; // 修改为 1表示重复启动不是真成功 if (it->second->running.load())
res.reason = "Already streaming"; {
return res; LOG_WARN("[RTMP] Restarting existing stream: " + key);
it->second->running.store(false);
if (it->second->thread.joinable()) it->second->thread.join();
}
streams.erase(it);
} }
ctx = std::make_unique<StreamContext>(); ctx = std::make_unique<StreamContext>();
@ -174,7 +180,22 @@ RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, Strea
status_future = ctx->start_promise.get_future(); status_future = ctx->start_promise.get_future();
StreamContext *ctx_ptr = ctx.get(); StreamContext *ctx_ptr = ctx.get();
ctx->thread = std::thread([cam, type, ctx_ptr]() { RTMPManager::stream_loop(cam, type, ctx_ptr); }); ctx->thread = std::thread(
[cam, type, ctx_ptr]()
{
try
{
RTMPManager::stream_loop(cam, type, ctx_ptr);
}
catch (const std::exception &e)
{
LOG_ERROR("[RTMP] Exception in stream thread: " + std::string(e.what()));
}
catch (...)
{
LOG_ERROR("[RTMP] Unknown exception in stream thread");
}
});
streams.emplace(key, std::move(ctx)); streams.emplace(key, std::move(ctx));
} }