kunlang_mqtt_client/src/tcp/tcp_thread/uplink_notify_client.cpp
2025-07-14 16:55:55 +08:00

145 lines
4.8 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "tcp_client.hpp"
#include "logger.hpp"
#include <memory>
static std::unique_ptr<TcpClient> uplink_notify_client;
static std::atomic<bool> heartbeat_running{false};
static std::atomic<bool> heartbeat_ack_received{false}; // 是否收到应答
static std::thread heartbeat_thread;
static void heartbeat_task()
{
while (heartbeat_running.load())
{
if (uplink_notify_client && uplink_notify_client->is_connected())
{
heartbeat_ack_received = false;
// 构造并发送心跳包
FullPacket heartPacket = ProtocolCodec::create_heartbeat_packet(g_mqtt_config.vin);
std::vector<uint8_t> raw = ProtocolCodec::encode_full_packet(heartPacket);
std::string payload(reinterpret_cast<const char *>(raw.data()), raw.size());
uplink_notify_client->send_data(payload);
LOG_INFO("[uplink_notify] Sent heartbeat.");
// 等待10秒期间判断是否收到回应
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
if (!heartbeat_running.load())
return;
}
if (!heartbeat_ack_received.load())
{
LOG_WARN("[uplink_notify] Heartbeat ACK not received within 10 seconds.");
}
}
else
{
LOG_WARN("[uplink_notify] Not connected, stop sending heartbeat.");
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
LOG_INFO("[uplink_notify] Heartbeat thread exited.");
}
// 接收处理逻辑
static void handle_uplink_notify_message(const std::string &data)
{
LOG_INFO("[uplink_notify] Received data, length = " + std::to_string(data.size()) + " bytes");
std::vector<uint8_t> bytes(data.begin(), data.end());
auto pkt_opt = ProtocolCodec::decode_full_packet(bytes);
if (!pkt_opt.has_value())
{
LOG_WARN("[uplink_notify] Invalid packet received.");
return;
}
const FullPacket &pkt = pkt_opt.value();
if (pkt.command_id == 0x07 && pkt.response_flag == 0x01 && pkt.data_length == 0)
{
heartbeat_ack_received = true;
LOG_INFO("[uplink_notify] Received heartbeat ACK.");
}
else if (pkt.command_id == 0xD4 && pkt.response_flag == 0xfe && pkt.data_length > 0) // 车辆通知信息
{
LOG_INFO("[uplink_notify] Received vehicle notify packet.");
auto data_opt = ProtocolCodec::parse_vehicle_message(pkt.data_unit);
if (!data_opt)
{
LOG_ERROR("[uplink_notify] Failed to parse vehicle data packet.");
std::vector<uint8_t> raw_reply = ProtocolCodec::make_ack_response(pkt, false);
std::string str_reply(reinterpret_cast<const char *>(raw_reply.data()), raw_reply.size());
uplink_notify_client->send_data(str_reply); // 解析失败回复
return;
}
else
{
std::vector<uint8_t> raw_reply = ProtocolCodec::make_ack_response(pkt, true);
std::string str_reply(reinterpret_cast<const char *>(raw_reply.data()), raw_reply.size());
uplink_notify_client->send_data(str_reply); // 解析成功回复
}
const auto &data_msg = *data_opt;
LOG_INFO("[uplink_notify] JSON Payload = " + data_msg.json_payload);
if (mqtt_client && mqtt_client->isConnected())
{
mqtt_client->publish(g_mqtt_config.topics.uplink_2, data_msg.json_payload, g_mqtt_config.qos);
}
else
{
LOG_WARN("[uplink_notify] MQTT not connected, failed to publish vehicle notify.");
}
}
else
{
LOG_ERROR("[uplink_notify] Unknown command flag.");
}
}
// 状态变化处理逻辑
static void handle_uplink_notify_status(bool connected)
{
LOG_INFO("[uplink_notify] status: " + std::string(connected ? "connected" : "disconnected"));
if (connected)
{
if (!heartbeat_running.load())
{
heartbeat_running = true;
heartbeat_thread = std::thread(heartbeat_task);
LOG_INFO("[uplink_notify] Heartbeat thread started.");
}
}
else
{
if (heartbeat_running.load())
{
heartbeat_running = false;
if (heartbeat_thread.joinable())
{
heartbeat_thread.join();
LOG_INFO("[uplink_notify] Heartbeat thread stopped.");
}
}
}
}
void start_uplink_notify_client(const std::string &ip, int port)
{
uplink_notify_client = std::make_unique<TcpClient>("uplink_notify", ip, port);
uplink_notify_client->set_receive_callback(handle_uplink_notify_message);
uplink_notify_client->set_status_callback(handle_uplink_notify_status);
uplink_notify_client->start();
}