分离mqtt和任务管理;增加mqtt断线重连

This commit is contained in:
lyq 2026-01-12 16:33:13 +08:00
parent d69cc29ec9
commit d24ea2ece9
8 changed files with 1228 additions and 22 deletions

View File

@ -5,7 +5,7 @@
#include <fstream>
/* Type define */
typedef unsigned char byte;
typedef unsigned char MD5_BYTE;
typedef unsigned int uint32;
using std::ifstream;
@ -22,30 +22,30 @@ public:
void update(const void *input, size_t length);
void update(const string &str);
void update(ifstream &in);
const byte *digest();
const MD5_BYTE *digest();
string toString();
void reset();
private:
void update(const byte *input, size_t length);
void update(const MD5_BYTE *input, size_t length);
void final();
void transform(const byte block[64]);
void encode(const uint32 *input, byte *output, size_t length);
void decode(const byte *input, uint32 *output, size_t length);
string bytesToHexString(const byte *input, size_t length);
void transform(const MD5_BYTE block[64]);
void encode(const uint32 *input, MD5_BYTE *output, size_t length);
void decode(const MD5_BYTE *input, uint32 *output, size_t length);
string bytesToHexString(const MD5_BYTE *input, size_t length);
/* class uncopyable */
MD5(const MD5 &);
MD5 &operator=(const MD5 &);
private:
uint32 _state[4]; /* state (ABCD) */
uint32 _count[2]; /* number of bits, modulo 2^64 (low-order word first) */
byte _buffer[64]; /* input buffer */
byte _digest[16]; /* message digest */
bool _finished; /* calculate finished ? */
uint32 _state[4]; /* state (ABCD) */
uint32 _count[2]; /* number of bits, modulo 2^64 (low-order word first) */
MD5_BYTE _buffer[64]; /* input buffer */
MD5_BYTE _digest[16]; /* message digest */
bool _finished; /* calculate finished ? */
static const byte PADDING[64]; /* padding for calculate */
static const MD5_BYTE PADDING[64]; /* padding for calculate */
static const char HEX[16];
enum
{

View File

@ -32,8 +32,10 @@ include_directories(
${catkin_INCLUDE_DIRS}
)
add_executable(task_manager_node
src/task_manager_node.cpp
add_executable(task_manager_node
src/task_manager_main.cpp
src/task_manager.cpp
src/mqtt_manager.cpp
src/jsoncpp.cpp
src/md5.cpp
)

View File

@ -0,0 +1,99 @@
#ifndef __MQTT_MANAGER_H__
#define __MQTT_MANAGER_H__
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <functional>
#include <iostream>
#include "MQTTClient.h"
using namespace std;
class MQTTManager
{
public:
// 消息回调函数类型
using MessageCallback = std::function<int(char *topicName, int topicLen, MQTTClient_message *message)>;
using ConnLostCallback = std::function<void(char *cause)>;
using DeliveredCallback = std::function<void(MQTTClient_deliveryToken dt)>;
MQTTManager();
~MQTTManager();
// 初始化MQTT连接
bool init(const string &address, const string &clientId,
const string &username, const string &password);
// 订阅主题
bool subscribe(const string &topic, int qos = 0);
// 发布消息
bool publish(const string &topic, const string &payload, int qos = 0);
// 设置回调函数
void setMessageCallback(MessageCallback cb) { message_callback_ = cb; }
void setConnLostCallback(ConnLostCallback cb) { conn_lost_callback_ = cb; }
void setDeliveredCallback(DeliveredCallback cb) { delivered_callback_ = cb; }
// 获取连接状态
bool isConnected();
// 启动MQTT循环线程
void start();
// 停止MQTT
void stop();
// 设置重连参数
void setReconnectInterval(int seconds) { reconnect_interval_ = seconds; }
void setMaxReconnectAttempts(int attempts) { max_reconnect_attempts_ = attempts; }
void setHeartbeatTimeout(int milliseconds) { heartbeat_timeout_ = milliseconds; }
private:
// MQTT连接循环线程函数
void mqttLoop();
// 重连逻辑
bool reconnect();
// 内部MQTT回调static函数
static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message);
static void onConnectionLost(void *context, char *cause);
static void onDelivered(void *context, MQTTClient_deliveryToken dt);
MQTTClient client_;
MQTTClient_connectOptions conn_opts_;
MQTTClient_deliveryToken token_;
string address_;
string client_id_;
string username_;
string password_;
string sub_topic_;
MessageCallback message_callback_;
ConnLostCallback conn_lost_callback_;
DeliveredCallback delivered_callback_;
bool is_running_;
std::thread mqtt_thread_;
std::mutex client_mutex_;
// 断线重连参数
int reconnect_interval_; // 重连间隔(秒)
int max_reconnect_attempts_; // 最大重连次数0为无限
int reconnect_attempts_;
std::chrono::steady_clock::time_point last_reconnect_time_;
// 心跳检测
std::chrono::steady_clock::time_point last_message_time_;
int heartbeat_timeout_; // 心跳超时时间(毫秒)
bool is_heartbeat_lost_;
constexpr static int TIMEOUT = 10000L;
constexpr static int QOS = 0;
};
#endif // __MQTT_MANAGER_H__

View File

@ -0,0 +1,87 @@
#ifndef __TASK_MANAGER_H__
#define __TASK_MANAGER_H__
#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <boost/any.hpp>
#include "task_manager_node.hpp"
using namespace std;
class TaskManager
{
public:
TaskManager();
~TaskManager();
// 启动任务处理线程
void start();
// 停止任务处理
void stop();
// 添加任务到队列
void addTask(const MQTT_JSON &mqtt_json);
// 获取当前任务状态
TaskStatus getCurrentTaskStatus();
// 获取当前任务ID
int getCurrentTaskId();
// 更新当前任务
void updateCurrentTask(int taskId, TaskStatus status);
// 获取任务队列大小
int getQueueSize();
private:
// 任务处理线程函数
void processTasksLoop();
// 处理单个start任务
bool processStartTask(const JSON_DATA &json_data, long seqNo);
// 处理单个stop任务
bool processStopTask(long taskId, long seqNo);
// 下载文件
bool downloadFile(const string &url, const string &expected_md5, const string &filename);
// 计算MD5
string calculate_md5(const string &filename);
// 复制文件
void copyFileAndOverwrite(const string &sourceFilePath, const string &destinationFilePath);
std::queue<MQTT_JSON> task_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_cv_;
bool is_running_;
std::thread task_thread_;
CurrentTask current_task_;
std::mutex task_mutex_;
string destination_file_path_;
int task_status_;
// 回调函数用于发送MQTT响应
std::function<void(const string &topic, int seqNo, int code, const string &msg)> send_response_callback_;
public:
void setSendResponseCallback(
std::function<void(const string &topic, int seqNo, int code, const string &msg)> cb)
{
send_response_callback_ = cb;
}
int getTaskStatus() const { return task_status_; }
void setTaskStatus(int status) { task_status_ = status; }
};
#endif // __TASK_MANAGER_H__

View File

@ -1,4 +1,3 @@
#include <string>
#include <boost/any.hpp>
#include <curl/curl.h>
@ -16,6 +15,7 @@
#include <iomanip>
#include <pthread.h>
#include <cstdlib>
#ifndef __SUB_NODE_H__
#define __SUB_NODE_H__
using namespace std;
@ -89,10 +89,4 @@ struct CurrentTask
CurrentTask() : id(0), status(PENDING) {}
};
// extern TASK_VALUE task_value;
// extern ROUTE_INFO routeInfo;
// extern MQTT_JSON mqtt_json;
// extern ROUTE_INFO routeInfo;
#endif

