// rtmp_manager.cpp #include "rtmp_manager.hpp" #include #include "logger.hpp" std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; std::queue RTMPManager::request_queue; std::mutex RTMPManager::queue_mutex; std::condition_variable RTMPManager::queue_cv; std::thread RTMPManager::worker_thread; std::atomic RTMPManager::running{false}; void RTMPManager::init() { gst_init(nullptr, nullptr); running = true; worker_thread = std::thread(rtmp_worker_thread); LOG_INFO("[RTMP] Initialized and worker thread started."); } void RTMPManager::stop_all() { running = false; queue_cv.notify_all(); // 停止所有流 { std::lock_guard lock(streams_mutex); for (auto &kv : streams) kv.second->running = false; } if (worker_thread.joinable()) worker_thread.join(); LOG_INFO("[RTMP] stop_all completed."); } void RTMPManager::enqueue_video_push_request(const VideoPushRequest &req) { { std::lock_guard lock(queue_mutex); request_queue.push(req); } queue_cv.notify_one(); } void RTMPManager::rtmp_worker_thread() { while (running) { VideoPushRequest req; { std::unique_lock lock(queue_mutex); queue_cv.wait(lock, [] { return !request_queue.empty() || !running; }); if (!running) break; req = request_queue.front(); request_queue.pop(); } nlohmann::json results = nlohmann::json::array(); for (auto &item : req.data) { for (int ch : item.channels) { if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) continue; Camera &cam = g_app_config.cameras[ch]; StreamResultInfo res; if (item.switchVal == 0) res = start_camera(cam, item.streamType); else res = stop_camera(cam, item.streamType); res.loc = ch; res.url = "rtmp://127.0.0.1/live/" + cam.name; results.push_back({{"loc", res.loc}, {"url", res.url}, {"result", res.result}, {"reason", res.reason}}); } } // 上报 MQTT nlohmann::json reply; reply["type"] = "response"; reply["seqNo"] = req.seqNo; reply["data"] = results; if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump()); } } // 简化 start_camera / stop_camera,不阻塞 MQTT 回调 RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, int streamType) { StreamResultInfo res; std::string key = cam.name + (streamType == 0 ? "_main" : "_sub"); { std::lock_guard lock(streams_mutex); if (streams.find(key) != streams.end()) { res.result = 0; res.reason = "Already streaming"; return res; } auto ctx = std::make_unique(); ctx->running = true; ctx->thread = std::thread( [cam, streamType, ctx_ptr = ctx.get()] { // TODO: GStreamer pipeline loop std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟启动 StreamResultInfo r; r.result = 0; r.reason = "Started OK"; ctx_ptr->start_result.set_value(r); }); streams[key] = std::move(ctx); } // 非阻塞:直接返回 future 的结果即可 try { res = streams[key]->start_result.get_future().get(); } catch (...) { res.result = 1; res.reason = "Exception in start_camera"; } return res; } RTMPManager::StreamResultInfo RTMPManager::stop_camera(const Camera &cam, int streamType) { StreamResultInfo res; std::string key = cam.name + (streamType == 0 ? "_main" : "_sub"); std::unique_ptr ctx; { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end()) { res.result = 0; res.reason = "Not streaming"; return res; } ctx = std::move(it->second); streams.erase(it); ctx->running = false; } if (ctx->thread.joinable()) ctx->thread.join(); res.result = 0; res.reason = "Stopped OK"; return res; }