From d24ea2ece932fca6d3ecd10c65849b369ad41e8e Mon Sep 17 00:00:00 2001 From: lyq Date: Mon, 12 Jan 2026 16:33:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E7=A6=BBmqtt=E5=92=8C=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86=EF=BC=9B=E5=A2=9E=E5=8A=A0mqtt?= =?UTF-8?q?=E6=96=AD=E7=BA=BF=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/communication/sub/include/sub/md5.h | 26 +- src/communication/task_manager/CMakeLists.txt | 6 +- .../include/task_manager/mqtt_manager.hpp | 99 ++++ .../include/task_manager/task_manager.hpp | 87 ++++ .../task_manager/task_manager_node.hpp | 8 +- .../task_manager/src/mqtt_manager.cpp | 274 ++++++++++ .../task_manager/src/task_manager.cpp | 272 ++++++++++ .../task_manager/src/task_manager_main.cpp | 478 ++++++++++++++++++ 8 files changed, 1228 insertions(+), 22 deletions(-) create mode 100644 src/communication/task_manager/include/task_manager/mqtt_manager.hpp create mode 100644 src/communication/task_manager/include/task_manager/task_manager.hpp create mode 100644 src/communication/task_manager/src/mqtt_manager.cpp create mode 100644 src/communication/task_manager/src/task_manager.cpp create mode 100644 src/communication/task_manager/src/task_manager_main.cpp diff --git a/src/communication/sub/include/sub/md5.h b/src/communication/sub/include/sub/md5.h index df10a8c..8fa6880 100644 --- a/src/communication/sub/include/sub/md5.h +++ b/src/communication/sub/include/sub/md5.h @@ -5,7 +5,7 @@ #include /* Type define */ -typedef unsigned char byte; +typedef unsigned char MD5_BYTE; typedef unsigned int uint32; using std::ifstream; @@ -22,30 +22,30 @@ public: void update(const void *input, size_t length); void update(const string &str); void update(ifstream &in); - const byte *digest(); + const MD5_BYTE *digest(); string toString(); void reset(); private: - void update(const byte *input, size_t length); + void update(const MD5_BYTE *input, size_t length); void final(); - void transform(const byte block[64]); - void encode(const uint32 *input, byte *output, size_t length); - void decode(const byte *input, uint32 *output, size_t length); - string bytesToHexString(const byte *input, size_t length); + void transform(const MD5_BYTE block[64]); + void encode(const uint32 *input, MD5_BYTE *output, size_t length); + void decode(const MD5_BYTE *input, uint32 *output, size_t length); + string bytesToHexString(const MD5_BYTE *input, size_t length); /* class uncopyable */ MD5(const MD5 &); MD5 &operator=(const MD5 &); private: - uint32 _state[4]; /* state (ABCD) */ - uint32 _count[2]; /* number of bits, modulo 2^64 (low-order word first) */ - byte _buffer[64]; /* input buffer */ - byte _digest[16]; /* message digest */ - bool _finished; /* calculate finished ? */ + uint32 _state[4]; /* state (ABCD) */ + uint32 _count[2]; /* number of bits, modulo 2^64 (low-order word first) */ + MD5_BYTE _buffer[64]; /* input buffer */ + MD5_BYTE _digest[16]; /* message digest */ + bool _finished; /* calculate finished ? */ - static const byte PADDING[64]; /* padding for calculate */ + static const MD5_BYTE PADDING[64]; /* padding for calculate */ static const char HEX[16]; enum { diff --git a/src/communication/task_manager/CMakeLists.txt b/src/communication/task_manager/CMakeLists.txt index f40a727..4a58552 100644 --- a/src/communication/task_manager/CMakeLists.txt +++ b/src/communication/task_manager/CMakeLists.txt @@ -32,8 +32,10 @@ include_directories( ${catkin_INCLUDE_DIRS} ) -add_executable(task_manager_node - src/task_manager_node.cpp +add_executable(task_manager_node + src/task_manager_main.cpp + src/task_manager.cpp + src/mqtt_manager.cpp src/jsoncpp.cpp src/md5.cpp ) diff --git a/src/communication/task_manager/include/task_manager/mqtt_manager.hpp b/src/communication/task_manager/include/task_manager/mqtt_manager.hpp new file mode 100644 index 0000000..e01eb83 --- /dev/null +++ b/src/communication/task_manager/include/task_manager/mqtt_manager.hpp @@ -0,0 +1,99 @@ +#ifndef __MQTT_MANAGER_H__ +#define __MQTT_MANAGER_H__ + +#include +#include +#include +#include +#include +#include +#include "MQTTClient.h" + +using namespace std; + +class MQTTManager +{ +public: + // 消息回调函数类型 + using MessageCallback = std::function; + using ConnLostCallback = std::function; + using DeliveredCallback = std::function; + + MQTTManager(); + ~MQTTManager(); + + // 初始化MQTT连接 + bool init(const string &address, const string &clientId, + const string &username, const string &password); + + // 订阅主题 + bool subscribe(const string &topic, int qos = 0); + + // 发布消息 + bool publish(const string &topic, const string &payload, int qos = 0); + + // 设置回调函数 + void setMessageCallback(MessageCallback cb) { message_callback_ = cb; } + void setConnLostCallback(ConnLostCallback cb) { conn_lost_callback_ = cb; } + void setDeliveredCallback(DeliveredCallback cb) { delivered_callback_ = cb; } + + // 获取连接状态 + bool isConnected(); + + // 启动MQTT循环线程 + void start(); + + // 停止MQTT + void stop(); + + // 设置重连参数 + void setReconnectInterval(int seconds) { reconnect_interval_ = seconds; } + void setMaxReconnectAttempts(int attempts) { max_reconnect_attempts_ = attempts; } + void setHeartbeatTimeout(int milliseconds) { heartbeat_timeout_ = milliseconds; } + +private: + // MQTT连接循环线程函数 + void mqttLoop(); + + // 重连逻辑 + bool reconnect(); + + // 内部MQTT回调(static函数) + static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message); + static void onConnectionLost(void *context, char *cause); + static void onDelivered(void *context, MQTTClient_deliveryToken dt); + + MQTTClient client_; + MQTTClient_connectOptions conn_opts_; + MQTTClient_deliveryToken token_; + + string address_; + string client_id_; + string username_; + string password_; + string sub_topic_; + + MessageCallback message_callback_; + ConnLostCallback conn_lost_callback_; + DeliveredCallback delivered_callback_; + + bool is_running_; + std::thread mqtt_thread_; + std::mutex client_mutex_; + + // 断线重连参数 + int reconnect_interval_; // 重连间隔(秒) + int max_reconnect_attempts_; // 最大重连次数,0为无限 + int reconnect_attempts_; + std::chrono::steady_clock::time_point last_reconnect_time_; + + // 心跳检测 + std::chrono::steady_clock::time_point last_message_time_; + int heartbeat_timeout_; // 心跳超时时间(毫秒) + bool is_heartbeat_lost_; + + constexpr static int TIMEOUT = 10000L; + constexpr static int QOS = 0; +}; + +#endif // __MQTT_MANAGER_H__ \ No newline at end of file diff --git a/src/communication/task_manager/include/task_manager/task_manager.hpp b/src/communication/task_manager/include/task_manager/task_manager.hpp new file mode 100644 index 0000000..e842179 --- /dev/null +++ b/src/communication/task_manager/include/task_manager/task_manager.hpp @@ -0,0 +1,87 @@ +#ifndef __TASK_MANAGER_H__ +#define __TASK_MANAGER_H__ + +#include +#include +#include +#include +#include +#include +#include +#include "task_manager_node.hpp" + +using namespace std; + +class TaskManager +{ +public: + TaskManager(); + ~TaskManager(); + + // 启动任务处理线程 + void start(); + + // 停止任务处理 + void stop(); + + // 添加任务到队列 + void addTask(const MQTT_JSON &mqtt_json); + + // 获取当前任务状态 + TaskStatus getCurrentTaskStatus(); + + // 获取当前任务ID + int getCurrentTaskId(); + + // 更新当前任务 + void updateCurrentTask(int taskId, TaskStatus status); + + // 获取任务队列大小 + int getQueueSize(); + +private: + // 任务处理线程函数 + void processTasksLoop(); + + // 处理单个start任务 + bool processStartTask(const JSON_DATA &json_data, long seqNo); + + // 处理单个stop任务 + bool processStopTask(long taskId, long seqNo); + + // 下载文件 + bool downloadFile(const string &url, const string &expected_md5, const string &filename); + + // 计算MD5 + string calculate_md5(const string &filename); + + // 复制文件 + void copyFileAndOverwrite(const string &sourceFilePath, const string &destinationFilePath); + + std::queue task_queue_; + std::mutex queue_mutex_; + std::condition_variable queue_cv_; + bool is_running_; + std::thread task_thread_; + + CurrentTask current_task_; + std::mutex task_mutex_; + + string destination_file_path_; + int task_status_; + + // 回调函数用于发送MQTT响应 + std::function send_response_callback_; + +public: + void setSendResponseCallback( + std::function cb) + { + send_response_callback_ = cb; + } + + int getTaskStatus() const { return task_status_; } + void setTaskStatus(int status) { task_status_ = status; } +}; + +#endif // __TASK_MANAGER_H__ \ No newline at end of file diff --git a/src/communication/task_manager/include/task_manager/task_manager_node.hpp b/src/communication/task_manager/include/task_manager/task_manager_node.hpp index a7bbe2a..c8cc51a 100644 --- a/src/communication/task_manager/include/task_manager/task_manager_node.hpp +++ b/src/communication/task_manager/include/task_manager/task_manager_node.hpp @@ -1,4 +1,3 @@ - #include #include #include @@ -16,6 +15,7 @@ #include #include #include + #ifndef __SUB_NODE_H__ #define __SUB_NODE_H__ using namespace std; @@ -89,10 +89,4 @@ struct CurrentTask CurrentTask() : id(0), status(PENDING) {} }; -// extern TASK_VALUE task_value; -// extern ROUTE_INFO routeInfo; - -// extern MQTT_JSON mqtt_json; -// extern ROUTE_INFO routeInfo; - #endif \ No newline at end of file diff --git a/src/communication/task_manager/src/mqtt_manager.cpp b/src/communication/task_manager/src/mqtt_manager.cpp new file mode 100644 index 0000000..9981fd1 --- /dev/null +++ b/src/communication/task_manager/src/mqtt_manager.cpp @@ -0,0 +1,274 @@ +#include "mqtt_manager.hpp" +#include +#include +#include + +MQTTManager::MQTTManager() + : client_(nullptr), is_running_(false), reconnect_interval_(5), + max_reconnect_attempts_(0), reconnect_attempts_(0), heartbeat_timeout_(500), + is_heartbeat_lost_(false) +{ + conn_opts_ = MQTTClient_connectOptions_initializer; +} + +MQTTManager::~MQTTManager() +{ + stop(); +} + +bool MQTTManager::init(const string &address, const string &clientId, + const string &username, const string &password) +{ + address_ = address; + client_id_ = clientId; + username_ = username; + password_ = password; + + int rc = MQTTClient_create(&client_, address_.c_str(), client_id_.c_str(), + MQTTCLIENT_PERSISTENCE_NONE, NULL); + if (rc != MQTTCLIENT_SUCCESS) + { + std::cerr << "Failed to create MQTT client, return code " << rc << std::endl; + return false; + } + + // 设置回调 + rc = MQTTClient_setCallbacks(client_, this, onConnectionLost, + onMessageArrived, onDelivered); + if (rc != MQTTCLIENT_SUCCESS) + { + std::cerr << "Failed to set MQTT callbacks, return code " << rc << std::endl; + MQTTClient_destroy(&client_); + return false; + } + + // 配置连接选项 + conn_opts_.keepAliveInterval = 20; + conn_opts_.cleansession = 1; + conn_opts_.username = username_.c_str(); + conn_opts_.password = password_.c_str(); + + // 初始化时间戳 + last_message_time_ = std::chrono::steady_clock::now(); + last_reconnect_time_ = std::chrono::steady_clock::now(); + + return true; +} + +bool MQTTManager::reconnect() +{ + std::lock_guard lock(client_mutex_); + + std::cout << "Attempting to reconnect to MQTT server..." << std::endl; + int rc = MQTTClient_connect(client_, &conn_opts_); + + if (rc != MQTTCLIENT_SUCCESS) + { + std::cerr << "Failed to reconnect, return code " << rc << std::endl; + reconnect_attempts_++; + + if (max_reconnect_attempts_ > 0 && reconnect_attempts_ >= max_reconnect_attempts_) + { + std::cerr << "Max reconnection attempts reached." << std::endl; + return false; + } + return false; + } + + std::cout << "Successfully reconnected to MQTT server." << std::endl; + reconnect_attempts_ = 0; + is_heartbeat_lost_ = false; + last_message_time_ = std::chrono::steady_clock::now(); + + return true; +} + +bool MQTTManager::subscribe(const string &topic, int qos) +{ + std::lock_guard lock(client_mutex_); + + if (!isConnected()) + { + std::cerr << "MQTT client is not connected." << std::endl; + return false; + } + + sub_topic_ = topic; + int rc = MQTTClient_subscribe(client_, topic.c_str(), qos); + if (rc != MQTTCLIENT_SUCCESS) + { + std::cerr << "Failed to subscribe, return code " << rc << std::endl; + return false; + } + + std::cout << "Successfully subscribed to topic: " << topic << std::endl; + return true; +} + +bool MQTTManager::publish(const string &topic, const string &payload, int qos) +{ + std::lock_guard lock(client_mutex_); + + if (!isConnected()) + { + std::cerr << "MQTT client is not connected." << std::endl; + return false; + } + + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = (void *)payload.c_str(); + pubmsg.payloadlen = payload.length(); + pubmsg.qos = qos; + pubmsg.retained = 0; + + int rc = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token_); + if (rc != MQTTCLIENT_SUCCESS) + { + std::cerr << "Failed to publish message, return code " << rc << std::endl; + return false; + } + + rc = MQTTClient_waitForCompletion(client_, token_, TIMEOUT); + if (rc != MQTTCLIENT_SUCCESS) + { + std::cerr << "Failed to wait for message completion, return code " << rc << std::endl; + return false; + } + + return true; +} + +bool MQTTManager::isConnected() +{ + return MQTTClient_isConnected(client_) == 1; +} + +void MQTTManager::start() +{ + if (is_running_) + { + std::cout << "MQTT manager is already running." << std::endl; + return; + } + + // 首先建立连接 + if (!reconnect()) + { + std::cerr << "Failed to establish initial MQTT connection." << std::endl; + return; + } + + is_running_ = true; + mqtt_thread_ = std::thread(&MQTTManager::mqttLoop, this); + std::cout << "MQTT manager started." << std::endl; +} + +void MQTTManager::stop() +{ + if (!is_running_) + return; + + is_running_ = false; + + if (mqtt_thread_.joinable()) + { + mqtt_thread_.join(); + } + + std::lock_guard lock(client_mutex_); + if (isConnected()) + { + MQTTClient_disconnect(client_, 10000); + } + MQTTClient_destroy(&client_); + + std::cout << "MQTT manager stopped." << std::endl; +} + +void MQTTManager::mqttLoop() +{ + while (is_running_) + { + // 检查连接状态 + if (!isConnected()) + { + auto now = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + now - last_reconnect_time_) + .count(); + + // 如果距离上次重连已过指定间隔,则尝试重连 + if (duration >= reconnect_interval_) + { + last_reconnect_time_ = now; + if (!reconnect()) + { + // 重连失败,继续等待 + is_heartbeat_lost_ = true; + } + } + } + else + { + // 连接正常,检查心跳 + auto now = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + now - last_message_time_) + .count(); + + if (duration > heartbeat_timeout_ && !is_heartbeat_lost_) + { + is_heartbeat_lost_ = true; + std::cout << "Heartbeat timeout: No message received in " + << heartbeat_timeout_ << "ms." << std::endl; + if (conn_lost_callback_) + { + conn_lost_callback_((char *)"Heartbeat timeout"); + } + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} + +int MQTTManager::onMessageArrived(void *context, char *topicName, int topicLen, + MQTTClient_message *message) +{ + MQTTManager *pThis = static_cast(context); + + // 更新最后消息时间 + pThis->last_message_time_ = std::chrono::steady_clock::now(); + pThis->is_heartbeat_lost_ = false; + + std::cout << "Message arrived on topic: " << topicName << std::endl; + + if (pThis->message_callback_) + { + return pThis->message_callback_(topicName, topicLen, message); + } + + return 1; +} + +void MQTTManager::onConnectionLost(void *context, char *cause) +{ + MQTTManager *pThis = static_cast(context); + std::cout << "Connection lost. Cause: " << (cause ? cause : "Unknown") << std::endl; + + if (pThis->conn_lost_callback_) + { + pThis->conn_lost_callback_(cause); + } +} + +void MQTTManager::onDelivered(void *context, MQTTClient_deliveryToken dt) +{ + MQTTManager *pThis = static_cast(context); + std::cout << "Message with token " << dt << " delivered." << std::endl; + + if (pThis->delivered_callback_) + { + pThis->delivered_callback_(dt); + } +} \ No newline at end of file diff --git a/src/communication/task_manager/src/task_manager.cpp b/src/communication/task_manager/src/task_manager.cpp new file mode 100644 index 0000000..ae127c5 --- /dev/null +++ b/src/communication/task_manager/src/task_manager.cpp @@ -0,0 +1,272 @@ +#include "task_manager.hpp" +#include "md5.h" +#include +#include +#include +#include + +TaskManager::TaskManager() + : is_running_(false), task_status_(0), + destination_file_path_("./gps_load_now.txt") +{ + current_task_.id = 0; + current_task_.status = TaskStatus::PENDING; +} + +TaskManager::~TaskManager() +{ + stop(); +} + +void TaskManager::start() +{ + if (is_running_) + { + std::cout << "TaskManager is already running." << std::endl; + return; + } + + is_running_ = true; + task_thread_ = std::thread(&TaskManager::processTasksLoop, this); + std::cout << "TaskManager started." << std::endl; +} + +void TaskManager::stop() +{ + if (!is_running_) + return; + + { + std::lock_guard lock(queue_mutex_); + is_running_ = false; + } + queue_cv_.notify_one(); + + if (task_thread_.joinable()) + { + task_thread_.join(); + } + + std::cout << "TaskManager stopped." << std::endl; +} + +void TaskManager::addTask(const MQTT_JSON &mqtt_json) +{ + { + std::lock_guard lock(queue_mutex_); + task_queue_.push(mqtt_json); + } + queue_cv_.notify_one(); +} + +int TaskManager::getQueueSize() +{ + std::lock_guard lock(queue_mutex_); + return task_queue_.size(); +} + +TaskStatus TaskManager::getCurrentTaskStatus() +{ + std::lock_guard lock(task_mutex_); + return current_task_.status; +} + +int TaskManager::getCurrentTaskId() +{ + std::lock_guard lock(task_mutex_); + return current_task_.id; +} + +void TaskManager::updateCurrentTask(int taskId, TaskStatus status) +{ + std::lock_guard lock(task_mutex_); + current_task_.id = taskId; + current_task_.status = status; + current_task_.lastUpdateTime = std::chrono::steady_clock::now(); + std::cout << "Task updated: ID=" << taskId << ", Status=" << status << std::endl; +} + +string TaskManager::calculate_md5(const string &filename) +{ + MD5 md5; + std::ifstream file; + file.open(filename, std::ios::binary); + if (!file) + { + std::cerr << "Failed to open file for MD5 calculation: " << filename << std::endl; + return ""; + } + md5.update(file); + return md5.toString(); +} + +bool TaskManager::downloadFile(const string &url, const string &expected_md5, + const string &filename) +{ + if (url.empty()) + { + std::cerr << "Download URL is empty." << std::endl; + return false; + } + + std::string command = "wget -O routes/" + filename + " \"" + url + "\""; + std::cout << "Executing command: " << command << std::endl; + + int result = system(command.c_str()); + if (result != 0) + { + std::cerr << "Download failed: " << url << std::endl; + return false; + } + + // 验证MD5 + std::string filepath = "routes/" + filename; + std::string actual_md5 = calculate_md5(filepath); + if (actual_md5 != expected_md5) + { + std::cerr << "MD5 verification failed. Expected: " << expected_md5 + << ", Actual: " << actual_md5 << std::endl; + remove(filepath.c_str()); + return false; + } + + std::cout << "File downloaded and verified: " << filepath << std::endl; + return true; +} + +void TaskManager::copyFileAndOverwrite(const string &sourceFilePath, + const string &destinationFilePath) +{ + std::ifstream src(sourceFilePath, std::ios::binary); + std::ofstream dst(destinationFilePath, std::ios::binary); + + if (!src) + { + std::cerr << "Failed to open source file: " << sourceFilePath << std::endl; + return; + } + + if (!dst) + { + std::cerr << "Failed to open destination file: " << destinationFilePath << std::endl; + return; + } + + dst << src.rdbuf(); + + if (!dst) + { + std::cerr << "Error while copying file." << std::endl; + } +} + +bool TaskManager::processStartTask(const JSON_DATA &json_data, long seqNo) +{ + std::cout << "Processing start task ID: " << json_data.value.id << std::endl; + std::cout << "URL: " << json_data.value.routeInfo.url << std::endl; + std::cout << "MD5: " << json_data.value.routeInfo.md5 << std::endl; + + // 更新当前任务ID + updateCurrentTask(json_data.value.id, TaskStatus::RUNNING); + + string downFileName = json_data.value.routeInfo.fileName; + string filepath = "routes/" + downFileName; + + // 检查文件是否已存在 + if (access(filepath.c_str(), F_OK) == -1) + { + std::cout << "File not found, downloading: " << downFileName << std::endl; + if (!downloadFile(json_data.value.routeInfo.url, + json_data.value.routeInfo.md5, downFileName)) + { + std::cout << "Download failed, skipping task." << std::endl; + updateCurrentTask(json_data.value.id, TaskStatus::FAILED); + if (send_response_callback_) + { + // 发送响应到状态主题(而不是task_response) + send_response_callback_("task_response", seqNo, 500, "Download failed"); + } + return false; + } + } + else + { + std::cout << "File already exists, skipping download: " << filepath << std::endl; + } + + // 复制文件 + copyFileAndOverwrite(filepath, destination_file_path_); + + // 启动任务 + task_status_ = 1; + updateCurrentTask(json_data.value.id, TaskStatus::RUNNING); + std::cout << "Task started successfully: " << json_data.value.id << std::endl; + + // 发送响应(响应主题由调用方决定) + if (send_response_callback_) + { + send_response_callback_("task_response", seqNo, 200, "Task started successfully"); + } + + return true; +} + +bool TaskManager::processStopTask(long taskId, long seqNo) +{ + std::cout << "Processing stop task ID: " << taskId << std::endl; + task_status_ = 0; + updateCurrentTask(taskId, TaskStatus::COMPLETED); + + if (send_response_callback_) + { + send_response_callback_("task_response", seqNo, 200, "Task stopped successfully"); + } + + return true; +} + +void TaskManager::processTasksLoop() +{ + while (true) + { + std::unique_lock lock(queue_mutex_); + + // 等待队列中有任务或线程需要停止 + queue_cv_.wait(lock, [this] + { return !task_queue_.empty() || !is_running_; }); + + // 检查是否需要退出 + if (!is_running_ && task_queue_.empty()) + { + break; + } + + if (!task_queue_.empty()) + { + MQTT_JSON mqtt_json = task_queue_.front(); + task_queue_.pop(); + lock.unlock(); + + try + { + if (mqtt_json.type == "request") + { + JSON_DATA json_data = boost::any_cast(mqtt_json.data); + if (json_data.command == "start") + { + processStartTask(json_data, mqtt_json.seqNo); + } + } + else if (mqtt_json.type == "stop") + { + long taskId = boost::any_cast(mqtt_json.data); + processStopTask(taskId, mqtt_json.seqNo); + } + } + catch (const boost::bad_any_cast &e) + { + std::cerr << "Bad any_cast in task processing: " << e.what() << std::endl; + } + } + } +} \ No newline at end of file diff --git a/src/communication/task_manager/src/task_manager_main.cpp b/src/communication/task_manager/src/task_manager_main.cpp new file mode 100644 index 0000000..187cd1b --- /dev/null +++ b/src/communication/task_manager/src/task_manager_main.cpp @@ -0,0 +1,478 @@ +#include "rclcpp/rclcpp.hpp" +#include "sweeper_interfaces/msg/task.hpp" +#include "mqtt_manager.hpp" +#include "task_manager.hpp" +#include "task_manager_node.hpp" +#include "json.h" +#include +#include +#include +#include + +using namespace std; + +// 全局变量 +volatile std::sig_atomic_t signal_received = 0; +MQTTManager mqtt_manager; +TaskManager task_manager; +int last_status = 0; +TaskStatus status_up = TaskStatus::PENDING; +std::thread status_report_thread; + +// 配置变量 +struct Config +{ + string mqtt_vid; + string mqtt_address; + string mqtt_username; + string mqtt_password; + string mqtt_topic_sub_task; + string mqtt_topic_push_status; +} config; + +// 前置声明 +void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg); +void sendTaskResponse(long seqNo, int code, const string &msg); +string generate_mqtt_client_id(); + +// 信号处理函数 +void signalHandler(int signum) +{ + std::cout << "Interrupt signal (" << signum << ") received." << std::endl; + signal_received = signum; + + if (signum == SIGINT && rclcpp::ok()) + { + std::cout << "Shutting down ROS2 node..." << std::endl; + rclcpp::shutdown(); + } +} + +// 生成MQTT客户端ID +string generate_mqtt_client_id() +{ + auto now = std::chrono::system_clock::now(); + auto millis = std::chrono::duration_cast( + now.time_since_epoch()) + .count(); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(1000, 9999); + int random_num = dis(gen); + + std::ostringstream oss; + oss << "client_" << millis << "_" << std::setw(4) << std::setfill('0') << random_num; + return oss.str(); +} + +// 从JSON解析ROUTE_INFO +ROUTE_INFO get_route_info(const Json::Value &root) +{ + ROUTE_INFO route_info; + if (root.isObject()) + { + if (root.isMember("routeName") && root["routeName"].isString()) + route_info.routeName = root["routeName"].asString(); + else + std::cerr << "Missing or invalid 'routeName' field" << std::endl; + + if (root.isMember("fileName") && root["fileName"].isString()) + route_info.fileName = root["fileName"].asString(); + else + std::cerr << "Missing or invalid 'fileName' field" << std::endl; + + if (root.isMember("url") && root["url"].isString()) + route_info.url = root["url"].asString(); + else + std::cerr << "Missing or invalid 'url' field" << std::endl; + + if (root.isMember("md5") && root["md5"].isString()) + route_info.md5 = root["md5"].asString(); + else + std::cerr << "Missing or invalid 'md5' field" << std::endl; + } + else + { + std::cerr << "routeInfo is not a valid JSON object" << std::endl; + } + + return route_info; +} + +// 从JSON解析TASK_VALUE +TASK_VALUE get_task_value(const Json::Value &root) +{ + TASK_VALUE task_value; + + if (root.isObject()) + { + if (root.isMember("id") && root["id"].isInt()) + task_value.id = root["id"].asInt(); + else + std::cerr << "Missing or invalid 'id' field" << std::endl; + + if (root.isMember("name") && root["name"].isString()) + task_value.name = root["name"].asString(); + else + std::cerr << "Missing or invalid 'name' field" << std::endl; + + if (root.isMember("mode") && root["mode"].isInt()) + task_value.mode = root["mode"].asInt(); + else + std::cerr << "Missing or invalid 'mode' field" << std::endl; + + if (root.isMember("count") && root["count"].isInt()) + task_value.count = root["count"].asInt(); + else + std::cerr << "Missing or invalid 'count' field" << std::endl; + + if (root.isMember("routeInfo") && root["routeInfo"].isObject()) + task_value.routeInfo = get_route_info(root["routeInfo"]); + else + std::cerr << "Missing or invalid 'routeInfo' field" << std::endl; + } + else + { + std::cerr << "task value is not a valid JSON object" << std::endl; + } + + return task_value; +} + +// 加载配置文件 +bool loadConfig(const string &config_file) +{ + Json::Reader reader; + Json::Value root; + std::ifstream in(config_file, std::ios::binary); + + if (!in.is_open()) + { + std::cout << "Failed to read config file: " << config_file << std::endl; + return false; + } + + if (!reader.parse(in, root)) + { + std::cout << "Failed to parse config file." << std::endl; + return false; + } + + config.mqtt_vid = root["mqtt"]["vid"].asString(); + config.mqtt_username = root["mqtt"]["mqtt_user"].asString(); + config.mqtt_password = root["mqtt"]["mqtt_password"].asString(); + config.mqtt_topic_sub_task = root["mqtt"]["mqtt_topic_sub_task"].asString(); + config.mqtt_topic_push_status = root["mqtt"]["mqtt_topic_push_status"].asString(); + + string external_addr = root["mqtt"]["external_net_address"].asString(); + int external_port = root["mqtt"]["external_net_port"].asInt(); + config.mqtt_address = external_addr + ":" + std::to_string(external_port); + + in.close(); + + std::cout << "Config loaded successfully." << std::endl; + std::cout << "MQTT Address: " << config.mqtt_address << std::endl; + std::cout << "Topic Sub: " << config.mqtt_topic_sub_task << std::endl; + std::cout << "Topic Pub: " << config.mqtt_topic_push_status << std::endl; + + return true; +} + +// 发送通用应答消息(触发式) +void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg) +{ + Json::Value responseData; + responseData["type"] = "response"; + responseData["seqNo"] = seqNo; + + Json::Value data; + data["code"] = code; + data["msg"] = msg; + + responseData["data"] = data; + + Json::FastWriter writer; + string responseJson = writer.write(responseData); + + std::cout << "[General Response] seqNo=" << seqNo << ", code=" << code + << ", msg=" << msg << std::endl; + std::cout << "Response JSON: " << responseJson << std::endl; + + bool success = mqtt_manager.publish(topic, responseJson, 0); + std::cout << "General response publish to [" << topic << "] " + << (success ? "[OK]" : "[FAILED]") << std::endl; +} + +// 发送任务专用应答(包含任务信息) +void sendTaskResponse(long seqNo, int code, const string &msg) +{ + Json::Value responseData; + responseData["type"] = "response"; + responseData["seqNo"] = seqNo; + + Json::Value data; + data["code"] = code; + data["msg"] = msg; + + int current_task_id = task_manager.getCurrentTaskId(); + TaskStatus current_status = task_manager.getCurrentTaskStatus(); + + data["taskId"] = current_task_id; + data["taskStatus"] = static_cast(current_status); + + responseData["data"] = data; + + Json::FastWriter writer; + string responseJson = writer.write(responseData); + + std::cout << "[Task Response] seqNo=" << seqNo << ", code=" << code + << ", taskId=" << current_task_id << ", taskStatus=" << (int)current_status << std::endl; + std::cout << "Response JSON: " << responseJson << std::endl; + + bool success = mqtt_manager.publish(config.mqtt_topic_sub_task, responseJson, 0); + std::cout << "Task response publish to [" << config.mqtt_topic_sub_task << "] " + << (success ? "[OK]" : "[FAILED]") << std::endl; +} + +// MQTT消息回调 +int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message) +{ + const int BUFFER_SIZE = 4096; + char *buffer = new char[BUFFER_SIZE]; + memset(buffer, 0, BUFFER_SIZE); + + size_t copy_size = std::min((size_t)message->payloadlen, (size_t)(BUFFER_SIZE - 1)); + memcpy(buffer, message->payload, copy_size); + + Json::Reader reader; + Json::Value root; + + if (!reader.parse(buffer, root)) + { + std::cout << "Failed to parse JSON from MQTT message." << std::endl; + delete[] buffer; + return 1; + } + + std::cout << "MQTT message received on topic: " << topicName << std::endl; + + if (root.isMember("type") && root["type"].asString() == "request") + { + long seqNo = 0; + if (root.isMember("seqNo")) + { + seqNo = root["seqNo"].asInt64(); + } + + // 立即发送通用应答 + sendGeneralResponse(string(topicName), seqNo, 200, "Message received"); + + Json::Value data = root["data"]; + + JSON_DATA json_data; + if (data.isMember("command") && data["command"].isString()) + { + json_data.command = data["command"].asString(); + + if (json_data.command == "start") + { + try + { + json_data.value = get_task_value(data["value"]); + + MQTT_JSON mqtt_json; + mqtt_json.type = "request"; + mqtt_json.seqNo = seqNo; + mqtt_json.data = json_data; + + std::cout << "Start task: " << json_data.value.id << std::endl; + task_manager.addTask(mqtt_json); + } + catch (const std::exception &e) + { + std::cerr << "Error processing start command: " << e.what() << std::endl; + sendTaskResponse(seqNo, 400, "Failed to process start command"); + } + } + else if (json_data.command == "stop") + { + try + { + long stop_id = data["value"].asInt(); + + MQTT_JSON mqtt_json; + mqtt_json.type = "stop"; + mqtt_json.seqNo = seqNo; + mqtt_json.data = stop_id; + + std::cout << "Stop task: " << stop_id << std::endl; + task_manager.addTask(mqtt_json); + + sendTaskResponse(seqNo, 200, "Task stop command received"); + } + catch (const std::exception &e) + { + std::cerr << "Error processing stop command: " << e.what() << std::endl; + sendTaskResponse(seqNo, 400, "Failed to process stop command"); + } + } + else + { + sendGeneralResponse(string(topicName), seqNo, 400, "Unknown command: " + json_data.command); + } + } + else + { + sendGeneralResponse(string(topicName), seqNo, 400, "Missing command field"); + } + } + else if (root.isMember("type") && root["type"].asString() == "noReply") + { + std::cout << "NoReply message received" << std::endl; + } + else + { + std::cout << "Unknown message type" << std::endl; + } + + delete[] buffer; + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +} + +// 周期性上报任务状态到MQTT(200ms间隔持续上报) +void reportTaskStatusToMqtt() +{ + std::cout << "DEBUG: Status report thread started" << std::endl; + + while (rclcpp::ok() && !signal_received) + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + int current_id = task_manager.getCurrentTaskId(); + TaskStatus current_status = task_manager.getCurrentTaskStatus(); + + if (current_id > 0) + { + Json::Value statusMsg; + statusMsg["id"] = current_id; + statusMsg["status"] = static_cast(current_status); + + Json::FastWriter writer; + string statusJson = writer.write(statusMsg); + + mqtt_manager.publish(config.mqtt_topic_push_status, statusJson, 0); + } + } + + std::cout << "DEBUG: Status report thread exiting" << std::endl; +} + +// ROS2节点类 +class TaskManagerNode : public rclcpp::Node +{ +public: + TaskManagerNode(string name) : Node(name) + { + RCLCPP_INFO(this->get_logger(), "Node %s started.", name.c_str()); + msg_publisher_ = this->create_publisher( + "task_message/download", 10); + task_subscribe_ = this->create_subscription( + "task_message/upload", 10, + std::bind(&TaskManagerNode::task_callback, this, std::placeholders::_1)); + + timer_ = this->create_wall_timer(std::chrono::milliseconds(500), + std::bind(&TaskManagerNode::timer_callback, this)); + } + +private: + void timer_callback() + { + sweeper_interfaces::msg::Task msg; + msg.task_status = task_manager.getTaskStatus(); + + if (msg.task_status != last_status) + { + msg_publisher_->publish(msg); + RCLCPP_INFO(this->get_logger(), "Task status published: %d", msg.task_status); + } + + last_status = msg.task_status; + } + + void task_callback(const sweeper_interfaces::msg::Task::SharedPtr msg) + { + if (msg->task_status >= TaskStatus::PENDING && msg->task_status <= TaskStatus::FAILED) + { + status_up = static_cast(msg->task_status); + } + else + { + RCLCPP_WARN(this->get_logger(), "Invalid task status: %d", msg->task_status); + } + } + + rclcpp::TimerBase::SharedPtr timer_; + rclcpp::Publisher::SharedPtr msg_publisher_; + rclcpp::Subscription::SharedPtr task_subscribe_; +}; + +int main(int argc, char **argv) +{ + signal(SIGINT, signalHandler); + + if (!loadConfig("config.json")) + { + std::cerr << "Failed to load configuration." << std::endl; + return 1; + } + + string client_id = generate_mqtt_client_id(); + if (!mqtt_manager.init(config.mqtt_address, client_id, + config.mqtt_username, config.mqtt_password)) + { + std::cerr << "Failed to initialize MQTT manager." << std::endl; + return 1; + } + + mqtt_manager.setHeartbeatTimeout(2000); + mqtt_manager.setReconnectInterval(5); + mqtt_manager.setMaxReconnectAttempts(0); + + mqtt_manager.setMessageCallback(handleMqttMessage); + + mqtt_manager.start(); + mqtt_manager.subscribe(config.mqtt_topic_sub_task, 0); + + task_manager.setSendResponseCallback([](const string &topic, int seqNo, int code, const string &msg) + { sendTaskResponse(seqNo, code, msg); }); + task_manager.start(); + + rclcpp::init(argc, argv); + auto node = std::make_shared("task_manager_node"); + + std::cout << "Starting status report thread..." << std::endl; + status_report_thread = std::thread(reportTaskStatusToMqtt); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + + while (rclcpp::ok() && !signal_received) + { + executor.spin_some(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::cout << "Shutting down components..." << std::endl; + task_manager.stop(); + mqtt_manager.stop(); + + if (status_report_thread.joinable()) + { + status_report_thread.join(); + } + + return 0; +} \ No newline at end of file