View File

@ -0,0 +1,274 @@
#include "mqtt_manager.hpp"
#include <iostream>
#include <iomanip>
#include <memory>
MQTTManager::MQTTManager()
: client_(nullptr), is_running_(false), reconnect_interval_(5),
max_reconnect_attempts_(0), reconnect_attempts_(0), heartbeat_timeout_(500),
is_heartbeat_lost_(false)
{
conn_opts_ = MQTTClient_connectOptions_initializer;
}
MQTTManager::~MQTTManager()
{
stop();
}
bool MQTTManager::init(const string &address, const string &clientId,
const string &username, const string &password)
{
address_ = address;
client_id_ = clientId;
username_ = username;
password_ = password;
int rc = MQTTClient_create(&client_, address_.c_str(), client_id_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to create MQTT client, return code " << rc << std::endl;
return false;
}
// 设置回调
rc = MQTTClient_setCallbacks(client_, this, onConnectionLost,
onMessageArrived, onDelivered);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to set MQTT callbacks, return code " << rc << std::endl;
MQTTClient_destroy(&client_);
return false;
}
// 配置连接选项
conn_opts_.keepAliveInterval = 20;
conn_opts_.cleansession = 1;
conn_opts_.username = username_.c_str();
conn_opts_.password = password_.c_str();
// 初始化时间戳
last_message_time_ = std::chrono::steady_clock::now();
last_reconnect_time_ = std::chrono::steady_clock::now();
return true;
}
bool MQTTManager::reconnect()
{
std::lock_guard<std::mutex> lock(client_mutex_);
std::cout << "Attempting to reconnect to MQTT server..." << std::endl;
int rc = MQTTClient_connect(client_, &conn_opts_);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to reconnect, return code " << rc << std::endl;
reconnect_attempts_++;
if (max_reconnect_attempts_ > 0 && reconnect_attempts_ >= max_reconnect_attempts_)
{
std::cerr << "Max reconnection attempts reached." << std::endl;
return false;
}
return false;
}
std::cout << "Successfully reconnected to MQTT server." << std::endl;
reconnect_attempts_ = 0;
is_heartbeat_lost_ = false;
last_message_time_ = std::chrono::steady_clock::now();
return true;
}
bool MQTTManager::subscribe(const string &topic, int qos)
{
std::lock_guard<std::mutex> lock(client_mutex_);
if (!isConnected())
{
std::cerr << "MQTT client is not connected." << std::endl;
return false;
}
sub_topic_ = topic;
int rc = MQTTClient_subscribe(client_, topic.c_str(), qos);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to subscribe, return code " << rc << std::endl;
return false;
}
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
return true;
}
bool MQTTManager::publish(const string &topic, const string &payload, int qos)
{
std::lock_guard<std::mutex> lock(client_mutex_);
if (!isConnected())
{
std::cerr << "MQTT client is not connected." << std::endl;
return false;
}
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = (void *)payload.c_str();
pubmsg.payloadlen = payload.length();
pubmsg.qos = qos;
pubmsg.retained = 0;
int rc = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token_);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to publish message, return code " << rc << std::endl;
return false;
}
rc = MQTTClient_waitForCompletion(client_, token_, TIMEOUT);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to wait for message completion, return code " << rc << std::endl;
return false;
}
return true;
}
bool MQTTManager::isConnected()
{
return MQTTClient_isConnected(client_) == 1;
}
void MQTTManager::start()
{
if (is_running_)
{
std::cout << "MQTT manager is already running." << std::endl;
return;
}
// 首先建立连接
if (!reconnect())
{
std::cerr << "Failed to establish initial MQTT connection." << std::endl;
return;
}
is_running_ = true;
mqtt_thread_ = std::thread(&MQTTManager::mqttLoop, this);
std::cout << "MQTT manager started." << std::endl;
}
void MQTTManager::stop()
{
if (!is_running_)
return;
is_running_ = false;
if (mqtt_thread_.joinable())
{
mqtt_thread_.join();
}
std::lock_guard<std::mutex> lock(client_mutex_);
if (isConnected())
{
MQTTClient_disconnect(client_, 10000);
}
MQTTClient_destroy(&client_);
std::cout << "MQTT manager stopped." << std::endl;
}
void MQTTManager::mqttLoop()
{
while (is_running_)
{
// 检查连接状态
if (!isConnected())
{
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
now - last_reconnect_time_)
.count();
// 如果距离上次重连已过指定间隔,则尝试重连
if (duration >= reconnect_interval_)
{
last_reconnect_time_ = now;
if (!reconnect())
{
// 重连失败,继续等待
is_heartbeat_lost_ = true;
}
}
}
else
{
// 连接正常,检查心跳
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_message_time_)
.count();
if (duration > heartbeat_timeout_ && !is_heartbeat_lost_)
{
is_heartbeat_lost_ = true;
std::cout << "Heartbeat timeout: No message received in "
<< heartbeat_timeout_ << "ms." << std::endl;
if (conn_lost_callback_)
{
conn_lost_callback_((char *)"Heartbeat timeout");
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int MQTTManager::onMessageArrived(void *context, char *topicName, int topicLen,
MQTTClient_message *message)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
// 更新最后消息时间
pThis->last_message_time_ = std::chrono::steady_clock::now();
pThis->is_heartbeat_lost_ = false;
std::cout << "Message arrived on topic: " << topicName << std::endl;
if (pThis->message_callback_)
{
return pThis->message_callback_(topicName, topicLen, message);
}
return 1;
}
void MQTTManager::onConnectionLost(void *context, char *cause)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
std::cout << "Connection lost. Cause: " << (cause ? cause : "Unknown") << std::endl;
if (pThis->conn_lost_callback_)
{
pThis->conn_lost_callback_(cause);
}
}
void MQTTManager::onDelivered(void *context, MQTTClient_deliveryToken dt)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
std::cout << "Message with token " << dt << " delivered." << std::endl;
if (pThis->delivered_callback_)
{
pThis->delivered_callback_(dt);
}
}

