// mqtt_client_wrapper.cpp #include "mqtt_client_wrapper.hpp" #include #include #include #include #include #include #include #include "record_manager.hpp" std::shared_ptr mqtt_client; extern std::atomic g_running; std::atomic g_streaming{false}; std::string g_dispatch_id; std::mutex g_dispatch_id_mutex; static void send_heartbeat() { if (!mqtt_client || !mqtt_client->isConnected()) return; // 获取当前时间戳(13位毫秒) auto now = std::chrono::system_clock::now(); auto ms = std::chrono::duration_cast(now.time_since_epoch()).count(); // 获取 RTMP 状态 auto channels_info = RTMPManager::get_all_channels_status(); nlohmann::json channels = nlohmann::json::array(); int total = static_cast(channels_info.size()); int running_count = 0; for (const auto& ch : channels_info) { nlohmann::json item; item["loc"] = ch.loc; item["running"] = ch.running; if (ch.running) { item["reason"] = nullptr; running_count++; } else { item["reason"] = ch.reason.empty() ? "Unknown error" : ch.reason; } channels.push_back(item); } nlohmann::json hb; hb["timestamp"] = ms; hb["status"] = (running_count == 0) ? 0 // 全部失败 : (running_count == total ? 1 : 2); // 全部正常 or 部分异常 hb["channels"] = channels; // 发布心跳 mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump(), 0); // LOG_INFO("[MQTT] Sent video heartbeat (" + std::to_string(running_count) + "/" + std::to_string(total) + // " running): " + hb.dump()); } // MQTT 回调 static void on_mqtt_connected() { LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip); mqtt_client->subscribe(g_app_config.mqtt.topics.video_down); mqtt_client->subscribe(g_app_config.mqtt.topics.record_query); mqtt_client->subscribe(g_app_config.mqtt.topics.record_play); } static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); } static void handle_video_down_request(const nlohmann::json& req) { if (!req.contains("data") || !req["data"].is_object()) { LOG_WARN("[video_down] Missing data"); return; } int sw = req["data"].value("switch", 0); if (sw != 1) { LOG_INFO("[video_down] switch != 1, ignore"); return; } // 取得当前所有 RTMP 播放地址 auto channels_info = RTMPManager::get_all_channels_status(); // 构造响应 nlohmann::json resp; resp["type"] = "response"; resp["seqNo"] = req.value("seqNo", "0"); resp["data"] = nlohmann::json::array(); int loc = 0; for (const auto& ch : channels_info) { resp["data"].push_back({{"loc", loc}, {"url", ch.url}}); loc++; } // 发布回复 mqtt_client->publish(g_app_config.mqtt.topics.video_down, resp.dump(-1), 1); } static void handle_record_query_request(const nlohmann::json& req) { // 1. 解析字段 if (!req.contains("data") || !req["data"].is_object()) { LOG_WARN("[record_query] missing data"); return; } const auto& d = req["data"]; int loc = d.value("loc", -1); int64_t start = d.value("startTime", -1LL); int64_t end = d.value("endTime", -1LL); if (loc < 0 || start <= 0 || end <= 0) { LOG_WARN("[record_query] invalid parameters"); return; } LOG_INFO("[record_query] loc=" + std::to_string(loc) + " start=" + std::to_string(start) + " end=" + std::to_string(end)); // 2. 计算 stream 名 // loc → stream,比如: // 0 → "AHD1_main" std::string stream = RTMPManager::loc_to_stream(loc); if (stream.empty()) { LOG_WARN("[record_query] invalid loc: " + std::to_string(loc)); return; } // 3. 向 RecordManager 查询录像段 auto segs = g_record_manager->querySegments(stream, start, end); LOG_INFO("[record_query] Found segments=" + std::to_string(segs.size())); // 4. 构造响应 JSON nlohmann::json resp; resp["type"] = "response"; resp["seqNo"] = req.value("seqNo", "0"); nlohmann::json data; data["loc"] = loc; data["segments"] = nlohmann::json::array(); int idx = 1; for (const auto& seg : segs) { nlohmann::json s; s["index"] = idx++; s["segmentId"] = seg.segment_id; s["startTime"] = seg.start_ms; s["endTime"] = seg.end_ms; data["segments"].push_back(s); } resp["data"] = data; // 5. 发送 MQTT mqtt_client->publish(g_app_config.mqtt.topics.record_query, resp.dump(-1), 1); } static void handle_record_play_request(const nlohmann::json& req) {} static void on_mqtt_message_received(const std::string& topic, const std::string& message) { try { auto j = nlohmann::json::parse(message); // 必须是 request if (!j.contains("type") || j["type"] != "request") { LOG_INFO("[MQTT] Ignored non-request message"); return; } LOG_INFO("[MQTT] Received request on [" + topic + "]"); LOG_INFO("[MQTT] Payload: " + j.dump(-1)); // 根据 topic 分发 if (topic == g_app_config.mqtt.topics.video_down) { handle_video_down_request(j); } else if (topic == g_app_config.mqtt.topics.record_query) { handle_record_query_request(j); } else if (topic == g_app_config.mqtt.topics.record_play) { handle_record_play_request(j); } else { LOG_WARN("[MQTT] Unknown topic: " + topic); } } catch (const std::exception& e) { LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); } } void mqtt_client_thread_func() { const auto& cfg = g_app_config.mqtt; auto heartbeat_interval = std::chrono::milliseconds(static_cast(cfg.keep_alive * 0.9)); while (g_running) { mqtt_client = std::make_unique(cfg); mqtt_client->setConnectCallback(on_mqtt_connected); mqtt_client->setDisconnectCallback(on_mqtt_disconnected); mqtt_client->setMessageCallback(on_mqtt_message_received); // 等待连接 while (g_running && !mqtt_client->isConnected()) { mqtt_client->connect(); for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++) std::this_thread::sleep_for(std::chrono::milliseconds(50)); if (!g_running && !mqtt_client->isConnected()) mqtt_client->force_disconnect(); } // 主循环:心跳 while (g_running && mqtt_client->isConnected()) { send_heartbeat(); auto sleep_time = heartbeat_interval; while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected()) { auto chunk = std::min(sleep_time, std::chrono::milliseconds(50)); std::this_thread::sleep_for(chunk); sleep_time -= chunk; } if (!g_running && mqtt_client->isConnected()) mqtt_client->force_disconnect(); } // 清理 if (mqtt_client) { if (g_running) mqtt_client->disconnect(); else mqtt_client->force_disconnect(); mqtt_client.reset(); } if (!g_running) break; // 短暂等待再重连 for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200)); } LOG_INFO("[MQTT] Client thread exiting."); }