增加mqtt断线重连机制

This commit is contained in:
lyq 2025-10-25 12:25:15 +08:00
parent dea1c5c797
commit be514b6d83
3 changed files with 285 additions and 69 deletions

View File

@ -3,18 +3,23 @@
#include "paho_mqtt_3c/MQTTClient.h"
#include <string>
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
class MQTTClientWrapper
{
public:
MQTTClientWrapper(const std::string &server_address, const std::string &client_id,
const std::string &username = "", const std::string &password = "")
const std::string &username = "", const std::string &password = "",
int reconnect_interval_ms = 5000) // 重连间隔(毫秒)
: server_address_(server_address), client_id_(client_id),
username_(username), password_(password), client_(nullptr)
username_(username), password_(password), client_(nullptr),
is_connected_(false), reconnect_interval_ms_(reconnect_interval_ms)
{
// 初始化客户端
int rc = MQTTClient_create(&client_, server_address.c_str(), client_id.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr);
MQTTCLIENT_PERSISTENCE_NONE, nullptr);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to create MQTT client, return code " << rc << std::endl;
@ -24,6 +29,7 @@ public:
~MQTTClientWrapper()
{
std::lock_guard<std::mutex> lock(mtx_);
if (client_)
{
disconnect();
@ -32,8 +38,10 @@ public:
}
}
bool connect()
// 连接服务器,支持最大重试次数
bool connect(int max_retries = 5)
{
std::lock_guard<std::mutex> lock(mtx_);
if (!client_)
{
std::cerr << "MQTT client not initialized" << std::endl;
@ -44,27 +52,48 @@ public:
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if (!username_.empty())
{
conn_opts.username = username_.c_str();
conn_opts.password = password_.c_str();
}
int rc = MQTTClient_connect(client_, &conn_opts);
if (rc != MQTTCLIENT_SUCCESS)
int rc;
int retry_count = 0;
while (retry_count < max_retries)
{
std::cerr << "Failed to connect, return code " << rc << std::endl;
return false;
rc = MQTTClient_connect(client_, &conn_opts);
if (rc == MQTTCLIENT_SUCCESS)
{
is_connected_ = true;
std::cout << "MQTT connected successfully" << std::endl;
return true;
}
retry_count++;
std::cerr << "Failed to connect (attempt " << retry_count << "/" << max_retries
<< "), return code " << rc << std::endl;
if (retry_count < max_retries)
{
// 等待重连间隔前,检查是否需要退出程序
if (!rclcpp::ok())
{
std::cerr << "Program exiting, stop reconnection" << std::endl;
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_ms_));
}
}
std::cout << "MQTT connected successfully" << std::endl;
return true;
is_connected_ = false;
return false;
}
void disconnect()
{
if (client_)
std::lock_guard<std::mutex> lock(mtx_);
if (client_ && is_connected_)
{
int rc = MQTTClient_disconnect(client_, 10000);
if (rc != MQTTCLIENT_SUCCESS)
@ -75,21 +104,35 @@ public:
{
std::cout << "MQTT disconnected." << std::endl;
}
is_connected_ = false;
}
}
void publish(const std::string &topic, const std::string &payload, int qos = 0, bool retained = false)
// 发布消息,自动处理重连
bool publish(const std::string &topic, const std::string &payload, int qos = 0, bool retained = false)
{
std::lock_guard<std::mutex> lock(mtx_);
if (!client_)
{
std::cerr << "MQTT client not initialized" << std::endl;
return;
return false;
}
// 检查连接状态,断开则尝试重连
if (!is_connected_ || !check_actual_connection())
{
std::cerr << "MQTT connection lost, trying to reconnect..." << std::endl;
if (!connect()) // 使用默认重试次数重连
{
std::cerr << "Reconnection failed, cannot publish message" << std::endl;
return false;
}
}
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
pubmsg.payload = const_cast<char*>(payload.c_str());
pubmsg.payload = const_cast<char *>(payload.c_str());
pubmsg.payloadlen = payload.length();
pubmsg.qos = qos;
pubmsg.retained = retained ? 1 : 0;
@ -98,26 +141,55 @@ public:
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to publish message, return code " << rc << std::endl;
return;
is_connected_ = false; // 发布失败标记为断开
return false;
}
// 等待发布完成
rc = MQTTClient_waitForCompletion(client_, token, 10000L);
rc = MQTTClient_waitForCompletion(client_, token, 1000L); // 超时1秒
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to wait for completion, return code " << rc << std::endl;
is_connected_ = false; // 确认失败标记为断开
return false;
}
else
{
// 可选:取消注释以打印发布信息
// std::cout << "Published to " << topic << std::endl;
}
// 可选:取消注释以打印发布信息
// std::cout << "Published to " << topic << std::endl;
return true;
}
// 检查实际连接状态通过MQTTClient API
bool is_connected()
{
std::lock_guard<std::mutex> lock(mtx_);
return check_actual_connection();
}
// 设置重连间隔(毫秒)
void set_reconnect_interval(int interval_ms)
{
std::lock_guard<std::mutex> lock(mtx_);
reconnect_interval_ms_ = interval_ms;
}
private:
// 内部检查连接状态的实现
bool check_actual_connection()
{
if (!client_)
return false;
int state = MQTTClient_isConnected(client_);
is_connected_ = (state == 1);
return is_connected_;
}
std::string server_address_;
std::string client_id_;
std::string username_;
std::string password_;
MQTTClient client_;
bool is_connected_; // 连接状态标记
int reconnect_interval_ms_; // 重连间隔(毫秒)
std::mutex mtx_; // 线程安全锁
};

