kunlang_mqtt_client/src/tcp/tcp_thread/uplink_notify_client.cpp

115 lines
3.5 KiB
C++
Raw Normal View History

2025-07-10 17:36:16 +08:00
#include "tcp_client.hpp"
#include "logger.hpp"
#include <memory>
static std::unique_ptr<TcpClient> uplink_notify_client;
static std::atomic<bool> heartbeat_running{false};
static std::atomic<bool> heartbeat_ack_received{false}; // 是否收到应答
static std::thread heartbeat_thread;
static void heartbeat_task()
{
while (heartbeat_running.load())
{
if (uplink_notify_client && uplink_notify_client->is_connected())
{
heartbeat_ack_received = false;
// 构造并发送心跳包
FullPacket heartPacket = ProtocolCodec::create_heartbeat_packet(g_mqtt_config.vin);
std::vector<uint8_t> raw = ProtocolCodec::encode_full_packet(heartPacket);
std::string payload(reinterpret_cast<const char *>(raw.data()), raw.size());
uplink_notify_client->send_data(payload);
LOG_INFO("[uplink_notify] Sent heartbeat.");
// 等待10秒期间判断是否收到回应
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
if (!heartbeat_running.load())
return;
}
if (!heartbeat_ack_received.load())
{
LOG_WARN("[uplink_notify] Heartbeat ACK not received within 10 seconds.");
}
}
else
{
LOG_WARN("[uplink_notify] Not connected, stop sending heartbeat.");
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
LOG_INFO("[uplink_notify] Heartbeat thread exited.");
}
// 接收处理逻辑
static void handle_uplink_notify_message(const std::string &data)
{
LOG_INFO("[uplink_notify] Received data, length = " + std::to_string(data.size()) + " bytes");
std::vector<uint8_t> bytes(data.begin(), data.end());
auto pkt_opt = ProtocolCodec::decode_full_packet(bytes);
if (!pkt_opt.has_value())
{
LOG_WARN("[uplink_notify] Invalid packet received.");
return;
}
const FullPacket &pkt = pkt_opt.value();
if (pkt.command_id == 0x07 && pkt.response_flag == 0x01 && pkt.data_length == 0)
{
heartbeat_ack_received = true;
LOG_INFO("[uplink_notify] Received heartbeat ACK.");
}
else
{
// TODO: 处理其他消息类型
LOG_INFO("[uplink_notify] Received command: 0x" +
std::to_string(pkt.command_id));
}
}
// 状态变化处理逻辑
static void handle_uplink_notify_status(bool connected)
{
LOG_INFO("[uplink_notify] status: " + std::string(connected ? "connected" : "disconnected"));
if (connected)
{
if (!heartbeat_running.load())
{
heartbeat_running = true;
heartbeat_thread = std::thread(heartbeat_task);
LOG_INFO("[uplink_notify] Heartbeat thread started.");
}
}
else
{
if (heartbeat_running.load())
{
heartbeat_running = false;
if (heartbeat_thread.joinable())
{
heartbeat_thread.join();
LOG_INFO("[uplink_notify] Heartbeat thread stopped.");
}
}
}
}
void start_uplink_notify_client(const std::string &ip, int port)
{
uplink_notify_client = std::make_unique<TcpClient>("uplink_notify", ip, port);
uplink_notify_client->set_receive_callback(handle_uplink_notify_message);
uplink_notify_client->set_status_callback(handle_uplink_notify_status);
uplink_notify_client->start();
}