合并定位信息上报节点至mqtt上报节点

This commit is contained in:
lyq 2026-01-14 16:44:15 +08:00
parent 977fab0f9a
commit 43b8299e6a
11 changed files with 480 additions and 337 deletions

View File

@ -4,8 +4,9 @@
"external_net_port": 19683,
"username": "zxwl",
"password": "zxwl1234@",
"info_topic": "/zxwl/vehicle/{vid}/info",
"fault_topic": "/zxwl/vehicle/{vid}/fault",
"info_topic": "/zxwl/sweeper/{vid}/info",
"fault_topic": "/zxwl/sweeper/{vid}/fault",
"gps_topic": "/zxwl/sweeper/{vid}/gps",
"pub_gps_topic": "/zxwl/sweeper/V060003/gps",
"remote_topic": "/zxwl/sweeper/V060003/ctrl",
"upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload",

View File

@ -1,22 +1,38 @@
// can_decoder.h
#pragma once
#include <mutex>
#include "mqtt_report/vehicle_context.h"
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "mqtt_report/report_struct.h"
namespace sweeperMsg = sweeper_interfaces::msg;
class CanDecoder
{
public:
CanDecoder(GeneralMsg &info,
std::mutex &info_mutex,
std::mutex &fault_mutex);
public:
explicit CanDecoder(VehicleContext& ctx);
void handle(const sweeperMsg::CanFrame::SharedPtr msg);
private:
GeneralMsg &info_report_;
std::mutex &info_mutex_;
std::mutex &fault_mutex_;
private:
using CanHandler = std::function<void(const sweeperMsg::CanFrame&)>;
// ===== BMS =====
void handle_bms_100(const sweeperMsg::CanFrame& msg);
void handle_bms_101(const sweeperMsg::CanFrame& msg);
// ===== MCU =====
void handle_mcu_status_1(const sweeperMsg::CanFrame& msg); // 0x0CF11E05
void handle_mcu_status_2(const sweeperMsg::CanFrame& msg); // 0x0CF11F05
// ===== EPS =====
void handle_eps_status(const sweeperMsg::CanFrame& msg); // 0x401
// ===== VCU =====
void handle_vcu_fault_12(const sweeperMsg::CanFrame& msg); // 0x216
void handle_vcu_fault_34(const sweeperMsg::CanFrame& msg); // 0x217
private:
VehicleContext& ctx_;
std::unordered_map<uint32_t, CanHandler> handlers_;
};

View File

@ -1,33 +1,28 @@
// fault_codes.h
#pragma once
#include <string>
#include <unordered_map>
#include <iostream>
#include <set>
#include "get_config.h"
#include <string>
#include <unordered_map>
struct FaultInfo
{
std::string subsystem; // 故障设备
std::string error_name; // 故障名称
std::string description; // 中文描述
int level; // 故障等级0-3
std::string subsystem; // 故障设备
std::string error_name; // 故障名称
std::string description; // 中文描述
int level; // 故障等级0-3
};
class ErrorCodeSet
{
public:
public:
// 添加错误码(自动去重)
void addErrorCode(int code)
{
errors.insert(code);
}
void addErrorCode(int code) { errors.insert(code); }
// 删除错误码
void removeErrorCode(int code)
{
errors.erase(code);
}
void removeErrorCode(int code) { errors.erase(code); }
// 打印所有当前错误码
void printErrors() const
@ -41,25 +36,18 @@ public:
}
// 获取所有当前错误码
const std::set<int> &getAllErrorCodes() const
{
return errors;
}
const std::set<int>& getAllErrorCodes() const { return errors; }
// 判断某个错误码是否存在
bool hasError(int code) const
{
return errors.find(code) != errors.end();
}
bool hasError(int code) const { return errors.find(code) != errors.end(); }
private:
private:
std::set<int> errors;
};
// 故障字典声明(在 .cpp 实现里定义)
extern std::unordered_map<int, FaultInfo> faultMap;
extern const uint8_t epsErrorCode[18];
extern ErrorCodeSet vehicle_error_code;
std::string generateFaultJson(int code, const std::string &vin, int64_t timestamp);
std::string generateFaultJson(int code, const std::string& vin, int64_t timestamp);
bool updateFaultLevel(int code, int newLevel);

View File

