fix : mqtt 死锁

This commit is contained in:
lyq 2025-11-18 13:36:06 +08:00
parent 048cefc202
commit b5686a81b0
2 changed files with 119 additions and 139 deletions

View File

@ -3,179 +3,168 @@
#include "paho_mqtt_3c/MQTTClient.h" #include "paho_mqtt_3c/MQTTClient.h"
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <chrono>
#include <thread>
#include <mutex> #include <mutex>
#include <thread>
#include <atomic>
#include <chrono>
class MQTTClientWrapper class MQTTClientWrapper
{ {
public: public:
MQTTClientWrapper(const std::string &server_address, const std::string &client_id, MQTTClientWrapper(const std::string &server_uri,
const std::string &username = "", const std::string &password = "", const std::string &client_id,
int reconnect_interval_ms = 5000) // 重连间隔(毫秒) const std::string &username = "",
: server_address_(server_address), client_id_(client_id), const std::string &password = "",
username_(username), password_(password), client_(nullptr), int reconnect_interval_ms = 3000)
is_connected_(false), reconnect_interval_ms_(reconnect_interval_ms) : server_uri_(server_uri),
client_id_(client_id),
username_(username),
password_(password),
reconnect_interval_ms_(reconnect_interval_ms),
connected_(false),
stop_flag_(false)
{ {
// 初始化客户端 int rc = MQTTClient_create(&client_,
int rc = MQTTClient_create(&client_, server_address.c_str(), client_id.c_str(), server_uri_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr); client_id_.c_str(),
MQTTCLIENT_PERSISTENCE_NONE,
nullptr);
if (rc != MQTTCLIENT_SUCCESS) if (rc != MQTTCLIENT_SUCCESS)
{ {
std::cerr << "Failed to create MQTT client, return code " << rc << std::endl; std::cerr << "MQTTClient_create failed! rc=" << rc << std::endl;
client_ = nullptr; client_ = nullptr;
return;
} }
MQTTClient_setCallbacks(client_, this, conn_lost_cb, nullptr, nullptr);
reconnect_thread_ = std::thread([this]() { reconnectLoop(); });
} }
~MQTTClientWrapper() ~MQTTClientWrapper()
{ {
stop_flag_ = true;
if (reconnect_thread_.joinable())
reconnect_thread_.join();
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
if (client_) if (client_)
{ {
disconnect(); if (connected_)
MQTTClient_disconnect(client_, 2000);
MQTTClient_destroy(&client_); MQTTClient_destroy(&client_);
client_ = nullptr;
} }
} }
bool connect(int max_retries = 5) // 手动连接(可选)
bool connect()
{
std::lock_guard<std::mutex> lock(mtx_);
return doConnect();
}
// 发布(非阻塞,无 waitForCompletion
bool publish(const std::string &topic,
const std::string &payload,
int qos = 0,
bool retained = false)
{ {
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
if (!client_) if (!client_)
return false; return false;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; if (!connected_)
conn_opts.keepAliveInterval = 20; return false;
conn_opts.cleansession = 1;
conn_opts.connectTimeout = 5; MQTTClient_message msg = MQTTClient_message_initializer;
msg.payload = (void *)payload.c_str();
msg.payloadlen = payload.size();
msg.qos = qos;
msg.retained = retained ? 1 : 0;
MQTTClient_deliveryToken token;
int rc = MQTTClient_publishMessage(client_, topic.c_str(), &msg, &token);
if (rc != MQTTCLIENT_SUCCESS)
{
connected_ = false;
return false;
}
// 非阻塞,不等待 completion
return true;
}
bool isConnected() const { return connected_; }
private:
bool doConnect()
{
if (!client_)
return false;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.connectTimeout = 5;
if (!username_.empty()) if (!username_.empty())
{ {
conn_opts.username = username_.c_str(); opts.username = username_.c_str();
conn_opts.password = password_.c_str(); opts.password = password_.c_str();
} }
int rc; int rc = MQTTClient_connect(client_, &opts);
for (int retry_count = 0; retry_count < max_retries; ++retry_count)
{
std::cout << "Connecting to MQTT (" << retry_count + 1 << "/" << max_retries << ")..." << std::endl;
rc = MQTTClient_connect(client_, &conn_opts);
if (rc == MQTTCLIENT_SUCCESS) if (rc == MQTTCLIENT_SUCCESS)
{ {
is_connected_ = true; std::cout << "[MQTT] Connected." << std::endl;
std::cout << "MQTT connected successfully" << std::endl; connected_ = true;
return true; return true;
} }
std::cerr << "Connect failed rc=" << rc << ", wait " << reconnect_interval_ms_ << " ms" << std::endl; std::cerr << "[MQTT] Connect failed rc=" << rc << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_ms_)); connected_ = false;
}
is_connected_ = false;
return false; return false;
} }
void disconnect() static void conn_lost_cb(void *context, char *cause)
{
auto *self = static_cast<MQTTClientWrapper *>(context);
std::cerr << "[MQTT] Connection lost: " << (cause ? cause : "unknown") << std::endl;
self->connected_ = false;
}
// 自动重连线程
void reconnectLoop()
{
while (!stop_flag_)
{
if (!connected_)
{ {
std::lock_guard<std::mutex> lock(mtx_); std::lock_guard<std::mutex> lock(mtx_);
if (client_ && is_connected_) doConnect();
{
int rc = MQTTClient_disconnect(client_, 10000);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to disconnect, return code " << rc << std::endl;
}
else
{
std::cout << "MQTT disconnected." << std::endl;
}
is_connected_ = false;
}
} }
// 发布消息,自动处理重连 std::this_thread::sleep_for(
bool publish(const std::string &topic, const std::string &payload, int qos = 0, bool retained = false) std::chrono::milliseconds(reconnect_interval_ms_));
{
std::lock_guard<std::mutex> lock(mtx_);
if (!client_)
{
std::cerr << "MQTT client not initialized" << std::endl;
return false;
} }
// 检查连接状态,断开则尝试重连
if (!is_connected_ || !check_actual_connection())
{
std::cerr << "MQTT connection lost, trying to reconnect..." << std::endl;
if (!connect()) // 使用默认重试次数重连
{
std::cerr << "Reconnection failed, cannot publish message" << std::endl;
return false;
}
}
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
pubmsg.payload = const_cast<char *>(payload.c_str());
pubmsg.payloadlen = payload.length();
pubmsg.qos = qos;
pubmsg.retained = retained ? 1 : 0;
int rc = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to publish message, return code " << rc << std::endl;
is_connected_ = false; // 发布失败标记为断开
return false;
}
// 等待发布完成
rc = MQTTClient_waitForCompletion(client_, token, 10000L); // 超时10秒
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to wait for completion, return code " << rc << std::endl;
// is_connected_ = false; // 确认失败标记为断开
return false;
}
// 可选:取消注释以打印发布信息
// std::cout << "Published to " << topic << std::endl;
return true;
}
// 检查实际连接状态通过MQTTClient API
bool is_connected()
{
std::lock_guard<std::mutex> lock(mtx_);
return check_actual_connection();
}
// 设置重连间隔(毫秒)
void set_reconnect_interval(int interval_ms)
{
std::lock_guard<std::mutex> lock(mtx_);
reconnect_interval_ms_ = interval_ms;
} }
private: private:
// 内部检查连接状态的实现 MQTTClient client_;
bool check_actual_connection() std::string server_uri_;
{
if (!client_)
return false;
int state = MQTTClient_isConnected(client_);
is_connected_ = (state == 1);
return is_connected_;
}
std::string server_address_;
std::string client_id_; std::string client_id_;
std::string username_; std::string username_;
std::string password_; std::string password_;
MQTTClient client_;
bool is_connected_; // 连接状态标记 int reconnect_interval_ms_;
int reconnect_interval_ms_; // 重连间隔(毫秒)
std::mutex mtx_; // 线程安全锁 std::atomic<bool> connected_;
std::atomic<bool> stop_flag_;
std::mutex mtx_;
std::thread reconnect_thread_;
}; };