View File

@ -1,18 +1,19 @@
#include <rclcpp/rclcpp.hpp>
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "mqtt_report/mqtt_client.hpp"
#include "mqtt_report/mqtt_client.hpp" // 包含上面修改后的MQTTClientWrapper
#include "mqtt_report/get_config.h"
#include "mqtt_report/report_struct.h"
#include "mqtt_report/fault_codes.h"
#include <algorithm>
#include <bitset>
#include <iostream>
#include <chrono>
namespace sweeperMsg = sweeper_interfaces::msg;
Config config; // 清扫车配置文件
GeneralMsg info_report; // 常规消息上报
Config config; // 清扫车配置文件
GeneralMsg info_report; // 常规消息上报
int mqtt_reconnect_interval = 5000; // 重连间隔(ms)
// 解析can报文做消息上报
void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg)
@ -25,7 +26,6 @@ void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg)
info_report.power = msg->data[4] * 0.5f;
info_report.chargeStatus = (msg->data[3] == 0x61);
std::cout << "[0x1821E5F1] power : " << info_report.power << std::endl;
// std::cout << "[0x1821E5F1] chargeStatus : " << info_report.chargeStatus << std::endl;
break;
}
@ -81,7 +81,7 @@ void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg)
if (is_fault && faultMap.count(error_code)) // 故障存在且在映射表中
{
vehicle_error_code.addErrorCode(error_code);
updateFaultLevel(error_code, 1); // 协议未定义故障等级默认1级(可根据需求调整)
updateFaultLevel(error_code, 1); // 协议未定义故障等级默认1级
}
else // 故障不存在
{
@ -97,17 +97,16 @@ void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg)
{
// 解析控制器温度data[2]偏移量40℃1℃/bit
int raw_ctrl_temp = static_cast<int>(msg->data[2]);
info_report.controllerTemp = raw_ctrl_temp - 40; // 实际温度=原始值-40,量程:-40~215℃
info_report.controllerTemp = raw_ctrl_temp - 40; // 实际温度=原始值-40
// 解析电机温度data[3]偏移量30℃1℃/bit
int raw_motor_temp = static_cast<int>(msg->data[3]);
info_report.motorTemp = raw_motor_temp - 30; // 实际温度=原始值-30,量程:-30~225℃
info_report.motorTemp = raw_motor_temp - 30; // 实际温度=原始值-30
// 解析控制器状态data[4]BIT1-BIT0命令状态BIT3-BIT2反馈状态
uint8_t ctrl_status = msg->data[4];
// 反馈状态BIT3-BIT20=空挡1=前进2=后退3=保留
int feedback_gear = (ctrl_status >> 2) & 0x03;
// 档位优先级:命令状态优先(空挡时以反馈状态为准,可根据需求调整)
info_report.gear = feedback_gear;
std::cout << "[0x0CF11F00] controllerTemp : " << info_report.controllerTemp << std::endl;
@ -254,13 +253,17 @@ class CanDataSubscriber : public rclcpp::Node
public:
CanDataSubscriber(const Config &config)
: Node("can_data_subscriber"),
mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port), generate_mqtt_client_id(), "zxwl", "zxwl1234@"),
mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port),
generate_mqtt_client_id(),
"zxwl", "zxwl1234@",
mqtt_reconnect_interval), // 传入重连间隔
info_topic_(config.info_topic),
fault_topic_(config.fault_topic)
{
if (!mqtt_client_.connect())
// 初始连接(带重试)
if (!mqtt_client_.connect(5)) // 最多重试5次
{
RCLCPP_ERROR(this->get_logger(), "Failed to connect to MQTT server");
RCLCPP_WARN(this->get_logger(), "Initial MQTT connection failed, will retry in background");
}
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
@ -273,30 +276,67 @@ public:
fault_timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::fault_timer_callback, this));
// 添加定期连接检查定时器每10秒
connection_check_timer_ = this->create_wall_timer(
std::chrono::seconds(10),
std::bind(&CanDataSubscriber::check_connection_callback, this));
}
~CanDataSubscriber()
{
// 停止所有定时器,避免回调继续执行
info_timer_->cancel();
fault_timer_->cancel();
connection_check_timer_->cancel();
// 断开MQTT连接
mqtt_client_.disconnect();
}
private:
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
{
// RCLCPP_INFO(this->get_logger(), "Received CAN frame ID: %u, DLC: %u", msg->id, msg->dlc);
Msg_Handler(msg);
}
void info_timer_callback()
{
if (!rclcpp::ok())
{ // 优先检查退出信号
return;
}
info_report.timestamp = getCurrentTimestampMs();
std::string json_msg = pack_general_info_to_json(info_report);
mqtt_client_.publish(info_topic_, json_msg);
// std::cout << "topic : " << info_topic_ << std::endl;
// std::cout << "json_msg : " << json_msg << std::endl;
if (!mqtt_client_.publish(info_topic_, json_msg, 1))
{
RCLCPP_WARN(this->get_logger(), "Failed to publish info message to MQTT");
}
}
void fault_timer_callback()
{
if (!rclcpp::ok())
{ // 优先检查退出信号
return;
}
for (int code : vehicle_error_code.getAllErrorCodes())
{
std::string fault_json = generateFaultJson(code, config.vid, getCurrentTimestampMs());
mqtt_client_.publish(fault_topic_, fault_json);
if (!mqtt_client_.publish(fault_topic_, fault_json, 1)) // QoS=1
{
RCLCPP_WARN(this->get_logger(), "Failed to publish fault code %d to MQTT", code);
}
}
}
// 定期检查连接状态,主动重连
void check_connection_callback()
{
if (!mqtt_client_.is_connected())
{
RCLCPP_WARN(this->get_logger(), "MQTT connection check failed, attempting to reconnect");
mqtt_client_.connect(3); // 检查时重试3次
}
}
@ -306,6 +346,7 @@ private:
std::string fault_topic_;
rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_; // 新增:连接检查定时器
};
int main(int argc, char **argv)
@ -318,10 +359,19 @@ int main(int argc, char **argv)
return 1;
}
// 确保配置中有重连间隔默认5000ms
if (mqtt_reconnect_interval <= 0)
{
mqtt_reconnect_interval = 5000;
}
auto node = std::make_shared<CanDataSubscriber>(config);
RCLCPP_INFO(node->get_logger(), "info_topic : %s", config.info_topic.c_str());
RCLCPP_INFO(node->get_logger(), "fault_topic : %s", config.fault_topic.c_str());
RCLCPP_INFO(node->get_logger(), "vid : %s", config.vid.c_str());
RCLCPP_INFO(node->get_logger(), "mqtt_ip : %s", config.mqtt_ip.c_str());
RCLCPP_INFO(node->get_logger(), "MQTT reconnection interval: %d ms", mqtt_reconnect_interval);
RCLCPP_INFO(node->get_logger(), "MqttReport node started.");

View File

@ -25,22 +25,68 @@ struct MqttConf
std::string pub_gps_topic;
int qos = 1;
long timeout_ms = 10000L;
int reconnect_interval = 5000; // 重连间隔(ms)
} g_conf;
GPS gps_mes;
// 增加连接状态标志,确保线程安全
static bool is_connected = false;
static pthread_mutex_t connect_mutex = PTHREAD_MUTEX_INITIALIZER;
// 检查连接状态
bool check_connection(MQTTClient client)
{
pthread_mutex_lock(&connect_mutex);
int state = MQTTClient_isConnected(client);
is_connected = (state == 1);
pthread_mutex_unlock(&connect_mutex);
return is_connected;
}
// 设置连接状态
void set_connected(bool connected)
{
pthread_mutex_lock(&connect_mutex);
is_connected = connected;
pthread_mutex_unlock(&connect_mutex);
}
// 重连函数
bool reconnect_client(MQTTClient *client)
{
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = g_conf.user.c_str();
conn_opts.password = g_conf.password.c_str();
int rc;
// 尝试重连最多尝试5次
for (int i = 0; i < 5; ++i)
{
std::cerr << "[MQTT] Reconnecting... (attempt " << i + 1 << "/5)" << std::endl;
rc = MQTTClient_connect(*client, &conn_opts);
if (rc == MQTTCLIENT_SUCCESS)
{
set_connected(true);
std::cout << "[MQTT] Reconnected successfully" << std::endl;
return true;
}
std::cerr << "[MQTT] Reconnect failed, rc=" << rc << ". Retrying in "
<< g_conf.reconnect_interval << "ms..." << std::endl;
usleep(g_conf.reconnect_interval * 1000);
}
return false;
}
void *mqtt_pub_gps(void *arg)
{
(void)arg;
MQTTClient client;
MQTTClient client = nullptr;
MQTTClient_deliveryToken token_d_m;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
int rc;
/* 创建客户端 */
// 初始化客户端
if ((rc = MQTTClient_create(&client,
g_conf.address.c_str(),
g_conf.client_id.c_str(),
@ -51,19 +97,15 @@ void *mqtt_pub_gps(void *arg)
return nullptr;
}
/* 连接配置 */
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = g_conf.user.c_str();
conn_opts.password = g_conf.password.c_str();
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
// 初始连接
if (!reconnect_client(&client))
{
std::cerr << "Failed to connect, rc=" << rc << std::endl;
std::cerr << "[MQTT] Failed to establish initial connection" << std::endl;
MQTTClient_destroy(&client);
return nullptr;
}
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.qos = g_conf.qos;
pubmsg.retained = 0;
@ -73,6 +115,41 @@ void *mqtt_pub_gps(void *arg)
while (true)
{
// 检查连接状态,断开则尝试重连
if (!check_connection(client))
{
std::cerr << "[MQTT] Connection lost. Attempting to reconnect..." << std::endl;
// 先断开旧连接
MQTTClient_disconnect(client, 1000);
// 重连失败则销毁客户端重新创建
if (!reconnect_client(&client))
{
std::cerr << "[MQTT] All reconnection attempts failed. Recreating client..." << std::endl;
MQTTClient_destroy(&client);
// 重新创建客户端
if ((rc = MQTTClient_create(&client,
g_conf.address.c_str(),
g_conf.client_id.c_str(),
MQTTCLIENT_PERSISTENCE_NONE,
nullptr)) != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to recreate client, rc=" << rc << std::endl;
usleep(g_conf.reconnect_interval * 1000);
continue;
}
// 再次尝试连接
if (!reconnect_client(&client))
{
usleep(g_conf.reconnect_interval * 1000);
continue;
}
}
}
root_st.clear();
root_st["lng"] = [&]
@ -92,27 +169,33 @@ void *mqtt_pub_gps(void *arg)
pubmsg.payload = msg_st;
pubmsg.payloadlen = static_cast<int>(json_string.size());
int rc = MQTTClient_publishMessage(client,
// 发布消息
if (check_connection(client))
{
rc = MQTTClient_publishMessage(client,
g_conf.pub_gps_topic.c_str(),
&pubmsg,
&token_d_m);
if (rc == MQTTCLIENT_SUCCESS)
{
std::cout << "[MQTT] Published to topic: " << g_conf.pub_gps_topic
<< "\n msg: " << json_string << std::endl;
MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms);
if (rc == MQTTCLIENT_SUCCESS)
{
std::cout << "[MQTT] Published to topic: " << g_conf.pub_gps_topic
<< "\n msg: " << json_string << std::endl;
MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms);
}
else
{
std::cerr << "[MQTT] publish failed, rc=" << rc << std::endl;
set_connected(false); // 发布失败标记为断开连接
}
}
else
{
std::cerr << "[MQTT] publish failed, rc=" << rc << std::endl;
}
MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms);
usleep(200000); // 200ms
}
// 清理资源
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
pthread_mutex_destroy(&connect_mutex);
return nullptr;
}
@ -171,7 +254,12 @@ bool load_config(const std::string &path)
g_conf.user = m["mqtt_user"].asString();
g_conf.password = m["mqtt_password"].asString();
g_conf.pub_gps_topic = m["pub_gps_topic"].asString();
/* 其它字段QOS 等)如有需求也可读出来 */
// 从配置读取重连间隔默认5000ms
if (m.isMember("reconnect_interval"))
{
g_conf.reconnect_interval = m["reconnect_interval"].asInt();
}
return true;
}
@ -181,11 +269,17 @@ int main(int argc, char **argv)
if (!load_config("config.json"))
return -1;
// 初始化互斥锁
pthread_mutex_init(&connect_mutex, nullptr);
pthread_t mqtt_thread;
pthread_create(&mqtt_thread, nullptr, mqtt_pub_gps, nullptr);
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<pub_gps_node>("pub_gps_node"));
rclcpp::shutdown();
// 清理资源
pthread_mutex_destroy(&connect_mutex);
return 0;
}