169 lines
4.0 KiB
C++
169 lines
4.0 KiB
C++
|
|
#include "mqtt_client.hpp"
|
||
|
|
#include <iostream>
|
||
|
|
#include <thread>
|
||
|
|
#include <chrono>
|
||
|
|
|
||
|
|
MQTTClient::MQTTClient(const MQTTConfig &config)
|
||
|
|
: config_(config), connected_(false), reconnect_active_(false)
|
||
|
|
{
|
||
|
|
initializeClient();
|
||
|
|
}
|
||
|
|
|
||
|
|
MQTTClient::~MQTTClient()
|
||
|
|
{
|
||
|
|
disconnect();
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::initializeClient()
|
||
|
|
{
|
||
|
|
std::string address = "tcp://" + config_.server_ip + ":" + std::to_string(config_.server_port);
|
||
|
|
client_ = std::make_shared<mqtt::async_client>(address, config_.client_id);
|
||
|
|
client_->set_callback(*this);
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::connect()
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
if (connected_)
|
||
|
|
return;
|
||
|
|
|
||
|
|
try
|
||
|
|
{
|
||
|
|
auto connOpts = mqtt::connect_options_builder()
|
||
|
|
.clean_session(config_.clean_session)
|
||
|
|
.automatic_reconnect(true)
|
||
|
|
.keep_alive_interval(std::chrono::seconds(config_.keep_alive))
|
||
|
|
.user_name(config_.username)
|
||
|
|
.password(config_.password)
|
||
|
|
.finalize();
|
||
|
|
|
||
|
|
client_->connect(connOpts)->wait();
|
||
|
|
connected_ = true;
|
||
|
|
reconnect_active_ = false;
|
||
|
|
|
||
|
|
if (on_connect_)
|
||
|
|
on_connect_();
|
||
|
|
}
|
||
|
|
catch (const mqtt::exception &e)
|
||
|
|
{
|
||
|
|
std::cerr << "[MQTTClient] Connect failed: " << e.what() << std::endl;
|
||
|
|
startReconnect();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::disconnect()
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
if (!connected_)
|
||
|
|
return;
|
||
|
|
|
||
|
|
try
|
||
|
|
{
|
||
|
|
client_->disconnect()->wait();
|
||
|
|
connected_ = false;
|
||
|
|
if (on_disconnect_)
|
||
|
|
on_disconnect_();
|
||
|
|
}
|
||
|
|
catch (const mqtt::exception &e)
|
||
|
|
{
|
||
|
|
std::cerr << "[MQTTClient] Disconnect failed: " << e.what() << std::endl;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
|
||
|
|
{
|
||
|
|
if (qos == -1)
|
||
|
|
qos = config_.qos;
|
||
|
|
|
||
|
|
try
|
||
|
|
{
|
||
|
|
client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500));
|
||
|
|
}
|
||
|
|
catch (const mqtt::exception &e)
|
||
|
|
{
|
||
|
|
std::cerr << "[MQTTClient] Publish failed: " << e.what() << std::endl;
|
||
|
|
if (!connected_)
|
||
|
|
startReconnect();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::subscribe(const std::string &topic, int qos)
|
||
|
|
{
|
||
|
|
if (qos == -1)
|
||
|
|
qos = config_.qos;
|
||
|
|
|
||
|
|
try
|
||
|
|
{
|
||
|
|
client_->subscribe(topic, qos)->wait();
|
||
|
|
}
|
||
|
|
catch (const mqtt::exception &e)
|
||
|
|
{
|
||
|
|
std::cerr << "[MQTTClient] Subscribe failed: " << e.what() << std::endl;
|
||
|
|
if (!connected_)
|
||
|
|
startReconnect();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::switchServer(const MQTTConfig &newConfig)
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
if (connected_)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
client_->disconnect()->wait();
|
||
|
|
}
|
||
|
|
catch (...)
|
||
|
|
{
|
||
|
|
}
|
||
|
|
connected_ = false;
|
||
|
|
}
|
||
|
|
config_ = newConfig;
|
||
|
|
initializeClient();
|
||
|
|
connect();
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::startReconnect()
|
||
|
|
{
|
||
|
|
if (reconnect_active_)
|
||
|
|
return;
|
||
|
|
reconnect_active_ = true;
|
||
|
|
|
||
|
|
std::thread([this]
|
||
|
|
{
|
||
|
|
while (!connected_ && reconnect_active_) {
|
||
|
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||
|
|
try {
|
||
|
|
connect();
|
||
|
|
} catch (...) {}
|
||
|
|
}
|
||
|
|
reconnect_active_ = false; })
|
||
|
|
.detach();
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::connection_lost(const std::string &cause)
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
connected_ = false;
|
||
|
|
if (on_disconnect_)
|
||
|
|
on_disconnect_();
|
||
|
|
startReconnect();
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
|
||
|
|
{
|
||
|
|
if (on_message_)
|
||
|
|
{
|
||
|
|
on_message_(msg->get_topic(), msg->get_payload_str());
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
bool MQTTClient::isConnected() const
|
||
|
|
{
|
||
|
|
return connected_;
|
||
|
|
}
|
||
|
|
|
||
|
|
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
|
||
|
|
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
|
||
|
|
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }
|