Auto commit at 2025-07-03 09:50:58

This commit is contained in:
cxh 2025-07-03 09:50:59 +08:00
parent 46b7d6e536
commit 4fb4b07b02
2 changed files with 171 additions and 21 deletions

View File

@ -3,13 +3,20 @@
#include "mqtt/async_client.h" #include "mqtt/async_client.h"
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <thread>
#include <atomic>
#include <mutex>
#include <chrono>
#include <condition_variable>
class MQTTClientWrapper class MQTTClientWrapper
{ {
public: public:
MQTTClientWrapper(const std::string &server_address, const std::string &client_id, MQTTClientWrapper(const std::string &server_address, const std::string &client_id,
const std::string &username = "", const std::string &password = "") const std::string &username = "", const std::string &password = "")
: client_(server_address, client_id), connOpts_() : client_(server_address, client_id), connOpts_(),
is_connected_(false), stop_reconnect_(false),
initial_reconnect_delay_ms_(1000), max_reconnect_delay_ms_(60000)
{ {
connOpts_.set_clean_start(true); connOpts_.set_clean_start(true);
if (!username.empty()) if (!username.empty())
@ -17,30 +24,38 @@ public:
connOpts_.set_user_name(username); connOpts_.set_user_name(username);
connOpts_.set_password(password); connOpts_.set_password(password);
} }
// 设置连接丢失回调
client_.set_connection_lost_handler([this](const std::string &cause)
{
std::cerr << "MQTT connection lost: " << cause << std::endl;
is_connected_ = false;
start_reconnect(); });
}
~MQTTClientWrapper()
{
stop_reconnect();
disconnect();
} }
bool connect() bool connect()
{ {
try std::lock_guard<std::mutex> lock(conn_mutex_);
{ return connect_with_retry();
mqtt::token_ptr conntok = client_.connect(connOpts_);
conntok->wait(); // 阻塞直到连接完成
std::cout << "MQTT connected successfully" << std::endl;
return true;
}
catch (const mqtt::exception &e)
{
std::cerr << "MQTT connection failed: " << e.what() << std::endl;
return false;
}
} }
void disconnect() void disconnect()
{ {
stop_reconnect();
try try
{ {
client_.disconnect()->wait(); if (client_.is_connected())
std::cout << "MQTT disconnected." << std::endl; {
client_.disconnect()->wait();
std::cout << "MQTT disconnected." << std::endl;
}
is_connected_ = false;
} }
catch (const mqtt::exception &e) catch (const mqtt::exception &e)
{ {
@ -50,6 +65,12 @@ public:
void publish(const std::string &topic, const std::string &payload, int qos = 1, bool retained = false) void publish(const std::string &topic, const std::string &payload, int qos = 1, bool retained = false)
{ {
if (!is_connected())
{
std::cerr << "MQTT not connected, cannot publish." << std::endl;
return;
}
try try
{ {
auto msg = mqtt::make_message(topic, payload); auto msg = mqtt::make_message(topic, payload);
@ -63,7 +84,98 @@ public:
} }
} }
bool is_connected() const
{
return is_connected_.load();
}
void set_reconnect_policy(int initial_delay_ms = 1000, int max_delay_ms = 60000)
{
initial_reconnect_delay_ms_ = initial_delay_ms;
max_reconnect_delay_ms_ = max_delay_ms;
}
private:
bool connect_once()
{
try
{
mqtt::token_ptr conntok = client_.connect(connOpts_);
conntok->wait();
std::cout << "MQTT connected successfully" << std::endl;
is_connected_ = true;
return true;
}
catch (const mqtt::exception &e)
{
std::cerr << "MQTT connection attempt failed: " << e.what() << std::endl;
return false;
}
}
bool connect_with_retry()
{
int delay = initial_reconnect_delay_ms_;
while (!stop_reconnect_)
{
if (connect_once())
{
return true;
}
// 指数退避策略,但有最大延迟限制
delay = std::min(delay * 2, max_reconnect_delay_ms_);
// 等待期间检查停止标志
for (int i = 0; i < delay / 100; i += 100)
{
if (stop_reconnect_)
break;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
return false;
}
void start_reconnect()
{
std::lock_guard<std::mutex> lock(reconnect_mutex_);
if (reconnect_thread_.joinable())
{
return; // 重连已在进行中
}
stop_reconnect_ = false;
reconnect_thread_ = std::thread([this]()
{
std::cout << "Starting persistent MQTT reconnection..." << std::endl;
{
std::lock_guard<std::mutex> lock(conn_mutex_);
connect_with_retry();
}
std::cout << "Reconnection thread exiting." << std::endl; });
}
void stop_reconnect()
{
stop_reconnect_ = true;
if (reconnect_thread_.joinable())
{
reconnect_thread_.join();
}
}
private: private:
mqtt::async_client client_; mqtt::async_client client_;
mqtt::connect_options connOpts_; mqtt::connect_options connOpts_;
std::thread reconnect_thread_;
std::mutex conn_mutex_;
std::mutex reconnect_mutex_;
std::atomic<bool> is_connected_;
std::atomic<bool> stop_reconnect_;
int initial_reconnect_delay_ms_;
int max_reconnect_delay_ms_;
}; };

View File

@ -5,6 +5,7 @@
#include "mqtt_report/report_struct.h" #include "mqtt_report/report_struct.h"
#include "mqtt_report/fault_codes.h" #include "mqtt_report/fault_codes.h"
#include <algorithm> #include <algorithm>
#include <mutex>
Config config; // 清扫车配置文件 Config config; // 清扫车配置文件
@ -221,18 +222,23 @@ class CanDataSubscriber : public rclcpp::Node
public: public:
CanDataSubscriber(const Config &config) CanDataSubscriber(const Config &config)
: Node("can_data_subscriber"), : Node("can_data_subscriber"),
mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port), generate_mqtt_client_id(), "zxwl", "zxwl1234@"), mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port),
generate_mqtt_client_id(), "zxwl", "zxwl1234@"),
info_topic_(config.info_topic), info_topic_(config.info_topic),
fault_topic_(config.fault_topic) fault_topic_(config.fault_topic),
config_(config) // 保存配置
{ {
if (!mqtt_client_.connect()) // 设置MQTT重连策略可选
{ mqtt_client_.set_reconnect_policy(1000, 60000); // 初始1秒最大60秒
RCLCPP_ERROR(this->get_logger(), "Failed to connect to MQTT server");
}
// 启动MQTT连接非阻塞后台自动重连
mqtt_client_.connect();
// 创建订阅
subscription_ = this->create_subscription<mc::msg::CanFrame>( subscription_ = this->create_subscription<mc::msg::CanFrame>(
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); "can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
// 创建定时器
info_timer_ = this->create_wall_timer( info_timer_ = this->create_wall_timer(
std::chrono::milliseconds(100), std::chrono::milliseconds(100),
std::bind(&CanDataSubscriber::info_timer_callback, this)); std::bind(&CanDataSubscriber::info_timer_callback, this));
@ -240,17 +246,36 @@ public:
fault_timer_ = this->create_wall_timer( fault_timer_ = this->create_wall_timer(
std::chrono::milliseconds(500), std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::fault_timer_callback, this)); std::bind(&CanDataSubscriber::fault_timer_callback, this));
// 添加MQTT连接状态监控
connection_monitor_ = this->create_wall_timer(
std::chrono::seconds(5),
[this]()
{
if (!mqtt_client_.is_connected())
{
RCLCPP_WARN(this->get_logger(), "MQTT disconnected, attempting to reconnect...");
mqtt_client_.connect(); // 触发重连
}
});
} }
private: private:
void topic_callback(const mc::msg::CanFrame::SharedPtr msg) void topic_callback(const mc::msg::CanFrame::SharedPtr msg)
{ {
// RCLCPP_INFO(this->get_logger(), "Received CAN frame ID: %u, DLC: %u", msg->id, msg->dlc); // RCLCPP_INFO(this->get_logger(), "Received CAN frame ID: %u, DLC: %u", msg->id, msg->dlc);
std::lock_guard<std::mutex> lock(data_mutex_);
Msg_Handler(msg); Msg_Handler(msg);
} }
void info_timer_callback() void info_timer_callback()
{ {
// 确保MQTT连接后再发送
if (!mqtt_client_.is_connected())
{
return;
}
info_report.timestamp = getCurrentTimestampMs(); info_report.timestamp = getCurrentTimestampMs();
std::string json_msg = pack_general_info_to_json(info_report); std::string json_msg = pack_general_info_to_json(info_report);
mqtt_client_.publish(info_topic_, json_msg); mqtt_client_.publish(info_topic_, json_msg);
@ -258,6 +283,12 @@ private:
void fault_timer_callback() void fault_timer_callback()
{ {
// 确保MQTT连接后再发送
if (!mqtt_client_.is_connected())
{
return;
}
for (int code : vehicle_error_code.getAllErrorCodes()) for (int code : vehicle_error_code.getAllErrorCodes())
{ {
std::string fault_json = generateFaultJson(code, config.vid, getCurrentTimestampMs()); std::string fault_json = generateFaultJson(code, config.vid, getCurrentTimestampMs());
@ -271,6 +302,13 @@ private:
std::string fault_topic_; std::string fault_topic_;
rclcpp::TimerBase::SharedPtr info_timer_; rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_; rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_monitor_; // MQTT连接监控定时器
Config config_; // 保存配置
// 用于线程安全的数据存储
std::mutex data_mutex_;
mc::msg::CanFrame last_can_frame_;
}; };
int main(int argc, char **argv) int main(int argc, char **argv)