@ -1,3 +1,5 @@
// get_config.h
#pragma once
#include <chrono>
@ -22,7 +24,8 @@ struct Config
std::string mqtt_username;
std::string mqtt_password;
std::string info_topic_template; // e.g. /zxwl/vehicle/{vid}/info
std::string info_topic_template; // e.g. /zxwl/vehicle/{vid}/info
std::string gps_topic_template;
std::string fault_topic_template; // e.g. /zxwl/vehicle/{vid}/fault
};

View File

@ -1,22 +1,24 @@
// report_struct.h
#pragma once
#include <chrono>
#include <cstdint>
#include <string>
#include <vector>
#include <chrono>
int64_t getCurrentTimestampMs();
struct GeneralMsg
{
float power = 0.0f; // 电量 %
bool chargeStatus = false; // 是否在充电
int gear = 0; // 档位
float speed = 0.0f; // 当前速度 km/h
float steeringAngle = 0.0f; // 当前角度
int motorTemp = 0; // 电机温度 °C
int controllerTemp = 0; // 控制器温度 °C
int64_t timestamp = 0; // 时间戳
float power = 0.0f; // 电量 %
bool chargeStatus = false; // 是否在充电
int gear = 0; // 档位
float speed = 0.0f; // 当前速度 km/h
float steeringAngle = 0.0f; // 当前角度
int motorTemp = 0; // 电机温度 °C
int controllerTemp = 0; // 控制器温度 °C
int64_t timestamp = 0; // 时间戳
};
struct BMSFault

View File

@ -0,0 +1,38 @@
// vehicle_context.h
#pragma once
#include <mutex>
#include <string>
#include "mqtt_report/fault_codes.h"
#include "mqtt_report/report_struct.h"
struct GpsMsg
{
double lat = 0.0;
double lng = 0.0;
double course = 0.0;
int mode = 0;
int64_t timestamp = 0;
bool valid = false;
};
struct VehicleContext
{
std::mutex mtx;
// identity
std::string imei;
std::string vid;
bool ready = false;
// realtime info
GeneralMsg info;
// gps state
GpsMsg gps;
// fault state
ErrorCodeSet errors;
};

View File

