From 9c73d52dceb858c3ceec1ca8234ca91be2f8050e Mon Sep 17 00:00:00 2001 From: lyq Date: Mon, 19 Jan 2026 14:36:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9task=5Fmanager=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E8=8E=B7=E5=8F=96vid=E5=92=8C=E8=AF=9D=E9=A2=98?= =?UTF-8?q?=E7=9A=84=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json | 4 +- src/communication/task_manager/CMakeLists.txt | 49 +- .../include/task_manager/mqtt_manager.hpp | 39 +- .../include/task_manager/task_manager.hpp | 38 +- .../{task_manager_node.hpp => task_types.hpp} | 0 .../task_manager/src/mqtt_manager.cpp | 70 +- .../task_manager/src/task_manager_main.cpp | 189 ++-- .../task_manager/src/task_manager_node.cpp | 807 ------------------ 8 files changed, 227 insertions(+), 969 deletions(-) rename src/communication/task_manager/include/task_manager/{task_manager_node.hpp => task_types.hpp} (100%) delete mode 100644 src/communication/task_manager/src/task_manager_node.cpp diff --git a/config.json b/config.json index b0e76e1..91a4ae3 100644 --- a/config.json +++ b/config.json @@ -10,8 +10,8 @@ "remote_topic": "/zxwl/sweeper/{vid}/ctrl", "upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload", "download_url_pre": "http://36.153.162.171:9510/api/ccp-web/file/", - "mqtt_topic_push_status": "/zxwl/sweeper/V060003/task/status", - "mqtt_topic_sub_task": "/zxwl/sweeper/V060003/task" + "mqtt_topic_push_status": "/zxwl/sweeper/{vid}/task/status", + "mqtt_topic_sub_task": "/zxwl/sweeper/{vid}/task" }, "detect_line_tolerance": 3.06, "detect_head_tolerance": 2, diff --git a/src/communication/task_manager/CMakeLists.txt b/src/communication/task_manager/CMakeLists.txt index 4a58552..b960838 100644 --- a/src/communication/task_manager/CMakeLists.txt +++ b/src/communication/task_manager/CMakeLists.txt @@ -16,7 +16,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") endif() if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - add_compile_options(-w) # 禁用所有警告 + add_compile_options(-w) # 禁用所有警告 endif() # find dependencies @@ -26,39 +26,30 @@ find_package(std_msgs REQUIRED) find_package(sweeper_interfaces REQUIRED) find_package(CURL REQUIRED) -include_directories( - include/task_manager - include/paho_mqtt_3c - ${catkin_INCLUDE_DIRS} -) +include_directories(include/task_manager include/paho_mqtt_3c ${catkin_INCLUDE_DIRS}) -add_executable(task_manager_node +add_executable( + task_manager_node src/task_manager_main.cpp src/task_manager.cpp src/mqtt_manager.cpp src/jsoncpp.cpp - src/md5.cpp -) + src/md5.cpp) -ament_target_dependencies(task_manager_node rclcpp std_msgs sweeper_interfaces CURL) - - -if(CMAKE_SYSTEM_PROCESSOR MATCHES aarch64) - target_link_libraries( - task_manager_node - ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a - ) -else() - target_link_libraries( - task_manager_node - ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a - ) -endif() - -install(TARGETS +ament_target_dependencies( task_manager_node - DESTINATION lib/${PROJECT_NAME} -) + rclcpp + std_msgs + sweeper_interfaces + CURL) + +if(CMAKE_SYSTEM_PROCESSOR MATCHES aarch64) + target_link_libraries(task_manager_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a) +else() + target_link_libraries(task_manager_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a) +endif() + +install(TARGETS task_manager_node DESTINATION lib/${PROJECT_NAME}) if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) @@ -69,5 +60,5 @@ if(BUILD_TESTING) # uncomment the line when this package is not in a git repo #set(ament_cmake_cpplint_FOUND TRUE) ament_lint_auto_find_test_dependencies() -endif() -ament_package() \ No newline at end of file +endif() +ament_package() diff --git a/src/communication/task_manager/include/task_manager/mqtt_manager.hpp b/src/communication/task_manager/include/task_manager/mqtt_manager.hpp index e01eb83..9d2b6f0 100644 --- a/src/communication/task_manager/include/task_manager/mqtt_manager.hpp +++ b/src/communication/task_manager/include/task_manager/mqtt_manager.hpp @@ -1,41 +1,43 @@ #ifndef __MQTT_MANAGER_H__ #define __MQTT_MANAGER_H__ -#include -#include -#include #include #include #include +#include +#include +#include + #include "MQTTClient.h" using namespace std; class MQTTManager { -public: + public: // 消息回调函数类型 - using MessageCallback = std::function; - using ConnLostCallback = std::function; + using MessageCallback = std::function; + using ConnLostCallback = std::function; using DeliveredCallback = std::function; + using ConnRestoredCallback = std::function; MQTTManager(); ~MQTTManager(); // 初始化MQTT连接 - bool init(const string &address, const string &clientId, - const string &username, const string &password); + bool init(const string& address, const string& clientId, const string& username, const string& password); // 订阅主题 - bool subscribe(const string &topic, int qos = 0); + bool subscribe(const string& topic, int qos = 0); // 发布消息 - bool publish(const string &topic, const string &payload, int qos = 0); + bool publish(const string& topic, const string& payload, int qos = 0); // 设置回调函数 void setMessageCallback(MessageCallback cb) { message_callback_ = cb; } void setConnLostCallback(ConnLostCallback cb) { conn_lost_callback_ = cb; } void setDeliveredCallback(DeliveredCallback cb) { delivered_callback_ = cb; } + void setConnRestoredCallback(ConnRestoredCallback cb) { conn_restored_callback_ = cb; } // 获取连接状态 bool isConnected(); @@ -51,7 +53,7 @@ public: void setMaxReconnectAttempts(int attempts) { max_reconnect_attempts_ = attempts; } void setHeartbeatTimeout(int milliseconds) { heartbeat_timeout_ = milliseconds; } -private: + private: // MQTT连接循环线程函数 void mqttLoop(); @@ -59,9 +61,9 @@ private: bool reconnect(); // 内部MQTT回调(static函数) - static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message); - static void onConnectionLost(void *context, char *cause); - static void onDelivered(void *context, MQTTClient_deliveryToken dt); + static int onMessageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message); + static void onConnectionLost(void* context, char* cause); + static void onDelivered(void* context, MQTTClient_deliveryToken dt); MQTTClient client_; MQTTClient_connectOptions conn_opts_; @@ -76,24 +78,25 @@ private: MessageCallback message_callback_; ConnLostCallback conn_lost_callback_; DeliveredCallback delivered_callback_; + ConnRestoredCallback conn_restored_callback_; bool is_running_; std::thread mqtt_thread_; std::mutex client_mutex_; // 断线重连参数 - int reconnect_interval_; // 重连间隔(秒) - int max_reconnect_attempts_; // 最大重连次数,0为无限 + int reconnect_interval_; // 重连间隔(秒) + int max_reconnect_attempts_; // 最大重连次数,0为无限 int reconnect_attempts_; std::chrono::steady_clock::time_point last_reconnect_time_; // 心跳检测 std::chrono::steady_clock::time_point last_message_time_; - int heartbeat_timeout_; // 心跳超时时间(毫秒) + int heartbeat_timeout_; // 心跳超时时间(毫秒) bool is_heartbeat_lost_; constexpr static int TIMEOUT = 10000L; constexpr static int QOS = 0; }; -#endif // __MQTT_MANAGER_H__ \ No newline at end of file +#endif // __MQTT_MANAGER_H__ \ No newline at end of file diff --git a/src/communication/task_manager/include/task_manager/task_manager.hpp b/src/communication/task_manager/include/task_manager/task_manager.hpp index e842179..113e1ee 100644 --- a/src/communication/task_manager/include/task_manager/task_manager.hpp +++ b/src/communication/task_manager/include/task_manager/task_manager.hpp @@ -1,20 +1,21 @@ #ifndef __TASK_MANAGER_H__ #define __TASK_MANAGER_H__ -#include -#include -#include -#include -#include -#include #include -#include "task_manager_node.hpp" +#include +#include +#include +#include +#include +#include + +#include "task_types.hpp" using namespace std; class TaskManager { -public: + public: TaskManager(); ~TaskManager(); @@ -25,7 +26,7 @@ public: void stop(); // 添加任务到队列 - void addTask(const MQTT_JSON &mqtt_json); + void addTask(const MQTT_JSON& mqtt_json); // 获取当前任务状态 TaskStatus getCurrentTaskStatus(); @@ -39,24 +40,24 @@ public: // 获取任务队列大小 int getQueueSize(); -private: + private: // 任务处理线程函数 void processTasksLoop(); // 处理单个start任务 - bool processStartTask(const JSON_DATA &json_data, long seqNo); + bool processStartTask(const JSON_DATA& json_data, long seqNo); // 处理单个stop任务 bool processStopTask(long taskId, long seqNo); // 下载文件 - bool downloadFile(const string &url, const string &expected_md5, const string &filename); + bool downloadFile(const string& url, const string& expected_md5, const string& filename); // 计算MD5 - string calculate_md5(const string &filename); + string calculate_md5(const string& filename); // 复制文件 - void copyFileAndOverwrite(const string &sourceFilePath, const string &destinationFilePath); + void copyFileAndOverwrite(const string& sourceFilePath, const string& destinationFilePath); std::queue task_queue_; std::mutex queue_mutex_; @@ -71,11 +72,10 @@ private: int task_status_; // 回调函数用于发送MQTT响应 - std::function send_response_callback_; + std::function send_response_callback_; -public: - void setSendResponseCallback( - std::function cb) + public: + void setSendResponseCallback(std::function cb) { send_response_callback_ = cb; } @@ -84,4 +84,4 @@ public: void setTaskStatus(int status) { task_status_ = status; } }; -#endif // __TASK_MANAGER_H__ \ No newline at end of file +#endif // __TASK_MANAGER_H__ \ No newline at end of file diff --git a/src/communication/task_manager/include/task_manager/task_manager_node.hpp b/src/communication/task_manager/include/task_manager/task_types.hpp similarity index 100% rename from src/communication/task_manager/include/task_manager/task_manager_node.hpp rename to src/communication/task_manager/include/task_manager/task_types.hpp diff --git a/src/communication/task_manager/src/mqtt_manager.cpp b/src/communication/task_manager/src/mqtt_manager.cpp index 9981fd1..2bbe002 100644 --- a/src/communication/task_manager/src/mqtt_manager.cpp +++ b/src/communication/task_manager/src/mqtt_manager.cpp @@ -1,31 +1,31 @@ #include "mqtt_manager.hpp" -#include + #include +#include #include MQTTManager::MQTTManager() - : client_(nullptr), is_running_(false), reconnect_interval_(5), - max_reconnect_attempts_(0), reconnect_attempts_(0), heartbeat_timeout_(500), + : client_(nullptr), + is_running_(false), + reconnect_interval_(5), + max_reconnect_attempts_(0), + reconnect_attempts_(0), + heartbeat_timeout_(500), is_heartbeat_lost_(false) { conn_opts_ = MQTTClient_connectOptions_initializer; } -MQTTManager::~MQTTManager() -{ - stop(); -} +MQTTManager::~MQTTManager() { stop(); } -bool MQTTManager::init(const string &address, const string &clientId, - const string &username, const string &password) +bool MQTTManager::init(const string& address, const string& clientId, const string& username, const string& password) { address_ = address; client_id_ = clientId; username_ = username; password_ = password; - int rc = MQTTClient_create(&client_, address_.c_str(), client_id_.c_str(), - MQTTCLIENT_PERSISTENCE_NONE, NULL); + int rc = MQTTClient_create(&client_, address_.c_str(), client_id_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTCLIENT_SUCCESS) { std::cerr << "Failed to create MQTT client, return code " << rc << std::endl; @@ -33,8 +33,7 @@ bool MQTTManager::init(const string &address, const string &clientId, } // 设置回调 - rc = MQTTClient_setCallbacks(client_, this, onConnectionLost, - onMessageArrived, onDelivered); + rc = MQTTClient_setCallbacks(client_, this, onConnectionLost, onMessageArrived, onDelivered); if (rc != MQTTCLIENT_SUCCESS) { std::cerr << "Failed to set MQTT callbacks, return code " << rc << std::endl; @@ -80,10 +79,15 @@ bool MQTTManager::reconnect() is_heartbeat_lost_ = false; last_message_time_ = std::chrono::steady_clock::now(); + if (conn_restored_callback_) + { + conn_restored_callback_("MQTT reconnected"); + } + return true; } -bool MQTTManager::subscribe(const string &topic, int qos) +bool MQTTManager::subscribe(const string& topic, int qos) { std::lock_guard lock(client_mutex_); @@ -105,7 +109,7 @@ bool MQTTManager::subscribe(const string &topic, int qos) return true; } -bool MQTTManager::publish(const string &topic, const string &payload, int qos) +bool MQTTManager::publish(const string& topic, const string& payload, int qos) { std::lock_guard lock(client_mutex_); @@ -116,7 +120,7 @@ bool MQTTManager::publish(const string &topic, const string &payload, int qos) } MQTTClient_message pubmsg = MQTTClient_message_initializer; - pubmsg.payload = (void *)payload.c_str(); + pubmsg.payload = (void*)payload.c_str(); pubmsg.payloadlen = payload.length(); pubmsg.qos = qos; pubmsg.retained = 0; @@ -138,10 +142,7 @@ bool MQTTManager::publish(const string &topic, const string &payload, int qos) return true; } -bool MQTTManager::isConnected() -{ - return MQTTClient_isConnected(client_) == 1; -} +bool MQTTManager::isConnected() { return MQTTClient_isConnected(client_) == 1; } void MQTTManager::start() { @@ -165,8 +166,7 @@ void MQTTManager::start() void MQTTManager::stop() { - if (!is_running_) - return; + if (!is_running_) return; is_running_ = false; @@ -193,9 +193,7 @@ void MQTTManager::mqttLoop() if (!isConnected()) { auto now = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast( - now - last_reconnect_time_) - .count(); + auto duration = std::chrono::duration_cast(now - last_reconnect_time_).count(); // 如果距离上次重连已过指定间隔,则尝试重连 if (duration >= reconnect_interval_) @@ -212,18 +210,15 @@ void MQTTManager::mqttLoop() { // 连接正常,检查心跳 auto now = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast( - now - last_message_time_) - .count(); + auto duration = std::chrono::duration_cast(now - last_message_time_).count(); if (duration > heartbeat_timeout_ && !is_heartbeat_lost_) { is_heartbeat_lost_ = true; - std::cout << "Heartbeat timeout: No message received in " - << heartbeat_timeout_ << "ms." << std::endl; + std::cout << "Heartbeat timeout: No message received in " << heartbeat_timeout_ << "ms." << std::endl; if (conn_lost_callback_) { - conn_lost_callback_((char *)"Heartbeat timeout"); + conn_lost_callback_((char*)"Heartbeat timeout"); } } } @@ -232,10 +227,9 @@ void MQTTManager::mqttLoop() } } -int MQTTManager::onMessageArrived(void *context, char *topicName, int topicLen, - MQTTClient_message *message) +int MQTTManager::onMessageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message) { - MQTTManager *pThis = static_cast(context); + MQTTManager* pThis = static_cast(context); // 更新最后消息时间 pThis->last_message_time_ = std::chrono::steady_clock::now(); @@ -251,9 +245,9 @@ int MQTTManager::onMessageArrived(void *context, char *topicName, int topicLen, return 1; } -void MQTTManager::onConnectionLost(void *context, char *cause) +void MQTTManager::onConnectionLost(void* context, char* cause) { - MQTTManager *pThis = static_cast(context); + MQTTManager* pThis = static_cast(context); std::cout << "Connection lost. Cause: " << (cause ? cause : "Unknown") << std::endl; if (pThis->conn_lost_callback_) @@ -262,9 +256,9 @@ void MQTTManager::onConnectionLost(void *context, char *cause) } } -void MQTTManager::onDelivered(void *context, MQTTClient_deliveryToken dt) +void MQTTManager::onDelivered(void* context, MQTTClient_deliveryToken dt) { - MQTTManager *pThis = static_cast(context); + MQTTManager* pThis = static_cast(context); std::cout << "Message with token " << dt << " delivered." << std::endl; if (pThis->delivered_callback_) diff --git a/src/communication/task_manager/src/task_manager_main.cpp b/src/communication/task_manager/src/task_manager_main.cpp index 187cd1b..17a7fda 100644 --- a/src/communication/task_manager/src/task_manager_main.cpp +++ b/src/communication/task_manager/src/task_manager_main.cpp @@ -1,13 +1,17 @@ +#include + +#include +#include +#include +#include + +#include "json.h" +#include "mqtt_manager.hpp" #include "rclcpp/rclcpp.hpp" #include "sweeper_interfaces/msg/task.hpp" -#include "mqtt_manager.hpp" +#include "sweeper_interfaces/msg/vehicle_identity.hpp" #include "task_manager.hpp" -#include "task_manager_node.hpp" -#include "json.h" -#include -#include -#include -#include +#include "task_types.hpp" using namespace std; @@ -19,20 +23,30 @@ int last_status = 0; TaskStatus status_up = TaskStatus::PENDING; std::thread status_report_thread; +// =============== 动态 MQTT topic + 身份控制 =============== +std::string mqtt_topic_sub_task; +std::string mqtt_topic_push_status; + +std::atomic identity_ready{false}; +std::atomic subscribed{false}; + +std::string vid; + // 配置变量 struct Config { - string mqtt_vid; string mqtt_address; string mqtt_username; string mqtt_password; - string mqtt_topic_sub_task; - string mqtt_topic_push_status; + + // 改为模板 + string mqtt_topic_sub_task_tmpl; + string mqtt_topic_push_status_tmpl; } config; // 前置声明 -void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg); -void sendTaskResponse(long seqNo, int code, const string &msg); +void sendGeneralResponse(const string& topic, long seqNo, int code, const string& msg); +void sendTaskResponse(long seqNo, int code, const string& msg); string generate_mqtt_client_id(); // 信号处理函数 @@ -52,9 +66,7 @@ void signalHandler(int signum) string generate_mqtt_client_id() { auto now = std::chrono::system_clock::now(); - auto millis = std::chrono::duration_cast( - now.time_since_epoch()) - .count(); + auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); std::random_device rd; std::mt19937 gen(rd()); @@ -67,7 +79,7 @@ string generate_mqtt_client_id() } // 从JSON解析ROUTE_INFO -ROUTE_INFO get_route_info(const Json::Value &root) +ROUTE_INFO get_route_info(const Json::Value& root) { ROUTE_INFO route_info; if (root.isObject()) @@ -101,7 +113,7 @@ ROUTE_INFO get_route_info(const Json::Value &root) } // 从JSON解析TASK_VALUE -TASK_VALUE get_task_value(const Json::Value &root) +TASK_VALUE get_task_value(const Json::Value& root) { TASK_VALUE task_value; @@ -141,7 +153,7 @@ TASK_VALUE get_task_value(const Json::Value &root) } // 加载配置文件 -bool loadConfig(const string &config_file) +bool loadConfig(const string& config_file) { Json::Reader reader; Json::Value root; @@ -159,11 +171,10 @@ bool loadConfig(const string &config_file) return false; } - config.mqtt_vid = root["mqtt"]["vid"].asString(); - config.mqtt_username = root["mqtt"]["mqtt_user"].asString(); - config.mqtt_password = root["mqtt"]["mqtt_password"].asString(); - config.mqtt_topic_sub_task = root["mqtt"]["mqtt_topic_sub_task"].asString(); - config.mqtt_topic_push_status = root["mqtt"]["mqtt_topic_push_status"].asString(); + config.mqtt_username = root["mqtt"]["username"].asString(); + config.mqtt_password = root["mqtt"]["password"].asString(); + config.mqtt_topic_sub_task_tmpl = root["mqtt"]["mqtt_topic_sub_task"].asString(); + config.mqtt_topic_push_status_tmpl = root["mqtt"]["mqtt_topic_push_status"].asString(); string external_addr = root["mqtt"]["external_net_address"].asString(); int external_port = root["mqtt"]["external_net_port"].asInt(); @@ -173,14 +184,14 @@ bool loadConfig(const string &config_file) std::cout << "Config loaded successfully." << std::endl; std::cout << "MQTT Address: " << config.mqtt_address << std::endl; - std::cout << "Topic Sub: " << config.mqtt_topic_sub_task << std::endl; - std::cout << "Topic Pub: " << config.mqtt_topic_push_status << std::endl; + std::cout << "Topic Sub template: " << config.mqtt_topic_sub_task_tmpl << std::endl; + std::cout << "Topic Pub template: " << config.mqtt_topic_push_status_tmpl << std::endl; return true; } // 发送通用应答消息(触发式) -void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg) +void sendGeneralResponse(const string& topic, long seqNo, int code, const string& msg) { Json::Value responseData; responseData["type"] = "response"; @@ -195,17 +206,15 @@ void sendGeneralResponse(const string &topic, long seqNo, int code, const string Json::FastWriter writer; string responseJson = writer.write(responseData); - std::cout << "[General Response] seqNo=" << seqNo << ", code=" << code - << ", msg=" << msg << std::endl; + std::cout << "[General Response] seqNo=" << seqNo << ", code=" << code << ", msg=" << msg << std::endl; std::cout << "Response JSON: " << responseJson << std::endl; bool success = mqtt_manager.publish(topic, responseJson, 0); - std::cout << "General response publish to [" << topic << "] " - << (success ? "[OK]" : "[FAILED]") << std::endl; + std::cout << "General response publish to [" << topic << "] " << (success ? "[OK]" : "[FAILED]") << std::endl; } // 发送任务专用应答(包含任务信息) -void sendTaskResponse(long seqNo, int code, const string &msg) +void sendTaskResponse(long seqNo, int code, const string& msg) { Json::Value responseData; responseData["type"] = "response"; @@ -226,20 +235,26 @@ void sendTaskResponse(long seqNo, int code, const string &msg) Json::FastWriter writer; string responseJson = writer.write(responseData); - std::cout << "[Task Response] seqNo=" << seqNo << ", code=" << code - << ", taskId=" << current_task_id << ", taskStatus=" << (int)current_status << std::endl; + std::cout << "[Task Response] seqNo=" << seqNo << ", code=" << code << ", taskId=" << current_task_id + << ", taskStatus=" << (int)current_status << std::endl; std::cout << "Response JSON: " << responseJson << std::endl; - bool success = mqtt_manager.publish(config.mqtt_topic_sub_task, responseJson, 0); - std::cout << "Task response publish to [" << config.mqtt_topic_sub_task << "] " - << (success ? "[OK]" : "[FAILED]") << std::endl; + if (mqtt_topic_sub_task.empty()) + { + std::cout << "[TASK] response topic not ready, drop response. seqNo=" << seqNo << std::endl; + return; + } + + bool success = mqtt_manager.publish(mqtt_topic_sub_task, responseJson, 0); + std::cout << "Task response publish to [" << mqtt_topic_sub_task << "] " << (success ? "[OK]" : "[FAILED]") + << std::endl; } // MQTT消息回调 -int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message) +int handleMqttMessage(char* topicName, int topicLen, MQTTClient_message* message) { const int BUFFER_SIZE = 4096; - char *buffer = new char[BUFFER_SIZE]; + char* buffer = new char[BUFFER_SIZE]; memset(buffer, 0, BUFFER_SIZE); size_t copy_size = std::min((size_t)message->payloadlen, (size_t)(BUFFER_SIZE - 1)); @@ -289,7 +304,7 @@ int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message std::cout << "Start task: " << json_data.value.id << std::endl; task_manager.addTask(mqtt_json); } - catch (const std::exception &e) + catch (const std::exception& e) { std::cerr << "Error processing start command: " << e.what() << std::endl; sendTaskResponse(seqNo, 400, "Failed to process start command"); @@ -311,7 +326,7 @@ int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message sendTaskResponse(seqNo, 200, "Task stop command received"); } - catch (const std::exception &e) + catch (const std::exception& e) { std::cerr << "Error processing stop command: " << e.what() << std::endl; sendTaskResponse(seqNo, 400, "Failed to process stop command"); @@ -342,6 +357,17 @@ int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message return 1; } +void try_subscribe_task_topic() +{ + if (!identity_ready.load()) return; + if (mqtt_topic_sub_task.empty()) return; + + if (subscribed.exchange(true)) return; // 已经订阅过了 + + std::cout << "[TASK] subscribe MQTT topic: " << mqtt_topic_sub_task << std::endl; + mqtt_manager.subscribe(mqtt_topic_sub_task, 0); +} + // 周期性上报任务状态到MQTT(200ms间隔持续上报) void reportTaskStatusToMqtt() { @@ -363,7 +389,10 @@ void reportTaskStatusToMqtt() Json::FastWriter writer; string statusJson = writer.write(statusMsg); - mqtt_manager.publish(config.mqtt_topic_push_status, statusJson, 0); + if (!mqtt_topic_push_status.empty()) + { + mqtt_manager.publish(mqtt_topic_push_status, statusJson, 0); + } } } @@ -373,21 +402,44 @@ void reportTaskStatusToMqtt() // ROS2节点类 class TaskManagerNode : public rclcpp::Node { -public: + public: TaskManagerNode(string name) : Node(name) { RCLCPP_INFO(this->get_logger(), "Node %s started.", name.c_str()); - msg_publisher_ = this->create_publisher( - "task_message/download", 10); + msg_publisher_ = this->create_publisher("task_message/download", 10); task_subscribe_ = this->create_subscription( - "task_message/upload", 10, - std::bind(&TaskManagerNode::task_callback, this, std::placeholders::_1)); + "task_message/upload", 10, std::bind(&TaskManagerNode::task_callback, this, std::placeholders::_1)); - timer_ = this->create_wall_timer(std::chrono::milliseconds(500), - std::bind(&TaskManagerNode::timer_callback, this)); + timer_ = + this->create_wall_timer(std::chrono::milliseconds(500), std::bind(&TaskManagerNode::timer_callback, this)); + + identity_sub_ = this->create_subscription( + "/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(), + [this](const sweeper_interfaces::msg::VehicleIdentity::SharedPtr msg) + { + vid = msg->vid; + identity_ready.store(!vid.empty()); + + if (!identity_ready.load()) return; + + // 拼 topic + mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl; + mqtt_topic_push_status = config.mqtt_topic_push_status_tmpl; + + auto pos = mqtt_topic_sub_task.find("{vid}"); + if (pos != std::string::npos) mqtt_topic_sub_task.replace(pos, 5, vid); + + pos = mqtt_topic_push_status.find("{vid}"); + if (pos != std::string::npos) mqtt_topic_push_status.replace(pos, 5, vid); + + RCLCPP_INFO(this->get_logger(), "[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(), + mqtt_topic_sub_task.c_str(), mqtt_topic_push_status.c_str()); + + try_subscribe_task_topic(); + }); } -private: + private: void timer_callback() { sweeper_interfaces::msg::Task msg; @@ -417,21 +469,39 @@ private: rclcpp::TimerBase::SharedPtr timer_; rclcpp::Publisher::SharedPtr msg_publisher_; rclcpp::Subscription::SharedPtr task_subscribe_; + + rclcpp::Subscription::SharedPtr identity_sub_; }; -int main(int argc, char **argv) +int main(int argc, char** argv) { signal(SIGINT, signalHandler); - if (!loadConfig("config.json")) + // 默认配置路径 + std::string config_path = "config.json"; + + // 支持 --config xxx.json + for (int i = 1; i < argc; ++i) + { + std::string arg = argv[i]; + if (arg == "--config" && i + 1 < argc) + { + config_path = argv[i + 1]; + ++i; + } + } + + std::cout << "[TASK] Using config: " << config_path << std::endl; + + if (!loadConfig(config_path)) { std::cerr << "Failed to load configuration." << std::endl; return 1; } + // ================= 初始化 MQTT ================= string client_id = generate_mqtt_client_id(); - if (!mqtt_manager.init(config.mqtt_address, client_id, - config.mqtt_username, config.mqtt_password)) + if (!mqtt_manager.init(config.mqtt_address, client_id, config.mqtt_username, config.mqtt_password)) { std::cerr << "Failed to initialize MQTT manager." << std::endl; return 1; @@ -443,10 +513,17 @@ int main(int argc, char **argv) mqtt_manager.setMessageCallback(handleMqttMessage); - mqtt_manager.start(); - mqtt_manager.subscribe(config.mqtt_topic_sub_task, 0); + mqtt_manager.setConnRestoredCallback( + [](const char* cause) + { + std::cout << "[TASK] MQTT reconnected: " << cause << std::endl; + subscribed.store(false); + try_subscribe_task_topic(); + }); - task_manager.setSendResponseCallback([](const string &topic, int seqNo, int code, const string &msg) + mqtt_manager.start(); + + task_manager.setSendResponseCallback([](const string& topic, int seqNo, int code, const string& msg) { sendTaskResponse(seqNo, code, msg); }); task_manager.start(); diff --git a/src/communication/task_manager/src/task_manager_node.cpp b/src/communication/task_manager/src/task_manager_node.cpp deleted file mode 100644 index 57ee823..0000000 --- a/src/communication/task_manager/src/task_manager_node.cpp +++ /dev/null @@ -1,807 +0,0 @@ -#include "MQTTClient.h" -#include "rclcpp/rclcpp.hpp" -#include "md5.h" -#include "sweeper_interfaces/msg/task.hpp" -#include "json.h" -#include "task_manager_node.hpp" -#include "sweeper_interfaces/msg/sub.hpp" -#include // 添加信号处理头文件 -#include - -using namespace std; - -// 信号标志 -volatile std::sig_atomic_t signal_received = 0; -// 任务队列和相关同步原语 -std::queue task_queue; -std::mutex queue_mutex; -std::condition_variable queue_cv; -bool thread_running = true; - -MQTTClient client; -MQTTClient_deliveryToken token_d_m; -MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; -MQTTClient_message pubmsg_d_m = MQTTClient_message_initializer; -// mqtt相关 -std::string mqtt_vid; -std::string mqtt_inter_net_address; -int mqtt_inter_net_port; -std::string mqtt_external_net_address; -int mqtt_external_net_port; -std::string mqtt_user; -std::string mqtt_password; -std::string mqtt_topic_sub_task; -std::string mqtt_topic_push_status; -std::string sub_topic; - -// 修复变量声明和初始化 -std::string ADDRESS; -std::string CLIENTID_SUB; // 客户端ID -std::string destinationFilePath1 = "./gps_load_now.txt"; -std::chrono::steady_clock::time_point last_message_time = std::chrono::steady_clock::now(); -constexpr auto QOS = 0; -constexpr auto TIMEOUT = 10000L; -volatile MQTTClient_deliveryToken deliveredtoken; -bool isDisconn = false; -int task_status = 0; -int last_status = 0; -TaskStatus status_up = TaskStatus::PENDING; - -CurrentTask currentTask; -std::mutex taskMutex; - -std::string generate_mqtt_client_id() -{ - // 获取当前时间戳(以毫秒为单位) - auto now = std::chrono::system_clock::now(); - auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); - - // 生成一个 4 位随机数 - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution<> dis(1000, 9999); - int random_num = dis(gen); - - // 拼接成 client ID - std::ostringstream oss; - oss << "client_" << millis << "_" << std::setw(4) << std::setfill('0') << random_num; - - return oss.str(); -} - -// 信号处理函数 -void signalHandler(int signum) -{ - std::cout << "Interrupt signal (" << signum << ") received." << std::endl; - signal_received = signum; - - // 第一次收到信号时,触发ROS2关闭 - if (signum == SIGINT && rclcpp::ok()) - { - std::cout << "Shutting down ROS2 node..." << std::endl; - rclcpp::shutdown(); - } -} - -void copyFileAndOverwrite(const std::string &sourceFilePath, const std::string &destinationFilePath) -{ - std::ifstream src(sourceFilePath, std::ios::binary); - std::ofstream dst(destinationFilePath, std::ios::binary); - - if (!src) - { - std::cerr << "无法打开源文件: " << sourceFilePath << std::endl; - return; - } - - if (!dst) - { - std::cerr << "无法打开目标文件: " << destinationFilePath << std::endl; - return; - } - - dst << src.rdbuf(); // 将源文件内容复制到目标文件 - - if (!dst) - { - std::cerr << "复制文件时出错" << std::endl; - } -} - -string calculate_md5(string filename) -{ - MD5 md5; - ifstream file; - file.open(filename, ios::binary); - md5.update(file); - cout << md5.toString() << endl; - return md5.toString(); -} - -bool downloadFile(const std::string &url, const std::string &expected_md5) -{ - // 确保URL不为空 - if (url.empty()) - { - std::cerr << "下载URL为空" << std::endl; - return false; - } - // 使用系统调用下载文件 - std::string filename = url.substr(url.find_last_of('/') + 1); - std::string command = "wget -O routes/" + filename + " \"" + url + "\""; - cout << command << endl; - int result = system(command.c_str()); - if (result != 0) - { - std::cerr << "下载失败: " << url << std::endl; - return false; - } - - // 计算下载文件的MD5并与预期的MD5比较 - std::string filepath = "routes/" + filename; - std::string actual_md5 = calculate_md5(filepath); - if (actual_md5 != expected_md5) - { - cout << actual_md5 << " " << expected_md5 << endl; - std::cerr << "MD5校验失败: " << filepath << std::endl; - return false; - } - - std::cout << filepath << " 下载并校验成功" << std::endl; - return true; -} - -ROUTE_INFO get_route_info(const Json::Value &root) -{ - ROUTE_INFO route_info; - if (root.isObject()) - { - if (root.isMember("routeName") && root["routeName"].isString()) - route_info.routeName = root["routeName"].asString(); - else - std::cerr << "Missing or invalid 'routeName' field" << std::endl; - - if (root.isMember("fileName") && root["fileName"].isString()) - route_info.fileName = root["fileName"].asString(); - else - std::cerr << "Missing or invalid 'fileName' field" << std::endl; - - if (root.isMember("url") && root["url"].isString()) - route_info.url = root["url"].asString(); - else - std::cerr << "Missing or invalid 'url' field" << std::endl; - - if (root.isMember("md5") && root["md5"].isString()) - route_info.md5 = root["md5"].asString(); - else - std::cerr << "Missing or invalid 'md5' field" << std::endl; - } - else - { - std::cerr << "routeInfo is not a valid JSON object" << std::endl; - } - - return route_info; -} - -TASK_VALUE get_task_value(const Json::Value &root) -{ - TASK_VALUE task_value; - - if (root.isObject()) - { - if (root.isMember("id") && root["id"].isInt()) - task_value.id = root["id"].asInt(); - else - std::cerr << "Missing or invalid 'id' field" << std::endl; - - if (root.isMember("name") && root["name"].isString()) - task_value.name = root["name"].asString(); - else - std::cerr << "Missing or invalid 'name' field" << std::endl; - - if (root.isMember("mode") && root["mode"].isInt()) - task_value.mode = root["mode"].asInt(); - else - std::cerr << "Missing or invalid 'mode' field" << std::endl; - - if (root.isMember("count") && root["count"].isInt()) - task_value.count = root["count"].asInt(); - else - std::cerr << "Missing or invalid 'count' field" << std::endl; - - if (root.isMember("routeInfo") && root["routeInfo"].isObject()) - task_value.routeInfo = get_route_info(root["routeInfo"]); - else - std::cerr << "Missing or invalid 'routeInfo' field" << std::endl; - } - else - { - std::cerr << "task value is not a valid JSON object" << std::endl; - } - - return task_value; -} - -void sendResponse(MQTTClient client, const std::string &topic, int seqNo, int code, std::string msg) -{ - // 使用JSON库构建响应数据 - Json::Value responseData; - responseData["type"] = "response"; - responseData["seqNo"] = seqNo; - - Json::Value data; - data["code"] = code; - data["msg"] = msg; - - responseData["data"] = data; - - // 使用JSON writer生成字符串 - Json::FastWriter writer; - std::string responseJson = writer.write(responseData); - - // 发布MQTT消息 - pubmsg_d_m.payload = (void *)responseJson.c_str(); - pubmsg_d_m.payloadlen = responseJson.length(); - pubmsg_d_m.qos = QOS; - pubmsg_d_m.retained = 0; - - MQTTClient_publishMessage(client, topic.c_str(), &pubmsg_d_m, &token_d_m); - MQTTClient_waitForCompletion(client, token_d_m, TIMEOUT); - printf("Message with delivery token %d delivered\n", token_d_m); -} - -// 更新当前任务信息 -void updateCurrentTask(int taskId, TaskStatus status) -{ - std::lock_guard lock(taskMutex); - currentTask.id = taskId; - currentTask.status = status; - currentTask.lastUpdateTime = std::chrono::steady_clock::now(); - std::cout << "Task updated: ID=" << taskId << ", Status=" << status << std::endl; -} - -// 从MQTT消息中提取任务ID并更新任务状态 -void extractTaskIdFromMessage(const Json::Value &root) -{ - if (root.isMember("data") && root["data"].isObject()) - { - Json::Value data = root["data"]; - - if (data.isMember("value") && data["value"].isObject()) - { - Json::Value value = data["value"]; - - if (value.isMember("id") && value["id"].isInt()) - { - int taskId = value["id"].asInt(); - - // 根据命令类型设置初始状态 - if (data.isMember("command") && data["command"].isString()) - { - std::string command = data["command"].asString(); - - if (command == "start") - { - updateCurrentTask(taskId, RUNNING); - } - else if (command == "stop") - { - updateCurrentTask(taskId, PENDING); - } - } - } - } - } -} - -void updateTaskStatus(int taskId, int status) -{ - // 构建JSON消息体 - Json::Value message; - message["id"] = taskId; - message["status"] = status; - - // 转换为字符串 - Json::FastWriter writer; - std::string jsonStr = writer.write(message); - - // 发布MQTT消息 - MQTTClient_message pubmsg = MQTTClient_message_initializer; - pubmsg.payload = (void *)jsonStr.c_str(); - pubmsg.payloadlen = jsonStr.length(); - pubmsg.qos = QOS; - pubmsg.retained = 0; - - MQTTClient_deliveryToken token; - int rc = MQTTClient_publishMessage(client, mqtt_topic_push_status.c_str(), &pubmsg, &token); - if (rc != MQTTCLIENT_SUCCESS) - { - std::cerr << "Failed to publish task status, return code " << rc << std::endl; - } - else - { - MQTTClient_waitForCompletion(client, token, TIMEOUT); - std::cout << "Task status updated: ID=" << taskId << ", Status=" << status << std::endl; - } -} - -// 周期性上报任务状态 -void *reportTaskStatus(void *arg) -{ - (void)arg; - - while (thread_running) - { - // 200ms周期 - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - - // 获取当前任务信息 - int taskId; - TaskStatus status; - - { - std::lock_guard lock(taskMutex); - taskId = currentTask.id; - status = status_up; - - // 仅在任务ID有效时上报 - if (taskId > 0) - { - currentTask.status = status; - } - } - - // 上报任务状态 - if (taskId > 0) - { - updateTaskStatus(taskId, static_cast(status)); - } - } - - return NULL; -} - -// 任务处理线程函数 -void *process_tasks(void *arg) -{ - (void)arg; - - while (thread_running) - { - std::unique_lock lock(queue_mutex); - - // 等待队列中有任务或线程终止信号 - queue_cv.wait(lock, [] - { return !task_queue.empty() || !thread_running; }); - - // 检查是否需要终止线程 - if (!thread_running && task_queue.empty()) - { - break; - } - - // 处理队列中的任务 - if (!task_queue.empty()) - { - MQTT_JSON mqtt_json = task_queue.front(); - task_queue.pop(); - lock.unlock(); // 释放锁,允许其他线程操作队列 - - // 处理任务 - if (mqtt_json.type == "request") - { - try - { - // 从boost::any中提取数据 - JSON_DATA json_data = boost::any_cast(mqtt_json.data); - if (json_data.command == "start") - { - int flag_ok = 0; - // sleep(10); - cout << "即将开始任务" << json_data.value.id << endl; - cout << "url: " << json_data.value.routeInfo.url << endl; - cout << "md5: " << json_data.value.routeInfo.md5 << endl; - string downFileName = json_data.value.routeInfo.fileName; - if (access(("routes/" + downFileName).c_str(), F_OK) == -1) - { - std::cout << "文件不存在,正在下载" << downFileName << std::endl; - bool isdown = downloadFile(json_data.value.routeInfo.url, json_data.value.routeInfo.md5); - if (!isdown) - { - std::cout << "下载失败,跳过" << std::endl; - remove(("routes/" + downFileName).c_str()); - } - else - { - std::cout << downFileName << "下载完成" << std::endl; - std::string sourceFilePath1 = ("routes/" + downFileName).c_str(); - copyFileAndOverwrite(sourceFilePath1, destinationFilePath1); - flag_ok = 1; - } - } - else - { - // 文件已存在,跳过下载 - std::cout << "文件已存在,跳过下载: routes/" << downFileName << std::endl; - std::string sourceFilePath1 = ("routes/" + downFileName).c_str(); - copyFileAndOverwrite(sourceFilePath1, destinationFilePath1); - flag_ok = 1; - } - if (flag_ok == 1) - { - // 启动任务 - cout << "开始执行任务" << json_data.value.id << endl; - task_status = 1; - } - // 回复 - sendResponse(client, mqtt_topic_sub_task, mqtt_json.seqNo, 200, "Task started successfully"); - } - else if (json_data.command == "stop") - { - // task_status = 0; - // 回复 - // sendResponse(client, mqtt_topic_sub_task, mqtt_json.seqNo, 200, "Task stopped successfully"); - } - } - catch (const boost::bad_any_cast &e) - { - std::cerr << "Bad any_cast: " << e.what() << std::endl; - } - } - } - } - - return NULL; -} - -void delivered(void *context, MQTTClient_deliveryToken dt) -{ - (void)context; - - printf("Message with token value %d delivery confirmed\n", dt); - deliveredtoken = dt; -} - -int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) -{ - (void)context; - (void)topicLen; - - printf("Message arrived\n"); - last_message_time = std::chrono::steady_clock::now(); // 更新时间戳 - printf("topic: %s\n", topicName); - printf("message: %.*s\n", message->payloadlen, (char *)message->payload); - - // 增加缓冲区大小 - const int BUFFER_SIZE = 4096; // 增大缓冲区 - char *buffer = new char[BUFFER_SIZE]; - memset(buffer, 0, BUFFER_SIZE); - - // 确保不溢出 - size_t copy_size = std::min(message->payloadlen, BUFFER_SIZE - 1); - memcpy(buffer, message->payload, copy_size); - - Json::Reader reader; - Json::Value root; - - if (!reader.parse(buffer, root)) - { - printf("recv json fail\n"); - return 1; - } - else - { - isDisconn = false; - // 创建MQTT_JSON对象 - if (root.isMember("type") && root["type"].isString() && root["type"].asString() == "request") - { - // 提取任务ID - extractTaskIdFromMessage(root); - - MQTT_JSON mqtt_json; - mqtt_json.type = root["type"].asString(); - mqtt_json.seqNo = root["seqNo"].asInt(); - Json::Value data = root["data"]; - // 创建JSON_DATA对象 - JSON_DATA json_data; - if (data.isMember("command") && data["command"].isString()) - { - json_data.command = data["command"].asString(); - if (json_data.command.compare("start") == 0) - { - try - { - json_data.value = get_task_value(data["value"]); - // 将JSON_DATA对象存入boost::any - mqtt_json.data = json_data; - cout << "即将开始任务" << json_data.value.id << endl; - // 将任务添加到队列 - std::lock_guard lock(queue_mutex); - task_queue.push(mqtt_json); - queue_cv.notify_one(); // 通知工作线程有新任务 - } - catch (const std::exception &e) - { - std::cerr << e.what() << '\n'; - } - } - else if (json_data.command.compare("stop") == 0) - { - try - { - long stop_id = data["value"].asInt(); - mqtt_json.data = stop_id; - cout << "stop_id:" << stop_id << "\t 即将停止任务" << endl; - - task_status = 0; - // 回复 - sendResponse(client, mqtt_topic_sub_task, mqtt_json.seqNo, 200, "Task stopped successfully"); - - // 将stop任务也加入队列处理 - std::lock_guard lock(queue_mutex); - task_queue.push(mqtt_json); - queue_cv.notify_one(); - } - catch (const std::exception &e) - { - std::cerr << e.what() << '\n'; - } - } - } - else - std::cerr << "Missing or invalid 'command' field" << std::endl; - } - else if (root["type"].asString() == "response") - { - } - else - { - std::cerr << "Missing or invalid 'type' field" << std::endl; - } - } - - MQTTClient_freeMessage(&message); - MQTTClient_free(topicName); - return 1; -} - -void connlost(void *context, char *cause) -{ - (void)context; - isDisconn = 1; - printf("\nConnection lost\n"); - printf("cause: %s\n", cause); -} - -void *mqtt_sub(void *arg) -{ - (void)arg; - - const char *SUB_TOPIC = sub_topic.c_str(); - - int rc; - // 修正:使用const char*而非char* - const char *username = mqtt_user.c_str(); - const char *password = mqtt_password.c_str(); - - if ((rc = MQTTClient_create(&client, ADDRESS.c_str(), CLIENTID_SUB.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to create client, return code %d\n", rc); - MQTTClient_destroy(&client); - return NULL; - } - - if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to set callbacks, return code %d\n", rc); - MQTTClient_destroy(&client); - return NULL; - } - - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - conn_opts.username = username; - conn_opts.password = password; - - if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to connect, return code %d\n", rc); - MQTTClient_destroy(&client); - return NULL; - } - - printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", SUB_TOPIC, CLIENTID_SUB.c_str(), QOS); - if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to subscribe, return code %d\n", rc); - MQTTClient_destroy(&client); - return NULL; - } - - while (1) - { - usleep(200000); - // 检查连接状态,如果连接丢失,则尝试重新连接 - if (MQTTClient_isConnected(client) == 0) - { - printf("MQTT connection lost, trying to reconnect...\n"); - - if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) - { - isDisconn = true; - printf("Failed to reconnect, return code %d\n", rc); - // 处理连接失败的情况,例如等待一段时间后再次尝试 - } - else - { - isDisconn = false; - printf("Reconnected to MQTT server.\n"); - // 重新订阅主题 - if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to resubscribe, return code %d\n", rc); - } - } - } - else - { - isDisconn = false; - auto current_time = std::chrono::steady_clock::now(); - if (std::chrono::duration_cast(current_time - last_message_time).count() > 500) - { - isDisconn = true; - // printf("Heartbeat timeout: No message received in 500ms.\n"); - // 执行心跳超时的处理逻辑 - } - } - } - - if ((rc = MQTTClient_unsubscribe(client, SUB_TOPIC)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to unsubscribe, return code %d\n", rc); - MQTTClient_destroy(&client); - return NULL; - } - - if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to disconnect, return code %d\n", rc); - MQTTClient_destroy(&client); - return NULL; - } - - MQTTClient_destroy(&client); - return NULL; -} - -class task_manager_node : public rclcpp::Node -{ -public: - // 构造函数,有一个参数为节点名称 - task_manager_node(std::string name) : Node(name) - { - RCLCPP_INFO(this->get_logger(), "%s节点已经启动.", name.c_str()); - msg_publisher_ = this->create_publisher("task_message/download", 10); - task_subscribe_ = this->create_subscription("task_message/upload", 10, std::bind(&task_manager_node::task_callback, this, std::placeholders::_1)); - - timer_ = this->create_wall_timer(std::chrono::milliseconds(500), std::bind(&task_manager_node::timer_callback, this)); - } - -private: - void timer_callback() - { - // 发布任务消息 - sweeper_interfaces::msg::Task msg; - msg.task_status = task_status; - - if (task_status != last_status) - { - msg_publisher_->publish(msg); - RCLCPP_INFO(this->get_logger(), "发布任务消息"); - } - - last_status = task_status; - - // RCLCPP_INFO(this->get_logger(), "msg.task_status : %d", msg.task_status); - // RCLCPP_INFO(this->get_logger(), "task_status : %d", task_status); - // RCLCPP_INFO(this->get_logger(), "last_status : %d", last_status); - // cout << endl; - } - - void task_callback(const sweeper_interfaces::msg::Task::SharedPtr msg) - { - // 安全转换:检查枚举值是否有效 - if (msg->task_status >= TaskStatus::PENDING && msg->task_status <= TaskStatus::FAILED) - { - status_up = static_cast(msg->task_status); - } - else - { - // 处理无效状态(例如设为默认值或记录错误) - RCLCPP_WARN(this->get_logger(), "Invalid task status received: %d", msg->task_status); - status_up = TaskStatus::FAILED; - } - } - - rclcpp::TimerBase::SharedPtr timer_; - rclcpp::Publisher::SharedPtr msg_publisher_; - rclcpp::Subscription::SharedPtr task_subscribe_; -}; - -void init_main() -{ - Json::Reader reader; - Json::Value root; - std::ifstream in("config.json", std::ios::binary); - if (!in.is_open()) - { - std::cout << "read config file error" << std::endl; - return; - } - if (reader.parse(in, root)) - { - mqtt_vid = root["mqtt"]["vid"].asString(); - mqtt_inter_net_address = root["mqtt"]["inter_net_address"].asString(); - mqtt_inter_net_port = root["mqtt"]["inter_net_port"].asInt(); - mqtt_external_net_address = root["mqtt"]["external_net_address"].asString(); - mqtt_external_net_port = root["mqtt"]["external_net_port"].asInt(); - mqtt_user = root["mqtt"]["mqtt_user"].asString(); - mqtt_password = root["mqtt"]["mqtt_password"].asString(); - mqtt_topic_sub_task = root["mqtt"]["mqtt_topic_sub_task"].asString(); - mqtt_topic_push_status = root["mqtt"]["mqtt_topic_push_status"].asString(); - // 添加对sub_topic的初始化 - sub_topic = mqtt_topic_sub_task; - CLIENTID_SUB = generate_mqtt_client_id(); - // ADDRESS = mqtt_inter_net_address + ":" + std::to_string(mqtt_inter_net_port); - ADDRESS = mqtt_external_net_address + ":" + std::to_string(mqtt_external_net_port); - cout << "ADDRESS: " << ADDRESS << endl; - cout << "CLIENTID_SUB: " << CLIENTID_SUB << endl; - cout << "mqtt_vid: " << mqtt_vid << endl; - } - in.close(); // 关闭文件流 -} - -int main(int argc, char **argv) -{ - // 注册信号处理函数 - signal(SIGINT, signalHandler); - - init_main(); - pthread_t mqtt_sub_thread_t; - pthread_create(&mqtt_sub_thread_t, NULL, mqtt_sub, NULL); - - // 创建任务处理线程 - pthread_t task_thread; - pthread_create(&task_thread, NULL, process_tasks, NULL); - - pthread_t reportThread; - pthread_create(&reportThread, NULL, reportTaskStatus, NULL); - - rclcpp::init(argc, argv); - /*创建对应节点的共享指针对象*/ - auto node = std::make_shared("task_manager_node"); - - // 使用非阻塞的spin - rclcpp::executors::SingleThreadedExecutor executor; - executor.add_node(node); - - // 循环执行spin_some,允许检查信号 - while (rclcpp::ok() && !signal_received) - { - executor.spin_some(); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - - // 优雅地停止线程 - { - std::lock_guard lock(queue_mutex); - thread_running = false; - } - queue_cv.notify_one(); - pthread_join(task_thread, NULL); - pthread_join(mqtt_sub_thread_t, NULL); - pthread_join(reportThread, NULL); - - return 0; -}