View File

@ -0,0 +1,272 @@
#include "task_manager.hpp"
#include "md5.h"
#include <iostream>
#include <fstream>
#include <cstdlib>
#include <unistd.h>
TaskManager::TaskManager()
: is_running_(false), task_status_(0),
destination_file_path_("./gps_load_now.txt")
{
current_task_.id = 0;
current_task_.status = TaskStatus::PENDING;
}
TaskManager::~TaskManager()
{
stop();
}
void TaskManager::start()
{
if (is_running_)
{
std::cout << "TaskManager is already running." << std::endl;
return;
}
is_running_ = true;
task_thread_ = std::thread(&TaskManager::processTasksLoop, this);
std::cout << "TaskManager started." << std::endl;
}
void TaskManager::stop()
{
if (!is_running_)
return;
{
std::lock_guard<std::mutex> lock(queue_mutex_);
is_running_ = false;
}
queue_cv_.notify_one();
if (task_thread_.joinable())
{
task_thread_.join();
}
std::cout << "TaskManager stopped." << std::endl;
}
void TaskManager::addTask(const MQTT_JSON &mqtt_json)
{
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.push(mqtt_json);
}
queue_cv_.notify_one();
}
int TaskManager::getQueueSize()
{
std::lock_guard<std::mutex> lock(queue_mutex_);
return task_queue_.size();
}
TaskStatus TaskManager::getCurrentTaskStatus()
{
std::lock_guard<std::mutex> lock(task_mutex_);
return current_task_.status;
}
int TaskManager::getCurrentTaskId()
{
std::lock_guard<std::mutex> lock(task_mutex_);
return current_task_.id;
}
void TaskManager::updateCurrentTask(int taskId, TaskStatus status)
{
std::lock_guard<std::mutex> lock(task_mutex_);
current_task_.id = taskId;
current_task_.status = status;
current_task_.lastUpdateTime = std::chrono::steady_clock::now();
std::cout << "Task updated: ID=" << taskId << ", Status=" << status << std::endl;
}
string TaskManager::calculate_md5(const string &filename)
{
MD5 md5;
std::ifstream file;
file.open(filename, std::ios::binary);
if (!file)
{
std::cerr << "Failed to open file for MD5 calculation: " << filename << std::endl;
return "";
}
md5.update(file);
return md5.toString();
}
bool TaskManager::downloadFile(const string &url, const string &expected_md5,
const string &filename)
{
if (url.empty())
{
std::cerr << "Download URL is empty." << std::endl;
return false;
}
std::string command = "wget -O routes/" + filename + " \"" + url + "\"";
std::cout << "Executing command: " << command << std::endl;
int result = system(command.c_str());
if (result != 0)
{
std::cerr << "Download failed: " << url << std::endl;
return false;
}
// 验证MD5
std::string filepath = "routes/" + filename;
std::string actual_md5 = calculate_md5(filepath);
if (actual_md5 != expected_md5)
{
std::cerr << "MD5 verification failed. Expected: " << expected_md5
<< ", Actual: " << actual_md5 << std::endl;
remove(filepath.c_str());
return false;
}
std::cout << "File downloaded and verified: " << filepath << std::endl;
return true;
}
void TaskManager::copyFileAndOverwrite(const string &sourceFilePath,
const string &destinationFilePath)
{
std::ifstream src(sourceFilePath, std::ios::binary);
std::ofstream dst(destinationFilePath, std::ios::binary);
if (!src)
{
std::cerr << "Failed to open source file: " << sourceFilePath << std::endl;
return;
}
if (!dst)
{
std::cerr << "Failed to open destination file: " << destinationFilePath << std::endl;
return;
}
dst << src.rdbuf();
if (!dst)
{
std::cerr << "Error while copying file." << std::endl;
}
}
bool TaskManager::processStartTask(const JSON_DATA &json_data, long seqNo)
{
std::cout << "Processing start task ID: " << json_data.value.id << std::endl;
std::cout << "URL: " << json_data.value.routeInfo.url << std::endl;
std::cout << "MD5: " << json_data.value.routeInfo.md5 << std::endl;
// 更新当前任务ID
updateCurrentTask(json_data.value.id, TaskStatus::RUNNING);
string downFileName = json_data.value.routeInfo.fileName;
string filepath = "routes/" + downFileName;
// 检查文件是否已存在
if (access(filepath.c_str(), F_OK) == -1)
{
std::cout << "File not found, downloading: " << downFileName << std::endl;
if (!downloadFile(json_data.value.routeInfo.url,
json_data.value.routeInfo.md5, downFileName))
{
std::cout << "Download failed, skipping task." << std::endl;
updateCurrentTask(json_data.value.id, TaskStatus::FAILED);
if (send_response_callback_)
{
// 发送响应到状态主题而不是task_response
send_response_callback_("task_response", seqNo, 500, "Download failed");
}
return false;
}
}
else
{
std::cout << "File already exists, skipping download: " << filepath << std::endl;
}
// 复制文件
copyFileAndOverwrite(filepath, destination_file_path_);
// 启动任务
task_status_ = 1;
updateCurrentTask(json_data.value.id, TaskStatus::RUNNING);
std::cout << "Task started successfully: " << json_data.value.id << std::endl;
// 发送响应(响应主题由调用方决定)
if (send_response_callback_)
{
send_response_callback_("task_response", seqNo, 200, "Task started successfully");
}
return true;
}
bool TaskManager::processStopTask(long taskId, long seqNo)
{
std::cout << "Processing stop task ID: " << taskId << std::endl;
task_status_ = 0;
updateCurrentTask(taskId, TaskStatus::COMPLETED);
if (send_response_callback_)
{
send_response_callback_("task_response", seqNo, 200, "Task stopped successfully");
}
return true;
}
void TaskManager::processTasksLoop()
{
while (true)
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// 等待队列中有任务或线程需要停止
queue_cv_.wait(lock, [this]
{ return !task_queue_.empty() || !is_running_; });
// 检查是否需要退出
if (!is_running_ && task_queue_.empty())
{
break;
}
if (!task_queue_.empty())
{
MQTT_JSON mqtt_json = task_queue_.front();
task_queue_.pop();
lock.unlock();
try
{
if (mqtt_json.type == "request")
{
JSON_DATA json_data = boost::any_cast<JSON_DATA>(mqtt_json.data);
if (json_data.command == "start")
{
processStartTask(json_data, mqtt_json.seqNo);
}
}
else if (mqtt_json.type == "stop")
{
long taskId = boost::any_cast<long>(mqtt_json.data);
processStopTask(taskId, mqtt_json.seqNo);
}
}
catch (const boost::bad_any_cast &e)
{
std::cerr << "Bad any_cast in task processing: " << e.what() << std::endl;
}
}
}
}

