kunlang_video/src/mqtt_client.cpp

178 lines
4.3 KiB
C++
Raw Normal View History

2025-09-08 10:59:08 +08:00
#include "mqtt_client.hpp"
#include <iostream>
#include <thread>
#include <chrono>
MQTTClient::MQTTClient(const MQTTConfig &config)
: config_(config), connected_(false), reconnect_active_(false)
{
initializeClient();
}
MQTTClient::~MQTTClient()
{
disconnect();
}
void MQTTClient::initializeClient()
{
std::string address = "tcp://" + config_.server_ip + ":" + std::to_string(config_.server_port);
client_ = std::make_shared<mqtt::async_client>(address, config_.client_id);
client_->set_callback(*this);
}
void MQTTClient::connect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (connected_)
return;
try
{
auto connOpts = mqtt::connect_options_builder()
.clean_session(config_.clean_session)
.automatic_reconnect(true)
.keep_alive_interval(std::chrono::seconds(config_.keep_alive))
.user_name(config_.username)
.password(config_.password)
.finalize();
2025-09-08 16:06:50 +08:00
auto tok = client_->connect(connOpts); // 异步连接
if (!tok->wait_for(std::chrono::seconds(2))) // 设置超时
{
2025-09-08 16:40:04 +08:00
LOG_WARN("[MQTT] Connect timed out to broker: " + config_.server_ip);
2025-09-08 16:06:50 +08:00
startReconnect();
return;
}
2025-09-08 10:59:08 +08:00
connected_ = true;
reconnect_active_ = false;
if (on_connect_)
on_connect_();
}
catch (const mqtt::exception &e)
{
2025-09-08 16:40:04 +08:00
LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what()));
2025-09-08 10:59:08 +08:00
startReconnect();
}
}
void MQTTClient::disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
client_->disconnect()->wait();
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
catch (const mqtt::exception &e)
{
2025-09-08 16:40:04 +08:00
LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what()));
2025-09-08 10:59:08 +08:00
}
}
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500));
2025-09-08 15:59:15 +08:00
LOG_INFO("[MQTTClient] Published message to topic: " + topic);
2025-09-08 10:59:08 +08:00
}
catch (const mqtt::exception &e)
{
2025-09-08 16:40:04 +08:00
LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what()));
2025-09-08 10:59:08 +08:00
if (!connected_)
startReconnect();
}
}
void MQTTClient::subscribe(const std::string &topic, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->subscribe(topic, qos)->wait();
2025-09-08 15:59:15 +08:00
LOG_INFO("[MQTTClient] Subscribed to topic: " + topic);
2025-09-08 10:59:08 +08:00
}
catch (const mqtt::exception &e)
{
2025-09-08 15:59:15 +08:00
LOG_ERROR("[MQTTClient] Subscribe failed: " + std::string(e.what()));
2025-09-08 10:59:08 +08:00
if (!connected_)
startReconnect();
}
}
void MQTTClient::switchServer(const MQTTConfig &newConfig)
{
std::lock_guard<std::mutex> lock(mutex_);
if (connected_)
{
try
{
client_->disconnect()->wait();
}
catch (...)
{
}
connected_ = false;
}
config_ = newConfig;
initializeClient();
connect();
}
void MQTTClient::startReconnect()
{
if (reconnect_active_)
return;
reconnect_active_ = true;
std::thread([this]
{
while (!connected_ && reconnect_active_) {
std::this_thread::sleep_for(std::chrono::seconds(2));
try {
connect();
} catch (...) {}
}
reconnect_active_ = false; })
.detach();
}
void MQTTClient::connection_lost(const std::string &cause)
{
std::lock_guard<std::mutex> lock(mutex_);
connected_ = false;
if (on_disconnect_)
on_disconnect_();
startReconnect();
}
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
{
if (on_message_)
{
on_message_(msg->get_topic(), msg->get_payload_str());
}
}
bool MQTTClient::isConnected() const
{
return connected_;
}
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }