From 689c10bd912f7e562f3a628290a0a481489072b5 Mon Sep 17 00:00:00 2001 From: lyq Date: Mon, 2 Feb 2026 14:13:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E5=85=88=E8=AE=A2=E9=98=85to?= =?UTF-8?q?pic=E5=86=8D=E8=8E=B7=E5=8F=96vid=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task_manager/src/task_manager_main.cpp | 57 +++++++++++++++++-- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/src/communication/task_manager/src/task_manager_main.cpp b/src/communication/task_manager/src/task_manager_main.cpp index a5f456f..b95a8fa 100644 --- a/src/communication/task_manager/src/task_manager_main.cpp +++ b/src/communication/task_manager/src/task_manager_main.cpp @@ -359,13 +359,35 @@ int handleMqttMessage(char* topicName, int topicLen, MQTTClient_message* message void try_subscribe_task_topic() { - if (!identity_ready.load()) return; - if (mqtt_topic_sub_task.empty()) return; + if (!identity_ready.load()) { + LOG_DEBUG("[TASK] Identity not ready, skipping subscribe"); + return; + } + if (mqtt_topic_sub_task.empty()) { + LOG_DEBUG("[TASK] MQTT topic not configured, skipping subscribe"); + return; + } - if (subscribed.exchange(true)) return; // 已经订阅过了 + if (!mqtt_manager.isConnected()) { + LOG_DEBUG("[TASK] MQTT not connected, skipping subscribe"); + subscribed.store(false); // 确保下次可以重试 + return; + } + + if (subscribed.load()) { + LOG_DEBUG("[TASK] Already subscribed, skipping"); + return; + } LOG_INFO("[TASK] subscribe MQTT topic: %s", mqtt_topic_sub_task.c_str()); - mqtt_manager.subscribe(mqtt_topic_sub_task, 0); + bool success = mqtt_manager.subscribe(mqtt_topic_sub_task, 0); + if (success) { + subscribed.store(true); + LOG_INFO("[TASK] Subscribe successful"); + } else { + LOG_ERROR("[TASK] Subscribe failed, will retry later"); + subscribed.store(false); // 订阅失败,允许重试 + } } // 周期性上报任务状态到MQTT(200ms间隔持续上报) @@ -420,7 +442,10 @@ class TaskManagerNode : public rclcpp::Node vid = msg->vid; identity_ready.store(!vid.empty()); - if (!identity_ready.load()) return; + if (!identity_ready.load()) { + LOG_WARN("[TASK] Invalid VID received: %s", msg->vid.c_str()); + return; + } // 拼 topic mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl; @@ -435,6 +460,8 @@ class TaskManagerNode : public rclcpp::Node LOG_INFO("[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(), mqtt_topic_sub_task.c_str(), mqtt_topic_push_status.c_str()); + // 重置订阅状态,确保可以重新尝试订阅 + subscribed.store(false); try_subscribe_task_topic(); }); } @@ -521,6 +548,7 @@ int main(int argc, char** argv) { LOG_INFO("[TASK] MQTT reconnected: %s", cause); subscribed.store(false); + // 尝试重新订阅任务话题 try_subscribe_task_topic(); }); @@ -539,9 +567,28 @@ int main(int argc, char** argv) rclcpp::executors::SingleThreadedExecutor executor; executor.add_node(node); + // 用于定期检查订阅状态的定时器 + auto last_check_time = std::chrono::steady_clock::now(); + const std::chrono::seconds SUBSCRIBE_CHECK_INTERVAL = std::chrono::seconds(5); + while (rclcpp::ok() && !signal_received) { executor.spin_some(); + + // 定期检查并尝试订阅任务话题 + auto now = std::chrono::steady_clock::now(); + if (now - last_check_time >= SUBSCRIBE_CHECK_INTERVAL) + { + last_check_time = now; + + // 如果已经准备好身份但尚未成功订阅,则尝试订阅 + if (identity_ready.load() && !subscribed.load()) + { + LOG_INFO("[TASK] Periodic check: identity ready but not subscribed, attempting subscribe"); + try_subscribe_task_topic(); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); }