diff --git a/src/communication/task_manager/src/task_manager_main.cpp b/src/communication/task_manager/src/task_manager_main.cpp index a5f456f..b95a8fa 100644 --- a/src/communication/task_manager/src/task_manager_main.cpp +++ b/src/communication/task_manager/src/task_manager_main.cpp @@ -359,13 +359,35 @@ int handleMqttMessage(char* topicName, int topicLen, MQTTClient_message* message void try_subscribe_task_topic() { - if (!identity_ready.load()) return; - if (mqtt_topic_sub_task.empty()) return; + if (!identity_ready.load()) { + LOG_DEBUG("[TASK] Identity not ready, skipping subscribe"); + return; + } + if (mqtt_topic_sub_task.empty()) { + LOG_DEBUG("[TASK] MQTT topic not configured, skipping subscribe"); + return; + } - if (subscribed.exchange(true)) return; // 已经订阅过了 + if (!mqtt_manager.isConnected()) { + LOG_DEBUG("[TASK] MQTT not connected, skipping subscribe"); + subscribed.store(false); // 确保下次可以重试 + return; + } + + if (subscribed.load()) { + LOG_DEBUG("[TASK] Already subscribed, skipping"); + return; + } LOG_INFO("[TASK] subscribe MQTT topic: %s", mqtt_topic_sub_task.c_str()); - mqtt_manager.subscribe(mqtt_topic_sub_task, 0); + bool success = mqtt_manager.subscribe(mqtt_topic_sub_task, 0); + if (success) { + subscribed.store(true); + LOG_INFO("[TASK] Subscribe successful"); + } else { + LOG_ERROR("[TASK] Subscribe failed, will retry later"); + subscribed.store(false); // 订阅失败,允许重试 + } } // 周期性上报任务状态到MQTT(200ms间隔持续上报) @@ -420,7 +442,10 @@ class TaskManagerNode : public rclcpp::Node vid = msg->vid; identity_ready.store(!vid.empty()); - if (!identity_ready.load()) return; + if (!identity_ready.load()) { + LOG_WARN("[TASK] Invalid VID received: %s", msg->vid.c_str()); + return; + } // 拼 topic mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl; @@ -435,6 +460,8 @@ class TaskManagerNode : public rclcpp::Node LOG_INFO("[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(), mqtt_topic_sub_task.c_str(), mqtt_topic_push_status.c_str()); + // 重置订阅状态,确保可以重新尝试订阅 + subscribed.store(false); try_subscribe_task_topic(); }); } @@ -521,6 +548,7 @@ int main(int argc, char** argv) { LOG_INFO("[TASK] MQTT reconnected: %s", cause); subscribed.store(false); + // 尝试重新订阅任务话题 try_subscribe_task_topic(); }); @@ -539,9 +567,28 @@ int main(int argc, char** argv) rclcpp::executors::SingleThreadedExecutor executor; executor.add_node(node); + // 用于定期检查订阅状态的定时器 + auto last_check_time = std::chrono::steady_clock::now(); + const std::chrono::seconds SUBSCRIBE_CHECK_INTERVAL = std::chrono::seconds(5); + while (rclcpp::ok() && !signal_received) { executor.spin_some(); + + // 定期检查并尝试订阅任务话题 + auto now = std::chrono::steady_clock::now(); + if (now - last_check_time >= SUBSCRIBE_CHECK_INTERVAL) + { + last_check_time = now; + + // 如果已经准备好身份但尚未成功订阅,则尝试订阅 + if (identity_ready.load() && !subscribed.load()) + { + LOG_INFO("[TASK] Periodic check: identity ready but not subscribed, attempting subscribe"); + try_subscribe_task_topic(); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); }