kunlang_video/src/mqtt_client.cpp
2025-09-10 11:19:40 +08:00

196 lines
5.0 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "mqtt_client.hpp"
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
extern std::atomic<bool> g_running;
MQTTClient::MQTTClient(const MQTTConfig &config)
: config_(config), connected_(false)
{
initializeClient();
}
MQTTClient::~MQTTClient()
{
disconnect();
}
void MQTTClient::initializeClient()
{
std::string address = "tcp://" + config_.server_ip + ":" + std::to_string(config_.server_port);
client_ = std::make_shared<mqtt::async_client>(address, config_.client_id);
client_->set_callback(*this);
}
void MQTTClient::connect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (connected_ || !g_running) // 添加g_running检查
return;
try
{
auto connOpts = mqtt::connect_options_builder()
.clean_session(config_.clean_session)
.automatic_reconnect(true)
.keep_alive_interval(std::chrono::seconds(config_.keep_alive))
.user_name(config_.username)
.password(config_.password)
.finalize();
// 使用wait_for而不是wait设置超时时间
auto connect_future = client_->connect(connOpts);
// 等待最多2秒钟期间检查退出标志
for (int i = 0; i < 20 && g_running; i++)
{
if (connect_future->wait_for(std::chrono::milliseconds(100)) == std::future_status::ready)
{
connected_ = true;
break;
}
}
}
catch (const mqtt::exception &e)
{
connected_ = false;
LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what()));
}
}
void MQTTClient::disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
// 使用wait_for而不是wait设置超时时间
auto disconnect_future = client_->disconnect();
// 等待最多1秒钟而不是无限等待
if (disconnect_future->wait_for(std::chrono::seconds(1)) == std::future_status::timeout)
{
LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection");
// 强制断开连接
client_->stop_consuming();
client_.reset();
}
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what()));
}
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::force_disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
// 直接停止客户端,不等待正常断开
client_->stop_consuming();
client_.reset();
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what()));
}
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500));
// 如果 topic 不包含 "heartbeat",才打印日志
if (topic.find("heartbeat") == std::string::npos)
{
LOG_INFO("[MQTT] Published message to topic: " + topic);
}
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what()));
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
}
void MQTTClient::subscribe(const std::string &topic, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->subscribe(topic, qos)->wait();
LOG_INFO("[MQTT] Subscribed to topic: " + topic);
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what()));
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
}
void MQTTClient::switchServer(const MQTTConfig &newConfig)
{
std::lock_guard<std::mutex> lock(mutex_);
disconnect();
config_ = newConfig;
initializeClient();
connect();
}
bool MQTTClient::isConnected() const
{
return connected_;
}
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }
void MQTTClient::connection_lost(const std::string &cause)
{
connected_ = false;
LOG_WARN("[MQTT] Connection lost: " + cause);
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::connected(const std::string &cause)
{
connected_ = true;
LOG_INFO("[MQTT] Reconnected: " + cause);
if (on_connect_)
on_connect_();
}
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
{
if (on_message_)
on_message_(msg->get_topic(), msg->get_payload_str());
}