From c6b3dedfdc34e8288e15e5c96d9374a84bf38f32 Mon Sep 17 00:00:00 2001 From: cxh Date: Mon, 5 Jan 2026 14:01:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8F=96=E6=B6=88=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/mqtt_client.hpp | 27 +++++++++++++++------------ src/mqtt_client.cpp | 15 +++++++++++++++ src/mqtt_client_wrapper.cpp | 18 +++++++++++++++++- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/include/mqtt_client.hpp b/include/mqtt_client.hpp index 69ad3c4..360db52 100644 --- a/include/mqtt_client.hpp +++ b/include/mqtt_client.hpp @@ -1,23 +1,25 @@ // mqtt_client.hpp #pragma once -#include +#include + +#include #include #include -#include #include -#include +#include + #include "app_config.hpp" #include "logger.hpp" class MQTTClient : public virtual mqtt::callback { -public: + public: using ConnectCallback = std::function; using DisconnectCallback = std::function; - using MessageCallback = std::function; + using MessageCallback = std::function; - explicit MQTTClient(const MQTTConfig &config); + explicit MQTTClient(const MQTTConfig& config); ~MQTTClient(); void setConnectCallback(ConnectCallback cb); @@ -27,18 +29,19 @@ public: void connect(); void disconnect(); void force_disconnect(); - void publish(const std::string &topic, const std::string &payload, int qos = -1); - void subscribe(const std::string &topic, int qos = -1); - void switchServer(const MQTTConfig &newConfig); + void publish(const std::string& topic, const std::string& payload, int qos = -1); + void subscribe(const std::string& topic, int qos = -1); + void unsubscribe(const std::string& topic); + void switchServer(const MQTTConfig& newConfig); bool isConnected() const; -private: + private: void initializeClient(); // mqtt::callback 实现 - void connection_lost(const std::string &cause) override; - void connected(const std::string &cause) override; + void connection_lost(const std::string& cause) override; + void connected(const std::string& cause) override; void message_arrived(mqtt::const_message_ptr msg) override; void delivery_complete(mqtt::delivery_token_ptr token) override {} diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp index 9b6f477..5b743d2 100644 --- a/src/mqtt_client.cpp +++ b/src/mqtt_client.cpp @@ -137,6 +137,21 @@ void MQTTClient::subscribe(const std::string& topic, int qos) } } +void MQTTClient::unsubscribe(const std::string& topic) +{ + if (!connected_ || topic.empty()) return; + + try + { + client_->unsubscribe(topic); + LOG_INFO("[MQTT] Unsubscribed from topic: " + topic); + } + catch (const mqtt::exception& e) + { + LOG_WARN(std::string("[MQTT] Unsubscribe failed: ") + e.what()); + } +} + void MQTTClient::switchServer(const MQTTConfig& newConfig) { std::lock_guard lock(mutex_); diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 0622a91..dbf5bef 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -25,15 +25,31 @@ static std::string g_last_vid; static bool try_activate_mqtt() { if (g_app_config.runtime_vid.empty()) return false; + if (!mqtt_client || !mqtt_client->isConnected()) return false; + // 如果已激活且 VID 未变化,什么都不做 if (g_mqtt_activated.load() && g_last_vid == g_app_config.runtime_vid) return true; - // VID 首次 or 发生变化 + // ---------- 1. 如果是 VID 变化,先取消旧订阅 ---------- + if (g_mqtt_activated.load()) + { + LOG_INFO("[MQTT] VID changed, unsubscribe old topics: " + g_last_vid); + + mqtt_client->unsubscribe(g_prev_topics.video_down); + mqtt_client->unsubscribe(g_prev_topics.record_query); + mqtt_client->unsubscribe(g_prev_topics.record_play); + mqtt_client->unsubscribe(g_prev_topics.vehicle_ctrl); + mqtt_client->unsubscribe(g_prev_topics.heartbeat_up); + } + + // ---------- 2. 切换到新 VID ---------- g_app_config.mqtt.topics.fill_with_veh_id(g_app_config.runtime_vid); + g_prev_topics = g_app_config.mqtt.topics; g_last_vid = g_app_config.runtime_vid; LOG_INFO("[MQTT] Activated with VID=" + g_last_vid); + // ---------- 3. 订阅新 topic ---------- mqtt_client->subscribe(g_app_config.mqtt.topics.video_down); mqtt_client->subscribe(g_app_config.mqtt.topics.record_query); mqtt_client->subscribe(g_app_config.mqtt.topics.record_play);