View File

@ -273,12 +273,6 @@ public:
info_topic_(config.info_topic), info_topic_(config.info_topic),
fault_topic_(config.fault_topic) fault_topic_(config.fault_topic)
{ {
// 初始连接(带重试)
if (!mqtt_client_.connect(5)) // 最多重试5次
{
RCLCPP_WARN(this->get_logger(), "Initial MQTT connection failed, will retry in background");
}
subscription_ = this->create_subscription<sweeperMsg::CanFrame>( subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); "can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
@ -302,8 +296,6 @@ public:
info_timer_->cancel(); info_timer_->cancel();
fault_timer_->cancel(); fault_timer_->cancel();
connection_check_timer_->cancel(); connection_check_timer_->cancel();
// 断开MQTT连接
mqtt_client_.disconnect();
} }
private: private:
@ -346,10 +338,9 @@ private:
// 定期检查连接状态,主动重连 // 定期检查连接状态,主动重连
void check_connection_callback() void check_connection_callback()
{ {
if (!mqtt_client_.is_connected()) if (!mqtt_client_.isConnected())
{ {
RCLCPP_WARN(this->get_logger(), "MQTT connection check failed, attempting to reconnect"); RCLCPP_WARN(this->get_logger(), "MQTT connection lost.");
mqtt_client_.connect(3); // 检查时重试3次
} }
} }