kunlang_video/src/mqtt_client_wrapper.cpp
2026-01-05 14:01:42 +08:00

437 lines
13 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// mqtt_client_wrapper.cpp
#include "mqtt_client_wrapper.hpp"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <nlohmann/json.hpp>
#include <queue>
#include <thread>
#include "record_manager.hpp"
#include "rtmp_manager.hpp"
std::shared_ptr<MQTTClient> mqtt_client;
extern std::atomic<bool> g_running;
std::atomic<bool> g_streaming{false};
std::string g_dispatch_id;
std::mutex g_dispatch_id_mutex;
static std::atomic<bool> g_mqtt_activated{false};
static std::string g_last_vid;
static bool try_activate_mqtt()
{
if (g_app_config.runtime_vid.empty()) return false;
if (!mqtt_client || !mqtt_client->isConnected()) return false;
// 如果已激活且 VID 未变化,什么都不做
if (g_mqtt_activated.load() && g_last_vid == g_app_config.runtime_vid) return true;
// ---------- 1. 如果是 VID 变化,先取消旧订阅 ----------
if (g_mqtt_activated.load())
{
LOG_INFO("[MQTT] VID changed, unsubscribe old topics: " + g_last_vid);
mqtt_client->unsubscribe(g_prev_topics.video_down);
mqtt_client->unsubscribe(g_prev_topics.record_query);
mqtt_client->unsubscribe(g_prev_topics.record_play);
mqtt_client->unsubscribe(g_prev_topics.vehicle_ctrl);
mqtt_client->unsubscribe(g_prev_topics.heartbeat_up);
}
// ---------- 2. 切换到新 VID ----------
g_app_config.mqtt.topics.fill_with_veh_id(g_app_config.runtime_vid);
g_prev_topics = g_app_config.mqtt.topics;
g_last_vid = g_app_config.runtime_vid;
LOG_INFO("[MQTT] Activated with VID=" + g_last_vid);
// ---------- 3. 订阅新 topic ----------
mqtt_client->subscribe(g_app_config.mqtt.topics.video_down);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_query);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_play);
mqtt_client->subscribe(g_app_config.mqtt.topics.vehicle_ctrl);
g_mqtt_activated.store(true);
return true;
}
static void send_heartbeat()
{
if (!mqtt_client || !mqtt_client->isConnected()) return;
// 获取当前时间戳13位毫秒
auto now = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
// 获取 RTMP 状态
auto channels_info = RTMPManager::get_all_channels_status();
nlohmann::json channels = nlohmann::json::array();
int total = static_cast<int>(channels_info.size());
int running_count = 0;
for (const auto& ch : channels_info)
{
nlohmann::json item;
item["loc"] = ch.loc;
item["running"] = ch.running;
if (ch.running)
{
item["reason"] = nullptr;
running_count++;
}
else
{
item["reason"] = ch.reason.empty() ? "Unknown error" : ch.reason;
}
channels.push_back(item);
}
nlohmann::json hb;
hb["timestamp"] = ms;
hb["status"] = (running_count == 0) ? 0 // 全部失败
: (running_count == total ? 1 : 2); // 全部正常 or 部分异常
hb["channels"] = channels;
// 发布心跳
mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump(), 0);
// LOG_INFO("[MQTT] Sent video heartbeat (" + std::to_string(running_count) + "/" + std::to_string(total) +
// " running): " + hb.dump());
}
// MQTT 回调
static void on_mqtt_connected()
{
LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip);
g_mqtt_activated.store(false); // 重连后需要重新激活
}
static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); }
static void handle_video_down_request(const nlohmann::json& req)
{
if (!req.contains("data") || !req["data"].is_object())
{
LOG_WARN("[video_down] Missing data");
return;
}
int sw = req["data"].value("switch", 0);
if (sw != 1)
{
LOG_INFO("[video_down] switch != 1, ignore");
return;
}
// 取得当前所有 RTMP 播放地址
auto channels_info = RTMPManager::get_all_channels_status();
// 构造响应
nlohmann::json resp;
resp["type"] = "response";
resp["seqNo"] = req.value("seqNo", "0");
resp["data"] = nlohmann::json::array();
int loc = 0;
for (const auto& ch : channels_info)
{
resp["data"].push_back({{"loc", loc}, {"url", ch.url}});
loc++;
}
// 发布回复
mqtt_client->publish(g_app_config.mqtt.topics.video_down, resp.dump(-1), 1);
}
static void handle_record_query_request(const nlohmann::json& req)
{
// 1. 解析字段
if (!req.contains("data") || !req["data"].is_object())
{
LOG_WARN("[record_query] missing data");
return;
}
const auto& d = req["data"];
int loc = d.value("loc", -1);
int64_t start = d.value("startTime", -1LL);
int64_t end = d.value("endTime", -1LL);
if (loc < 0 || start <= 0 || end <= 0 || start >= end)
{
LOG_WARN("[record_query] invalid parameters");
return;
}
LOG_INFO("[record_query] loc=" + std::to_string(loc) + " start=" + std::to_string(start) +
" end=" + std::to_string(end));
// 2. 计算 stream 名
// loc → stream比如
// 0 → "AHD1_main"
std::string stream = RecordManager::loc_to_stream(loc);
if (stream.empty())
{
LOG_WARN("[record_query] invalid loc: " + std::to_string(loc));
return;
}
// 3. 向 RecordManager 查询录像段
auto segs = g_record_manager->querySegments(stream, start, end);
LOG_INFO("[record_query] Found segments=" + std::to_string(segs.size()));
// 4. 构造响应 JSON
nlohmann::json resp;
resp["type"] = "response";
resp["seqNo"] = req.value("seqNo", "0");
nlohmann::json data;
data["loc"] = loc;
data["segments"] = nlohmann::json::array();
int idx = 1;
for (const auto& seg : segs)
{
nlohmann::json s;
s["index"] = idx++;
s["segmentId"] = seg.segment_id;
s["startTime"] = seg.start_ms;
s["endTime"] = seg.end_ms;
data["segments"].push_back(s);
}
resp["data"] = data;
// 5. 发送 MQTT
mqtt_client->publish(g_app_config.mqtt.topics.record_query, resp.dump(-1), 1);
}
static void handle_record_play_request(const nlohmann::json& req)
{
// 1. 解析参数
if (!req.contains("data") || !req["data"].is_object())
{
LOG_WARN("[record_play] missing data");
return;
}
const auto& d = req["data"];
int loc = d.value("loc", -1);
std::string segmentId = d.value("segmentId", "");
if (loc < 0 || segmentId.empty())
{
LOG_WARN("[record_play] invalid parameters");
return;
}
LOG_INFO("[record_play] loc=" + std::to_string(loc) + " segmentId=" + segmentId);
// 2. 根据 segmentId 获取录像段
auto seg = g_record_manager->getSegment(segmentId);
if (seg.files.empty())
{
LOG_WARN("[record_play] no files found for segmentId: " + segmentId);
return;
}
// 3. 计算总时长
int64_t duration = seg.end_ms - seg.start_ms;
// 4. 获取播放地址前缀
std::string ip = get_ip_address("enP2p33s0");
if (ip.empty()) ip = "127.0.0.1";
int http_port = g_record_manager->getHttpPort();
// 5. 构造响应
nlohmann::json resp;
resp["type"] = "response";
resp["seqNo"] = req.value("seqNo", "0");
nlohmann::json data;
data["loc"] = loc;
data["segmentId"] = segmentId;
data["startTime"] = seg.start_ms;
data["endTime"] = seg.end_ms;
data["duration"] = duration;
int index = 1;
// 6. 单文件列表
nlohmann::json files = nlohmann::json::array();
for (auto& f : seg.files)
{
nlohmann::json item;
item["index"] = index++;
item["url"] = "http://" + ip + ":" + std::to_string(http_port) + f.path;
item["startTime"] = f.start_ms;
item["endTime"] = f.end_ms;
files.push_back(item);
}
data["files"] = files;
resp["data"] = data;
// 7. 回复 MQTT
mqtt_client->publish(g_app_config.mqtt.topics.record_play, resp.dump(), 1);
}
static void handle_vehicle_ctrl_request(const nlohmann::json& req)
{
std::string cmd = req.value("command", "");
int value = req.value("value", 0);
// value 固定为 1不是 1 直接忽略
if (value != 1)
{
LOG_WARN("[vehicle_ctrl] ignore command=" + cmd + " value=" + std::to_string(value));
return;
}
if (cmd == "startCtrl")
{
LOG_INFO("[vehicle_ctrl] startCtrl → start all RTMP");
RTMPManager::start_all();
g_streaming.store(true);
return;
}
if (cmd == "stopCtrl")
{
LOG_INFO("[vehicle_ctrl] stopCtrl → stop all RTMP");
RTMPManager::stop_all();
g_streaming.store(false);
return;
}
LOG_WARN("[vehicle_ctrl] unknown command: " + cmd);
}
static void on_mqtt_message_received(const std::string& topic, const std::string& message)
{
try
{
auto j = nlohmann::json::parse(message);
LOG_INFO("[MQTT] Received message on [" + topic + "]");
LOG_INFO("[MQTT] Payload: " + j.dump(-1));
// ===============================
// 1. vehicle_ctrl放宽协议限制
// ===============================
if (topic == g_app_config.mqtt.topics.vehicle_ctrl)
{
// vehicle_ctrl 只要求有 command
if (!j.contains("command"))
{
LOG_WARN("[MQTT] vehicle_ctrl missing 'command'");
return;
}
handle_vehicle_ctrl_request(j);
return;
}
// ==========================================
// 2. 其他 topic必须是 type=request
// ==========================================
if (!j.contains("type") || j["type"] != "request")
{
// 非 request 的消息直接忽略
return;
}
// ===============================
// 3. 按 topic 分发
// ===============================
if (topic == g_app_config.mqtt.topics.video_down)
{
handle_video_down_request(j);
}
else if (topic == g_app_config.mqtt.topics.record_query)
{
handle_record_query_request(j);
}
else if (topic == g_app_config.mqtt.topics.record_play)
{
handle_record_play_request(j);
}
else
{
LOG_WARN("[MQTT] Unknown topic: " + topic);
}
}
catch (const std::exception& e)
{
LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what());
}
}
void mqtt_client_thread_func()
{
const auto& cfg = g_app_config.mqtt;
auto heartbeat_interval = std::chrono::milliseconds(static_cast<int>(cfg.keep_alive * 0.9));
while (g_running)
{
mqtt_client = std::make_unique<MQTTClient>(cfg);
mqtt_client->setConnectCallback(on_mqtt_connected);
mqtt_client->setDisconnectCallback(on_mqtt_disconnected);
mqtt_client->setMessageCallback(on_mqtt_message_received);
// 等待连接
while (g_running && !mqtt_client->isConnected())
{
mqtt_client->connect();
for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (!g_running && !mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 主循环:心跳
while (g_running && mqtt_client->isConnected())
{
try_activate_mqtt(); // VID 到来时触发订阅
if (g_mqtt_activated.load())
{
send_heartbeat();
}
auto sleep_time = heartbeat_interval;
while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected())
{
auto chunk = std::min(sleep_time, std::chrono::milliseconds(50));
std::this_thread::sleep_for(chunk);
sleep_time -= chunk;
}
if (!g_running && mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 清理
if (mqtt_client)
{
if (g_running)
mqtt_client->disconnect();
else
mqtt_client->force_disconnect();
mqtt_client.reset();
}
if (!g_running) break;
// 短暂等待再重连
for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
LOG_INFO("[MQTT] Client thread exiting.");
}