kunlang_video/src/mqtt_client_wrapper.cpp
2025-11-14 15:40:14 +08:00

212 lines
6.2 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// mqtt_client_wrapper.cpp
#include "mqtt_client_wrapper.hpp"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <nlohmann/json.hpp>
#include <queue>
#include <thread>
#include "record_manager.hpp"
std::shared_ptr<MQTTClient> mqtt_client;
extern std::atomic<bool> g_running;
std::atomic<bool> g_streaming{false};
std::string g_dispatch_id;
std::mutex g_dispatch_id_mutex;
static void send_heartbeat()
{
if (!mqtt_client || !mqtt_client->isConnected()) return;
// 获取当前时间戳13位毫秒
auto now = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
// 获取 RTMP 状态
auto channels_info = RTMPManager::get_all_channels_status();
nlohmann::json channels = nlohmann::json::array();
int total = static_cast<int>(channels_info.size());
int running_count = 0;
for (const auto& ch : channels_info)
{
nlohmann::json item;
item["loc"] = ch.loc;
item["running"] = ch.running;
if (ch.running)
{
item["reason"] = nullptr;
running_count++;
}
else
{
item["reason"] = ch.reason.empty() ? "Unknown error" : ch.reason;
}
channels.push_back(item);
}
nlohmann::json hb;
hb["timestamp"] = ms;
hb["status"] = (running_count == 0) ? 0 // 全部失败
: (running_count == total ? 1 : 2); // 全部正常 or 部分异常
hb["channels"] = channels;
// 发布心跳
mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump(), 0);
// LOG_INFO("[MQTT] Sent video heartbeat (" + std::to_string(running_count) + "/" + std::to_string(total) +
// " running): " + hb.dump());
}
// MQTT 回调
static void on_mqtt_connected()
{
LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip);
mqtt_client->subscribe(g_app_config.mqtt.topics.video_down);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_query);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_play);
}
static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); }
static void handle_video_down_request(const nlohmann::json& req)
{
if (!req.contains("data") || !req["data"].is_object())
{
LOG_WARN("[video_down] Missing data");
return;
}
int sw = req["data"].value("switch", 0);
if (sw != 1)
{
LOG_INFO("[video_down] switch != 1, ignore");
return;
}
// 取得当前所有 RTMP 播放地址
auto channels_info = RTMPManager::get_all_channels_status();
// 构造响应
nlohmann::json resp;
resp["type"] = "response";
resp["seqNo"] = req.value("seqNo", "0");
resp["data"] = nlohmann::json::array();
int loc = 0;
for (const auto& ch : channels_info)
{
resp["data"].push_back({{"loc", loc}, {"url", ch.url}});
loc++;
}
// 发布回复
mqtt_client->publish(g_app_config.mqtt.topics.video_down, resp.dump(-1), 1);
}
static void handle_record_query_request(const nlohmann::json& req) {}
static void handle_record_play_request(const nlohmann::json& req) {}
static void on_mqtt_message_received(const std::string& topic, const std::string& message)
{
try
{
auto j = nlohmann::json::parse(message);
// 必须是 request
if (!j.contains("type") || j["type"] != "request")
{
LOG_INFO("[MQTT] Ignored non-request message");
return;
}
LOG_INFO("[MQTT] Received request on [" + topic + "]");
LOG_INFO("[MQTT] Payload: " + j.dump(-1));
// 根据 topic 分发
if (topic == g_app_config.mqtt.topics.video_down)
{
handle_video_down_request(j);
}
else if (topic == g_app_config.mqtt.topics.record_query)
{
handle_record_query_request(j);
}
else if (topic == g_app_config.mqtt.topics.record_play)
{
handle_record_play_request(j);
}
else
{
LOG_WARN("[MQTT] Unknown topic: " + topic);
}
}
catch (const std::exception& e)
{
LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what());
}
}
void mqtt_client_thread_func()
{
const auto& cfg = g_app_config.mqtt;
auto heartbeat_interval = std::chrono::milliseconds(static_cast<int>(cfg.keep_alive * 0.9));
while (g_running)
{
mqtt_client = std::make_unique<MQTTClient>(cfg);
mqtt_client->setConnectCallback(on_mqtt_connected);
mqtt_client->setDisconnectCallback(on_mqtt_disconnected);
mqtt_client->setMessageCallback(on_mqtt_message_received);
// 等待连接
while (g_running && !mqtt_client->isConnected())
{
mqtt_client->connect();
for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (!g_running && !mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 主循环:心跳
while (g_running && mqtt_client->isConnected())
{
send_heartbeat();
auto sleep_time = heartbeat_interval;
while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected())
{
auto chunk = std::min(sleep_time, std::chrono::milliseconds(50));
std::this_thread::sleep_for(chunk);
sleep_time -= chunk;
}
if (!g_running && mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 清理
if (mqtt_client)
{
if (g_running)
mqtt_client->disconnect();
else
mqtt_client->force_disconnect();
mqtt_client.reset();
}
if (!g_running) break;
// 短暂等待再重连
for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
LOG_INFO("[MQTT] Client thread exiting.");
}