kunlang_video/src/mqtt_client_wrapper.cpp
2025-10-15 15:01:55 +08:00

179 lines
5.8 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// mqtt_client_wrapper.cpp
#include "mqtt_client_wrapper.hpp"
#include <algorithm>
#include <chrono>
#include <nlohmann/json.hpp>
#include <thread>
std::shared_ptr<MQTTClient> mqtt_client;
std::atomic<bool> mqtt_restart_required{false};
extern std::atomic<bool> g_running;
std::atomic<bool> g_streaming{false};
std::string g_dispatch_id;
std::mutex g_dispatch_id_mutex;
static void send_heartbeat()
{
if (!mqtt_client || !mqtt_client->isConnected()) return;
nlohmann::json hb_data;
hb_data["time"] = Logger::get_current_time_utc8();
hb_data["status"] = g_streaming ? 0 : 2;
{
std::lock_guard<std::mutex> lock(g_dispatch_id_mutex);
hb_data["dispatchId"] = g_streaming ? g_dispatch_id : "";
}
nlohmann::json msg;
msg["data"] = hb_data;
msg["isEnc"] = 0;
msg["type"] = 0;
mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, msg.dump());
}
// MQTT 回调
static void on_mqtt_connected()
{
LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip);
const auto &topics = g_app_config.mqtt.topics;
mqtt_client->subscribe(topics.video_down);
}
static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); }
static void on_mqtt_message_received(const std::string &topic, const std::string &message)
{
try
{
auto j = nlohmann::json::parse(message);
if (topic == g_app_config.mqtt.topics.video_down && j["type"] == "request")
{
LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size()));
LOG_INFO("[MQTT] Message content: " + j.dump(0));
std::string seqNo = j.value("seqNo", "");
auto data = j["data"];
int switch_val = data.value("switch", 0); // 0 开1 关
int streamType = data.value("streamType", 0); // 0 主1 子
StreamType type = (streamType == 0) ? StreamType::MAIN : StreamType::SUB;
auto channels = data.value("channels", std::vector<int>{});
nlohmann::json results = nlohmann::json::array();
for (int ch : channels)
{
if (ch < 0 || ch >= static_cast<int>(g_app_config.cameras.size())) continue;
Camera &cam = g_app_config.cameras[ch];
RTMPManager::StreamResultInfo res;
if (switch_val == 0)
{
if (!RTMPManager::is_streaming(cam.name, type))
{
res = RTMPManager::start_camera(cam, type);
}
else
{
res.loc = ch;
res.url = RTMPManager::get_stream_url(cam.name, type);
res.result = 0;
res.reason = "Already streaming";
}
}
else
{
if (RTMPManager::is_streaming(cam.name, type))
{
res = RTMPManager::stop_camera(cam.name, type);
}
else
{
res.loc = ch;
res.url = RTMPManager::get_stream_url(cam.name, type);
res.result = 0;
res.reason = "Not streaming";
}
}
results.push_back({{"loc", res.loc}, {"url", res.url}, {"result", res.result}, {"reason", res.reason}});
}
nlohmann::json reply;
reply["type"] = "response";
reply["seqNo"] = seqNo;
reply["data"] = results;
if (mqtt_client) mqtt_client->publish(g_app_config.mqtt.topics.video_down, reply.dump());
}
}
catch (const std::exception &e)
{
LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what());
}
}
void mqtt_client_thread_func()
{
const auto &cfg = g_app_config.mqtt;
auto heartbeat_interval = std::chrono::milliseconds(static_cast<int>(cfg.keep_alive * 0.9));
while (g_running)
{
mqtt_client = std::make_unique<MQTTClient>(cfg);
mqtt_client->setConnectCallback(on_mqtt_connected);
mqtt_client->setDisconnectCallback(on_mqtt_disconnected);
mqtt_client->setMessageCallback(on_mqtt_message_received);
// 等待连接
while (g_running && !mqtt_client->isConnected())
{
mqtt_client->connect();
for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (!g_running && !mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 主循环:心跳
while (g_running && mqtt_client->isConnected())
{
send_heartbeat();
auto sleep_time = heartbeat_interval;
while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected())
{
auto chunk = std::min(sleep_time, std::chrono::milliseconds(50));
std::this_thread::sleep_for(chunk);
sleep_time -= chunk;
}
if (!g_running && mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 清理
if (mqtt_client)
{
if (g_running)
mqtt_client->disconnect();
else
mqtt_client->force_disconnect();
mqtt_client.reset();
}
mqtt_restart_required = false;
if (!g_running) break;
// 短暂等待再重连
for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
LOG_INFO("[MQTT] Client thread exiting.");
}