View File

@ -0,0 +1,478 @@
#include "rclcpp/rclcpp.hpp"
#include "sweeper_interfaces/msg/task.hpp"
#include "mqtt_manager.hpp"
#include "task_manager.hpp"
#include "task_manager_node.hpp"
#include "json.h"
#include <signal.h>
#include <fstream>
#include <random>
#include <iomanip>
using namespace std;
// 全局变量
volatile std::sig_atomic_t signal_received = 0;
MQTTManager mqtt_manager;
TaskManager task_manager;
int last_status = 0;
TaskStatus status_up = TaskStatus::PENDING;
std::thread status_report_thread;
// 配置变量
struct Config
{
string mqtt_vid;
string mqtt_address;
string mqtt_username;
string mqtt_password;
string mqtt_topic_sub_task;
string mqtt_topic_push_status;
} config;
// 前置声明
void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg);
void sendTaskResponse(long seqNo, int code, const string &msg);
string generate_mqtt_client_id();
// 信号处理函数
void signalHandler(int signum)
{
std::cout << "Interrupt signal (" << signum << ") received." << std::endl;
signal_received = signum;
if (signum == SIGINT && rclcpp::ok())
{
std::cout << "Shutting down ROS2 node..." << std::endl;
rclcpp::shutdown();
}
}
// 生成MQTT客户端ID
string generate_mqtt_client_id()
{
auto now = std::chrono::system_clock::now();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch())
.count();
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1000, 9999);
int random_num = dis(gen);
std::ostringstream oss;
oss << "client_" << millis << "_" << std::setw(4) << std::setfill('0') << random_num;
return oss.str();
}
// 从JSON解析ROUTE_INFO
ROUTE_INFO get_route_info(const Json::Value &root)
{
ROUTE_INFO route_info;
if (root.isObject())
{
if (root.isMember("routeName") && root["routeName"].isString())
route_info.routeName = root["routeName"].asString();
else
std::cerr << "Missing or invalid 'routeName' field" << std::endl;
if (root.isMember("fileName") && root["fileName"].isString())
route_info.fileName = root["fileName"].asString();
else
std::cerr << "Missing or invalid 'fileName' field" << std::endl;
if (root.isMember("url") && root["url"].isString())
route_info.url = root["url"].asString();
else
std::cerr << "Missing or invalid 'url' field" << std::endl;
if (root.isMember("md5") && root["md5"].isString())
route_info.md5 = root["md5"].asString();
else
std::cerr << "Missing or invalid 'md5' field" << std::endl;
}
else
{
std::cerr << "routeInfo is not a valid JSON object" << std::endl;
}
return route_info;
}
// 从JSON解析TASK_VALUE
TASK_VALUE get_task_value(const Json::Value &root)
{
TASK_VALUE task_value;
if (root.isObject())
{
if (root.isMember("id") && root["id"].isInt())
task_value.id = root["id"].asInt();
else
std::cerr << "Missing or invalid 'id' field" << std::endl;
if (root.isMember("name") && root["name"].isString())
task_value.name = root["name"].asString();
else
std::cerr << "Missing or invalid 'name' field" << std::endl;
if (root.isMember("mode") && root["mode"].isInt())
task_value.mode = root["mode"].asInt();
else
std::cerr << "Missing or invalid 'mode' field" << std::endl;
if (root.isMember("count") && root["count"].isInt())
task_value.count = root["count"].asInt();
else
std::cerr << "Missing or invalid 'count' field" << std::endl;
if (root.isMember("routeInfo") && root["routeInfo"].isObject())
task_value.routeInfo = get_route_info(root["routeInfo"]);
else
std::cerr << "Missing or invalid 'routeInfo' field" << std::endl;
}
else
{
std::cerr << "task value is not a valid JSON object" << std::endl;
}
return task_value;
}
// 加载配置文件
bool loadConfig(const string &config_file)
{
Json::Reader reader;
Json::Value root;
std::ifstream in(config_file, std::ios::binary);
if (!in.is_open())
{
std::cout << "Failed to read config file: " << config_file << std::endl;
return false;
}
if (!reader.parse(in, root))
{
std::cout << "Failed to parse config file." << std::endl;
return false;
}
config.mqtt_vid = root["mqtt"]["vid"].asString();
config.mqtt_username = root["mqtt"]["mqtt_user"].asString();
config.mqtt_password = root["mqtt"]["mqtt_password"].asString();
config.mqtt_topic_sub_task = root["mqtt"]["mqtt_topic_sub_task"].asString();
config.mqtt_topic_push_status = root["mqtt"]["mqtt_topic_push_status"].asString();
string external_addr = root["mqtt"]["external_net_address"].asString();
int external_port = root["mqtt"]["external_net_port"].asInt();
config.mqtt_address = external_addr + ":" + std::to_string(external_port);
in.close();
std::cout << "Config loaded successfully." << std::endl;
std::cout << "MQTT Address: " << config.mqtt_address << std::endl;
std::cout << "Topic Sub: " << config.mqtt_topic_sub_task << std::endl;
std::cout << "Topic Pub: " << config.mqtt_topic_push_status << std::endl;
return true;
}
// 发送通用应答消息(触发式)
void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg)
{
Json::Value responseData;
responseData["type"] = "response";
responseData["seqNo"] = seqNo;
Json::Value data;
data["code"] = code;
data["msg"] = msg;
responseData["data"] = data;
Json::FastWriter writer;
string responseJson = writer.write(responseData);
std::cout << "[General Response] seqNo=" << seqNo << ", code=" << code
<< ", msg=" << msg << std::endl;
std::cout << "Response JSON: " << responseJson << std::endl;
bool success = mqtt_manager.publish(topic, responseJson, 0);
std::cout << "General response publish to [" << topic << "] "
<< (success ? "[OK]" : "[FAILED]") << std::endl;
}
// 发送任务专用应答(包含任务信息)
void sendTaskResponse(long seqNo, int code, const string &msg)
{
Json::Value responseData;
responseData["type"] = "response";
responseData["seqNo"] = seqNo;
Json::Value data;
data["code"] = code;
data["msg"] = msg;
int current_task_id = task_manager.getCurrentTaskId();
TaskStatus current_status = task_manager.getCurrentTaskStatus();
data["taskId"] = current_task_id;
data["taskStatus"] = static_cast<int>(current_status);
responseData["data"] = data;
Json::FastWriter writer;
string responseJson = writer.write(responseData);
std::cout << "[Task Response] seqNo=" << seqNo << ", code=" << code
<< ", taskId=" << current_task_id << ", taskStatus=" << (int)current_status << std::endl;
std::cout << "Response JSON: " << responseJson << std::endl;
bool success = mqtt_manager.publish(config.mqtt_topic_sub_task, responseJson, 0);
std::cout << "Task response publish to [" << config.mqtt_topic_sub_task << "] "
<< (success ? "[OK]" : "[FAILED]") << std::endl;
}
// MQTT消息回调
int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message)
{
const int BUFFER_SIZE = 4096;
char *buffer = new char[BUFFER_SIZE];
memset(buffer, 0, BUFFER_SIZE);
size_t copy_size = std::min((size_t)message->payloadlen, (size_t)(BUFFER_SIZE - 1));
memcpy(buffer, message->payload, copy_size);
Json::Reader reader;
Json::Value root;
if (!reader.parse(buffer, root))
{
std::cout << "Failed to parse JSON from MQTT message." << std::endl;
delete[] buffer;
return 1;
}
std::cout << "MQTT message received on topic: " << topicName << std::endl;
if (root.isMember("type") && root["type"].asString() == "request")
{
long seqNo = 0;
if (root.isMember("seqNo"))
{
seqNo = root["seqNo"].asInt64();
}
// 立即发送通用应答
sendGeneralResponse(string(topicName), seqNo, 200, "Message received");
Json::Value data = root["data"];
JSON_DATA json_data;
if (data.isMember("command") && data["command"].isString())
{
json_data.command = data["command"].asString();
if (json_data.command == "start")
{
try
{
json_data.value = get_task_value(data["value"]);
MQTT_JSON mqtt_json;
mqtt_json.type = "request";
mqtt_json.seqNo = seqNo;
mqtt_json.data = json_data;
std::cout << "Start task: " << json_data.value.id << std::endl;
task_manager.addTask(mqtt_json);
}
catch (const std::exception &e)
{
std::cerr << "Error processing start command: " << e.what() << std::endl;
sendTaskResponse(seqNo, 400, "Failed to process start command");
}
}
else if (json_data.command == "stop")
{
try
{
long stop_id = data["value"].asInt();
MQTT_JSON mqtt_json;
mqtt_json.type = "stop";
mqtt_json.seqNo = seqNo;
mqtt_json.data = stop_id;
std::cout << "Stop task: " << stop_id << std::endl;
task_manager.addTask(mqtt_json);
sendTaskResponse(seqNo, 200, "Task stop command received");
}
catch (const std::exception &e)
{
std::cerr << "Error processing stop command: " << e.what() << std::endl;
sendTaskResponse(seqNo, 400, "Failed to process stop command");
}
}
else
{
sendGeneralResponse(string(topicName), seqNo, 400, "Unknown command: " + json_data.command);
}
}
else
{
sendGeneralResponse(string(topicName), seqNo, 400, "Missing command field");
}
}
else if (root.isMember("type") && root["type"].asString() == "noReply")
{
std::cout << "NoReply message received" << std::endl;
}
else
{
std::cout << "Unknown message type" << std::endl;
}
delete[] buffer;
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
// 周期性上报任务状态到MQTT200ms间隔持续上报
void reportTaskStatusToMqtt()
{
std::cout << "DEBUG: Status report thread started" << std::endl;
while (rclcpp::ok() && !signal_received)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
int current_id = task_manager.getCurrentTaskId();
TaskStatus current_status = task_manager.getCurrentTaskStatus();
if (current_id > 0)
{
Json::Value statusMsg;
statusMsg["id"] = current_id;
statusMsg["status"] = static_cast<int>(current_status);
Json::FastWriter writer;
string statusJson = writer.write(statusMsg);
mqtt_manager.publish(config.mqtt_topic_push_status, statusJson, 0);
}
}
std::cout << "DEBUG: Status report thread exiting" << std::endl;
}
// ROS2节点类
class TaskManagerNode : public rclcpp::Node
{
public:
TaskManagerNode(string name) : Node(name)
{
RCLCPP_INFO(this->get_logger(), "Node %s started.", name.c_str());
msg_publisher_ = this->create_publisher<sweeper_interfaces::msg::Task>(
"task_message/download", 10);
task_subscribe_ = this->create_subscription<sweeper_interfaces::msg::Task>(
"task_message/upload", 10,
std::bind(&TaskManagerNode::task_callback, this, std::placeholders::_1));
timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&TaskManagerNode::timer_callback, this));
}
private:
void timer_callback()
{
sweeper_interfaces::msg::Task msg;
msg.task_status = task_manager.getTaskStatus();
if (msg.task_status != last_status)
{
msg_publisher_->publish(msg);
RCLCPP_INFO(this->get_logger(), "Task status published: %d", msg.task_status);
}
last_status = msg.task_status;
}
void task_callback(const sweeper_interfaces::msg::Task::SharedPtr msg)
{
if (msg->task_status >= TaskStatus::PENDING && msg->task_status <= TaskStatus::FAILED)
{
status_up = static_cast<TaskStatus>(msg->task_status);
}
else
{
RCLCPP_WARN(this->get_logger(), "Invalid task status: %d", msg->task_status);
}
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<sweeper_interfaces::msg::Task>::SharedPtr msg_publisher_;
rclcpp::Subscription<sweeper_interfaces::msg::Task>::SharedPtr task_subscribe_;
};
int main(int argc, char **argv)
{
signal(SIGINT, signalHandler);
if (!loadConfig("config.json"))
{
std::cerr << "Failed to load configuration." << std::endl;
return 1;
}
string client_id = generate_mqtt_client_id();
if (!mqtt_manager.init(config.mqtt_address, client_id,
config.mqtt_username, config.mqtt_password))
{
std::cerr << "Failed to initialize MQTT manager." << std::endl;
return 1;
}
mqtt_manager.setHeartbeatTimeout(2000);
mqtt_manager.setReconnectInterval(5);
mqtt_manager.setMaxReconnectAttempts(0);
mqtt_manager.setMessageCallback(handleMqttMessage);
mqtt_manager.start();
mqtt_manager.subscribe(config.mqtt_topic_sub_task, 0);
task_manager.setSendResponseCallback([](const string &topic, int seqNo, int code, const string &msg)
{ sendTaskResponse(seqNo, code, msg); });
task_manager.start();
rclcpp::init(argc, argv);
auto node = std::make_shared<TaskManagerNode>("task_manager_node");
std::cout << "Starting status report thread..." << std::endl;
status_report_thread = std::thread(reportTaskStatusToMqtt);
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
while (rclcpp::ok() && !signal_received)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "Shutting down components..." << std::endl;
task_manager.stop();
mqtt_manager.stop();
if (status_report_thread.joinable())
{
status_report_thread.join();
}
return 0;
}