kunlang_video/src/mqtt_client.cpp

213 lines
5.5 KiB
C++
Raw Normal View History

2025-09-08 10:59:08 +08:00
#include "mqtt_client.hpp"
#include <iostream>
#include <thread>
#include <chrono>
2025-09-10 11:19:40 +08:00
#include <future>
2025-09-08 10:59:08 +08:00
2025-09-10 11:18:29 +08:00
extern std::atomic<bool> g_running;
2025-09-08 10:59:08 +08:00
MQTTClient::MQTTClient(const MQTTConfig &config)
2025-09-10 10:34:36 +08:00
: config_(config), connected_(false)
2025-09-08 10:59:08 +08:00
{
initializeClient();
}
MQTTClient::~MQTTClient()
{
disconnect();
}
void MQTTClient::initializeClient()
{
std::string address = "tcp://" + config_.server_ip + ":" + std::to_string(config_.server_port);
client_ = std::make_shared<mqtt::async_client>(address, config_.client_id);
client_->set_callback(*this);
}
void MQTTClient::connect()
{
std::lock_guard<std::mutex> lock(mutex_);
2025-09-10 11:18:29 +08:00
if (connected_ || !g_running) // 添加g_running检查
2025-09-08 10:59:08 +08:00
return;
try
{
auto connOpts = mqtt::connect_options_builder()
.clean_session(config_.clean_session)
.automatic_reconnect(true)
.keep_alive_interval(std::chrono::seconds(config_.keep_alive))
.user_name(config_.username)
.password(config_.password)
.finalize();
2025-09-10 11:24:52 +08:00
auto connect_token = client_->connect(connOpts); // shared_ptr<mqtt::token>
2025-09-10 11:18:29 +08:00
2025-09-10 11:24:52 +08:00
for (int i = 0; i < 20 && g_running; ++i) // 最多 2 秒
2025-09-10 11:18:29 +08:00
{
2025-09-10 11:24:52 +08:00
if (connect_token->is_complete()) // 非阻塞检查
2025-09-10 11:18:29 +08:00
{
connected_ = true;
break;
}
2025-09-10 11:24:52 +08:00
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 如果循环结束仍未完成,可以手动断开
if (!connected_)
{
try
{
client_->disconnect()->wait();
}
catch (...)
{
}
2025-09-10 11:18:29 +08:00
}
2025-09-08 10:59:08 +08:00
}
catch (const mqtt::exception &e)
{
2025-09-10 10:34:36 +08:00
connected_ = false;
2025-09-08 16:40:04 +08:00
LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what()));
2025-09-08 10:59:08 +08:00
}
}
void MQTTClient::disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
2025-09-10 11:24:52 +08:00
auto disc_token = client_->disconnect(); // 返回 std::shared_ptr<mqtt::token>
// 等待最多1秒钟期间检查 g_running
int wait_loops = 10; // 10*100ms = 1秒
for (int i = 0; i < wait_loops && g_running; ++i)
{
if (disc_token->is_complete())
break;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
2025-09-10 11:18:29 +08:00
2025-09-10 11:24:52 +08:00
if (!disc_token->is_complete())
2025-09-10 11:18:29 +08:00
{
LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection");
2025-09-10 11:24:52 +08:00
client_->stop_consuming(); // 强制停止消费消息
client_.reset(); // 释放 client 对象
2025-09-10 11:18:29 +08:00
}
2025-09-08 10:59:08 +08:00
}
catch (const mqtt::exception &e)
{
2025-09-08 16:40:04 +08:00
LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what()));
2025-09-08 10:59:08 +08:00
}
2025-09-10 10:34:36 +08:00
connected_ = false;
if (on_disconnect_)
on_disconnect_();
2025-09-08 10:59:08 +08:00
}
2025-09-10 11:18:29 +08:00
void MQTTClient::force_disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
// 直接停止客户端,不等待正常断开
client_->stop_consuming();
client_.reset();
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what()));
}
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
2025-09-08 10:59:08 +08:00
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500));
2025-09-09 17:19:59 +08:00
// 如果 topic 不包含 "heartbeat",才打印日志
if (topic.find("heartbeat") == std::string::npos)
{
LOG_INFO("[MQTT] Published message to topic: " + topic);
}
2025-09-08 10:59:08 +08:00
}
catch (const mqtt::exception &e)
{
2025-09-08 16:40:04 +08:00
LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what()));
2025-09-10 10:34:36 +08:00
connected_ = false;
if (on_disconnect_)
on_disconnect_();
2025-09-08 10:59:08 +08:00
}
}
void MQTTClient::subscribe(const std::string &topic, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->subscribe(topic, qos)->wait();
2025-09-09 10:25:16 +08:00
LOG_INFO("[MQTT] Subscribed to topic: " + topic);
2025-09-08 10:59:08 +08:00
}
catch (const mqtt::exception &e)
{
2025-09-09 10:25:16 +08:00
LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what()));
2025-09-10 10:34:36 +08:00
connected_ = false;
if (on_disconnect_)
on_disconnect_();
2025-09-08 10:59:08 +08:00
}
}
void MQTTClient::switchServer(const MQTTConfig &newConfig)
{
std::lock_guard<std::mutex> lock(mutex_);
2025-09-10 10:34:36 +08:00
disconnect();
2025-09-08 10:59:08 +08:00
config_ = newConfig;
initializeClient();
connect();
}
2025-09-10 10:34:36 +08:00
bool MQTTClient::isConnected() const
2025-09-08 10:59:08 +08:00
{
2025-09-10 10:34:36 +08:00
return connected_;
2025-09-08 10:59:08 +08:00
}
2025-09-10 10:34:36 +08:00
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }
2025-09-08 10:59:08 +08:00
void MQTTClient::connection_lost(const std::string &cause)
{
connected_ = false;
2025-09-10 10:11:18 +08:00
LOG_WARN("[MQTT] Connection lost: " + cause);
2025-09-08 10:59:08 +08:00
if (on_disconnect_)
on_disconnect_();
2025-09-10 10:11:18 +08:00
}
void MQTTClient::connected(const std::string &cause)
{
connected_ = true;
LOG_INFO("[MQTT] Reconnected: " + cause);
if (on_connect_)
on_connect_();
2025-09-08 10:59:08 +08:00
}
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
{
if (on_message_)
on_message_(msg->get_topic(), msg->get_payload_str());
}