sweeper_video/src/mqtt_client_wrapper.cpp
2026-01-22 17:14:01 +08:00

391 lines
11 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// mqtt_client_wrapper.cpp
#include "mqtt_client_wrapper.hpp"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <nlohmann/json.hpp>
#include <queue>
#include <thread>
#include "record_manager.hpp"
#include "rtmp_manager.hpp"
#include "serial_AT.hpp"
std::shared_ptr<MQTTClient> mqtt_client;
extern std::atomic<bool> g_running;
std::atomic<bool> g_streaming{false};
std::string g_dispatch_id;
std::mutex g_dispatch_id_mutex;
static void send_heartbeat()
{
if (!mqtt_client || !mqtt_client->isConnected()) return;
// 获取当前时间戳13位毫秒
auto now = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
// 获取 RTMP 状态
auto channels_info = RTMPManager::get_all_channels_status();
nlohmann::json channels = nlohmann::json::array();
int total = static_cast<int>(channels_info.size());
int running_count = 0;
for (const auto& ch : channels_info)
{
nlohmann::json item;
item["loc"] = ch.loc;
item["running"] = ch.running;
if (ch.running)
{
item["reason"] = nullptr;
running_count++;
}
else
{
item["reason"] = ch.reason.empty() ? "Unknown error" : ch.reason;
}
channels.push_back(item);
}
// ===== 新增:读取 radio 信息(线程安全,返回拷贝)=====
RadioInfo ri = get_radio_info();
nlohmann::json hb;
hb["timestamp"] = ms;
hb["status"] = (running_count == 0) ? 0 : (running_count == total ? 1 : 2);
hb["channels"] = channels;
// ===== 新增:把 radio 塞进心跳 =====
hb["radio"] = {
{"state", ri.state}, {"rat", ri.rat}, {"pci", ri.pci}, {"band", ri.band}, {"arfcn", ri.arfcn},
{"rsrp", ri.rsrp}, {"rsrq", ri.rsrq}, {"sinr", ri.sinr}, {"raw", ri.raw},
};
// 发布心跳
mqtt_client->publish(g_app_config.mqtt.topics.heartbeat_up, hb.dump(), 0);
// LOG_INFO("[MQTT] Sent video heartbeat (" + std::to_string(running_count) + "/" + std::to_string(total) +
// " running): " + hb.dump());
}
// MQTT 回调
static void on_mqtt_connected()
{
LOG_INFO("[MQTT] Connected to broker: " + g_app_config.mqtt.server_ip);
// 一次性订阅VID 来自配置文件,永远不变)
mqtt_client->subscribe(g_app_config.mqtt.topics.video_down);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_query);
mqtt_client->subscribe(g_app_config.mqtt.topics.record_play);
mqtt_client->subscribe(g_app_config.mqtt.topics.vehicle_ctrl);
LOG_INFO("[MQTT] Subscribed all topics for VID=" + g_app_config.mqtt.vehicle_id);
}
static void on_mqtt_disconnected() { LOG_WARN("[MQTT] Disconnected from broker: " + g_app_config.mqtt.server_ip); }
static void handle_video_down_request(const nlohmann::json& req)
{
if (!req.contains("data") || !req["data"].is_object())
{
LOG_WARN("[video_down] Missing data");
return;
}
int sw = req["data"].value("switch", 0);
if (sw != 1)
{
LOG_INFO("[video_down] switch != 1, ignore");
return;
}
// ===== seqNo 自适应 =====
nlohmann::json seqNo = nullptr;
if (req.contains("seqNo"))
{
seqNo = req["seqNo"];
}
auto channels_info = RTMPManager::get_all_channels_status();
nlohmann::json resp;
resp["type"] = "response";
if (!seqNo.is_null()) resp["seqNo"] = seqNo;
resp["data"] = nlohmann::json::array();
int loc = 0;
for (const auto& ch : channels_info)
{
if (ch.running)
{
resp["data"].push_back({{"loc", loc}, {"url", ch.url}});
loc++;
}
}
mqtt_client->publish(g_app_config.mqtt.topics.video_down, resp.dump(-1), 1);
}
static void handle_record_query_request(const nlohmann::json& req)
{
if (!req.contains("data") || !req["data"].is_object())
{
LOG_WARN("[record_query] missing data");
return;
}
// ===== seqNo 自适应 =====
nlohmann::json seqNo = nullptr;
if (req.contains("seqNo"))
{
seqNo = req["seqNo"];
}
const auto& d = req["data"];
int loc = d.value("loc", -1);
int64_t start = d.value("startTime", -1LL);
int64_t end = d.value("endTime", -1LL);
if (loc < 0 || start <= 0 || end <= 0 || start >= end)
{
LOG_WARN("[record_query] invalid parameters");
return;
}
std::string stream = RecordManager::loc_to_stream(loc);
if (stream.empty())
{
LOG_WARN("[record_query] invalid loc: " + std::to_string(loc));
return;
}
auto segs = g_record_manager->querySegments(stream, start, end);
nlohmann::json resp;
resp["type"] = "response";
if (!seqNo.is_null()) resp["seqNo"] = seqNo;
nlohmann::json data;
data["loc"] = loc;
data["segments"] = nlohmann::json::array();
int idx = 1;
for (const auto& seg : segs)
{
data["segments"].push_back(
{{"index", idx++}, {"segmentId", seg.segment_id}, {"startTime", seg.start_ms}, {"endTime", seg.end_ms}});
}
resp["data"] = data;
mqtt_client->publish(g_app_config.mqtt.topics.record_query, resp.dump(-1), 1);
}
static void handle_record_play_request(const nlohmann::json& req)
{
if (!req.contains("data") || !req["data"].is_object())
{
LOG_WARN("[record_play] missing data");
return;
}
// ===== seqNo 自适应 =====
nlohmann::json seqNo = nullptr;
if (req.contains("seqNo"))
{
seqNo = req["seqNo"];
}
const auto& d = req["data"];
int loc = d.value("loc", -1);
std::string segmentId = d.value("segmentId", "");
if (loc < 0 || segmentId.empty())
{
LOG_WARN("[record_play] invalid parameters");
return;
}
auto seg = g_record_manager->getSegment(segmentId);
if (seg.files.empty())
{
LOG_WARN("[record_play] no files found for segmentId: " + segmentId);
return;
}
int64_t duration = seg.end_ms - seg.start_ms;
std::string ip = get_ip_address("enP2p33s0");
if (ip.empty()) ip = "127.0.0.1";
int http_port = g_record_manager->getHttpPort();
nlohmann::json resp;
resp["type"] = "response";
if (!seqNo.is_null()) resp["seqNo"] = seqNo;
nlohmann::json data;
data["loc"] = loc;
data["segmentId"] = segmentId;
data["startTime"] = seg.start_ms;
data["endTime"] = seg.end_ms;
data["duration"] = duration;
nlohmann::json files = nlohmann::json::array();
int index = 1;
for (auto& f : seg.files)
{
files.push_back({{"index", index++},
{"url", "http://" + ip + ":" + std::to_string(http_port) + f.path},
{"startTime", f.start_ms},
{"endTime", f.end_ms}});
}
data["files"] = files;
resp["data"] = data;
mqtt_client->publish(g_app_config.mqtt.topics.record_play, resp.dump(), 1);
}
static void on_mqtt_message_received(const std::string& topic, const std::string& message)
{
try
{
auto j = nlohmann::json::parse(message);
// ===============================
// 1. vehicle_ctrl放宽协议限制
// ===============================
if (topic == g_app_config.mqtt.topics.vehicle_ctrl)
{
// 必须存在 data且 data 是对象
if (!j.contains("data") || !j["data"].is_object()) return;
const auto& d = j["data"];
std::string cmd = d.value("command", "");
int value = d.value("value", 0);
// 视频程序只关心 startCtrl / stopCtrl其它不打印也不处理
if (value != 1) return;
if (cmd != "startCtrl" && cmd != "stopCtrl") return;
// 视频关心的指令才打印
LOG_INFO("[MQTT] Received message on [" + topic + "]");
LOG_INFO("[MQTT] Payload: " + j.dump(-1));
// 执行
if (cmd == "startCtrl")
{
LOG_INFO("[vehicle_ctrl] startCtrl → ENABLE live");
RTMPManager::set_live_enabled_all(true);
}
else // stopCtrl
{
LOG_INFO("[vehicle_ctrl] stopCtrl → DISABLE live");
RTMPManager::set_live_enabled_all(false);
}
return;
}
// ==========================================
// 2. 其他 topic必须是 type=request
// ==========================================
if (!j.contains("type") || j["type"] != "request")
{
// 非 request 的消息直接忽略
return;
}
LOG_INFO("[MQTT] Received message on [" + topic + "]");
LOG_INFO("[MQTT] Payload: " + j.dump(-1));
// ===============================
// 3. 按 topic 分发
// ===============================
if (topic == g_app_config.mqtt.topics.video_down)
{
handle_video_down_request(j);
}
else if (topic == g_app_config.mqtt.topics.record_query)
{
handle_record_query_request(j);
}
else if (topic == g_app_config.mqtt.topics.record_play)
{
handle_record_play_request(j);
}
else
{
LOG_WARN("[MQTT] Unknown topic: " + topic);
}
}
catch (const std::exception& e)
{
LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what());
}
}
void mqtt_client_thread_func()
{
const auto& cfg = g_app_config.mqtt;
auto heartbeat_interval = std::chrono::milliseconds(static_cast<int>(cfg.keep_alive * 0.9));
while (g_running)
{
mqtt_client = std::make_unique<MQTTClient>(cfg);
mqtt_client->setConnectCallback(on_mqtt_connected);
mqtt_client->setDisconnectCallback(on_mqtt_disconnected);
mqtt_client->setMessageCallback(on_mqtt_message_received);
// 等待连接
while (g_running && !mqtt_client->isConnected())
{
mqtt_client->connect();
for (int i = 0; i < 10 && g_running && !mqtt_client->isConnected(); i++)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (!g_running && !mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 主循环:心跳
while (g_running && mqtt_client->isConnected())
{
send_heartbeat();
auto sleep_time = heartbeat_interval;
while (sleep_time.count() > 0 && g_running && mqtt_client->isConnected())
{
auto chunk = std::min(sleep_time, std::chrono::milliseconds(50));
std::this_thread::sleep_for(chunk);
sleep_time -= chunk;
}
if (!g_running && mqtt_client->isConnected()) mqtt_client->force_disconnect();
}
// 清理
if (mqtt_client)
{
if (g_running)
mqtt_client->disconnect();
else
mqtt_client->force_disconnect();
mqtt_client.reset();
}
if (!g_running) break;
// 短暂等待再重连
for (int i = 0; i < 5 && g_running; i++) std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
LOG_INFO("[MQTT] Client thread exiting.");
}