修改task_manager节点获取vid和话题的方式

This commit is contained in:
lyq 2026-01-19 14:36:36 +08:00
parent cec7a19971
commit 9c73d52dce
8 changed files with 227 additions and 969 deletions

View File

@ -10,8 +10,8 @@
"remote_topic": "/zxwl/sweeper/{vid}/ctrl",
"upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload",
"download_url_pre": "http://36.153.162.171:9510/api/ccp-web/file/",
"mqtt_topic_push_status": "/zxwl/sweeper/V060003/task/status",
"mqtt_topic_sub_task": "/zxwl/sweeper/V060003/task"
"mqtt_topic_push_status": "/zxwl/sweeper/{vid}/task/status",
"mqtt_topic_sub_task": "/zxwl/sweeper/{vid}/task"
},
"detect_line_tolerance": 3.06,
"detect_head_tolerance": 2,

View File

@ -16,7 +16,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options(-w) #
add_compile_options(-w) #
endif()
# find dependencies
@ -26,39 +26,30 @@ find_package(std_msgs REQUIRED)
find_package(sweeper_interfaces REQUIRED)
find_package(CURL REQUIRED)
include_directories(
include/task_manager
include/paho_mqtt_3c
${catkin_INCLUDE_DIRS}
)
include_directories(include/task_manager include/paho_mqtt_3c ${catkin_INCLUDE_DIRS})
add_executable(task_manager_node
add_executable(
task_manager_node
src/task_manager_main.cpp
src/task_manager.cpp
src/mqtt_manager.cpp
src/jsoncpp.cpp
src/md5.cpp
)
ament_target_dependencies(task_manager_node rclcpp std_msgs sweeper_interfaces CURL)
src/md5.cpp)
ament_target_dependencies(
task_manager_node
rclcpp
std_msgs
sweeper_interfaces
CURL)
if(CMAKE_SYSTEM_PROCESSOR MATCHES aarch64)
target_link_libraries(
task_manager_node
${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a
)
target_link_libraries(task_manager_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a)
else()
target_link_libraries(
task_manager_node
${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a
)
target_link_libraries(task_manager_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a)
endif()
install(TARGETS
task_manager_node
DESTINATION lib/${PROJECT_NAME}
)
install(TARGETS task_manager_node DESTINATION lib/${PROJECT_NAME})
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)

View File

@ -1,41 +1,43 @@
#ifndef __MQTT_MANAGER_H__
#define __MQTT_MANAGER_H__
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <functional>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include "MQTTClient.h"
using namespace std;
class MQTTManager
{
public:
public:
// 消息回调函数类型
using MessageCallback = std::function<int(char *topicName, int topicLen, MQTTClient_message *message)>;
using ConnLostCallback = std::function<void(char *cause)>;
using MessageCallback = std::function<int(char* topicName, int topicLen, MQTTClient_message* message)>;
using ConnLostCallback = std::function<void(char* cause)>;
using DeliveredCallback = std::function<void(MQTTClient_deliveryToken dt)>;
using ConnRestoredCallback = std::function<void(const char* cause)>;
MQTTManager();
~MQTTManager();
// 初始化MQTT连接
bool init(const string &address, const string &clientId,
const string &username, const string &password);
bool init(const string& address, const string& clientId, const string& username, const string& password);
// 订阅主题
bool subscribe(const string &topic, int qos = 0);
bool subscribe(const string& topic, int qos = 0);
// 发布消息
bool publish(const string &topic, const string &payload, int qos = 0);
bool publish(const string& topic, const string& payload, int qos = 0);
// 设置回调函数
void setMessageCallback(MessageCallback cb) { message_callback_ = cb; }
void setConnLostCallback(ConnLostCallback cb) { conn_lost_callback_ = cb; }
void setDeliveredCallback(DeliveredCallback cb) { delivered_callback_ = cb; }
void setConnRestoredCallback(ConnRestoredCallback cb) { conn_restored_callback_ = cb; }
// 获取连接状态
bool isConnected();
@ -51,7 +53,7 @@ public:
void setMaxReconnectAttempts(int attempts) { max_reconnect_attempts_ = attempts; }
void setHeartbeatTimeout(int milliseconds) { heartbeat_timeout_ = milliseconds; }
private:
private:
// MQTT连接循环线程函数
void mqttLoop();
@ -59,9 +61,9 @@ private:
bool reconnect();
// 内部MQTT回调static函数
static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message);
static void onConnectionLost(void *context, char *cause);
static void onDelivered(void *context, MQTTClient_deliveryToken dt);
static int onMessageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message);
static void onConnectionLost(void* context, char* cause);
static void onDelivered(void* context, MQTTClient_deliveryToken dt);
MQTTClient client_;
MQTTClient_connectOptions conn_opts_;
@ -76,24 +78,25 @@ private:
MessageCallback message_callback_;
ConnLostCallback conn_lost_callback_;
DeliveredCallback delivered_callback_;
ConnRestoredCallback conn_restored_callback_;
bool is_running_;
std::thread mqtt_thread_;
std::mutex client_mutex_;
// 断线重连参数
int reconnect_interval_; // 重连间隔(秒)
int max_reconnect_attempts_; // 最大重连次数0为无限
int reconnect_interval_; // 重连间隔(秒)
int max_reconnect_attempts_; // 最大重连次数0为无限
int reconnect_attempts_;
std::chrono::steady_clock::time_point last_reconnect_time_;
// 心跳检测
std::chrono::steady_clock::time_point last_message_time_;
int heartbeat_timeout_; // 心跳超时时间(毫秒)
int heartbeat_timeout_; // 心跳超时时间(毫秒)
bool is_heartbeat_lost_;
constexpr static int TIMEOUT = 10000L;
constexpr static int QOS = 0;
};
#endif // __MQTT_MANAGER_H__
#endif // __MQTT_MANAGER_H__

View File

@ -1,20 +1,21 @@
#ifndef __TASK_MANAGER_H__
#define __TASK_MANAGER_H__
#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <boost/any.hpp>
#include "task_manager_node.hpp"
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include "task_types.hpp"
using namespace std;
class TaskManager
{
public:
public:
TaskManager();
~TaskManager();
@ -25,7 +26,7 @@ public:
void stop();
// 添加任务到队列
void addTask(const MQTT_JSON &mqtt_json);
void addTask(const MQTT_JSON& mqtt_json);
// 获取当前任务状态
TaskStatus getCurrentTaskStatus();
@ -39,24 +40,24 @@ public:
// 获取任务队列大小
int getQueueSize();
private:
private:
// 任务处理线程函数
void processTasksLoop();
// 处理单个start任务
bool processStartTask(const JSON_DATA &json_data, long seqNo);
bool processStartTask(const JSON_DATA& json_data, long seqNo);
// 处理单个stop任务
bool processStopTask(long taskId, long seqNo);
// 下载文件
bool downloadFile(const string &url, const string &expected_md5, const string &filename);
bool downloadFile(const string& url, const string& expected_md5, const string& filename);
// 计算MD5
string calculate_md5(const string &filename);
string calculate_md5(const string& filename);
// 复制文件
void copyFileAndOverwrite(const string &sourceFilePath, const string &destinationFilePath);
void copyFileAndOverwrite(const string& sourceFilePath, const string& destinationFilePath);
std::queue<MQTT_JSON> task_queue_;
std::mutex queue_mutex_;
@ -71,11 +72,10 @@ private:
int task_status_;
// 回调函数用于发送MQTT响应
std::function<void(const string &topic, int seqNo, int code, const string &msg)> send_response_callback_;
std::function<void(const string& topic, int seqNo, int code, const string& msg)> send_response_callback_;
public:
void setSendResponseCallback(
std::function<void(const string &topic, int seqNo, int code, const string &msg)> cb)
public:
void setSendResponseCallback(std::function<void(const string& topic, int seqNo, int code, const string& msg)> cb)
{
send_response_callback_ = cb;
}
@ -84,4 +84,4 @@ public:
void setTaskStatus(int status) { task_status_ = status; }
};
#endif // __TASK_MANAGER_H__
#endif // __TASK_MANAGER_H__

View File

@ -1,31 +1,31 @@
#include "mqtt_manager.hpp"
#include <iostream>
#include <iomanip>
#include <iostream>
#include <memory>
MQTTManager::MQTTManager()
: client_(nullptr), is_running_(false), reconnect_interval_(5),
max_reconnect_attempts_(0), reconnect_attempts_(0), heartbeat_timeout_(500),
: client_(nullptr),
is_running_(false),
reconnect_interval_(5),
max_reconnect_attempts_(0),
reconnect_attempts_(0),
heartbeat_timeout_(500),
is_heartbeat_lost_(false)
{
conn_opts_ = MQTTClient_connectOptions_initializer;
}
MQTTManager::~MQTTManager()
{
stop();
}
MQTTManager::~MQTTManager() { stop(); }
bool MQTTManager::init(const string &address, const string &clientId,
const string &username, const string &password)
bool MQTTManager::init(const string& address, const string& clientId, const string& username, const string& password)
{
address_ = address;
client_id_ = clientId;
username_ = username;
password_ = password;
int rc = MQTTClient_create(&client_, address_.c_str(), client_id_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, NULL);
int rc = MQTTClient_create(&client_, address_.c_str(), client_id_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to create MQTT client, return code " << rc << std::endl;
@ -33,8 +33,7 @@ bool MQTTManager::init(const string &address, const string &clientId,
}
// 设置回调
rc = MQTTClient_setCallbacks(client_, this, onConnectionLost,
onMessageArrived, onDelivered);
rc = MQTTClient_setCallbacks(client_, this, onConnectionLost, onMessageArrived, onDelivered);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to set MQTT callbacks, return code " << rc << std::endl;
@ -80,10 +79,15 @@ bool MQTTManager::reconnect()
is_heartbeat_lost_ = false;
last_message_time_ = std::chrono::steady_clock::now();
if (conn_restored_callback_)
{
conn_restored_callback_("MQTT reconnected");
}
return true;
}
bool MQTTManager::subscribe(const string &topic, int qos)
bool MQTTManager::subscribe(const string& topic, int qos)
{
std::lock_guard<std::mutex> lock(client_mutex_);
@ -105,7 +109,7 @@ bool MQTTManager::subscribe(const string &topic, int qos)
return true;
}
bool MQTTManager::publish(const string &topic, const string &payload, int qos)
bool MQTTManager::publish(const string& topic, const string& payload, int qos)
{
std::lock_guard<std::mutex> lock(client_mutex_);
@ -116,7 +120,7 @@ bool MQTTManager::publish(const string &topic, const string &payload, int qos)
}
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = (void *)payload.c_str();
pubmsg.payload = (void*)payload.c_str();
pubmsg.payloadlen = payload.length();
pubmsg.qos = qos;
pubmsg.retained = 0;
@ -138,10 +142,7 @@ bool MQTTManager::publish(const string &topic, const string &payload, int qos)
return true;
}
bool MQTTManager::isConnected()
{
return MQTTClient_isConnected(client_) == 1;
}
bool MQTTManager::isConnected() { return MQTTClient_isConnected(client_) == 1; }
void MQTTManager::start()
{
@ -165,8 +166,7 @@ void MQTTManager::start()
void MQTTManager::stop()
{
if (!is_running_)
return;
if (!is_running_) return;
is_running_ = false;
@ -193,9 +193,7 @@ void MQTTManager::mqttLoop()
if (!isConnected())
{
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
now - last_reconnect_time_)
.count();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(now - last_reconnect_time_).count();
// 如果距离上次重连已过指定间隔,则尝试重连
if (duration >= reconnect_interval_)
@ -212,18 +210,15 @@ void MQTTManager::mqttLoop()
{
// 连接正常,检查心跳
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_message_time_)
.count();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_message_time_).count();
if (duration > heartbeat_timeout_ && !is_heartbeat_lost_)
{
is_heartbeat_lost_ = true;
std::cout << "Heartbeat timeout: No message received in "
<< heartbeat_timeout_ << "ms." << std::endl;
std::cout << "Heartbeat timeout: No message received in " << heartbeat_timeout_ << "ms." << std::endl;
if (conn_lost_callback_)
{
conn_lost_callback_((char *)"Heartbeat timeout");
conn_lost_callback_((char*)"Heartbeat timeout");
}
}
}
@ -232,10 +227,9 @@ void MQTTManager::mqttLoop()
}
}
int MQTTManager::onMessageArrived(void *context, char *topicName, int topicLen,
MQTTClient_message *message)
int MQTTManager::onMessageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
MQTTManager* pThis = static_cast<MQTTManager*>(context);
// 更新最后消息时间
pThis->last_message_time_ = std::chrono::steady_clock::now();
@ -251,9 +245,9 @@ int MQTTManager::onMessageArrived(void *context, char *topicName, int topicLen,
return 1;
}
void MQTTManager::onConnectionLost(void *context, char *cause)
void MQTTManager::onConnectionLost(void* context, char* cause)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
MQTTManager* pThis = static_cast<MQTTManager*>(context);
std::cout << "Connection lost. Cause: " << (cause ? cause : "Unknown") << std::endl;
if (pThis->conn_lost_callback_)
@ -262,9 +256,9 @@ void MQTTManager::onConnectionLost(void *context, char *cause)
}
}
void MQTTManager::onDelivered(void *context, MQTTClient_deliveryToken dt)
void MQTTManager::onDelivered(void* context, MQTTClient_deliveryToken dt)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
MQTTManager* pThis = static_cast<MQTTManager*>(context);
std::cout << "Message with token " << dt << " delivered." << std::endl;
if (pThis->delivered_callback_)