@ -1,231 +1,208 @@
// can/can_decoder.cpp
#include "mqtt_report/can/can_decoder.h"
#include <iomanip>
#include <iostream>
#include "mqtt_report/fault_codes.h"
#include <iostream>
#include <iomanip>
CanDecoder::CanDecoder(GeneralMsg &info,
std::mutex &info_mutex,
std::mutex &fault_mutex)
: info_report_(info),
info_mutex_(info_mutex),
fault_mutex_(fault_mutex)
// ===================== ctor =====================
CanDecoder::CanDecoder(VehicleContext& ctx) : ctx_(ctx)
{
// BMS
handlers_[0x100] = [this](const auto& m) { handle_bms_100(m); };
handlers_[0x101] = [this](const auto& m) { handle_bms_101(m); };
// MCU
handlers_[0x0CF11E05] = [this](const auto& m) { handle_mcu_status_1(m); };
handlers_[0x0CF11F05] = [this](const auto& m) { handle_mcu_status_2(m); };
// EPS
handlers_[0x401] = [this](const auto& m) { handle_eps_status(m); };
// VCU
handlers_[0x216] = [this](const auto& m) { handle_vcu_fault_12(m); };
handlers_[0x217] = [this](const auto& m) { handle_vcu_fault_34(m); };
}
// ===================== dispatch =====================
void CanDecoder::handle(const sweeperMsg::CanFrame::SharedPtr msg)
{
switch (msg->id)
auto it = handlers_.find(msg->id);
if (it != handlers_.end()) it->second(*msg);
}
// ===================== BMS =====================
void CanDecoder::handle_bms_100(const sweeperMsg::CanFrame& msg)
{
uint16_t voltage_raw = (msg.data[0] << 8) | msg.data[1];
int16_t current_raw = (msg.data[2] << 8) | msg.data[3];
uint16_t capacity_raw = (msg.data[4] << 8) | msg.data[5];
float voltage = voltage_raw * 0.01f;
float current = current_raw * 0.01f;
float capacity = capacity_raw * 0.01f;
{
case 0x100: // BMS_Status 100
{
uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1];
int16_t current_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5];
float voltage = voltage_raw * 0.01f;
float current = current_raw * 0.01f;
float capacity = capacity_raw * 0.01f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.chargeStatus = (current >= 0);
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x100] 总电压: " << voltage << " V, "
<< "电流: " << current << " A, "
<< "剩余容量: " << capacity << " Ah, "
<< "是否在充电: " << ((current >= 0) ? "" : "") << std::endl;
break;
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.info.chargeStatus = (current >= 0);
}
case 0x101: // BMS_Status 101
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x100] 总电压: " << voltage << " V, "
<< "电流: " << current << " A, "
<< "剩余容量: " << capacity << " Ah, "
<< "是否在充电: " << ((current >= 0) ? "" : "") << std::endl;
}
void CanDecoder::handle_bms_101(const sweeperMsg::CanFrame& msg)
{
uint16_t full_capacity_raw = (msg.data[0] << 8) | msg.data[1];
uint16_t cycle_count_raw = (msg.data[2] << 8) | msg.data[3];
uint16_t rsoc_raw = (msg.data[4] << 8) | msg.data[5];
float full_capacity = full_capacity_raw * 0.01f;
uint16_t cycle_count = cycle_count_raw;
float rsoc = rsoc_raw * 1.0f;
{
uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1];
uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5];
float full_capacity = full_capacity_raw * 0.01f;
uint16_t cycle_count = cycle_count_raw;
float rsoc = rsoc_raw * 1.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.power = rsoc;
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, "
<< "循环次数: " << cycle_count << " 次, "
<< "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl;
break;
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.info.power = rsoc;
}
case 0x0CF11E05: // MCU_Status_1 + MCU_Fault
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, "
<< "循环次数: " << cycle_count << " 次, "
<< "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl;
}
// ===================== MCU =====================
void CanDecoder::handle_mcu_status_1(const sweeperMsg::CanFrame& msg)
{
uint16_t raw_speed = (static_cast<uint16_t>(msg.data[1]) << 8) | msg.data[0];
float actual_speed_rpm = static_cast<float>(raw_speed);
float gear_ratio = 32.0f;
float wheel_radius = 0.3f;
float speed_kmh = (actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) / (1000.0f * gear_ratio);
uint16_t mcu_fault_code = (static_cast<uint16_t>(msg.data[7]) << 8) | msg.data[6];
{
uint16_t raw_speed =
(static_cast<uint16_t>(msg->data[1]) << 8) | msg->data[0];
float actual_speed_rpm = static_cast<float>(raw_speed);
std::lock_guard<std::mutex> lock(ctx_.mtx);
float gear_ratio = 32.0f;
float wheel_radius = 0.3f;
float speed_kmh =
(actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) /
(1000.0f * gear_ratio);
// info
ctx_.info.speed = speed_kmh;
// faults: 先清 MCU 故障区
for (int code = 1101; code <= 1116; ++code) ctx_.errors.removeErrorCode(code);
for (int i = 0; i < 16; ++i)
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.speed = speed_kmh;
}
bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0;
int error_code = 1101 + i;
uint16_t mcu_fault_code =
(static_cast<uint16_t>(msg->data[7]) << 8) | msg->data[6];
{
std::lock_guard<std::mutex> lock(fault_mutex_);
// 先清 MCU 故障区
for (int code = 1101; code <= 1116; ++code)
vehicle_error_code.removeErrorCode(code);
for (int i = 0; i < 16; ++i)
if (is_fault && faultMap.count(error_code))
{
bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0;
int error_code = 1101 + i;
if (is_fault && faultMap.count(error_code))
{
vehicle_error_code.addErrorCode(error_code);
updateFaultLevel(error_code, 1);
}
else
{
vehicle_error_code.removeErrorCode(error_code);
}
ctx_.errors.addErrorCode(error_code);
updateFaultLevel(error_code, 1);
}
else
{
ctx_.errors.removeErrorCode(error_code);
}
}
break;
}
case 0x0CF11F05: // MCU_Status_2
{
int raw_ctrl_temp = static_cast<int>(msg->data[2]);
int raw_motor_temp = static_cast<int>(msg->data[3]);
uint8_t ctrl_status = msg->data[4];
int controllerTemp = raw_ctrl_temp - 40;
int motorTemp = raw_motor_temp - 30;
int feedback_gear = (ctrl_status >> 2) & 0x03;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.controllerTemp = controllerTemp;
info_report_.motorTemp = motorTemp;
info_report_.gear = feedback_gear;
}
break;
}
case 0x401: // EPS_Status + EPS_Fault
{
uint16_t raw_value =
static_cast<uint16_t>((msg->data[3] << 8) | msg->data[4]);
float steeringAngle = (raw_value - 1024) / 7.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.steeringAngle = steeringAngle;
}
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1201; code <= 1218; ++code)
vehicle_error_code.removeErrorCode(code);
uint8_t data_bytes[2] = {msg->data[2], msg->data[6]};
for (uint8_t byte : data_bytes)
{
for (int i = 0; i < 18; ++i)
{
if (byte == epsErrorCode[i])
{
vehicle_error_code.addErrorCode(1201 + i);
break;
}
}
}
}
break;
}
case 0x216: // VCU_Fault_1/2
{
uint32_t fault1_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault2_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1701; code <= 1764; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault1_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1701 + bit);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault2_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1733 + bit);
}
break;
}
case 0x217: // VCU_Fault_3/4
{
uint32_t fault3_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault4_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1765; code <= 1814; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault3_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1765 + bit);
for (uint8_t bit = 0; bit <= 18; ++bit)
if ((fault4_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1797 + bit);
}
break;
}
default:
break;
}
}
void CanDecoder::handle_mcu_status_2(const sweeperMsg::CanFrame& msg)
{
int raw_ctrl_temp = static_cast<int>(msg.data[2]);
int raw_motor_temp = static_cast<int>(msg.data[3]);
uint8_t ctrl_status = msg.data[4];
int controllerTemp = raw_ctrl_temp - 40;
int motorTemp = raw_motor_temp - 30;
int feedback_gear = (ctrl_status >> 2) & 0x03;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.info.controllerTemp = controllerTemp;
ctx_.info.motorTemp = motorTemp;
ctx_.info.gear = feedback_gear;
}
}
// ===================== EPS =====================
void CanDecoder::handle_eps_status(const sweeperMsg::CanFrame& msg)
{
uint16_t raw_value = static_cast<uint16_t>((msg.data[3] << 8) | msg.data[4]);
float steeringAngle = (raw_value - 1024) / 7.0f;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
// info
ctx_.info.steeringAngle = steeringAngle;
// faults
for (int code = 1201; code <= 1218; ++code) ctx_.errors.removeErrorCode(code);
uint8_t data_bytes[2] = {msg.data[2], msg.data[6]};
for (uint8_t byte : data_bytes)
{
for (int i = 0; i < 18; ++i)
{
if (byte == epsErrorCode[i])
{
ctx_.errors.addErrorCode(1201 + i);
break;
}
}
}
}
}
// ===================== VCU =====================
void CanDecoder::handle_vcu_fault_12(const sweeperMsg::CanFrame& msg)
{
uint32_t fault1_bits = (static_cast<uint32_t>(msg.data[0]) << 0) | (static_cast<uint32_t>(msg.data[1]) << 8) |
(static_cast<uint32_t>(msg.data[2]) << 16) | (static_cast<uint32_t>(msg.data[3]) << 24);
uint32_t fault2_bits = (static_cast<uint32_t>(msg.data[4]) << 0) | (static_cast<uint32_t>(msg.data[5]) << 8) |
(static_cast<uint32_t>(msg.data[6]) << 16) | (static_cast<uint32_t>(msg.data[7]) << 24);
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
for (int code = 1701; code <= 1764; ++code) ctx_.errors.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault1_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1701 + bit);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault2_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1733 + bit);
}
}
void CanDecoder::handle_vcu_fault_34(const sweeperMsg::CanFrame& msg)
{
uint32_t fault3_bits = (static_cast<uint32_t>(msg.data[0]) << 0) | (static_cast<uint32_t>(msg.data[1]) << 8) |
(static_cast<uint32_t>(msg.data[2]) << 16) | (static_cast<uint32_t>(msg.data[3]) << 24);
uint32_t fault4_bits = (static_cast<uint32_t>(msg.data[4]) << 0) | (static_cast<uint32_t>(msg.data[5]) << 8) |
(static_cast<uint32_t>(msg.data[6]) << 16) | (static_cast<uint32_t>(msg.data[7]) << 24);
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
for (int code = 1765; code <= 1814; ++code) ctx_.errors.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault3_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1765 + bit);
for (uint8_t bit = 0; bit <= 18; ++bit)
if ((fault4_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1797 + bit);
}
}

