diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 2bd21a9..42df80a 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -6,6 +6,41 @@ std::atomic mqtt_restart_required{false}; extern std::atomic g_running; +std::atomic g_dispatch_id{""}; +auto last_heartbeat = std::chrono::steady_clock::now(); +auto heartbeat_interval = std::chrono::milliseconds(static_cast(cfg.keep_alive * 0.9)); + +static void send_heartbeat() +{ + if (!mqtt_client || !mqtt_client->isConnected()) + return; + + // 判断当前状态 + int status = 2; // 默认等待中 + for (const auto &cam : g_app_config.cameras) + { + if (cam.enabled && RTSPManager::is_streaming(cam.name)) + { + status = 0; // 有推流摄像头 + break; + } + } + + nlohmann::json hb_data; + hb_data["time"] = Logger::get_current_time_utc8(); + if (!g_dispatch_id.load().empty()) + hb_data["dispatchId"] = g_dispatch_id.load(); + hb_data["status"] = status; + + nlohmann::json msg; + msg["data"] = hb_data; + msg["isEnc"] = 0; + msg["type"] = 0; + + mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, msg.dump()); + LOG_INFO("[MQTT] Sent heartbeat: " + msg.dump()); +} + // MQTT 回调定义 static void on_mqtt_connected() { @@ -39,6 +74,11 @@ static void on_mqtt_message_received(const std::string &topic, const std::string return; } + if (j["data"].contains("dispatchId")) + { + g_dispatch_id = j["data"]["dispatchId"].get(); + } + int status = j["data"]["status"].get(); std::string seqNo = j["data"]["seqNo"].get(); bool success = true; // 标记是否操作成功 @@ -172,7 +212,21 @@ void mqtt_client_thread_func() while (!mqtt_restart_required && g_running) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto now = std::chrono::steady_clock::now(); + if (now - last_heartbeat >= heartbeat_interval) + { + try + { + send_heartbeat(); + } + catch (const std::exception &e) + { + LOG_ERROR(std::string("[MQTT] Heartbeat error: ") + e.what()); + } + last_heartbeat = now; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 轻量轮询 } // 需要重启或退出