kunlang_video/src/mqtt_client.cpp
2025-09-10 11:24:52 +08:00

213 lines
5.5 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "mqtt_client.hpp"
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
extern std::atomic<bool> g_running;
MQTTClient::MQTTClient(const MQTTConfig &config)
: config_(config), connected_(false)
{
initializeClient();
}
MQTTClient::~MQTTClient()
{
disconnect();
}
void MQTTClient::initializeClient()
{
std::string address = "tcp://" + config_.server_ip + ":" + std::to_string(config_.server_port);
client_ = std::make_shared<mqtt::async_client>(address, config_.client_id);
client_->set_callback(*this);
}
void MQTTClient::connect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (connected_ || !g_running) // 添加g_running检查
return;
try
{
auto connOpts = mqtt::connect_options_builder()
.clean_session(config_.clean_session)
.automatic_reconnect(true)
.keep_alive_interval(std::chrono::seconds(config_.keep_alive))
.user_name(config_.username)
.password(config_.password)
.finalize();
auto connect_token = client_->connect(connOpts); // shared_ptr<mqtt::token>
for (int i = 0; i < 20 && g_running; ++i) // 最多 2 秒
{
if (connect_token->is_complete()) // 非阻塞检查
{
connected_ = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 如果循环结束仍未完成,可以手动断开
if (!connected_)
{
try
{
client_->disconnect()->wait();
}
catch (...)
{
}
}
}
catch (const mqtt::exception &e)
{
connected_ = false;
LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what()));
}
}
void MQTTClient::disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
auto disc_token = client_->disconnect(); // 返回 std::shared_ptr<mqtt::token>
// 等待最多1秒钟期间检查 g_running
int wait_loops = 10; // 10*100ms = 1秒
for (int i = 0; i < wait_loops && g_running; ++i)
{
if (disc_token->is_complete())
break;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (!disc_token->is_complete())
{
LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection");
client_->stop_consuming(); // 强制停止消费消息
client_.reset(); // 释放 client 对象
}
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what()));
}
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::force_disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
// 直接停止客户端,不等待正常断开
client_->stop_consuming();
client_.reset();
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what()));
}
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500));
// 如果 topic 不包含 "heartbeat",才打印日志
if (topic.find("heartbeat") == std::string::npos)
{
LOG_INFO("[MQTT] Published message to topic: " + topic);
}
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what()));
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
}
void MQTTClient::subscribe(const std::string &topic, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->subscribe(topic, qos)->wait();
LOG_INFO("[MQTT] Subscribed to topic: " + topic);
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what()));
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
}
void MQTTClient::switchServer(const MQTTConfig &newConfig)
{
std::lock_guard<std::mutex> lock(mutex_);
disconnect();
config_ = newConfig;
initializeClient();
connect();
}
bool MQTTClient::isConnected() const
{
return connected_;
}
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }
void MQTTClient::connection_lost(const std::string &cause)
{
connected_ = false;
LOG_WARN("[MQTT] Connection lost: " + cause);
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::connected(const std::string &cause)
{
connected_ = true;
LOG_INFO("[MQTT] Reconnected: " + cause);
if (on_connect_)
on_connect_();
}
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
{
if (on_message_)
on_message_(msg->get_topic(), msg->get_payload_str());
}