View File

@ -1,9 +1,16 @@
// fault_codes.cpp
#include "mqtt_report/fault_codes.h"
ErrorCodeSet vehicle_error_code; // 整车错误码
#include "nlohmann/json.hpp"
using json = nlohmann::json;
using ordered_json = nlohmann::ordered_json;
ErrorCodeSet vehicle_error_code; // 整车错误码
// 根据错误码列表生成错误上报json
std::string generateFaultJson(int code, const std::string &vin, int64_t timestamp)
std::string generateFaultJson(int code, const std::string& vin, int64_t timestamp)
{
auto it = faultMap.find(code);
if (it == faultMap.end())
@ -11,7 +18,7 @@ std::string generateFaultJson(int code, const std::string &vin, int64_t timestam
throw std::runtime_error("错误码不存在");
}
const FaultInfo &info = it->second;
const FaultInfo& info = it->second;
ordered_json j;
j["vin"] = vin;
@ -37,7 +44,8 @@ bool updateFaultLevel(int code, int newLevel)
return true;
}
const uint8_t epsErrorCode[18] = {0x10, 0x14, 0x12, 0x21, 0x22, 0x23, 0x24, 0x25, 0x32, 0x33, 0x35, 0x37, 0x43, 0x46, 0x51, 0x55, 0x61, 0x62};
const uint8_t epsErrorCode[18] = {0x10, 0x14, 0x12, 0x21, 0x22, 0x23, 0x24, 0x25, 0x32,
0x33, 0x35, 0x37, 0x43, 0x46, 0x51, 0x55, 0x61, 0x62};
std::unordered_map<int, FaultInfo> faultMap = {
{1001, {"BMS", "BMS_TEMP_HIGH_ERR", "温度过高", 0}},

View File

@ -1,3 +1,5 @@
// get_config.cpp
#include "mqtt_report/get_config.h"
bool load_config(const std::string& path, Config& config)
@ -22,6 +24,7 @@ bool load_config(const std::string& path, Config& config)
config.mqtt_password = mqtt.at("password").get<std::string>();
config.info_topic_template = mqtt.at("info_topic").get<std::string>();
config.gps_topic_template = mqtt.at("gps_topic").get<std::string>();
config.fault_topic_template = mqtt.at("fault_topic").get<std::string>();
return true;

View File

@ -1,21 +1,17 @@
// mqtt_report_node.cpp
#include <algorithm>
#include <bitset>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <vector>
#include "mqtt_report/can/can_decoder.h"
#include "mqtt_report/fault_codes.h"
#include "mqtt_report/get_config.h"
#include "mqtt_report/mqtt_client.hpp"
#include "mqtt_report/report_struct.h"
#include "mqtt_report/vehicle_context.h"
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "sweeper_interfaces/msg/rtk.hpp"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
namespace sweeperMsg = sweeper_interfaces::msg;
@ -23,6 +19,19 @@ using sweeper_interfaces::msg::VehicleIdentity;
int mqtt_reconnect_interval = 5000; // ms
static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000; // 每 10s 警告一次
static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000;
static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000;
static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000;
struct InputMonitor
{
bool ok = false;
bool sticky = false; // 是否一次成功就永久 OK
int64_t last_rx_ts = 0;
int64_t last_warn_ts = 0;
};
// ===================== JSON 打包 =====================
static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei)
{
@ -39,6 +48,19 @@ static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::s
return j.dump(-1);
}
static std::string pack_gps_to_json(const GpsMsg& gps, const std::string& imei)
{
ordered_json j;
j["imei"] = imei;
j["lat"] = gps.lat;
j["lng"] = gps.lng;
j["course"] = gps.course;
j["mode"] = gps.mode;
j["gps_timestamp"] = gps.timestamp;
j["report_timestamp"] = getCurrentTimestampMs();
return j.dump(-1);
}
static std::string make_topic(const std::string& tpl, const std::string& vid)
{
std::string topic = tpl;
@ -60,21 +82,28 @@ class CanDataSubscriber : public rclcpp::Node
config_(cfg),
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
cfg.mqtt_password, mqtt_reconnect_interval),
can_decoder_(info_report_, info_mutex_, fault_mutex_)
can_decoder_(ctx_)
{
// CAN
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
// Identity(关键)
// Identity
identity_sub_ = this->create_subscription<VehicleIdentity>(
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1));
// rtk
rtk_sub_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
"rtk_message", 10, std::bind(&CanDataSubscriber::rtk_callback, this, std::placeholders::_1));
// Timers
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::info_timer_callback, this));
gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200),
std::bind(&CanDataSubscriber::gps_timer_callback, this));
fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::fault_timer_callback, this));
@ -85,67 +114,77 @@ class CanDataSubscriber : public rclcpp::Node
~CanDataSubscriber() override
{
if (info_timer_) info_timer_->cancel();
if (gps_timer_) gps_timer_->cancel();
if (fault_timer_) fault_timer_->cancel();
if (connection_check_timer_) connection_check_timer_->cancel();
}
private:
// ===================== identity =====================
bool snapshot_identity(std::string& imei, std::string& vid)
{
std::lock_guard<std::mutex> lock(identity_mutex_);
if (!vehicle_ready_) return false;
imei = vehicle_imei_;
vid = vehicle_vid_;
return true;
}
void identity_callback(const VehicleIdentity::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(identity_mutex_);
vehicle_imei_ = msg->imei;
vehicle_vid_ = msg->vid;
vehicle_ready_ = msg->ready;
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.imei = msg->imei;
ctx_.vid = msg->vid;
ctx_.ready = msg->ready;
RCLCPP_INFO(this->get_logger(), "Vehicle identity received: IMEI=%s, VID=%s", vehicle_imei_.c_str(),
vehicle_vid_.c_str());
int64_t now = getCurrentTimestampMs();
if (!identity_mon_.ok)
{
RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", ctx_.imei.c_str(), ctx_.vid.c_str());
}
identity_mon_.ok = true;
identity_mon_.last_rx_ts = now;
}
bool snapshot_identity(std::string& imei, std::string& vid)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.ready) return false;
imei = ctx_.imei;
vid = ctx_.vid;
return true;
}
// ===================== timers =====================
void info_timer_callback()
{
if (!rclcpp::ok()) return;
std::string imei, vid;
if (!snapshot_identity(imei, vid))
{
RCLCPP_WARN_THROTTLE(this->get_logger(), *this->get_clock(), 5000,
"Vehicle identity not ready, skip MQTT publish");
return;
}
if (!snapshot_identity(imei, vid)) return;
const std::string topic = make_topic(config_.info_topic_template, vid);
GeneralMsg snapshot;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.timestamp = getCurrentTimestampMs();
snapshot = info_report_;
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.info.timestamp = getCurrentTimestampMs();
snapshot = ctx_.info;
}
const std::string payload = pack_general_info_to_json(snapshot, imei);
mqtt_client_.publish(topic, pack_general_info_to_json(snapshot, imei), 1);
}
if (!mqtt_client_.publish(topic, payload, 1))
void gps_timer_callback()
{
std::string imei, vid;
if (!snapshot_identity(imei, vid)) return;
GpsMsg snapshot;
{
RCLCPP_WARN(this->get_logger(), "Failed to publish info message to MQTT");
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.gps.valid) return;
snapshot = ctx_.gps;
}
const std::string topic = make_topic(config_.gps_topic_template, vid);
mqtt_client_.publish(topic, pack_gps_to_json(snapshot, imei), 1);
}
void fault_timer_callback()
{
if (!rclcpp::ok()) return;
std::string imei, vid;
if (!snapshot_identity(imei, vid)) return;
@ -153,61 +192,126 @@ class CanDataSubscriber : public rclcpp::Node
std::vector<int> codes;
{
std::lock_guard<std::mutex> lock(fault_mutex_);
const auto& err_set = vehicle_error_code.getAllErrorCodes();
std::lock_guard<std::mutex> lock(ctx_.mtx);
const auto& err_set = ctx_.errors.getAllErrorCodes();
codes.assign(err_set.begin(), err_set.end());
}
for (int code : codes)
{
const std::string payload = generateFaultJson(code, vid, getCurrentTimestampMs());
if (!mqtt_client_.publish(topic, payload, 0))
{
RCLCPP_WARN(this->get_logger(), "Failed to publish fault code %d to MQTT", code);
}
mqtt_client_.publish(topic, generateFaultJson(code, vid, getCurrentTimestampMs()), 0);
}
}
void check_connection_callback()
{
int64_t now = getCurrentTimestampMs();
if (!mqtt_client_.isConnected())
{
RCLCPP_WARN(this->get_logger(), "MQTT connection lost.");
RCLCPP_WARN(get_logger(), "MQTT connection lost.");
}
check_input("vehicle identity (/vehicle/identity)", identity_mon_, now, IDENTITY_TIMEOUT_MS);
check_input("GPS (/rtk_message)", gps_mon_, now, GPS_TIMEOUT_MS);
check_input("CAN (/can_data)", can_mon_, now, CAN_TIMEOUT_MS);
}
void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms)
{
// sticky 输入:一旦 OK不再 timeout
if (mon.sticky && mon.ok) return;
// 从 OK → 不 OK流断了
if (mon.ok && now - mon.last_rx_ts > timeout_ms)
{
mon.ok = false;
RCLCPP_WARN(get_logger(), "%s timeout (%ld ms without data).", name, now - mon.last_rx_ts);
mon.last_warn_ts = now;
return;
}
// 一直不 OK → 低频 WARN
if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS)
{
RCLCPP_WARN(get_logger(), "Waiting for %s...", name);
mon.last_warn_ts = now;
}
}
// ===================== CAN handler =====================
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) { can_decoder_.handle(msg); }
// ========== CAN ==========
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
{
can_decoder_.handle(msg);
int64_t now = getCurrentTimestampMs();
if (!can_mon_.ok)
{
RCLCPP_INFO(get_logger(), "CAN data stream active (/can_data).");
}
can_mon_.ok = true;
can_mon_.last_rx_ts = now;
}
// ========== RTK ==========
void rtk_callback(const sweeper_interfaces::msg::Rtk::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.gps.valid = true;
ctx_.gps.lat = msg->lat;
ctx_.gps.lng = msg->lon;
ctx_.gps.course = msg->head;
if (msg->p_quality == 5)
ctx_.gps.mode = 2;
else if (msg->p_quality == 4)
ctx_.gps.mode = 3;
else
ctx_.gps.mode = msg->p_quality;
ctx_.gps.timestamp = getCurrentTimestampMs();
ctx_.gps.valid = true;
int64_t now = ctx_.gps.timestamp;
if (!gps_mon_.ok)
{
RCLCPP_INFO(get_logger(), "GPS data stream active (/rtk_message).");
}
gps_mon_.ok = true;
gps_mon_.last_rx_ts = now;
}
private:
// ROS
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_;
// CAN decoder
VehicleContext ctx_;
CanDecoder can_decoder_;
Config config_;
// MQTT
MQTTClientWrapper mqtt_client_;
// Data
GeneralMsg info_report_;
std::mutex info_mutex_;
// ROS
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
rclcpp::Subscription<sweeper_interfaces::msg::Rtk>::SharedPtr rtk_sub_;
// CAN decoder
CanDecoder can_decoder_;
rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr gps_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_;
std::mutex fault_mutex_;
// Identity cache
std::mutex identity_mutex_;
std::string vehicle_imei_;
std::string vehicle_vid_;
bool vehicle_ready_ = false;
InputMonitor identity_mon_{false, true};
InputMonitor gps_mon_;
InputMonitor can_mon_;
};
// ===================== main =====================
@ -229,18 +333,20 @@ int main(int argc, char** argv)
Config config;
auto logger = rclcpp::get_logger("main");
if (!load_config(config_path, config))
{
RCLCPP_ERROR(rclcpp::get_logger("main"), "Failed to load config: %s", config_path.c_str());
RCLCPP_ERROR(logger, "Failed to load config: %s", config_path.c_str());
return 1;
}
if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000;
RCLCPP_INFO(rclcpp::get_logger("main"), "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(),
config.mqtt_port, config.mqtt_username.empty() ? "<anonymous>" : config.mqtt_username.c_str());
RCLCPP_INFO(logger, "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(), config.mqtt_port,
config.mqtt_username.empty() ? "<anonymous>" : config.mqtt_username.c_str());
RCLCPP_INFO(rclcpp::get_logger("main"), "Topic templates: info='%s', fault='%s'",
RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(),
config.info_topic_template.c_str(), config.fault_topic_template.c_str());
auto node = std::make_shared<CanDataSubscriber>(config);

View File

@ -1,10 +1,11 @@
// report_struct.cpp
#include "mqtt_report/report_struct.h"
#include <chrono>
int64_t getCurrentTimestampMs()
{
using namespace std::chrono;
return duration_cast<milliseconds>(
system_clock::now().time_since_epoch())
.count();
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}