修正先订阅topic再获取vid bug

This commit is contained in:
lyq 2026-02-02 14:13:33 +08:00
parent 93afcdebbb
commit 689c10bd91

View File

@ -359,13 +359,35 @@ int handleMqttMessage(char* topicName, int topicLen, MQTTClient_message* message
void try_subscribe_task_topic() void try_subscribe_task_topic()
{ {
if (!identity_ready.load()) return; if (!identity_ready.load()) {
if (mqtt_topic_sub_task.empty()) return; LOG_DEBUG("[TASK] Identity not ready, skipping subscribe");
return;
}
if (mqtt_topic_sub_task.empty()) {
LOG_DEBUG("[TASK] MQTT topic not configured, skipping subscribe");
return;
}
if (subscribed.exchange(true)) return; // 已经订阅过了 if (!mqtt_manager.isConnected()) {
LOG_DEBUG("[TASK] MQTT not connected, skipping subscribe");
subscribed.store(false); // 确保下次可以重试
return;
}
if (subscribed.load()) {
LOG_DEBUG("[TASK] Already subscribed, skipping");
return;
}
LOG_INFO("[TASK] subscribe MQTT topic: %s", mqtt_topic_sub_task.c_str()); LOG_INFO("[TASK] subscribe MQTT topic: %s", mqtt_topic_sub_task.c_str());
mqtt_manager.subscribe(mqtt_topic_sub_task, 0); bool success = mqtt_manager.subscribe(mqtt_topic_sub_task, 0);
if (success) {
subscribed.store(true);
LOG_INFO("[TASK] Subscribe successful");
} else {
LOG_ERROR("[TASK] Subscribe failed, will retry later");
subscribed.store(false); // 订阅失败,允许重试
}
} }
// 周期性上报任务状态到MQTT200ms间隔持续上报 // 周期性上报任务状态到MQTT200ms间隔持续上报
@ -420,7 +442,10 @@ class TaskManagerNode : public rclcpp::Node
vid = msg->vid; vid = msg->vid;
identity_ready.store(!vid.empty()); identity_ready.store(!vid.empty());
if (!identity_ready.load()) return; if (!identity_ready.load()) {
LOG_WARN("[TASK] Invalid VID received: %s", msg->vid.c_str());
return;
}
// 拼 topic // 拼 topic
mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl; mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl;
@ -435,6 +460,8 @@ class TaskManagerNode : public rclcpp::Node
LOG_INFO("[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(), mqtt_topic_sub_task.c_str(), LOG_INFO("[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(), mqtt_topic_sub_task.c_str(),
mqtt_topic_push_status.c_str()); mqtt_topic_push_status.c_str());
// 重置订阅状态,确保可以重新尝试订阅
subscribed.store(false);
try_subscribe_task_topic(); try_subscribe_task_topic();
}); });
} }
@ -521,6 +548,7 @@ int main(int argc, char** argv)
{ {
LOG_INFO("[TASK] MQTT reconnected: %s", cause); LOG_INFO("[TASK] MQTT reconnected: %s", cause);
subscribed.store(false); subscribed.store(false);
// 尝试重新订阅任务话题
try_subscribe_task_topic(); try_subscribe_task_topic();
}); });
@ -539,9 +567,28 @@ int main(int argc, char** argv)
rclcpp::executors::SingleThreadedExecutor executor; rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node); executor.add_node(node);
// 用于定期检查订阅状态的定时器
auto last_check_time = std::chrono::steady_clock::now();
const std::chrono::seconds SUBSCRIBE_CHECK_INTERVAL = std::chrono::seconds(5);
while (rclcpp::ok() && !signal_received) while (rclcpp::ok() && !signal_received)
{ {
executor.spin_some(); executor.spin_some();
// 定期检查并尝试订阅任务话题
auto now = std::chrono::steady_clock::now();
if (now - last_check_time >= SUBSCRIBE_CHECK_INTERVAL)
{
last_check_time = now;
// 如果已经准备好身份但尚未成功订阅,则尝试订阅
if (identity_ready.load() && !subscribed.load())
{
LOG_INFO("[TASK] Periodic check: identity ready but not subscribed, attempting subscribe");
try_subscribe_task_topic();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }