Compare commits

...

5 Commits

Author SHA1 Message Date
lyq
9cc4998c71 修改配置文件和路径文件 2026-01-13 14:05:25 +08:00
lyq
d24ea2ece9 分离mqtt和任务管理;增加mqtt断线重连 2026-01-13 14:04:17 +08:00
lyq
d69cc29ec9 调整打印信息 2026-01-13 13:59:44 +08:00
lyq
d5b01517a0 调整优先级 2026-01-13 13:59:43 +08:00
lyq
cf5a698eb9 修改滚刷推杆不下沉的bug 2026-01-13 13:59:43 +08:00
20 changed files with 1877 additions and 116 deletions

View File

@ -9,12 +9,10 @@
"external_net_port": 19683,
"mqtt_user": "zxwl",
"mqtt_password": "zxwl1234@",
"pub_gps_topic": "/zxwl/vehicle/V060003/gps",
"remote_topic": "/zxwl/vehicle/V060003/ctrl",
"mqtt_topic_push_status": "/zxwl/vehicle/V060003/task/status",
"mqtt_topic_sub_task": "/zxwl/vehicle/V060003/task",
"info_topic": "/zxwl/vehicle/V060003/info",
"fault_topic": "/zxwl/vehicle/V060003/fault"
"pub_gps_topic": "/zxwl/sweeper/V060003/gps",
"remote_topic": "/zxwl/sweeper/V060003/ctrl",
"mqtt_topic_push_status": "/zxwl/sweeper/V060003/task/status",
"mqtt_topic_sub_task": "/zxwl/sweeper/V060003/task"
},
"detect_line_tolerance": 3.06,
"detect_head_tolerance": 2,

View File

@ -1,68 +1,308 @@
32.0309247030
120.91507181700
175.167999
32.0309270710
120.91508380700
174.522003
0.000000
32.0309153960
120.91507298400
169.643997
32.0309177630
120.91508508700
170.716995
0.000000
32.0309064620
120.91507499500
172.268997
32.0309083630
120.91508691600
167.014008
0.000000
32.0308971460
120.91507587000
177.580002
32.0308991500
120.91509013100
158.218994
0.000000
32.0308879100
120.91507735600
152.240005
32.0308909680
120.91509463700
151.005997
0.000000
32.0308810980
120.91508470900
102.825996
32.0308825810
120.91510033500
147.225006
0.000000
32.0308796330
120.91509556100
94.540001
32.0308741660
120.91510631000
156.934006
0.000000
32.0308790170
120.91510617400
84.416000
32.0308652190
120.91510999400
166.878998
0.000000
32.0308824190
120.91511639600
37.883999
32.0308561900
120.91511247100
165.128998
0.000000
32.0308908460
120.91512016300
353.354004
32.0308470540
120.91511526000
164.408005
0.000000
32.0308998390
120.91511894700
353.295013
32.0308384050
120.91511818800
165.382996
0.000000
32.0309090620
120.91511812300
358.222992
32.0308288240
120.91512052600
167.072998
0.000000
32.0309183710
120.91511749800
346.201996
32.0308193630
120.91512325900
164.526993
0.000000
32.0309261310
120.91511212000
297.071991
32.0308099890
120.91512646200
160.466995
0.000000
32.0309276550
120.91510141900
249.197006
32.0308016850
120.91513058700
151.772003
0.000000
32.0309219690
120.91509306000
200.531006
32.0307940750
120.91513631900
140.869995
0.000000
32.0309129310
120.91509103000
176.186005
32.0307872200
120.91514318900
132.024994
0.000000
32.0307816190
120.91515177200
123.033997
0.000000
32.0307767850
120.91516175300
111.536003
0.000000
32.0307737970
120.91517181200
104.806999
0.000000
32.0307716670
120.91518215100
100.636002
0.000000
32.0307703680
120.91519282400
89.806000
0.000000
32.0307709200
120.91520367800
81.512001
0.000000
32.0307728420
120.91521413600
69.341003
0.000000
32.0307771010
120.91522443200
52.085999
0.000000
32.0307837600
120.91523281900
41.723999
0.000000
32.0307914160
120.91523974900
31.273001
0.000000
32.0307994650
120.91524466500
21.080000
0.000000
32.0308081020
120.91524794900
11.581000
0.000000
32.0308169840
120.91524990900
6.804000
0.000000
32.0308266650
120.91525087500
2.044000
0.000000
32.0308363640
120.91525102500
357.092987
0.000000
32.0308454740
120.91525076200
0.180000
0.000000
32.0308545400
120.91525048300
358.631012
0.000000
32.0308642580
120.91525064500
1.479000
0.000000
32.0308733660
120.91525116000
3.574000
0.000000
32.0308823780
120.91525174100
1.308000
0.000000
32.0308915670
120.91525213700
3.472000
0.000000
32.0309013400
120.91525273600
1.245000
0.000000
32.0309103910
120.91525290600
1.489000
0.000000
32.0309201180
120.91525361800
1.429000
0.000000
32.0309291640
120.91525371800
359.424011
0.000000
32.0309388660
120.91525320500
356.106995
0.000000
32.0309478980
120.91525251700
352.549988
0.000000
32.0309576240
120.91525108400
350.079010
0.000000
32.0309665940
120.91524933800
352.509003
0.000000
32.0309755100
120.91524791200
351.528015
0.000000
32.0309844990
120.91524603700
348.252014
0.000000
32.0309940620
120.91524366200
346.699005
0.000000
32.0310034720
120.91524080400
344.282990
0.000000
32.0310120850
120.91523778100
340.821014
0.000000
32.0310205650
120.91523415800
335.459015
0.000000
32.0310285570
120.91522924400
325.936005
0.000000
32.0310358980
120.91522309900
322.156006
0.000000
32.0310433430
120.91521577300
312.940002
0.000000
32.0310496090
120.91520717900
303.739990
0.000000
32.0310549300
120.91519746200
298.476013
0.000000
32.0310588260
120.91518787300
291.746002
0.000000
32.0310619800
120.91517768500
284.023010
0.000000
32.0310634770
120.91516693600
275.184998
0.000000
32.0310638860
120.91515611400
264.816010
0.000000
32.0310623280
120.91514564900
253.266998
0.000000
32.0310587730
120.91513498100
240.110001
0.000000
32.0310535320
120.91512525600
233.132004
0.000000
32.0310479220
120.91511694700
226.000000
0.000000
32.0310409290
120.91510900500
220.576004
0.000000
32.0310339060
120.91510237400
214.026001
0.000000
32.0310256190
120.91509629900
203.143997
0.000000
32.0310169390
120.91509284600
191.787994
0.000000
32.0310074340
120.91509122200
179.445999
0.000000
32.0309977060
120.91509169800
172.662994
0.000000
32.0309887870
120.91509335300
167.395004
0.000000
32.0309799010
120.91509565100
168.294998
0.000000
32.0309702280
120.91509780900
172.145996
0.000000
32.0309611820
120.91509877900
176.160995
0.000000
32.0309514570
120.91509948100
176.261993
0.000000
32.0309423130
120.91510028200
174.684998
0.000000

View File

@ -0,0 +1,308 @@
32.0309270710
120.91508380700
174.522003
0.000000
32.0309177630
120.91508508700
170.716995
0.000000
32.0309083630
120.91508691600
167.014008
0.000000
32.0308991500
120.91509013100
158.218994
0.000000
32.0308909680
120.91509463700
151.005997
0.000000
32.0308825810
120.91510033500
147.225006
0.000000
32.0308741660
120.91510631000
156.934006
0.000000
32.0308652190
120.91510999400
166.878998
0.000000
32.0308561900
120.91511247100
165.128998
0.000000
32.0308470540
120.91511526000
164.408005
0.000000
32.0308384050
120.91511818800
165.382996
0.000000
32.0308288240
120.91512052600
167.072998
0.000000
32.0308193630
120.91512325900
164.526993
0.000000
32.0308099890
120.91512646200
160.466995
0.000000
32.0308016850
120.91513058700
151.772003
0.000000
32.0307940750
120.91513631900
140.869995
0.000000
32.0307872200
120.91514318900
132.024994
0.000000
32.0307816190
120.91515177200
123.033997
0.000000
32.0307767850
120.91516175300
111.536003
0.000000
32.0307737970
120.91517181200
104.806999
0.000000
32.0307716670
120.91518215100
100.636002
0.000000
32.0307703680
120.91519282400
89.806000
0.000000
32.0307709200
120.91520367800
81.512001
0.000000
32.0307728420
120.91521413600
69.341003
0.000000
32.0307771010
120.91522443200
52.085999
0.000000
32.0307837600
120.91523281900
41.723999
0.000000
32.0307914160
120.91523974900
31.273001
0.000000
32.0307994650
120.91524466500
21.080000
0.000000
32.0308081020
120.91524794900
11.581000
0.000000
32.0308169840
120.91524990900
6.804000
0.000000
32.0308266650
120.91525087500
2.044000
0.000000
32.0308363640
120.91525102500
357.092987
0.000000
32.0308454740
120.91525076200
0.180000
0.000000
32.0308545400
120.91525048300
358.631012
0.000000
32.0308642580
120.91525064500
1.479000
0.000000
32.0308733660
120.91525116000
3.574000
0.000000
32.0308823780
120.91525174100
1.308000
0.000000
32.0308915670
120.91525213700
3.472000
0.000000
32.0309013400
120.91525273600
1.245000
0.000000
32.0309103910
120.91525290600
1.489000
0.000000
32.0309201180
120.91525361800
1.429000
0.000000
32.0309291640
120.91525371800
359.424011
0.000000
32.0309388660
120.91525320500
356.106995
0.000000
32.0309478980
120.91525251700
352.549988
0.000000
32.0309576240
120.91525108400
350.079010
0.000000
32.0309665940
120.91524933800
352.509003
0.000000
32.0309755100
120.91524791200
351.528015
0.000000
32.0309844990
120.91524603700
348.252014
0.000000
32.0309940620
120.91524366200
346.699005
0.000000
32.0310034720
120.91524080400
344.282990
0.000000
32.0310120850
120.91523778100
340.821014
0.000000
32.0310205650
120.91523415800
335.459015
0.000000
32.0310285570
120.91522924400
325.936005
0.000000
32.0310358980
120.91522309900
322.156006
0.000000
32.0310433430
120.91521577300
312.940002
0.000000
32.0310496090
120.91520717900
303.739990
0.000000
32.0310549300
120.91519746200
298.476013
0.000000
32.0310588260
120.91518787300
291.746002
0.000000
32.0310619800
120.91517768500
284.023010
0.000000
32.0310634770
120.91516693600
275.184998
0.000000
32.0310638860
120.91515611400
264.816010
0.000000
32.0310623280
120.91514564900
253.266998
0.000000
32.0310587730
120.91513498100
240.110001
0.000000
32.0310535320
120.91512525600
233.132004
0.000000
32.0310479220
120.91511694700
226.000000
0.000000
32.0310409290
120.91510900500
220.576004
0.000000
32.0310339060
120.91510237400
214.026001
0.000000
32.0310256190
120.91509629900
203.143997
0.000000
32.0310169390
120.91509284600
191.787994
0.000000
32.0310074340
120.91509122200
179.445999
0.000000
32.0309977060
120.91509169800
172.662994
0.000000
32.0309887870
120.91509335300
167.395004
0.000000
32.0309799010
120.91509565100
168.294998
0.000000
32.0309702280
120.91509780900
172.145996
0.000000
32.0309611820
120.91509877900
176.160995
0.000000
32.0309514570
120.91509948100
176.261993
0.000000
32.0309423130
120.91510028200
174.684998
0.000000

View File

@ -1,7 +1,7 @@
FU:
ros__parameters:
# 遇障停车功能 False or True
enable_obstacle_stop: False
enable_obstacle_stop: True
# 绕障功能 False or True
enable_obstacle_avoid: False

View File

@ -133,11 +133,13 @@ private:
if (!isCollecting)
{
cout << "等待采集...." << endl;
// cout << "等待采集...." << endl;
RCLCPP_INFO(this->get_logger(), "等待采集.....");
}
else
{
cout << "平台采集中" << endl;
// cout << "平台采集中" << endl;
RCLCPP_INFO(this->get_logger(), "平台采集中.....");
}
}
void sub_callback(const sweeper_interfaces::msg::Sub::SharedPtr msg)
@ -146,21 +148,25 @@ private:
{
std::lock_guard<std::mutex> lock(collect_mutex); // 加锁
isCollecting = true;
cout << "平台开始采集" << endl;
// cout << "平台开始采集" << endl;
RCLCPP_INFO(this->get_logger(), "平台开始采集");
pthread_create(&route_thread_t, NULL, route_thread, NULL);
}
else if (!msg->get_route && isCollecting)
{
isCollecting = false;
cout << "平台结束采集" << endl;
// cout << "平台结束采集" << endl;
RCLCPP_INFO(this->get_logger(), "平台结束采集");
pthread_cancel(route_thread_t);
if (upload_file(filename))
{
cout << "上传成功" << endl;
// cout << "上传成功" << endl;
RCLCPP_INFO(this->get_logger(), "上传成功");
}
else
{
cout << "上传失败" << endl;
// cout << "上传失败" << endl;
RCLCPP_INFO(this->get_logger(), "上传失败");
}
}
}

View File

@ -293,43 +293,43 @@ struct can_VCU_motor1_cmd
// 设置Byte1
void setByte1(int8_t value)
{
data[1] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[1] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte2
void setByte2(int8_t value)
{
data[2] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[2] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte3
void setByte3(int8_t value)
{
data[3] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[3] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte4
void setByte4(int8_t value)
{
data[4] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[4] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte5
void setByte5(int8_t value)
{
data[5] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[5] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte6
void setByte6(int8_t value)
{
data[6] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[6] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte7
void setByte7(int8_t value)
{
data[7] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[7] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
CANFrame toFrame() const
@ -356,49 +356,49 @@ struct can_VCU_motor2_cmd
// 设置Byte0
void setByte0(int8_t value)
{
data[0] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[0] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte1
void setByte1(int8_t value)
{
data[1] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[1] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte2
void setByte2(int8_t value)
{
data[2] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[2] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte3
void setByte3(int8_t value)
{
data[3] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[3] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte4
void setByte4(int8_t value)
{
data[4] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[4] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte5
void setByte5(int8_t value)
{
data[5] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[5] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte6
void setByte6(int8_t value)
{
data[6] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[6] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
// 设置Byte7
void setByte7(int8_t value)
{
data[7] = std::clamp(value, static_cast<int8_t>(0), static_cast<int8_t>(100));
data[7] = std::clamp(value, static_cast<int8_t>(-100), static_cast<int8_t>(100));
}
CANFrame toFrame() const

View File

@ -55,15 +55,16 @@ void vcuTimerTask(CANDriver &canctl)
if (sweep_state_changed)
{
// 根据清扫状态设置所有部件
uint8_t target_value = msg.sweep ? 100 : 0; // 100表示启动0表示停止
int8_t target_value = msg.sweep ? 100 : 0; // 100表示启动0表示停止
int8_t target_value2 = msg.sweep ? -100 : 100; // -100表示下沉100表示抬升
// ===== 控制0x211报文 (电机M1-M7) =====
vcu_motor1_cmd.setByte0(msg.sweep ? 1 : 0); // 1表示正向运行0表示停止
vcu_motor1_cmd.setByte1(target_value);
vcu_motor1_cmd.setByte2(target_value);
vcu_motor1_cmd.setByte3(target_value);
vcu_motor1_cmd.setByte4(target_value);
vcu_motor1_cmd.setByte5(target_value);
vcu_motor1_cmd.setByte4(target_value2);
vcu_motor1_cmd.setByte5(target_value2);
vcu_motor1_cmd.setByte6(target_value);
vcu_motor1_cmd.setByte7(target_value);
canctl.sendFrame(vcu_motor1_cmd.toFrame());

View File

@ -3,7 +3,7 @@
"mqtt": {
"ip": "36.153.162.171",
"port": 19683,
"info_topic": "/zxwl/vehicle/{vid}/info",
"fault_topic": "/zxwl/vehicle/{vid}/fault"
"info_topic": "/zxwl/sweeper/{vid}/info",
"fault_topic": "/zxwl/sweeper/{vid}/fault"
}
}

View File

@ -179,8 +179,8 @@ void *mqtt_pub_gps(void *arg)
&token_d_m);
if (rc == MQTTCLIENT_SUCCESS)
{
std::cout << "[MQTT] Published to topic: " << g_conf.pub_gps_topic
<< "\n msg: " << json_string << std::endl;
// std::cout << "[MQTT] Published to topic: " << g_conf.pub_gps_topic
// << "\n msg: " << json_string << std::endl;
MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms);
}
else

View File

@ -5,7 +5,7 @@
#include <fstream>
/* Type define */
typedef unsigned char byte;
typedef unsigned char MD5_BYTE;
typedef unsigned int uint32;
using std::ifstream;
@ -22,17 +22,17 @@ public:
void update(const void *input, size_t length);
void update(const string &str);
void update(ifstream &in);
const byte *digest();
const MD5_BYTE *digest();
string toString();
void reset();
private:
void update(const byte *input, size_t length);
void update(const MD5_BYTE *input, size_t length);
void final();
void transform(const byte block[64]);
void encode(const uint32 *input, byte *output, size_t length);
void decode(const byte *input, uint32 *output, size_t length);
string bytesToHexString(const byte *input, size_t length);
void transform(const MD5_BYTE block[64]);
void encode(const uint32 *input, MD5_BYTE *output, size_t length);
void decode(const MD5_BYTE *input, uint32 *output, size_t length);
string bytesToHexString(const MD5_BYTE *input, size_t length);
/* class uncopyable */
MD5(const MD5 &);
@ -41,11 +41,11 @@ private:
private:
uint32 _state[4]; /* state (ABCD) */
uint32 _count[2]; /* number of bits, modulo 2^64 (low-order word first) */
byte _buffer[64]; /* input buffer */
byte _digest[16]; /* message digest */
MD5_BYTE _buffer[64]; /* input buffer */
MD5_BYTE _digest[16]; /* message digest */
bool _finished; /* calculate finished ? */
static const byte PADDING[64]; /* padding for calculate */
static const MD5_BYTE PADDING[64]; /* padding for calculate */
static const char HEX[16];
enum
{

View File

@ -33,7 +33,9 @@ include_directories(
)
add_executable(task_manager_node
src/task_manager_node.cpp
src/task_manager_main.cpp
src/task_manager.cpp
src/mqtt_manager.cpp
src/jsoncpp.cpp
src/md5.cpp
)

View File

@ -0,0 +1,99 @@
#ifndef __MQTT_MANAGER_H__
#define __MQTT_MANAGER_H__
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <functional>
#include <iostream>
#include "MQTTClient.h"
using namespace std;
class MQTTManager
{
public:
// 消息回调函数类型
using MessageCallback = std::function<int(char *topicName, int topicLen, MQTTClient_message *message)>;
using ConnLostCallback = std::function<void(char *cause)>;
using DeliveredCallback = std::function<void(MQTTClient_deliveryToken dt)>;
MQTTManager();
~MQTTManager();
// 初始化MQTT连接
bool init(const string &address, const string &clientId,
const string &username, const string &password);
// 订阅主题
bool subscribe(const string &topic, int qos = 0);
// 发布消息
bool publish(const string &topic, const string &payload, int qos = 0);
// 设置回调函数
void setMessageCallback(MessageCallback cb) { message_callback_ = cb; }
void setConnLostCallback(ConnLostCallback cb) { conn_lost_callback_ = cb; }
void setDeliveredCallback(DeliveredCallback cb) { delivered_callback_ = cb; }
// 获取连接状态
bool isConnected();
// 启动MQTT循环线程
void start();
// 停止MQTT
void stop();
// 设置重连参数
void setReconnectInterval(int seconds) { reconnect_interval_ = seconds; }
void setMaxReconnectAttempts(int attempts) { max_reconnect_attempts_ = attempts; }
void setHeartbeatTimeout(int milliseconds) { heartbeat_timeout_ = milliseconds; }
private:
// MQTT连接循环线程函数
void mqttLoop();
// 重连逻辑
bool reconnect();
// 内部MQTT回调static函数
static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message);
static void onConnectionLost(void *context, char *cause);
static void onDelivered(void *context, MQTTClient_deliveryToken dt);
MQTTClient client_;
MQTTClient_connectOptions conn_opts_;
MQTTClient_deliveryToken token_;
string address_;
string client_id_;
string username_;
string password_;
string sub_topic_;
MessageCallback message_callback_;
ConnLostCallback conn_lost_callback_;
DeliveredCallback delivered_callback_;
bool is_running_;
std::thread mqtt_thread_;
std::mutex client_mutex_;
// 断线重连参数
int reconnect_interval_; // 重连间隔(秒)
int max_reconnect_attempts_; // 最大重连次数0为无限
int reconnect_attempts_;
std::chrono::steady_clock::time_point last_reconnect_time_;
// 心跳检测
std::chrono::steady_clock::time_point last_message_time_;
int heartbeat_timeout_; // 心跳超时时间(毫秒)
bool is_heartbeat_lost_;
constexpr static int TIMEOUT = 10000L;
constexpr static int QOS = 0;
};
#endif // __MQTT_MANAGER_H__

View File

@ -0,0 +1,87 @@
#ifndef __TASK_MANAGER_H__
#define __TASK_MANAGER_H__
#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <boost/any.hpp>
#include "task_manager_node.hpp"
using namespace std;
class TaskManager
{
public:
TaskManager();
~TaskManager();
// 启动任务处理线程
void start();
// 停止任务处理
void stop();
// 添加任务到队列
void addTask(const MQTT_JSON &mqtt_json);
// 获取当前任务状态
TaskStatus getCurrentTaskStatus();
// 获取当前任务ID
int getCurrentTaskId();
// 更新当前任务
void updateCurrentTask(int taskId, TaskStatus status);
// 获取任务队列大小
int getQueueSize();
private:
// 任务处理线程函数
void processTasksLoop();
// 处理单个start任务
bool processStartTask(const JSON_DATA &json_data, long seqNo);
// 处理单个stop任务
bool processStopTask(long taskId, long seqNo);
// 下载文件
bool downloadFile(const string &url, const string &expected_md5, const string &filename);
// 计算MD5
string calculate_md5(const string &filename);
// 复制文件
void copyFileAndOverwrite(const string &sourceFilePath, const string &destinationFilePath);
std::queue<MQTT_JSON> task_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_cv_;
bool is_running_;
std::thread task_thread_;
CurrentTask current_task_;
std::mutex task_mutex_;
string destination_file_path_;
int task_status_;
// 回调函数用于发送MQTT响应
std::function<void(const string &topic, int seqNo, int code, const string &msg)> send_response_callback_;
public:
void setSendResponseCallback(
std::function<void(const string &topic, int seqNo, int code, const string &msg)> cb)
{
send_response_callback_ = cb;
}
int getTaskStatus() const { return task_status_; }
void setTaskStatus(int status) { task_status_ = status; }
};
#endif // __TASK_MANAGER_H__

View File

@ -1,4 +1,3 @@
#include <string>
#include <boost/any.hpp>
#include <curl/curl.h>
@ -16,6 +15,7 @@
#include <iomanip>
#include <pthread.h>
#include <cstdlib>
#ifndef __SUB_NODE_H__
#define __SUB_NODE_H__
using namespace std;
@ -89,10 +89,4 @@ struct CurrentTask
CurrentTask() : id(0), status(PENDING) {}
};
// extern TASK_VALUE task_value;
// extern ROUTE_INFO routeInfo;
// extern MQTT_JSON mqtt_json;
// extern ROUTE_INFO routeInfo;
#endif

View File

@ -0,0 +1,274 @@
#include "mqtt_manager.hpp"
#include <iostream>
#include <iomanip>
#include <memory>
MQTTManager::MQTTManager()
: client_(nullptr), is_running_(false), reconnect_interval_(5),
max_reconnect_attempts_(0), reconnect_attempts_(0), heartbeat_timeout_(500),
is_heartbeat_lost_(false)
{
conn_opts_ = MQTTClient_connectOptions_initializer;
}
MQTTManager::~MQTTManager()
{
stop();
}
bool MQTTManager::init(const string &address, const string &clientId,
const string &username, const string &password)
{
address_ = address;
client_id_ = clientId;
username_ = username;
password_ = password;
int rc = MQTTClient_create(&client_, address_.c_str(), client_id_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to create MQTT client, return code " << rc << std::endl;
return false;
}
// 设置回调
rc = MQTTClient_setCallbacks(client_, this, onConnectionLost,
onMessageArrived, onDelivered);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to set MQTT callbacks, return code " << rc << std::endl;
MQTTClient_destroy(&client_);
return false;
}
// 配置连接选项
conn_opts_.keepAliveInterval = 20;
conn_opts_.cleansession = 1;
conn_opts_.username = username_.c_str();
conn_opts_.password = password_.c_str();
// 初始化时间戳
last_message_time_ = std::chrono::steady_clock::now();
last_reconnect_time_ = std::chrono::steady_clock::now();
return true;
}
bool MQTTManager::reconnect()
{
std::lock_guard<std::mutex> lock(client_mutex_);
std::cout << "Attempting to reconnect to MQTT server..." << std::endl;
int rc = MQTTClient_connect(client_, &conn_opts_);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to reconnect, return code " << rc << std::endl;
reconnect_attempts_++;
if (max_reconnect_attempts_ > 0 && reconnect_attempts_ >= max_reconnect_attempts_)
{
std::cerr << "Max reconnection attempts reached." << std::endl;
return false;
}
return false;
}
std::cout << "Successfully reconnected to MQTT server." << std::endl;
reconnect_attempts_ = 0;
is_heartbeat_lost_ = false;
last_message_time_ = std::chrono::steady_clock::now();
return true;
}
bool MQTTManager::subscribe(const string &topic, int qos)
{
std::lock_guard<std::mutex> lock(client_mutex_);
if (!isConnected())
{
std::cerr << "MQTT client is not connected." << std::endl;
return false;
}
sub_topic_ = topic;
int rc = MQTTClient_subscribe(client_, topic.c_str(), qos);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to subscribe, return code " << rc << std::endl;
return false;
}
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
return true;
}
bool MQTTManager::publish(const string &topic, const string &payload, int qos)
{
std::lock_guard<std::mutex> lock(client_mutex_);
if (!isConnected())
{
std::cerr << "MQTT client is not connected." << std::endl;
return false;
}
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = (void *)payload.c_str();
pubmsg.payloadlen = payload.length();
pubmsg.qos = qos;
pubmsg.retained = 0;
int rc = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token_);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to publish message, return code " << rc << std::endl;
return false;
}
rc = MQTTClient_waitForCompletion(client_, token_, TIMEOUT);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to wait for message completion, return code " << rc << std::endl;
return false;
}
return true;
}
bool MQTTManager::isConnected()
{
return MQTTClient_isConnected(client_) == 1;
}
void MQTTManager::start()
{
if (is_running_)
{
std::cout << "MQTT manager is already running." << std::endl;
return;
}
// 首先建立连接
if (!reconnect())
{
std::cerr << "Failed to establish initial MQTT connection." << std::endl;
return;
}
is_running_ = true;
mqtt_thread_ = std::thread(&MQTTManager::mqttLoop, this);
std::cout << "MQTT manager started." << std::endl;
}
void MQTTManager::stop()
{
if (!is_running_)
return;
is_running_ = false;
if (mqtt_thread_.joinable())
{
mqtt_thread_.join();
}
std::lock_guard<std::mutex> lock(client_mutex_);
if (isConnected())
{
MQTTClient_disconnect(client_, 10000);
}
MQTTClient_destroy(&client_);
std::cout << "MQTT manager stopped." << std::endl;
}
void MQTTManager::mqttLoop()
{
while (is_running_)
{
// 检查连接状态
if (!isConnected())
{
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
now - last_reconnect_time_)
.count();
// 如果距离上次重连已过指定间隔,则尝试重连
if (duration >= reconnect_interval_)
{
last_reconnect_time_ = now;
if (!reconnect())
{
// 重连失败,继续等待
is_heartbeat_lost_ = true;
}
}
}
else
{
// 连接正常,检查心跳
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_message_time_)
.count();
if (duration > heartbeat_timeout_ && !is_heartbeat_lost_)
{
is_heartbeat_lost_ = true;
std::cout << "Heartbeat timeout: No message received in "
<< heartbeat_timeout_ << "ms." << std::endl;
if (conn_lost_callback_)
{
conn_lost_callback_((char *)"Heartbeat timeout");
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int MQTTManager::onMessageArrived(void *context, char *topicName, int topicLen,
MQTTClient_message *message)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
// 更新最后消息时间
pThis->last_message_time_ = std::chrono::steady_clock::now();
pThis->is_heartbeat_lost_ = false;
std::cout << "Message arrived on topic: " << topicName << std::endl;
if (pThis->message_callback_)
{
return pThis->message_callback_(topicName, topicLen, message);
}
return 1;
}
void MQTTManager::onConnectionLost(void *context, char *cause)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
std::cout << "Connection lost. Cause: " << (cause ? cause : "Unknown") << std::endl;
if (pThis->conn_lost_callback_)
{
pThis->conn_lost_callback_(cause);
}
}
void MQTTManager::onDelivered(void *context, MQTTClient_deliveryToken dt)
{
MQTTManager *pThis = static_cast<MQTTManager *>(context);
std::cout << "Message with token " << dt << " delivered." << std::endl;
if (pThis->delivered_callback_)
{
pThis->delivered_callback_(dt);
}
}

View File

@ -0,0 +1,272 @@
#include "task_manager.hpp"
#include "md5.h"
#include <iostream>
#include <fstream>
#include <cstdlib>
#include <unistd.h>
TaskManager::TaskManager()
: is_running_(false), task_status_(0),
destination_file_path_("./gps_load_now.txt")
{
current_task_.id = 0;
current_task_.status = TaskStatus::PENDING;
}
TaskManager::~TaskManager()
{
stop();
}
void TaskManager::start()
{
if (is_running_)
{
std::cout << "TaskManager is already running." << std::endl;
return;
}
is_running_ = true;
task_thread_ = std::thread(&TaskManager::processTasksLoop, this);
std::cout << "TaskManager started." << std::endl;
}
void TaskManager::stop()
{
if (!is_running_)
return;
{
std::lock_guard<std::mutex> lock(queue_mutex_);
is_running_ = false;
}
queue_cv_.notify_one();
if (task_thread_.joinable())
{
task_thread_.join();
}
std::cout << "TaskManager stopped." << std::endl;
}
void TaskManager::addTask(const MQTT_JSON &mqtt_json)
{
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.push(mqtt_json);
}
queue_cv_.notify_one();
}
int TaskManager::getQueueSize()
{
std::lock_guard<std::mutex> lock(queue_mutex_);
return task_queue_.size();
}
TaskStatus TaskManager::getCurrentTaskStatus()
{
std::lock_guard<std::mutex> lock(task_mutex_);
return current_task_.status;
}
int TaskManager::getCurrentTaskId()
{
std::lock_guard<std::mutex> lock(task_mutex_);
return current_task_.id;
}
void TaskManager::updateCurrentTask(int taskId, TaskStatus status)
{
std::lock_guard<std::mutex> lock(task_mutex_);
current_task_.id = taskId;
current_task_.status = status;
current_task_.lastUpdateTime = std::chrono::steady_clock::now();
std::cout << "Task updated: ID=" << taskId << ", Status=" << status << std::endl;
}
string TaskManager::calculate_md5(const string &filename)
{
MD5 md5;
std::ifstream file;
file.open(filename, std::ios::binary);
if (!file)
{
std::cerr << "Failed to open file for MD5 calculation: " << filename << std::endl;
return "";
}
md5.update(file);
return md5.toString();
}
bool TaskManager::downloadFile(const string &url, const string &expected_md5,
const string &filename)
{
if (url.empty())
{
std::cerr << "Download URL is empty." << std::endl;
return false;
}
std::string command = "wget -O routes/" + filename + " \"" + url + "\"";
std::cout << "Executing command: " << command << std::endl;
int result = system(command.c_str());
if (result != 0)
{
std::cerr << "Download failed: " << url << std::endl;
return false;
}
// 验证MD5
std::string filepath = "routes/" + filename;
std::string actual_md5 = calculate_md5(filepath);
if (actual_md5 != expected_md5)
{
std::cerr << "MD5 verification failed. Expected: " << expected_md5
<< ", Actual: " << actual_md5 << std::endl;
remove(filepath.c_str());
return false;
}
std::cout << "File downloaded and verified: " << filepath << std::endl;
return true;
}
void TaskManager::copyFileAndOverwrite(const string &sourceFilePath,
const string &destinationFilePath)
{
std::ifstream src(sourceFilePath, std::ios::binary);
std::ofstream dst(destinationFilePath, std::ios::binary);
if (!src)
{
std::cerr << "Failed to open source file: " << sourceFilePath << std::endl;
return;
}
if (!dst)
{
std::cerr << "Failed to open destination file: " << destinationFilePath << std::endl;
return;
}
dst << src.rdbuf();
if (!dst)
{
std::cerr << "Error while copying file." << std::endl;
}
}
bool TaskManager::processStartTask(const JSON_DATA &json_data, long seqNo)
{
std::cout << "Processing start task ID: " << json_data.value.id << std::endl;
std::cout << "URL: " << json_data.value.routeInfo.url << std::endl;
std::cout << "MD5: " << json_data.value.routeInfo.md5 << std::endl;
// 更新当前任务ID
updateCurrentTask(json_data.value.id, TaskStatus::RUNNING);
string downFileName = json_data.value.routeInfo.fileName;
string filepath = "routes/" + downFileName;
// 检查文件是否已存在
if (access(filepath.c_str(), F_OK) == -1)
{
std::cout << "File not found, downloading: " << downFileName << std::endl;
if (!downloadFile(json_data.value.routeInfo.url,
json_data.value.routeInfo.md5, downFileName))
{
std::cout << "Download failed, skipping task." << std::endl;
updateCurrentTask(json_data.value.id, TaskStatus::FAILED);
if (send_response_callback_)
{
// 发送响应到状态主题而不是task_response
send_response_callback_("task_response", seqNo, 500, "Download failed");
}
return false;
}
}
else
{
std::cout << "File already exists, skipping download: " << filepath << std::endl;
}
// 复制文件
copyFileAndOverwrite(filepath, destination_file_path_);
// 启动任务
task_status_ = 1;
updateCurrentTask(json_data.value.id, TaskStatus::RUNNING);
std::cout << "Task started successfully: " << json_data.value.id << std::endl;
// 发送响应(响应主题由调用方决定)
if (send_response_callback_)
{
send_response_callback_("task_response", seqNo, 200, "Task started successfully");
}
return true;
}
bool TaskManager::processStopTask(long taskId, long seqNo)
{
std::cout << "Processing stop task ID: " << taskId << std::endl;
task_status_ = 0;
updateCurrentTask(taskId, TaskStatus::COMPLETED);
if (send_response_callback_)
{
send_response_callback_("task_response", seqNo, 200, "Task stopped successfully");
}
return true;
}
void TaskManager::processTasksLoop()
{
while (true)
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// 等待队列中有任务或线程需要停止
queue_cv_.wait(lock, [this]
{ return !task_queue_.empty() || !is_running_; });
// 检查是否需要退出
if (!is_running_ && task_queue_.empty())
{
break;
}
if (!task_queue_.empty())
{
MQTT_JSON mqtt_json = task_queue_.front();
task_queue_.pop();
lock.unlock();
try
{
if (mqtt_json.type == "request")
{
JSON_DATA json_data = boost::any_cast<JSON_DATA>(mqtt_json.data);
if (json_data.command == "start")
{
processStartTask(json_data, mqtt_json.seqNo);
}
}
else if (mqtt_json.type == "stop")
{
long taskId = boost::any_cast<long>(mqtt_json.data);
processStopTask(taskId, mqtt_json.seqNo);
}
}
catch (const boost::bad_any_cast &e)
{
std::cerr << "Bad any_cast in task processing: " << e.what() << std::endl;
}
}
}
}

View File

@ -0,0 +1,478 @@
#include "rclcpp/rclcpp.hpp"
#include "sweeper_interfaces/msg/task.hpp"
#include "mqtt_manager.hpp"
#include "task_manager.hpp"
#include "task_manager_node.hpp"
#include "json.h"
#include <signal.h>
#include <fstream>
#include <random>
#include <iomanip>
using namespace std;
// 全局变量
volatile std::sig_atomic_t signal_received = 0;
MQTTManager mqtt_manager;
TaskManager task_manager;
int last_status = 0;
TaskStatus status_up = TaskStatus::PENDING;
std::thread status_report_thread;
// 配置变量
struct Config
{
string mqtt_vid;
string mqtt_address;
string mqtt_username;
string mqtt_password;
string mqtt_topic_sub_task;
string mqtt_topic_push_status;
} config;
// 前置声明
void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg);
void sendTaskResponse(long seqNo, int code, const string &msg);
string generate_mqtt_client_id();
// 信号处理函数
void signalHandler(int signum)
{
std::cout << "Interrupt signal (" << signum << ") received." << std::endl;
signal_received = signum;
if (signum == SIGINT && rclcpp::ok())
{
std::cout << "Shutting down ROS2 node..." << std::endl;
rclcpp::shutdown();
}
}
// 生成MQTT客户端ID
string generate_mqtt_client_id()
{
auto now = std::chrono::system_clock::now();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch())
.count();
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1000, 9999);
int random_num = dis(gen);
std::ostringstream oss;
oss << "client_" << millis << "_" << std::setw(4) << std::setfill('0') << random_num;
return oss.str();
}
// 从JSON解析ROUTE_INFO
ROUTE_INFO get_route_info(const Json::Value &root)
{
ROUTE_INFO route_info;
if (root.isObject())
{
if (root.isMember("routeName") && root["routeName"].isString())
route_info.routeName = root["routeName"].asString();
else
std::cerr << "Missing or invalid 'routeName' field" << std::endl;
if (root.isMember("fileName") && root["fileName"].isString())
route_info.fileName = root["fileName"].asString();
else
std::cerr << "Missing or invalid 'fileName' field" << std::endl;
if (root.isMember("url") && root["url"].isString())
route_info.url = root["url"].asString();
else
std::cerr << "Missing or invalid 'url' field" << std::endl;
if (root.isMember("md5") && root["md5"].isString())
route_info.md5 = root["md5"].asString();
else
std::cerr << "Missing or invalid 'md5' field" << std::endl;
}
else
{
std::cerr << "routeInfo is not a valid JSON object" << std::endl;
}
return route_info;
}
// 从JSON解析TASK_VALUE
TASK_VALUE get_task_value(const Json::Value &root)
{
TASK_VALUE task_value;
if (root.isObject())
{
if (root.isMember("id") && root["id"].isInt())
task_value.id = root["id"].asInt();
else
std::cerr << "Missing or invalid 'id' field" << std::endl;
if (root.isMember("name") && root["name"].isString())
task_value.name = root["name"].asString();
else
std::cerr << "Missing or invalid 'name' field" << std::endl;
if (root.isMember("mode") && root["mode"].isInt())
task_value.mode = root["mode"].asInt();
else
std::cerr << "Missing or invalid 'mode' field" << std::endl;
if (root.isMember("count") && root["count"].isInt())
task_value.count = root["count"].asInt();
else
std::cerr << "Missing or invalid 'count' field" << std::endl;
if (root.isMember("routeInfo") && root["routeInfo"].isObject())
task_value.routeInfo = get_route_info(root["routeInfo"]);
else
std::cerr << "Missing or invalid 'routeInfo' field" << std::endl;
}
else
{
std::cerr << "task value is not a valid JSON object" << std::endl;
}
return task_value;
}
// 加载配置文件
bool loadConfig(const string &config_file)
{
Json::Reader reader;
Json::Value root;
std::ifstream in(config_file, std::ios::binary);
if (!in.is_open())
{
std::cout << "Failed to read config file: " << config_file << std::endl;
return false;
}
if (!reader.parse(in, root))
{
std::cout << "Failed to parse config file." << std::endl;
return false;
}
config.mqtt_vid = root["mqtt"]["vid"].asString();
config.mqtt_username = root["mqtt"]["mqtt_user"].asString();
config.mqtt_password = root["mqtt"]["mqtt_password"].asString();
config.mqtt_topic_sub_task = root["mqtt"]["mqtt_topic_sub_task"].asString();
config.mqtt_topic_push_status = root["mqtt"]["mqtt_topic_push_status"].asString();
string external_addr = root["mqtt"]["external_net_address"].asString();
int external_port = root["mqtt"]["external_net_port"].asInt();
config.mqtt_address = external_addr + ":" + std::to_string(external_port);
in.close();
std::cout << "Config loaded successfully." << std::endl;
std::cout << "MQTT Address: " << config.mqtt_address << std::endl;
std::cout << "Topic Sub: " << config.mqtt_topic_sub_task << std::endl;
std::cout << "Topic Pub: " << config.mqtt_topic_push_status << std::endl;
return true;
}
// 发送通用应答消息(触发式)
void sendGeneralResponse(const string &topic, long seqNo, int code, const string &msg)
{
Json::Value responseData;
responseData["type"] = "response";
responseData["seqNo"] = seqNo;
Json::Value data;
data["code"] = code;
data["msg"] = msg;
responseData["data"] = data;
Json::FastWriter writer;
string responseJson = writer.write(responseData);
std::cout << "[General Response] seqNo=" << seqNo << ", code=" << code
<< ", msg=" << msg << std::endl;
std::cout << "Response JSON: " << responseJson << std::endl;
bool success = mqtt_manager.publish(topic, responseJson, 0);
std::cout << "General response publish to [" << topic << "] "
<< (success ? "[OK]" : "[FAILED]") << std::endl;
}
// 发送任务专用应答(包含任务信息)
void sendTaskResponse(long seqNo, int code, const string &msg)
{
Json::Value responseData;
responseData["type"] = "response";
responseData["seqNo"] = seqNo;
Json::Value data;
data["code"] = code;
data["msg"] = msg;
int current_task_id = task_manager.getCurrentTaskId();
TaskStatus current_status = task_manager.getCurrentTaskStatus();
data["taskId"] = current_task_id;
data["taskStatus"] = static_cast<int>(current_status);
responseData["data"] = data;
Json::FastWriter writer;
string responseJson = writer.write(responseData);
std::cout << "[Task Response] seqNo=" << seqNo << ", code=" << code
<< ", taskId=" << current_task_id << ", taskStatus=" << (int)current_status << std::endl;
std::cout << "Response JSON: " << responseJson << std::endl;
bool success = mqtt_manager.publish(config.mqtt_topic_sub_task, responseJson, 0);
std::cout << "Task response publish to [" << config.mqtt_topic_sub_task << "] "
<< (success ? "[OK]" : "[FAILED]") << std::endl;
}
// MQTT消息回调
int handleMqttMessage(char *topicName, int topicLen, MQTTClient_message *message)
{
const int BUFFER_SIZE = 4096;
char *buffer = new char[BUFFER_SIZE];
memset(buffer, 0, BUFFER_SIZE);
size_t copy_size = std::min((size_t)message->payloadlen, (size_t)(BUFFER_SIZE - 1));
memcpy(buffer, message->payload, copy_size);
Json::Reader reader;
Json::Value root;
if (!reader.parse(buffer, root))
{
std::cout << "Failed to parse JSON from MQTT message." << std::endl;
delete[] buffer;
return 1;
}
std::cout << "MQTT message received on topic: " << topicName << std::endl;
if (root.isMember("type") && root["type"].asString() == "request")
{
long seqNo = 0;
if (root.isMember("seqNo"))
{
seqNo = root["seqNo"].asInt64();
}
// 立即发送通用应答
sendGeneralResponse(string(topicName), seqNo, 200, "Message received");
Json::Value data = root["data"];
JSON_DATA json_data;
if (data.isMember("command") && data["command"].isString())
{
json_data.command = data["command"].asString();
if (json_data.command == "start")
{
try
{
json_data.value = get_task_value(data["value"]);
MQTT_JSON mqtt_json;
mqtt_json.type = "request";
mqtt_json.seqNo = seqNo;
mqtt_json.data = json_data;
std::cout << "Start task: " << json_data.value.id << std::endl;
task_manager.addTask(mqtt_json);
}
catch (const std::exception &e)
{
std::cerr << "Error processing start command: " << e.what() << std::endl;
sendTaskResponse(seqNo, 400, "Failed to process start command");
}
}
else if (json_data.command == "stop")
{
try
{
long stop_id = data["value"].asInt();
MQTT_JSON mqtt_json;
mqtt_json.type = "stop";
mqtt_json.seqNo = seqNo;
mqtt_json.data = stop_id;
std::cout << "Stop task: " << stop_id << std::endl;
task_manager.addTask(mqtt_json);
sendTaskResponse(seqNo, 200, "Task stop command received");
}
catch (const std::exception &e)
{
std::cerr << "Error processing stop command: " << e.what() << std::endl;
sendTaskResponse(seqNo, 400, "Failed to process stop command");
}
}
else
{
sendGeneralResponse(string(topicName), seqNo, 400, "Unknown command: " + json_data.command);
}
}
else
{
sendGeneralResponse(string(topicName), seqNo, 400, "Missing command field");
}
}
else if (root.isMember("type") && root["type"].asString() == "noReply")
{
std::cout << "NoReply message received" << std::endl;
}
else
{
std::cout << "Unknown message type" << std::endl;
}
delete[] buffer;
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
// 周期性上报任务状态到MQTT200ms间隔持续上报
void reportTaskStatusToMqtt()
{
std::cout << "DEBUG: Status report thread started" << std::endl;
while (rclcpp::ok() && !signal_received)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
int current_id = task_manager.getCurrentTaskId();
TaskStatus current_status = task_manager.getCurrentTaskStatus();
if (current_id > 0)
{
Json::Value statusMsg;
statusMsg["id"] = current_id;
statusMsg["status"] = static_cast<int>(current_status);
Json::FastWriter writer;
string statusJson = writer.write(statusMsg);
mqtt_manager.publish(config.mqtt_topic_push_status, statusJson, 0);
}
}
std::cout << "DEBUG: Status report thread exiting" << std::endl;
}
// ROS2节点类
class TaskManagerNode : public rclcpp::Node
{
public:
TaskManagerNode(string name) : Node(name)
{
RCLCPP_INFO(this->get_logger(), "Node %s started.", name.c_str());
msg_publisher_ = this->create_publisher<sweeper_interfaces::msg::Task>(
"task_message/download", 10);
task_subscribe_ = this->create_subscription<sweeper_interfaces::msg::Task>(
"task_message/upload", 10,
std::bind(&TaskManagerNode::task_callback, this, std::placeholders::_1));
timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&TaskManagerNode::timer_callback, this));
}
private:
void timer_callback()
{
sweeper_interfaces::msg::Task msg;
msg.task_status = task_manager.getTaskStatus();
if (msg.task_status != last_status)
{
msg_publisher_->publish(msg);
RCLCPP_INFO(this->get_logger(), "Task status published: %d", msg.task_status);
}
last_status = msg.task_status;
}
void task_callback(const sweeper_interfaces::msg::Task::SharedPtr msg)
{
if (msg->task_status >= TaskStatus::PENDING && msg->task_status <= TaskStatus::FAILED)
{
status_up = static_cast<TaskStatus>(msg->task_status);
}
else
{
RCLCPP_WARN(this->get_logger(), "Invalid task status: %d", msg->task_status);
}
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<sweeper_interfaces::msg::Task>::SharedPtr msg_publisher_;
rclcpp::Subscription<sweeper_interfaces::msg::Task>::SharedPtr task_subscribe_;
};
int main(int argc, char **argv)
{
signal(SIGINT, signalHandler);
if (!loadConfig("config.json"))
{
std::cerr << "Failed to load configuration." << std::endl;
return 1;
}
string client_id = generate_mqtt_client_id();
if (!mqtt_manager.init(config.mqtt_address, client_id,
config.mqtt_username, config.mqtt_password))
{
std::cerr << "Failed to initialize MQTT manager." << std::endl;
return 1;
}
mqtt_manager.setHeartbeatTimeout(2000);
mqtt_manager.setReconnectInterval(5);
mqtt_manager.setMaxReconnectAttempts(0);
mqtt_manager.setMessageCallback(handleMqttMessage);
mqtt_manager.start();
mqtt_manager.subscribe(config.mqtt_topic_sub_task, 0);
task_manager.setSendResponseCallback([](const string &topic, int seqNo, int code, const string &msg)
{ sendTaskResponse(seqNo, code, msg); });
task_manager.start();
rclcpp::init(argc, argv);
auto node = std::make_shared<TaskManagerNode>("task_manager_node");
std::cout << "Starting status report thread..." << std::endl;
status_report_thread = std::thread(reportTaskStatusToMqtt);
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
while (rclcpp::ok() && !signal_received)
{
executor.spin_some();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "Shutting down components..." << std::endl;
task_manager.stop();
mqtt_manager.stop();
if (status_report_thread.joinable())
{
status_report_thread.join();
}
return 0;
}

View File

@ -64,12 +64,7 @@ private:
RCLCPP_INFO_THROTTLE(this->get_logger(), *this->get_clock(), 2000, "[ARBITER] Using RADIO control");
return;
}
if (auto_valid_ && (now - auto_last_time_).nanoseconds() < timeout_ms_ * 1000000)
{
publisher_->publish(auto_msg_);
RCLCPP_INFO_THROTTLE(this->get_logger(), *this->get_clock(), 2000, "[ARBITER] Using AUTO control");
return;
}
if (remote_valid_ && (now - remote_last_time_).nanoseconds() < timeout_ms_ * 1000000)
{
publisher_->publish(remote_msg_);
@ -77,6 +72,13 @@ private:
return;
}
if (auto_valid_ && (now - auto_last_time_).nanoseconds() < timeout_ms_ * 1000000)
{
publisher_->publish(auto_msg_);
RCLCPP_INFO_THROTTLE(this->get_logger(), *this->get_clock(), 2000, "[ARBITER] Using AUTO control");
return;
}
// 所有控制源均失联,发布安全默认指令
sweeperMsg::McCtrl safe_msg;
safe_msg.brake = 1;

View File

@ -3,4 +3,4 @@ host = 36.153.162.171
port = 19683
username = zxwl
password = zxwl1234@
vehicle_id = V090001
vehicle_id = V060003