first commit

This commit is contained in:
cxh 2025-09-10 10:34:36 +08:00
parent 679366fa8f
commit 7794cc3ce9
3 changed files with 60 additions and 84 deletions

View File

@ -34,7 +34,6 @@ public:
private: private:
void initializeClient(); void initializeClient();
void startReconnect();
// mqtt::callback 实现 // mqtt::callback 实现
void connection_lost(const std::string &cause) override; void connection_lost(const std::string &cause) override;
@ -45,7 +44,6 @@ private:
MQTTConfig config_; MQTTConfig config_;
std::shared_ptr<mqtt::async_client> client_; std::shared_ptr<mqtt::async_client> client_;
std::atomic<bool> connected_; std::atomic<bool> connected_;
std::atomic<bool> reconnect_active_;
std::mutex mutex_; std::mutex mutex_;
ConnectCallback on_connect_; ConnectCallback on_connect_;

View File

@ -4,7 +4,7 @@
#include <chrono> #include <chrono>
MQTTClient::MQTTClient(const MQTTConfig &config) MQTTClient::MQTTClient(const MQTTConfig &config)
: config_(config), connected_(false), reconnect_active_(false) : config_(config), connected_(false)
{ {
initializeClient(); initializeClient();
} }
@ -37,18 +37,15 @@ void MQTTClient::connect()
.password(config_.password) .password(config_.password)
.finalize(); .finalize();
auto tok = client_->connect(connOpts); client_->connect(connOpts)->wait(); // 阻塞等待连接
tok->wait(); // 阻塞直到连接成功或失败,不用 wait_for(2s)
LOG_INFO("[MQTT] Connected to broker: " + config_.server_ip);
connected_ = true; connected_ = true;
if (on_connect_) if (on_connect_)
on_connect_(); on_connect_();
} }
catch (const mqtt::exception &e) catch (const mqtt::exception &e)
{ {
connected_ = false;
LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what())); LOG_ERROR("[MQTT] Connect failed: " + std::string(e.what()));
// startReconnect();
} }
} }
@ -61,14 +58,15 @@ void MQTTClient::disconnect()
try try
{ {
client_->disconnect()->wait(); client_->disconnect()->wait();
connected_ = false;
if (on_disconnect_)
on_disconnect_();
} }
catch (const mqtt::exception &e) catch (const mqtt::exception &e)
{ {
LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what())); LOG_ERROR("[MQTT] Disconnect failed: " + std::string(e.what()));
} }
connected_ = false;
if (on_disconnect_)
on_disconnect_();
} }
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos) void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
@ -88,8 +86,9 @@ void MQTTClient::publish(const std::string &topic, const std::string &payload, i
catch (const mqtt::exception &e) catch (const mqtt::exception &e)
{ {
LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what())); LOG_ERROR("[MQTT] Publish failed: " + std::string(e.what()));
if (!connected_) connected_ = false;
startReconnect(); if (on_disconnect_)
on_disconnect_();
} }
} }
@ -106,75 +105,21 @@ void MQTTClient::subscribe(const std::string &topic, int qos)
catch (const mqtt::exception &e) catch (const mqtt::exception &e)
{ {
LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what())); LOG_ERROR("[MQTT] Subscribe failed: " + std::string(e.what()));
if (!connected_) connected_ = false;
startReconnect(); if (on_disconnect_)
on_disconnect_();
} }
} }
void MQTTClient::switchServer(const MQTTConfig &newConfig) void MQTTClient::switchServer(const MQTTConfig &newConfig)
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (connected_) disconnect();
{
try
{
client_->disconnect()->wait();
}
catch (...)
{
}
connected_ = false;
}
config_ = newConfig; config_ = newConfig;
initializeClient(); initializeClient();
connect(); connect();
} }
void MQTTClient::startReconnect()
{
if (reconnect_active_)
return;
reconnect_active_ = true;
std::thread([this]
{
while (!connected_ && reconnect_active_) {
std::this_thread::sleep_for(std::chrono::seconds(2));
try {
connect();
} catch (...) {}
}
reconnect_active_ = false; })
.detach();
}
void MQTTClient::connection_lost(const std::string &cause)
{
std::lock_guard<std::mutex> lock(mutex_);
connected_ = false;
LOG_WARN("[MQTT] Connection lost: " + cause);
if (on_disconnect_)
on_disconnect_();
// startReconnect();
}
void MQTTClient::connected(const std::string &cause)
{
std::lock_guard<std::mutex> lock(mutex_);
connected_ = true;
LOG_INFO("[MQTT] Reconnected: " + cause);
if (on_connect_)
on_connect_();
}
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
{
if (on_message_)
{
on_message_(msg->get_topic(), msg->get_payload_str());
}
}
bool MQTTClient::isConnected() const bool MQTTClient::isConnected() const
{ {
return connected_; return connected_;
@ -183,3 +128,25 @@ bool MQTTClient::isConnected() const
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; } void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; } void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; } void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }
void MQTTClient::connection_lost(const std::string &cause)
{
connected_ = false;
LOG_WARN("[MQTT] Connection lost: " + cause);
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::connected(const std::string &cause)
{
connected_ = true;
LOG_INFO("[MQTT] Reconnected: " + cause);
if (on_connect_)
on_connect_();
}
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
{
if (on_message_)
on_message_(msg->get_topic(), msg->get_payload_str());
}

View File

@ -1,12 +1,13 @@
// mqtt_client_wrapper.cpp // mqtt_client_wrapper.cpp
#include "mqtt_client_wrapper.hpp" #include "mqtt_client_wrapper.hpp"
#include <thread>
#include <chrono>
#include <nlohmann/json.hpp>
std::shared_ptr<MQTTClient> mqtt_client; std::shared_ptr<MQTTClient> mqtt_client;
std::atomic<bool> mqtt_restart_required{false}; std::atomic<bool> mqtt_restart_required{false};
extern std::atomic<bool> g_running; extern std::atomic<bool> g_running;
std::atomic<bool> g_streaming{false};
std::atomic<bool> g_streaming{false}; // 当前是否有摄像头在拉流
std::string g_dispatch_id; std::string g_dispatch_id;
std::mutex g_dispatch_id_mutex; std::mutex g_dispatch_id_mutex;
@ -16,12 +17,13 @@ static void send_heartbeat()
return; return;
nlohmann::json hb_data; nlohmann::json hb_data;
hb_data["time"] = Logger::get_current_time_utc8(); // UTC+8 hb_data["time"] = Logger::get_current_time_utc8();
// 状态: 2=没有拉流, 0=正在拉流
hb_data["status"] = g_streaming ? 0 : 2; hb_data["status"] = g_streaming ? 0 : 2;
hb_data["dispatchId"] = g_streaming ? g_dispatch_id : ""; {
std::lock_guard<std::mutex> lock(g_dispatch_id_mutex);
hb["dispatchId"] = g_streaming ? g_dispatch_id : "";
}
nlohmann::json msg; nlohmann::json msg;
msg["data"] = hb_data; msg["data"] = hb_data;
@ -29,7 +31,6 @@ static void send_heartbeat()
msg["type"] = 0; msg["type"] = 0;
mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, msg.dump()); mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, msg.dump());
// LOG_INFO("[MQTT] Sent heartbeat: " + msg.dump());
} }
// MQTT 回调定义 // MQTT 回调定义
@ -209,9 +210,20 @@ void mqtt_client_thread_func()
mqtt_client->setDisconnectCallback(on_mqtt_disconnected); mqtt_client->setDisconnectCallback(on_mqtt_disconnected);
mqtt_client->setMessageCallback(on_mqtt_message_received); mqtt_client->setMessageCallback(on_mqtt_message_received);
while (g_running && !mqtt_client->isConnected())
{
try
{
mqtt_client->connect(); mqtt_client->connect();
}
catch (...)
{
}
if (!mqtt_client->isConnected())
std::this_thread::sleep_for(std::chrono::seconds(2));
}
while (!mqtt_restart_required && g_running) while (g_running && mqtt_client->isConnected())
{ {
try try
{ {
@ -219,10 +231,9 @@ void mqtt_client_thread_func()
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
LOG_ERROR(std::string("[MQTT] Heartbeat error: ") + e.what()); LOG_ERROR("[MQTT] Heartbeat error: " + std::string(e.what()));
} }
std::this_thread::sleep_for(heartbeat_interval);
std::this_thread::sleep_for(heartbeat_interval); // 固定周期
} }
// 需要重启或退出 // 需要重启或退出