first commit

This commit is contained in:
cxh 2025-09-10 11:18:29 +08:00
parent a7b821a6f3
commit 21e6d97c4a
4 changed files with 85 additions and 11 deletions

View File

@ -26,6 +26,7 @@ public:
void connect(); void connect();
void disconnect(); void disconnect();
void force_disconnect();
void publish(const std::string &topic, const std::string &payload, int qos = -1); void publish(const std::string &topic, const std::string &payload, int qos = -1);
void subscribe(const std::string &topic, int qos = -1); void subscribe(const std::string &topic, int qos = -1);
void switchServer(const MQTTConfig &newConfig); void switchServer(const MQTTConfig &newConfig);

View File

@ -9,6 +9,7 @@
std::atomic<bool> g_running(true); std::atomic<bool> g_running(true);
// main.cpp - 修改signalHandler
void signalHandler(int signum) void signalHandler(int signum)
{ {
static bool already_called = false; static bool already_called = false;
@ -18,16 +19,21 @@ void signalHandler(int signum)
LOG_INFO("[MAIN] Received signal " + std::to_string(signum) + ", shutting down..."); LOG_INFO("[MAIN] Received signal " + std::to_string(signum) + ", shutting down...");
g_running = false; g_running = false;
RTSPManager::stop(); // 停止 RTSP loop
// 设置超时,防止无限等待 // 停止RTSP循环
std::this_thread::sleep_for(std::chrono::milliseconds(500)); RTSPManager::stop();
// 强制退出如果仍然卡住 // 记录停止时间
if (signum == SIGINT) auto start_time = std::chrono::steady_clock::now();
// 等待一段时间让线程退出
while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(3))
{ {
exit(1); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
LOG_INFO("[MAIN] Force exiting after waiting for threads");
exit(1);
} }
int main() int main()

View File

@ -3,6 +3,8 @@
#include <thread> #include <thread>
#include <chrono> #include <chrono>
extern std::atomic<bool> g_running;
MQTTClient::MQTTClient(const MQTTConfig &config) MQTTClient::MQTTClient(const MQTTConfig &config)
: config_(config), connected_(false) : config_(config), connected_(false)
{ {
@ -24,7 +26,7 @@ void MQTTClient::initializeClient()
void MQTTClient::connect() void MQTTClient::connect()
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (connected_) if (connected_ || !g_running) // 添加g_running检查
return; return;
try try
@ -37,7 +39,18 @@ void MQTTClient::connect()
.password(config_.password) .password(config_.password)
.finalize(); .finalize();
client_->connect(connOpts)->wait(); // 阻塞等待连接 // 使用wait_for而不是wait设置超时时间
auto connect_future = client_->connect(connOpts);
// 等待最多2秒钟期间检查退出标志
for (int i = 0; i < 20 && g_running; i++)
{
if (connect_future->wait_for(std::chrono::milliseconds(100)) == std::future_status::ready)
{
connected_ = true;
break;
}
}
} }
catch (const mqtt::exception &e) catch (const mqtt::exception &e)
{ {
@ -54,7 +67,17 @@ void MQTTClient::disconnect()
try try
{ {
client_->disconnect()->wait(); // 使用wait_for而不是wait设置超时时间
auto disconnect_future = client_->disconnect();
// 等待最多1秒钟而不是无限等待
if (disconnect_future->wait_for(std::chrono::seconds(1)) == std::future_status::timeout)
{
LOG_WARN("[MQTT] Disconnect timed out, forcing disconnection");
// 强制断开连接
client_->stop_consuming();
client_.reset();
}
} }
catch (const mqtt::exception &e) catch (const mqtt::exception &e)
{ {
@ -66,6 +89,28 @@ void MQTTClient::disconnect()
on_disconnect_(); on_disconnect_();
} }
void MQTTClient::force_disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
// 直接停止客户端,不等待正常断开
client_->stop_consuming();
client_.reset();
}
catch (const mqtt::exception &e)
{
LOG_ERROR("[MQTT] Force disconnect failed: " + std::string(e.what()));
}
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos) void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
{ {
if (qos == -1) if (qos == -1)

View File

@ -247,13 +247,35 @@ void mqtt_client_thread_func()
} }
} }
// 需要重启或退出 // 清理资源
if (mqtt_client) if (mqtt_client)
{
// 只有在运行标志仍然为true时才尝试正常断开
if (g_running)
{ {
mqtt_client->disconnect(); mqtt_client->disconnect();
}
else
{
// 如果正在退出,直接重置客户端,不等待断开完成
mqtt_client->force_disconnect();
}
mqtt_client.reset(); mqtt_client.reset();
} }
mqtt_restart_required = false; mqtt_restart_required = false;
// 如果正在退出,跳出循环
if (!g_running)
{
break;
}
// 短暂等待后再尝试重连
for (int i = 0; i < 5 && g_running; i++)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
} }
LOG_INFO("[MQTT] Client thread exiting."); LOG_INFO("[MQTT] Client thread exiting.");
} }