1
This commit is contained in:
parent
055fe9c626
commit
ba07a6fe15
@ -1,20 +1,17 @@
|
|||||||
#include "mqtt_client.hpp"
|
#include "mqtt_client.hpp"
|
||||||
#include <iostream>
|
|
||||||
#include <thread>
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <future>
|
#include <future>
|
||||||
|
#include <iostream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
extern std::atomic<bool> g_running;
|
extern std::atomic<bool> g_running;
|
||||||
|
|
||||||
MQTTClient::MQTTClient(const MQTTConfig &config)
|
MQTTClient::MQTTClient(const MQTTConfig& config) : config_(config), connected_(false) { initializeClient(); }
|
||||||
: config_(config), connected_(false)
|
|
||||||
{
|
|
||||||
initializeClient();
|
|
||||||
}
|
|
||||||
|
|
||||||
MQTTClient::~MQTTClient()
|
MQTTClient::~MQTTClient()
|
||||||
{
|
{
|
||||||
force_disconnect(); // 析构时直接释放资源,不阻塞
|
force_disconnect(); // 析构时直接释放资源,不阻塞
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTClient::initializeClient()
|
void MQTTClient::initializeClient()
|
||||||
@ -27,7 +24,7 @@ void MQTTClient::initializeClient()
|
|||||||
void MQTTClient::connect()
|
void MQTTClient::connect()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mutex_);
|
std::lock_guard<std::mutex> lock(mutex_);
|
||||||
if (connected_ || !g_running) // 添加g_running检查
|
if (connected_ || !g_running) // 添加g_running检查
|
||||||
return;
|
return;
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -41,9 +38,9 @@ void MQTTClient::connect()
|
|||||||
.finalize();
|
.finalize();
|
||||||
|
|
||||||
// 异步连接,不阻塞
|
// 异步连接,不阻塞
|
||||||
auto tok = client_->connect(connOpts); // 返回 token
|
auto tok = client_->connect(connOpts); // 返回 token
|
||||||
}
|
}
|
||||||
catch (const mqtt::exception &e)
|
catch (const mqtt::exception& e)
|
||||||
{
|
{
|
||||||
connected_ = false;
|
connected_ = false;
|
||||||
LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what()));
|
LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what()));
|
||||||
@ -53,18 +50,16 @@ void MQTTClient::connect()
|
|||||||
void MQTTClient::disconnect()
|
void MQTTClient::disconnect()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mutex_);
|
std::lock_guard<std::mutex> lock(mutex_);
|
||||||
if (!connected_)
|
if (!connected_) return;
|
||||||
return;
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto disc_tok = client_->disconnect(); // 异步返回 token
|
auto disc_tok = client_->disconnect(); // 异步返回 token
|
||||||
|
|
||||||
// 等待短时间,检查 g_running
|
// 等待短时间,检查 g_running
|
||||||
for (int i = 0; i < 10 && g_running; ++i)
|
for (int i = 0; i < 10 && g_running; ++i)
|
||||||
{
|
{
|
||||||
if (disc_tok->is_complete())
|
if (disc_tok->is_complete()) break;
|
||||||
break;
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,21 +70,19 @@ void MQTTClient::disconnect()
|
|||||||
client_.reset();
|
client_.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const mqtt::exception &e)
|
catch (const mqtt::exception& e)
|
||||||
{
|
{
|
||||||
LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what()));
|
LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what()));
|
||||||
}
|
}
|
||||||
|
|
||||||
connected_ = false;
|
connected_ = false;
|
||||||
if (on_disconnect_)
|
if (on_disconnect_) on_disconnect_();
|
||||||
on_disconnect_();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTClient::force_disconnect()
|
void MQTTClient::force_disconnect()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mutex_);
|
std::lock_guard<std::mutex> lock(mutex_);
|
||||||
if (!connected_)
|
if (!connected_) return;
|
||||||
return;
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -97,59 +90,54 @@ void MQTTClient::force_disconnect()
|
|||||||
client_->stop_consuming();
|
client_->stop_consuming();
|
||||||
client_.reset();
|
client_.reset();
|
||||||
}
|
}
|
||||||
catch (const mqtt::exception &e)
|
catch (const mqtt::exception& e)
|
||||||
{
|
{
|
||||||
LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what()));
|
LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what()));
|
||||||
}
|
}
|
||||||
|
|
||||||
connected_ = false;
|
connected_ = false;
|
||||||
if (on_disconnect_)
|
if (on_disconnect_) on_disconnect_();
|
||||||
on_disconnect_();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
|
void MQTTClient::publish(const std::string& topic, const std::string& payload, int qos)
|
||||||
{
|
{
|
||||||
if (qos == -1)
|
if (qos == -1) qos = config_.qos;
|
||||||
qos = config_.qos;
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
client_->publish(topic, payload.data(), payload.size(), qos, false); // 异步,不阻塞
|
client_->publish(topic, payload.data(), payload.size(), qos, false); // 异步,不阻塞
|
||||||
// 如果 topic 不包含 "heartbeat",才打印日志
|
// 如果 topic 不包含 "heartbeat",才打印日志
|
||||||
if (topic.find("heartbeat") == std::string::npos)
|
if (topic.find("status") == std::string::npos)
|
||||||
{
|
{
|
||||||
LOG_INFO("[MQTT] Published message to topic: " + topic);
|
LOG_INFO("[MQTT] Published message to topic: " + topic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const mqtt::exception &e)
|
catch (const mqtt::exception& e)
|
||||||
{
|
{
|
||||||
LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what()));
|
LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what()));
|
||||||
connected_ = false;
|
connected_ = false;
|
||||||
if (on_disconnect_)
|
if (on_disconnect_) on_disconnect_();
|
||||||
on_disconnect_();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTClient::subscribe(const std::string &topic, int qos)
|
void MQTTClient::subscribe(const std::string& topic, int qos)
|
||||||
{
|
{
|
||||||
if (qos == -1)
|
if (qos == -1) qos = config_.qos;
|
||||||
qos = config_.qos;
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
client_->subscribe(topic, qos); // 异步,不阻塞
|
client_->subscribe(topic, qos); // 异步,不阻塞
|
||||||
LOG_INFO("[MQTT] Subscribed to topic: " + topic);
|
LOG_INFO("[MQTT] Subscribed to topic: " + topic);
|
||||||
}
|
}
|
||||||
catch (const mqtt::exception &e)
|
catch (const mqtt::exception& e)
|
||||||
{
|
{
|
||||||
LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what()));
|
LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what()));
|
||||||
connected_ = false;
|
connected_ = false;
|
||||||
if (on_disconnect_)
|
if (on_disconnect_) on_disconnect_();
|
||||||
on_disconnect_();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTClient::switchServer(const MQTTConfig &newConfig)
|
void MQTTClient::switchServer(const MQTTConfig& newConfig)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mutex_);
|
std::lock_guard<std::mutex> lock(mutex_);
|
||||||
disconnect();
|
disconnect();
|
||||||
@ -158,33 +146,27 @@ void MQTTClient::switchServer(const MQTTConfig &newConfig)
|
|||||||
connect();
|
connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MQTTClient::isConnected() const
|
bool MQTTClient::isConnected() const { return connected_; }
|
||||||
{
|
|
||||||
return connected_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
|
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
|
||||||
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
|
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
|
||||||
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }
|
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }
|
||||||
|
|
||||||
void MQTTClient::connection_lost(const std::string &cause)
|
void MQTTClient::connection_lost(const std::string& cause)
|
||||||
{
|
{
|
||||||
connected_ = false;
|
connected_ = false;
|
||||||
LOG_WARN("[MQTT] Connection lost: " + cause);
|
LOG_WARN("[MQTT] Connection lost: " + cause);
|
||||||
if (on_disconnect_)
|
if (on_disconnect_) on_disconnect_();
|
||||||
on_disconnect_();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTClient::connected(const std::string &cause)
|
void MQTTClient::connected(const std::string& cause)
|
||||||
{
|
{
|
||||||
connected_ = true;
|
connected_ = true;
|
||||||
LOG_INFO("[MQTT] Reconnected: " + cause);
|
LOG_INFO("[MQTT] Reconnected: " + cause);
|
||||||
if (on_connect_)
|
if (on_connect_) on_connect_();
|
||||||
on_connect_();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
|
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
|
||||||
{
|
{
|
||||||
if (on_message_)
|
if (on_message_) on_message_(msg->get_topic(), msg->get_payload_str());
|
||||||
on_message_(msg->get_topic(), msg->get_payload_str());
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user