View File

@ -1,13 +1,17 @@
#include <signal.h>
#include <atomic>
#include <fstream>
#include <iomanip>
#include <random>
#include "json.h"
#include "mqtt_manager.hpp"
#include "rclcpp/rclcpp.hpp"
#include "sweeper_interfaces/msg/task.hpp"
#include "mqtt_manager.hpp"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
#include "task_manager.hpp"
#include "task_manager_node.hpp"
#include "json.h"
#include <signal.h>
#include <fstream>
#include <random>
#include <iomanip>
#include "task_types.hpp"
using namespace std;
@ -19,20 +23,30 @@ int last_status = 0;
TaskStatus status_up = TaskStatus::PENDING;
std::thread status_report_thread;
// =============== 动态 MQTT topic + 身份控制 ===============
std::string mqtt_topic_sub_task;
std::string mqtt_topic_push_status;
std::atomic<bool> identity_ready{false};
std::atomic<bool> subscribed{false};
std::string vid;
// 配置变量
struct Config
{
string mqtt_vid;
string mqtt_address;
string mqtt_username;
string mqtt_password;
string mqtt_topic_sub_task;
string mqtt_topic_push_status;
// 改为模板
string mqtt_topic_sub_task_tmpl;
string mqtt_topic_push_status_tmpl;
} config;
// 前置声明
void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg);
void sendTaskResponse(long seqNo, int code, const string &msg);
void sendGeneralResponse(const string& topic, long seqNo, int code, const string& msg);
void sendTaskResponse(long seqNo, int code, const string& msg);
string generate_mqtt_client_id();
// 信号处理函数
@ -52,9 +66,7 @@ void signalHandler(int signum)
string generate_mqtt_client_id()
{
auto now = std::chrono::system_clock::now();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch())
.count();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
std::random_device rd;
std::mt19937 gen(rd());
@ -67,7 +79,7 @@ string generate_mqtt_client_id()
}
// 从JSON解析ROUTE_INFO
ROUTE_INFO get_route_info(const Json::Value &root)
ROUTE_INFO get_route_info(const Json::Value& root)
{
ROUTE_INFO route_info;
if (root.isObject())
@ -101,7 +113,7 @@ ROUTE_INFO get_route_info(const Json::Value &root)
}
// 从JSON解析TASK_VALUE
TASK_VALUE get_task_value(const Json::Value &root)
TASK_VALUE get_task_value(const Json::Value& root)
{
TASK_VALUE task_value;
@ -141,7 +153,7 @@ TASK_VALUE get_task_value(const Json::Value &root)
}
// 加载配置文件
bool loadConfig(const string &config_file)
bool loadConfig(const string& config_file)
{
Json::Reader reader;
Json::Value root;
@ -159,11 +171,10 @@ bool loadConfig(const string &config_file)
return false;
}
config.mqtt_vid = root["mqtt"]["vid"].asString();
config.mqtt_username = root["mqtt"]["mqtt_user"].asString();
config.mqtt_password = root["mqtt"]["mqtt_password"].asString();
config.mqtt_topic_sub_task = root["mqtt"]["mqtt_topic_sub_task"].asString();
config.mqtt_topic_push_status = root["mqtt"]["mqtt_topic_push_status"].asString();
config.mqtt_username = root["mqtt"]["username"].asString();
config.mqtt_password = root["mqtt"]["password"].asString();
config.mqtt_topic_sub_task_tmpl = root["mqtt"]["mqtt_topic_sub_task"].asString();
config.mqtt_topic_push_status_tmpl = root["mqtt"]["mqtt_topic_push_status"].asString();
string external_addr = root["mqtt"]["external_net_address"].asString();
int external_port = root["mqtt"]["external_net_port"].asInt();
@ -173,14 +184,14 @@ bool loadConfig(const string &config_file)
std::cout << "Config loaded successfully." << std::endl;
std::cout << "MQTT Address: " << config.mqtt_address << std::endl;
std::cout << "Topic Sub: " << config.mqtt_topic_sub_task << std::endl;
std::cout << "Topic Pub: " << config.mqtt_topic_push_status << std::endl;
std::cout << "Topic Sub template: " << config.mqtt_topic_sub_task_tmpl << std::endl;
std::cout << "Topic Pub template: " << config.mqtt_topic_push_status_tmpl << std::endl;
return true;
}
// 发送通用应答消息(触发式)
void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg)
void sendGeneralResponse(const string& topic, long seqNo, int code, const string& msg)
{
Json::Value responseData;
responseData["type"] = "response";
@ -195,17 +206,15 @@ void sendGeneralResponse(const string &topic, long seqNo, int code, const string
Json::FastWriter writer;
string responseJson = writer.write(responseData);
std::cout << "[General Response] seqNo=" << seqNo << ", code=" << code
<< ", msg=" << msg << std::endl;
std::cout << "[General Response] seqNo=" << seqNo << ", code=" << code << ", msg=" << msg << std::endl;
std::cout << "Response JSON: " << responseJson << std::endl;
bool success = mqtt_manager.publish(topic, responseJson, 0);
std::cout << "General response publish to [" << topic << "] "
<< (success ? "[OK]" : "[FAILED]") << std::endl;
std::cout << "General response publish to [" << topic << "] " << (success ? "[OK]" : "[FAILED]") << std::endl;
}
// 发送任务专用应答(包含任务信息)
void sendTaskResponse(long seqNo, int code, const string &msg)
void sendTaskResponse(long seqNo, int code, const string& msg)
{
Json::Value responseData;
responseData["type"] = "response";
@ -226,20 +235,26 @@ void sendTaskResponse(long seqNo, int code, const string &msg)
Json::FastWriter writer;
string responseJson = writer.write(responseData);
std::cout << "[Task Response] seqNo=" << seqNo << ", code=" << code
<< ", taskId=" << current_task_id << ", taskStatus=" << (int)current_status << std::endl;
std::cout << "[Task Response] seqNo=" << seqNo << ", code=" << code << ", taskId=" << current_task_id
<< ", taskStatus=" << (int)current_status << std::endl;
std::cout << "Response JSON: " << responseJson << std::endl;
bool success = mqtt_manager.publish(config.mqtt_topic_sub_task, responseJson, 0);
std::cout << "Task response publish to [" << config.mqtt_topic_sub_task << "] "
<< (success ? "[OK]" : "[FAILED]") << std::endl;
if (mqtt_topic_sub_task.empty())
{
std::cout << "[TASK] response topic not ready, drop response. seqNo=" << seqNo << std::endl;
return;
}
bool success = mqtt_manager.publish(mqtt_topic_sub_task, responseJson, 0);
std::cout << "Task response publish to [" << mqtt_topic_sub_task << "] " << (success ? "[OK]" : "[FAILED]")
<< std::endl;
}
// MQTT消息回调
int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message)
int handleMqttMessage(char* topicName, int topicLen, MQTTClient_message* message)
{
const int BUFFER_SIZE = 4096;
char *buffer = new char[BUFFER_SIZE];
char* buffer = new char[BUFFER_SIZE];
memset(buffer, 0, BUFFER_SIZE);
size_t copy_size = std::min((size_t)message->payloadlen, (size_t)(BUFFER_SIZE - 1));
@ -289,7 +304,7 @@ int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message
std::cout << "Start task: " << json_data.value.id << std::endl;
task_manager.addTask(mqtt_json);
}
catch (const std::exception &e)
catch (const std::exception& e)
{
std::cerr << "Error processing start command: " << e.what() << std::endl;
sendTaskResponse(seqNo, 400, "Failed to process start command");
@ -311,7 +326,7 @@ int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message
sendTaskResponse(seqNo, 200, "Task stop command received");
}
catch (const std::exception &e)
catch (const std::exception& e)
{
std::cerr << "Error processing stop command: " << e.what() << std::endl;
sendTaskResponse(seqNo, 400, "Failed to process stop command");
@ -342,6 +357,17 @@ int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message
return 1;
}
void try_subscribe_task_topic()
{
if (!identity_ready.load()) return;
if (mqtt_topic_sub_task.empty()) return;
if (subscribed.exchange(true)) return; // 已经订阅过了
std::cout << "[TASK] subscribe MQTT topic: " << mqtt_topic_sub_task << std::endl;
mqtt_manager.subscribe(mqtt_topic_sub_task, 0);
}
// 周期性上报任务状态到MQTT200ms间隔持续上报
void reportTaskStatusToMqtt()
{
@ -363,7 +389,10 @@ void reportTaskStatusToMqtt()
Json::FastWriter writer;
string statusJson = writer.write(statusMsg);
mqtt_manager.publish(config.mqtt_topic_push_status, statusJson, 0);
if (!mqtt_topic_push_status.empty())
{
mqtt_manager.publish(mqtt_topic_push_status, statusJson, 0);
}
}
}
@ -373,21 +402,44 @@ void reportTaskStatusToMqtt()
// ROS2节点类
class TaskManagerNode : public rclcpp::Node
{
public:
public:
TaskManagerNode(string name) : Node(name)
{
RCLCPP_INFO(this->get_logger(), "Node %s started.", name.c_str());
msg_publisher_ = this->create_publisher<sweeper_interfaces::msg::Task>(
"task_message/download", 10);
msg_publisher_ = this->create_publisher<sweeper_interfaces::msg::Task>("task_message/download", 10);
task_subscribe_ = this->create_subscription<sweeper_interfaces::msg::Task>(
"task_message/upload", 10,
std::bind(&TaskManagerNode::task_callback, this, std::placeholders::_1));
"task_message/upload", 10, std::bind(&TaskManagerNode::task_callback, this, std::placeholders::_1));
timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&TaskManagerNode::timer_callback, this));
timer_ =
this->create_wall_timer(std::chrono::milliseconds(500), std::bind(&TaskManagerNode::timer_callback, this));
identity_sub_ = this->create_subscription<sweeper_interfaces::msg::VehicleIdentity>(
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
[this](const sweeper_interfaces::msg::VehicleIdentity::SharedPtr msg)
{
vid = msg->vid;
identity_ready.store(!vid.empty());
if (!identity_ready.load()) return;
// 拼 topic
mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl;
mqtt_topic_push_status = config.mqtt_topic_push_status_tmpl;
auto pos = mqtt_topic_sub_task.find("{vid}");
if (pos != std::string::npos) mqtt_topic_sub_task.replace(pos, 5, vid);
pos = mqtt_topic_push_status.find("{vid}");
if (pos != std::string::npos) mqtt_topic_push_status.replace(pos, 5, vid);
RCLCPP_INFO(this->get_logger(), "[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(),
mqtt_topic_sub_task.c_str(), mqtt_topic_push_status.c_str());
try_subscribe_task_topic();
});
}
private:
private:
void timer_callback()
{
sweeper_interfaces::msg::Task msg;
@ -417,21 +469,39 @@ private:
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<sweeper_interfaces::msg::Task>::SharedPtr msg_publisher_;
rclcpp::Subscription<sweeper_interfaces::msg::Task>::SharedPtr task_subscribe_;
rclcpp::Subscription<sweeper_interfaces::msg::VehicleIdentity>::SharedPtr identity_sub_;
};
int main(int argc, char **argv)
int main(int argc, char** argv)
{
signal(SIGINT, signalHandler);
if (!loadConfig("config.json"))
// 默认配置路径
std::string config_path = "config.json";
// 支持 --config xxx.json
for (int i = 1; i < argc; ++i)
{
std::string arg = argv[i];
if (arg == "--config" && i + 1 < argc)
{
config_path = argv[i + 1];
++i;
}
}
std::cout << "[TASK] Using config: " << config_path << std::endl;
if (!loadConfig(config_path))
{
std::cerr << "Failed to load configuration." << std::endl;
return 1;
}
// ================= 初始化 MQTT =================
string client_id = generate_mqtt_client_id();
if (!mqtt_manager.init(config.mqtt_address, client_id,
config.mqtt_username, config.mqtt_password))
if (!mqtt_manager.init(config.mqtt_address, client_id, config.mqtt_username, config.mqtt_password))
{
std::cerr << "Failed to initialize MQTT manager." << std::endl;
return 1;
@ -443,10 +513,17 @@ int main(int argc, char **argv)
mqtt_manager.setMessageCallback(handleMqttMessage);
mqtt_manager.start();
mqtt_manager.subscribe(config.mqtt_topic_sub_task, 0);
mqtt_manager.setConnRestoredCallback(
[](const char* cause)
{
std::cout << "[TASK] MQTT reconnected: " << cause << std::endl;
subscribed.store(false);
try_subscribe_task_topic();
});
task_manager.setSendResponseCallback([](const string &topic, int seqNo, int code, const string &msg)
mqtt_manager.start();
task_manager.setSendResponseCallback([](const string& topic, int seqNo, int code, const string& msg)
{ sendTaskResponse(seqNo, code, msg); });
task_manager.start();

View File

@ -1,807 +0,0 @@
#include "MQTTClient.h"
#include "rclcpp/rclcpp.hpp"
#include "md5.h"
#include "sweeper_interfaces/msg/task.hpp"
#include "json.h"
#include "task_manager_node.hpp"
#include "sweeper_interfaces/msg/sub.hpp"
#include <signal.h> // 添加信号处理头文件
#include <random>
using namespace std;
// 信号标志
volatile std::sig_atomic_t signal_received = 0;
// 任务队列和相关同步原语
std::queue<MQTT_JSON> task_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
bool thread_running = true;
MQTTClient client;
MQTTClient_deliveryToken token_d_m;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg_d_m = MQTTClient_message_initializer;
// mqtt相关
std::string mqtt_vid;
std::string mqtt_inter_net_address;
int mqtt_inter_net_port;
std::string mqtt_external_net_address;
int mqtt_external_net_port;
std::string mqtt_user;
std::string mqtt_password;
std::string mqtt_topic_sub_task;
std::string mqtt_topic_push_status;
std::string sub_topic;
// 修复变量声明和初始化
std::string ADDRESS;
std::string CLIENTID_SUB; // 客户端ID
std::string destinationFilePath1 = "./gps_load_now.txt";
std::chrono::steady_clock::time_point last_message_time = std::chrono::steady_clock::now();
constexpr auto QOS = 0;
constexpr auto TIMEOUT = 10000L;
volatile MQTTClient_deliveryToken deliveredtoken;
bool isDisconn = false;
int task_status = 0;
int last_status = 0;
TaskStatus status_up = TaskStatus::PENDING;
CurrentTask currentTask;
std::mutex taskMutex;
std::string generate_mqtt_client_id()
{
// 获取当前时间戳(以毫秒为单位)
auto now = std::chrono::system_clock::now();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
// 生成一个 4 位随机数
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1000, 9999);
int random_num = dis(gen);
// 拼接成 client ID
std::ostringstream oss;
oss << "client_" << millis << "_" << std::setw(4) << std::setfill('0') << random_num;
return oss.str();
}
// 信号处理函数
void signalHandler(int signum)
{
std::cout << "Interrupt signal (" << signum << ") received." << std::endl;
signal_received = signum;
// 第一次收到信号时触发ROS2关闭
if (signum == SIGINT && rclcpp::ok())
{
std::cout << "Shutting down ROS2 node..." << std::endl;
rclcpp::shutdown();
}
}
void copyFileAndOverwrite(const std::string &sourceFilePath, const std::string &destinationFilePath)
{
std::ifstream src(sourceFilePath, std::ios::binary);
std::ofstream dst(destinationFilePath, std::ios::binary);
if (!src)
{
std::cerr << "无法打开源文件: " << sourceFilePath << std::endl;
return;
}
if (!dst)
{
std::cerr << "无法打开目标文件: " << destinationFilePath << std::endl;
return;
}
dst << src.rdbuf(); // 将源文件内容复制到目标文件
if (!dst)
{
std::cerr << "复制文件时出错" << std::endl;
}
}
string calculate_md5(string filename)
{
MD5 md5;
ifstream file;
file.open(filename, ios::binary);
md5.update(file);
cout << md5.toString() << endl;
return md5.toString();
}
bool downloadFile(const std::string &url, const std::string &expected_md5)
{
// 确保URL不为空
if (url.empty())
{
std::cerr << "下载URL为空" << std::endl;
return false;
}
// 使用系统调用下载文件
std::string filename = url.substr(url.find_last_of('/') + 1);
std::string command = "wget -O routes/" + filename + " \"" + url + "\"";
cout << command << endl;
int result = system(command.c_str());
if (result != 0)
{
std::cerr << "下载失败: " << url << std::endl;
return false;
}
// 计算下载文件的MD5并与预期的MD5比较
std::string filepath = "routes/" + filename;
std::string actual_md5 = calculate_md5(filepath);
if (actual_md5 != expected_md5)
{
cout << actual_md5 << " " << expected_md5 << endl;
std::cerr << "MD5校验失败: " << filepath << std::endl;
return false;
}
std::cout << filepath << " 下载并校验成功" << std::endl;
return true;
}
ROUTE_INFO get_route_info(const Json::Value &root)
{
ROUTE_INFO route_info;
if (root.isObject())
{
if (root.isMember("routeName") && root["routeName"].isString())
route_info.routeName = root["routeName"].asString();
else
std::cerr << "Missing or invalid 'routeName' field" << std::endl;
if (root.isMember("fileName") && root["fileName"].isString())
route_info.fileName = root["fileName"].asString();
else
std::cerr << "Missing or invalid 'fileName' field" << std::endl;
if (root.isMember("url") && root["url"].isString())
route_info.url = root["url"].asString();
else
std::cerr << "Missing or invalid 'url' field" << std::endl;
if (root.isMember("md5") && root["md5"].isString())
route_info.md5 = root["md5"].asString();
else
std::cerr << "Missing or invalid 'md5' field" << std::endl;
}
else
{
std::cerr << "routeInfo is not a valid JSON object" << std::endl;
}
return route_info;
}
TASK_VALUE get_task_value(const Json::Value &root)
{
TASK_VALUE task_value;
if (root.isObject())
{
if (root.isMember("id") && root["id"].isInt())
task_value.id = root["id"].asInt();
else
std::cerr << "Missing or invalid 'id' field" << std::endl;
if (root.isMember("name") && root["name"].isString())
task_value.name = root["name"].asString();
else
std::cerr << "Missing or invalid 'name' field" << std::endl;
if (root.isMember("mode") && root["mode"].isInt())
task_value.mode = root["mode"].asInt();
else
std::cerr << "Missing or invalid 'mode' field" << std::endl;
if (root.isMember("count") && root["count"].isInt())
task_value.count = root["count"].asInt();
else
std::cerr << "Missing or invalid 'count' field" << std::endl;
if (root.isMember("routeInfo") && root["routeInfo"].isObject())
task_value.routeInfo = get_route_info(root["routeInfo"]);
else
std::cerr << "Missing or invalid 'routeInfo' field" << std::endl;
}
else
{
std::cerr << "task value is not a valid JSON object" << std::endl;
}
return task_value;
}
void sendResponse(MQTTClient client, const std::string &topic, int seqNo, int code, std::string msg)
{
// 使用JSON库构建响应数据
Json::Value responseData;
responseData["type"] = "response";
responseData["seqNo"] = seqNo;
Json::Value data;
data["code"] = code;
data["msg"] = msg;
responseData["data"] = data;
// 使用JSON writer生成字符串
Json::FastWriter writer;
std::string responseJson = writer.write(responseData);
// 发布MQTT消息
pubmsg_d_m.payload = (void *)responseJson.c_str();
pubmsg_d_m.payloadlen = responseJson.length();
pubmsg_d_m.qos = QOS;
pubmsg_d_m.retained = 0;
MQTTClient_publishMessage(client, topic.c_str(), &pubmsg_d_m, &token_d_m);
MQTTClient_waitForCompletion(client, token_d_m, TIMEOUT);
printf("Message with delivery token %d delivered\n", token_d_m);
}
// 更新当前任务信息
void updateCurrentTask(int taskId, TaskStatus status)
{
std::lock_guard<std::mutex> lock(taskMutex);
currentTask.id = taskId;
currentTask.status = status;
currentTask.lastUpdateTime = std::chrono::steady_clock::now();
std::cout << "Task updated: ID=" << taskId << ", Status=" << status << std::endl;
}
// 从MQTT消息中提取任务ID并更新任务状态
void extractTaskIdFromMessage(const Json::Value &root)
{
if (root.isMember("data") && root["data"].isObject())
{
Json::Value data = root["data"];
if (data.isMember("value") && data["value"].isObject())
{
Json::Value value = data["value"];
if (value.isMember("id") && value["id"].isInt())
{
int taskId = value["id"].asInt();
// 根据命令类型设置初始状态
if (data.isMember("command") && data["command"].isString())
{
std::string command = data["command"].asString();
if (command == "start")
{
updateCurrentTask(taskId, RUNNING);
}
else if (command == "stop")
{
updateCurrentTask(taskId, PENDING);
}
}
}
}
}
}
void updateTaskStatus(int taskId, int status)
{
// 构建JSON消息体
Json::Value message;
message["id"] = taskId;
message["status"] = status;
// 转换为字符串
Json::FastWriter writer;
std::string jsonStr = writer.write(message);
// 发布MQTT消息
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = (void *)jsonStr.c_str();
pubmsg.payloadlen = jsonStr.length();
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_deliveryToken token;
int rc = MQTTClient_publishMessage(client, mqtt_topic_push_status.c_str(), &pubmsg, &token);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to publish task status, return code " << rc << std::endl;
}
else
{
MQTTClient_waitForCompletion(client, token, TIMEOUT);
std::cout << "Task status updated: ID=" << taskId << ", Status=" << status << std::endl;
}
}
// 周期性上报任务状态
void *reportTaskStatus(void *arg)
{
(void)arg;
while (thread_running)
{
// 200ms周期
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// 获取当前任务信息
int taskId;
TaskStatus status;
{
std::lock_guard<std::mutex> lock(taskMutex);
taskId = currentTask.id;
status = status_up;
// 仅在任务ID有效时上报
if (taskId > 0)
{
currentTask.status = status;
}
}
// 上报任务状态
if (taskId > 0)
{
updateTaskStatus(taskId, static_cast<int>(status));
}
}
return NULL;
}
// 任务处理线程函数
void *process_tasks(void *arg)
{
(void)arg;
while (thread_running)
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 等待队列中有任务或线程终止信号
queue_cv.wait(lock, []
{ return !task_queue.empty() || !thread_running; });
// 检查是否需要终止线程
if (!thread_running && task_queue.empty())
{
break;
}
// 处理队列中的任务
if (!task_queue.empty())
{
MQTT_JSON mqtt_json = task_queue.front();
task_queue.pop();
lock.unlock(); // 释放锁,允许其他线程操作队列
// 处理任务
if (mqtt_json.type == "request")
{
try
{
// 从boost::any中提取数据
JSON_DATA json_data = boost::any_cast<JSON_DATA>(mqtt_json.data);
if (json_data.command == "start")
{
int flag_ok = 0;
// sleep(10);
cout << "即将开始任务" << json_data.value.id << endl;
cout << "url: " << json_data.value.routeInfo.url << endl;
cout << "md5: " << json_data.value.routeInfo.md5 << endl;
string downFileName = json_data.value.routeInfo.fileName;
if (access(("routes/" + downFileName).c_str(), F_OK) == -1)
{
std::cout << "文件不存在,正在下载" << downFileName << std::endl;
bool isdown = downloadFile(json_data.value.routeInfo.url, json_data.value.routeInfo.md5);
if (!isdown)
{
std::cout << "下载失败,跳过" << std::endl;
remove(("routes/" + downFileName).c_str());
}
else
{
std::cout << downFileName << "下载完成" << std::endl;
std::string sourceFilePath1 = ("routes/" + downFileName).c_str();
copyFileAndOverwrite(sourceFilePath1, destinationFilePath1);
flag_ok = 1;
}
}
else
{
// 文件已存在,跳过下载
std::cout << "文件已存在,跳过下载: routes/" << downFileName << std::endl;
std::string sourceFilePath1 = ("routes/" + downFileName).c_str();
copyFileAndOverwrite(sourceFilePath1, destinationFilePath1);
flag_ok = 1;
}
if (flag_ok == 1)
{
// 启动任务
cout << "开始执行任务" << json_data.value.id << endl;
task_status = 1;
}
// 回复
sendResponse(client, mqtt_topic_sub_task, mqtt_json.seqNo, 200, "Task started successfully");
}
else if (json_data.command == "stop")
{
// task_status = 0;
// 回复
// sendResponse(client, mqtt_topic_sub_task, mqtt_json.seqNo, 200, "Task stopped successfully");
}
}
catch (const boost::bad_any_cast &e)
{
std::cerr << "Bad any_cast: " << e.what() << std::endl;
}
}
}
}
return NULL;
}
void delivered(void *context, MQTTClient_deliveryToken dt)
{
(void)context;
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
(void)context;
(void)topicLen;
printf("Message arrived\n");
last_message_time = std::chrono::steady_clock::now(); // 更新时间戳
printf("topic: %s\n", topicName);
printf("message: %.*s\n", message->payloadlen, (char *)message->payload);
// 增加缓冲区大小
const int BUFFER_SIZE = 4096; // 增大缓冲区
char *buffer = new char[BUFFER_SIZE];
memset(buffer, 0, BUFFER_SIZE);
// 确保不溢出
size_t copy_size = std::min(message->payloadlen, BUFFER_SIZE - 1);
memcpy(buffer, message->payload, copy_size);
Json::Reader reader;
Json::Value root;
if (!reader.parse(buffer, root))
{
printf("recv json fail\n");
return 1;
}
else
{
isDisconn = false;
// 创建MQTT_JSON对象
if (root.isMember("type") && root["type"].isString() && root["type"].asString() == "request")
{
// 提取任务ID
extractTaskIdFromMessage(root);
MQTT_JSON mqtt_json;
mqtt_json.type = root["type"].asString();
mqtt_json.seqNo = root["seqNo"].asInt();
Json::Value data = root["data"];
// 创建JSON_DATA对象
JSON_DATA json_data;
if (data.isMember("command") && data["command"].isString())
{
json_data.command = data["command"].asString();
if (json_data.command.compare("start") == 0)
{
try
{
json_data.value = get_task_value(data["value"]);
// 将JSON_DATA对象存入boost::any
mqtt_json.data = json_data;
cout << "即将开始任务" << json_data.value.id << endl;
// 将任务添加到队列
std::lock_guard<std::mutex> lock(queue_mutex);
task_queue.push(mqtt_json);
queue_cv.notify_one(); // 通知工作线程有新任务
}
catch (const std::exception &e)
{
std::cerr << e.what() << '\n';
}
}
else if (json_data.command.compare("stop") == 0)
{
try
{
long stop_id = data["value"].asInt();
mqtt_json.data = stop_id;
cout << "stop_id:" << stop_id << "\t 即将停止任务" << endl;
task_status = 0;
// 回复
sendResponse(client, mqtt_topic_sub_task, mqtt_json.seqNo, 200, "Task stopped successfully");
// 将stop任务也加入队列处理
std::lock_guard<std::mutex> lock(queue_mutex);
task_queue.push(mqtt_json);
queue_cv.notify_one();
}
catch (const std::exception &e)
{
std::cerr << e.what() << '\n';
}
}
}
else
std::cerr << "Missing or invalid 'command' field" << std::endl;
}
else if (root["type"].asString() == "response")
{
}
else
{
std::cerr << "Missing or invalid 'type' field" << std::endl;
}
}
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
(void)context;
isDisconn = 1;
printf("\nConnection lost\n");
printf("cause: %s\n", cause);
}
void *mqtt_sub(void *arg)
{
(void)arg;
const char *SUB_TOPIC = sub_topic.c_str();
int rc;
// 修正使用const char*而非char*
const char *username = mqtt_user.c_str();
const char *password = mqtt_password.c_str();
if ((rc = MQTTClient_create(&client, ADDRESS.c_str(), CLIENTID_SUB.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = username;
conn_opts.password = password;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", SUB_TOPIC, CLIENTID_SUB.c_str(), QOS);
if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
while (1)
{
usleep(200000);
// 检查连接状态,如果连接丢失,则尝试重新连接
if (MQTTClient_isConnected(client) == 0)
{
printf("MQTT connection lost, trying to reconnect...\n");
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
isDisconn = true;
printf("Failed to reconnect, return code %d\n", rc);
// 处理连接失败的情况,例如等待一段时间后再次尝试
}
else
{
isDisconn = false;
printf("Reconnected to MQTT server.\n");
// 重新订阅主题
if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to resubscribe, return code %d\n", rc);
}
}
}
else
{
isDisconn = false;
auto current_time = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(current_time - last_message_time).count() > 500)
{
isDisconn = true;
// printf("Heartbeat timeout: No message received in 500ms.\n");
// 执行心跳超时的处理逻辑
}
}
}
if ((rc = MQTTClient_unsubscribe(client, SUB_TOPIC)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to unsubscribe, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to disconnect, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
MQTTClient_destroy(&client);
return NULL;
}
class task_manager_node : public rclcpp::Node
{
public:
// 构造函数,有一个参数为节点名称
task_manager_node(std::string name) : Node(name)
{
RCLCPP_INFO(this->get_logger(), "%s节点已经启动.", name.c_str());
msg_publisher_ = this->create_publisher<sweeper_interfaces::msg::Task>("task_message/download", 10);
task_subscribe_ = this->create_subscription<sweeper_interfaces::msg::Task>("task_message/upload", 10, std::bind(&task_manager_node::task_callback, this, std::placeholders::_1));
timer_ = this->create_wall_timer(std::chrono::milliseconds(500), std::bind(&task_manager_node::timer_callback, this));
}
private:
void timer_callback()
{
// 发布任务消息
sweeper_interfaces::msg::Task msg;
msg.task_status = task_status;
if (task_status != last_status)
{
msg_publisher_->publish(msg);
RCLCPP_INFO(this->get_logger(), "发布任务消息");
}
last_status = task_status;
// RCLCPP_INFO(this->get_logger(), "msg.task_status : %d", msg.task_status);
// RCLCPP_INFO(this->get_logger(), "task_status : %d", task_status);
// RCLCPP_INFO(this->get_logger(), "last_status : %d", last_status);
// cout << endl;
}
void task_callback(const sweeper_interfaces::msg::Task::SharedPtr msg)
{
// 安全转换:检查枚举值是否有效
if (msg->task_status >= TaskStatus::PENDING && msg->task_status <= TaskStatus::FAILED)
{
status_up = static_cast<TaskStatus>(msg->task_status);
}
else
{
// 处理无效状态(例如设为默认值或记录错误)
RCLCPP_WARN(this->get_logger(), "Invalid task status received: %d", msg->task_status);
status_up = TaskStatus::FAILED;
}
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<sweeper_interfaces::msg::Task>::SharedPtr msg_publisher_;
rclcpp::Subscription<sweeper_interfaces::msg::Task>::SharedPtr task_subscribe_;
};
void init_main()
{
Json::Reader reader;
Json::Value root;
std::ifstream in("config.json", std::ios::binary);
if (!in.is_open())
{
std::cout << "read config file error" << std::endl;
return;
}
if (reader.parse(in, root))
{
mqtt_vid = root["mqtt"]["vid"].asString();
mqtt_inter_net_address = root["mqtt"]["inter_net_address"].asString();
mqtt_inter_net_port = root["mqtt"]["inter_net_port"].asInt();
mqtt_external_net_address = root["mqtt"]["external_net_address"].asString();
mqtt_external_net_port = root["mqtt"]["external_net_port"].asInt();
mqtt_user = root["mqtt"]["mqtt_user"].asString();
mqtt_password = root["mqtt"]["mqtt_password"].asString();
mqtt_topic_sub_task = root["mqtt"]["mqtt_topic_sub_task"].asString();
mqtt_topic_push_status = root["mqtt"]["mqtt_topic_push_status"].asString();
// 添加对sub_topic的初始化
sub_topic = mqtt_topic_sub_task;
CLIENTID_SUB = generate_mqtt_client_id();
// ADDRESS = mqtt_inter_net_address + ":" + std::to_string(mqtt_inter_net_port);
ADDRESS = mqtt_external_net_address + ":" + std::to_string(mqtt_external_net_port);
cout << "ADDRESS: " << ADDRESS << endl;
cout << "CLIENTID_SUB: " << CLIENTID_SUB << endl;
cout << "mqtt_vid: " << mqtt_vid << endl;
}
in.close(); // 关闭文件流
}
int main(int argc, char **argv)
{
// 注册信号处理函数
signal(SIGINT, signalHandler);
init_main();
pthread_t mqtt_sub_thread_t;
pthread_create(&mqtt_sub_thread_t, NULL, mqtt_sub, NULL);
// 创建任务处理线程
pthread_t task_thread;
pthread_create(&task_thread, NULL, process_tasks, NULL);
pthread_t reportThread;
pthread_create(&reportThread, NULL, reportTaskStatus, NULL);
rclcpp::init(argc, argv);
/*创建对应节点的共享指针对象*/
auto node = std::make_shared<task_manager_node>("task_manager_node");
// 使用非阻塞的spin
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
// 循环执行spin_some允许检查信号
while (rclcpp::ok() && !signal_received)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 优雅地停止线程
{
std::lock_guard<std::mutex> lock(queue_mutex);
thread_running = false;
}
queue_cv.notify_one();
pthread_join(task_thread, NULL);
pthread_join(mqtt_sub_thread_t, NULL);
pthread_join(reportThread, NULL);
return 0;
}