From 43b8299e6a287618837b6875cf195eab1572746e Mon Sep 17 00:00:00 2001 From: lyq Date: Wed, 14 Jan 2026 16:44:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6=E5=AE=9A=E4=BD=8D=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E4=B8=8A=E6=8A=A5=E8=8A=82=E7=82=B9=E8=87=B3mqtt?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json | 5 +- .../include/mqtt_report/can/can_decoder.h | 36 +- .../include/mqtt_report/fault_codes.h | 42 +- .../include/mqtt_report/get_config.h | 5 +- .../include/mqtt_report/report_struct.h | 20 +- .../include/mqtt_report/vehicle_context.h | 38 ++ .../mqtt_report/src/can/can_decoder.cpp | 391 +++++++++--------- .../mqtt_report/src/fault_codes.cpp | 16 +- .../mqtt_report/src/get_config.cpp | 3 + .../mqtt_report/src/mqtt_report_node.cpp | 254 ++++++++---- .../mqtt_report/src/report_struct.cpp | 7 +- 11 files changed, 480 insertions(+), 337 deletions(-) create mode 100644 src/communication/mqtt_report/include/mqtt_report/vehicle_context.h diff --git a/config.json b/config.json index a93ced4..28b174f 100644 --- a/config.json +++ b/config.json @@ -4,8 +4,9 @@ "external_net_port": 19683, "username": "zxwl", "password": "zxwl1234@", - "info_topic": "/zxwl/vehicle/{vid}/info", - "fault_topic": "/zxwl/vehicle/{vid}/fault", + "info_topic": "/zxwl/sweeper/{vid}/info", + "fault_topic": "/zxwl/sweeper/{vid}/fault", + "gps_topic": "/zxwl/sweeper/{vid}/gps", "pub_gps_topic": "/zxwl/sweeper/V060003/gps", "remote_topic": "/zxwl/sweeper/V060003/ctrl", "upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload", diff --git a/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h b/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h index a4d427d..62b8cd9 100644 --- a/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h +++ b/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h @@ -1,22 +1,38 @@ +// can_decoder.h + #pragma once -#include +#include "mqtt_report/vehicle_context.h" #include "sweeper_interfaces/msg/can_frame.hpp" -#include "mqtt_report/report_struct.h" namespace sweeperMsg = sweeper_interfaces::msg; class CanDecoder { -public: - CanDecoder(GeneralMsg &info, - std::mutex &info_mutex, - std::mutex &fault_mutex); + public: + explicit CanDecoder(VehicleContext& ctx); void handle(const sweeperMsg::CanFrame::SharedPtr msg); -private: - GeneralMsg &info_report_; - std::mutex &info_mutex_; - std::mutex &fault_mutex_; + private: + using CanHandler = std::function; + + // ===== BMS ===== + void handle_bms_100(const sweeperMsg::CanFrame& msg); + void handle_bms_101(const sweeperMsg::CanFrame& msg); + + // ===== MCU ===== + void handle_mcu_status_1(const sweeperMsg::CanFrame& msg); // 0x0CF11E05 + void handle_mcu_status_2(const sweeperMsg::CanFrame& msg); // 0x0CF11F05 + + // ===== EPS ===== + void handle_eps_status(const sweeperMsg::CanFrame& msg); // 0x401 + + // ===== VCU ===== + void handle_vcu_fault_12(const sweeperMsg::CanFrame& msg); // 0x216 + void handle_vcu_fault_34(const sweeperMsg::CanFrame& msg); // 0x217 + + private: + VehicleContext& ctx_; + std::unordered_map handlers_; }; diff --git a/src/communication/mqtt_report/include/mqtt_report/fault_codes.h b/src/communication/mqtt_report/include/mqtt_report/fault_codes.h index 29cf4e6..86b39c1 100644 --- a/src/communication/mqtt_report/include/mqtt_report/fault_codes.h +++ b/src/communication/mqtt_report/include/mqtt_report/fault_codes.h @@ -1,33 +1,28 @@ +// fault_codes.h + #pragma once -#include -#include #include #include -#include "get_config.h" +#include +#include struct FaultInfo { - std::string subsystem; // 故障设备 - std::string error_name; // 故障名称 - std::string description; // 中文描述 - int level; // 故障等级,0-3 + std::string subsystem; // 故障设备 + std::string error_name; // 故障名称 + std::string description; // 中文描述 + int level; // 故障等级,0-3 }; class ErrorCodeSet { -public: + public: // 添加错误码(自动去重) - void addErrorCode(int code) - { - errors.insert(code); - } + void addErrorCode(int code) { errors.insert(code); } // 删除错误码 - void removeErrorCode(int code) - { - errors.erase(code); - } + void removeErrorCode(int code) { errors.erase(code); } // 打印所有当前错误码 void printErrors() const @@ -41,25 +36,18 @@ public: } // 获取所有当前错误码 - const std::set &getAllErrorCodes() const - { - return errors; - } + const std::set& getAllErrorCodes() const { return errors; } // 判断某个错误码是否存在 - bool hasError(int code) const - { - return errors.find(code) != errors.end(); - } + bool hasError(int code) const { return errors.find(code) != errors.end(); } -private: + private: std::set errors; }; // 故障字典声明(在 .cpp 实现里定义) extern std::unordered_map faultMap; extern const uint8_t epsErrorCode[18]; -extern ErrorCodeSet vehicle_error_code; -std::string generateFaultJson(int code, const std::string &vin, int64_t timestamp); +std::string generateFaultJson(int code, const std::string& vin, int64_t timestamp); bool updateFaultLevel(int code, int newLevel); \ No newline at end of file diff --git a/src/communication/mqtt_report/include/mqtt_report/get_config.h b/src/communication/mqtt_report/include/mqtt_report/get_config.h index 99e4981..b0d4ee5 100644 --- a/src/communication/mqtt_report/include/mqtt_report/get_config.h +++ b/src/communication/mqtt_report/include/mqtt_report/get_config.h @@ -1,3 +1,5 @@ +// get_config.h + #pragma once #include @@ -22,7 +24,8 @@ struct Config std::string mqtt_username; std::string mqtt_password; - std::string info_topic_template; // e.g. /zxwl/vehicle/{vid}/info + std::string info_topic_template; // e.g. /zxwl/vehicle/{vid}/info + std::string gps_topic_template; std::string fault_topic_template; // e.g. /zxwl/vehicle/{vid}/fault }; diff --git a/src/communication/mqtt_report/include/mqtt_report/report_struct.h b/src/communication/mqtt_report/include/mqtt_report/report_struct.h index a10c143..163277b 100644 --- a/src/communication/mqtt_report/include/mqtt_report/report_struct.h +++ b/src/communication/mqtt_report/include/mqtt_report/report_struct.h @@ -1,22 +1,24 @@ +// report_struct.h + #pragma once +#include #include #include #include -#include int64_t getCurrentTimestampMs(); struct GeneralMsg { - float power = 0.0f; // 电量 % - bool chargeStatus = false; // 是否在充电 - int gear = 0; // 档位 - float speed = 0.0f; // 当前速度 km/h - float steeringAngle = 0.0f; // 当前角度 - int motorTemp = 0; // 电机温度 °C - int controllerTemp = 0; // 控制器温度 °C - int64_t timestamp = 0; // 时间戳 + float power = 0.0f; // 电量 % + bool chargeStatus = false; // 是否在充电 + int gear = 0; // 档位 + float speed = 0.0f; // 当前速度 km/h + float steeringAngle = 0.0f; // 当前角度 + int motorTemp = 0; // 电机温度 °C + int controllerTemp = 0; // 控制器温度 °C + int64_t timestamp = 0; // 时间戳 }; struct BMSFault diff --git a/src/communication/mqtt_report/include/mqtt_report/vehicle_context.h b/src/communication/mqtt_report/include/mqtt_report/vehicle_context.h new file mode 100644 index 0000000..7fd21fa --- /dev/null +++ b/src/communication/mqtt_report/include/mqtt_report/vehicle_context.h @@ -0,0 +1,38 @@ +// vehicle_context.h + +#pragma once + +#include +#include + +#include "mqtt_report/fault_codes.h" +#include "mqtt_report/report_struct.h" + +struct GpsMsg +{ + double lat = 0.0; + double lng = 0.0; + double course = 0.0; + int mode = 0; + int64_t timestamp = 0; + bool valid = false; +}; + +struct VehicleContext +{ + std::mutex mtx; + + // identity + std::string imei; + std::string vid; + bool ready = false; + + // realtime info + GeneralMsg info; + + // gps state + GpsMsg gps; + + // fault state + ErrorCodeSet errors; +}; diff --git a/src/communication/mqtt_report/src/can/can_decoder.cpp b/src/communication/mqtt_report/src/can/can_decoder.cpp index 20cad4c..5a85bd5 100644 --- a/src/communication/mqtt_report/src/can/can_decoder.cpp +++ b/src/communication/mqtt_report/src/can/can_decoder.cpp @@ -1,231 +1,208 @@ +// can/can_decoder.cpp + #include "mqtt_report/can/can_decoder.h" + +#include +#include + #include "mqtt_report/fault_codes.h" -#include -#include - -CanDecoder::CanDecoder(GeneralMsg &info, - std::mutex &info_mutex, - std::mutex &fault_mutex) - : info_report_(info), - info_mutex_(info_mutex), - fault_mutex_(fault_mutex) +// ===================== ctor ===================== +CanDecoder::CanDecoder(VehicleContext& ctx) : ctx_(ctx) { + // BMS + handlers_[0x100] = [this](const auto& m) { handle_bms_100(m); }; + handlers_[0x101] = [this](const auto& m) { handle_bms_101(m); }; + + // MCU + handlers_[0x0CF11E05] = [this](const auto& m) { handle_mcu_status_1(m); }; + handlers_[0x0CF11F05] = [this](const auto& m) { handle_mcu_status_2(m); }; + + // EPS + handlers_[0x401] = [this](const auto& m) { handle_eps_status(m); }; + + // VCU + handlers_[0x216] = [this](const auto& m) { handle_vcu_fault_12(m); }; + handlers_[0x217] = [this](const auto& m) { handle_vcu_fault_34(m); }; } +// ===================== dispatch ===================== void CanDecoder::handle(const sweeperMsg::CanFrame::SharedPtr msg) { - switch (msg->id) + auto it = handlers_.find(msg->id); + if (it != handlers_.end()) it->second(*msg); +} + +// ===================== BMS ===================== +void CanDecoder::handle_bms_100(const sweeperMsg::CanFrame& msg) +{ + uint16_t voltage_raw = (msg.data[0] << 8) | msg.data[1]; + int16_t current_raw = (msg.data[2] << 8) | msg.data[3]; + uint16_t capacity_raw = (msg.data[4] << 8) | msg.data[5]; + + float voltage = voltage_raw * 0.01f; + float current = current_raw * 0.01f; + float capacity = capacity_raw * 0.01f; + { - case 0x100: // BMS_Status 100 - { - uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1]; - int16_t current_raw = (msg->data[2] << 8) | msg->data[3]; - uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5]; - - float voltage = voltage_raw * 0.01f; - float current = current_raw * 0.01f; - float capacity = capacity_raw * 0.01f; - - { - std::lock_guard lock(info_mutex_); - info_report_.chargeStatus = (current >= 0); - } - - std::cout << std::fixed << std::setprecision(2); - std::cout << "[0x100] 总电压: " << voltage << " V, " - << "电流: " << current << " A, " - << "剩余容量: " << capacity << " Ah, " - << "是否在充电: " << ((current >= 0) ? "是" : "否") << std::endl; - break; + std::lock_guard lock(ctx_.mtx); + ctx_.info.chargeStatus = (current >= 0); } - case 0x101: // BMS_Status 101 + std::cout << std::fixed << std::setprecision(2); + std::cout << "[0x100] 总电压: " << voltage << " V, " + << "电流: " << current << " A, " + << "剩余容量: " << capacity << " Ah, " + << "是否在充电: " << ((current >= 0) ? "是" : "否") << std::endl; +} + +void CanDecoder::handle_bms_101(const sweeperMsg::CanFrame& msg) +{ + uint16_t full_capacity_raw = (msg.data[0] << 8) | msg.data[1]; + uint16_t cycle_count_raw = (msg.data[2] << 8) | msg.data[3]; + uint16_t rsoc_raw = (msg.data[4] << 8) | msg.data[5]; + + float full_capacity = full_capacity_raw * 0.01f; + uint16_t cycle_count = cycle_count_raw; + float rsoc = rsoc_raw * 1.0f; + { - uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1]; - uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3]; - uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5]; - - float full_capacity = full_capacity_raw * 0.01f; - uint16_t cycle_count = cycle_count_raw; - float rsoc = rsoc_raw * 1.0f; - - { - std::lock_guard lock(info_mutex_); - info_report_.power = rsoc; - } - - std::cout << std::fixed << std::setprecision(2); - std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, " - << "循环次数: " << cycle_count << " 次, " - << "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl; - break; + std::lock_guard lock(ctx_.mtx); + ctx_.info.power = rsoc; } - case 0x0CF11E05: // MCU_Status_1 + MCU_Fault + std::cout << std::fixed << std::setprecision(2); + std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, " + << "循环次数: " << cycle_count << " 次, " + << "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl; +} + +// ===================== MCU ===================== +void CanDecoder::handle_mcu_status_1(const sweeperMsg::CanFrame& msg) +{ + uint16_t raw_speed = (static_cast(msg.data[1]) << 8) | msg.data[0]; + float actual_speed_rpm = static_cast(raw_speed); + + float gear_ratio = 32.0f; + float wheel_radius = 0.3f; + float speed_kmh = (actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) / (1000.0f * gear_ratio); + + uint16_t mcu_fault_code = (static_cast(msg.data[7]) << 8) | msg.data[6]; + { - uint16_t raw_speed = - (static_cast(msg->data[1]) << 8) | msg->data[0]; - float actual_speed_rpm = static_cast(raw_speed); + std::lock_guard lock(ctx_.mtx); - float gear_ratio = 32.0f; - float wheel_radius = 0.3f; - float speed_kmh = - (actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) / - (1000.0f * gear_ratio); + // info + ctx_.info.speed = speed_kmh; + // faults: 先清 MCU 故障区 + for (int code = 1101; code <= 1116; ++code) ctx_.errors.removeErrorCode(code); + + for (int i = 0; i < 16; ++i) { - std::lock_guard lock(info_mutex_); - info_report_.speed = speed_kmh; - } + bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0; + int error_code = 1101 + i; - uint16_t mcu_fault_code = - (static_cast(msg->data[7]) << 8) | msg->data[6]; - - { - std::lock_guard lock(fault_mutex_); - - // 先清 MCU 故障区 - for (int code = 1101; code <= 1116; ++code) - vehicle_error_code.removeErrorCode(code); - - for (int i = 0; i < 16; ++i) + if (is_fault && faultMap.count(error_code)) { - bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0; - int error_code = 1101 + i; - - if (is_fault && faultMap.count(error_code)) - { - vehicle_error_code.addErrorCode(error_code); - updateFaultLevel(error_code, 1); - } - else - { - vehicle_error_code.removeErrorCode(error_code); - } + ctx_.errors.addErrorCode(error_code); + updateFaultLevel(error_code, 1); + } + else + { + ctx_.errors.removeErrorCode(error_code); } } - - break; - } - - case 0x0CF11F05: // MCU_Status_2 - { - int raw_ctrl_temp = static_cast(msg->data[2]); - int raw_motor_temp = static_cast(msg->data[3]); - uint8_t ctrl_status = msg->data[4]; - - int controllerTemp = raw_ctrl_temp - 40; - int motorTemp = raw_motor_temp - 30; - int feedback_gear = (ctrl_status >> 2) & 0x03; - - { - std::lock_guard lock(info_mutex_); - info_report_.controllerTemp = controllerTemp; - info_report_.motorTemp = motorTemp; - info_report_.gear = feedback_gear; - } - break; - } - - case 0x401: // EPS_Status + EPS_Fault - { - uint16_t raw_value = - static_cast((msg->data[3] << 8) | msg->data[4]); - float steeringAngle = (raw_value - 1024) / 7.0f; - - { - std::lock_guard lock(info_mutex_); - info_report_.steeringAngle = steeringAngle; - } - - { - std::lock_guard lock(fault_mutex_); - - for (int code = 1201; code <= 1218; ++code) - vehicle_error_code.removeErrorCode(code); - - uint8_t data_bytes[2] = {msg->data[2], msg->data[6]}; - for (uint8_t byte : data_bytes) - { - for (int i = 0; i < 18; ++i) - { - if (byte == epsErrorCode[i]) - { - vehicle_error_code.addErrorCode(1201 + i); - break; - } - } - } - } - break; - } - - case 0x216: // VCU_Fault_1/2 - { - uint32_t fault1_bits = - (static_cast(msg->data[0]) << 0) | - (static_cast(msg->data[1]) << 8) | - (static_cast(msg->data[2]) << 16) | - (static_cast(msg->data[3]) << 24); - - uint32_t fault2_bits = - (static_cast(msg->data[4]) << 0) | - (static_cast(msg->data[5]) << 8) | - (static_cast(msg->data[6]) << 16) | - (static_cast(msg->data[7]) << 24); - - { - std::lock_guard lock(fault_mutex_); - - for (int code = 1701; code <= 1764; ++code) - vehicle_error_code.removeErrorCode(code); - - for (uint8_t bit = 0; bit <= 31; ++bit) - if ((fault1_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1701 + bit); - - for (uint8_t bit = 0; bit <= 31; ++bit) - if ((fault2_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1733 + bit); - } - - break; - } - - case 0x217: // VCU_Fault_3/4 - { - uint32_t fault3_bits = - (static_cast(msg->data[0]) << 0) | - (static_cast(msg->data[1]) << 8) | - (static_cast(msg->data[2]) << 16) | - (static_cast(msg->data[3]) << 24); - - uint32_t fault4_bits = - (static_cast(msg->data[4]) << 0) | - (static_cast(msg->data[5]) << 8) | - (static_cast(msg->data[6]) << 16) | - (static_cast(msg->data[7]) << 24); - - { - std::lock_guard lock(fault_mutex_); - - for (int code = 1765; code <= 1814; ++code) - vehicle_error_code.removeErrorCode(code); - - for (uint8_t bit = 0; bit <= 31; ++bit) - if ((fault3_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1765 + bit); - - for (uint8_t bit = 0; bit <= 18; ++bit) - if ((fault4_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1797 + bit); - } - - break; - } - - default: - break; + } +} + +void CanDecoder::handle_mcu_status_2(const sweeperMsg::CanFrame& msg) +{ + int raw_ctrl_temp = static_cast(msg.data[2]); + int raw_motor_temp = static_cast(msg.data[3]); + uint8_t ctrl_status = msg.data[4]; + + int controllerTemp = raw_ctrl_temp - 40; + int motorTemp = raw_motor_temp - 30; + int feedback_gear = (ctrl_status >> 2) & 0x03; + + { + std::lock_guard lock(ctx_.mtx); + ctx_.info.controllerTemp = controllerTemp; + ctx_.info.motorTemp = motorTemp; + ctx_.info.gear = feedback_gear; + } +} + +// ===================== EPS ===================== +void CanDecoder::handle_eps_status(const sweeperMsg::CanFrame& msg) +{ + uint16_t raw_value = static_cast((msg.data[3] << 8) | msg.data[4]); + float steeringAngle = (raw_value - 1024) / 7.0f; + + { + std::lock_guard lock(ctx_.mtx); + + // info + ctx_.info.steeringAngle = steeringAngle; + + // faults + for (int code = 1201; code <= 1218; ++code) ctx_.errors.removeErrorCode(code); + + uint8_t data_bytes[2] = {msg.data[2], msg.data[6]}; + for (uint8_t byte : data_bytes) + { + for (int i = 0; i < 18; ++i) + { + if (byte == epsErrorCode[i]) + { + ctx_.errors.addErrorCode(1201 + i); + break; + } + } + } + } +} + +// ===================== VCU ===================== +void CanDecoder::handle_vcu_fault_12(const sweeperMsg::CanFrame& msg) +{ + uint32_t fault1_bits = (static_cast(msg.data[0]) << 0) | (static_cast(msg.data[1]) << 8) | + (static_cast(msg.data[2]) << 16) | (static_cast(msg.data[3]) << 24); + + uint32_t fault2_bits = (static_cast(msg.data[4]) << 0) | (static_cast(msg.data[5]) << 8) | + (static_cast(msg.data[6]) << 16) | (static_cast(msg.data[7]) << 24); + + { + std::lock_guard lock(ctx_.mtx); + + for (int code = 1701; code <= 1764; ++code) ctx_.errors.removeErrorCode(code); + + for (uint8_t bit = 0; bit <= 31; ++bit) + if ((fault1_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1701 + bit); + + for (uint8_t bit = 0; bit <= 31; ++bit) + if ((fault2_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1733 + bit); + } +} + +void CanDecoder::handle_vcu_fault_34(const sweeperMsg::CanFrame& msg) +{ + uint32_t fault3_bits = (static_cast(msg.data[0]) << 0) | (static_cast(msg.data[1]) << 8) | + (static_cast(msg.data[2]) << 16) | (static_cast(msg.data[3]) << 24); + + uint32_t fault4_bits = (static_cast(msg.data[4]) << 0) | (static_cast(msg.data[5]) << 8) | + (static_cast(msg.data[6]) << 16) | (static_cast(msg.data[7]) << 24); + + { + std::lock_guard lock(ctx_.mtx); + + for (int code = 1765; code <= 1814; ++code) ctx_.errors.removeErrorCode(code); + + for (uint8_t bit = 0; bit <= 31; ++bit) + if ((fault3_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1765 + bit); + + for (uint8_t bit = 0; bit <= 18; ++bit) + if ((fault4_bits >> bit) & 0x01) ctx_.errors.addErrorCode(1797 + bit); } } diff --git a/src/communication/mqtt_report/src/fault_codes.cpp b/src/communication/mqtt_report/src/fault_codes.cpp index 8f3c15d..df60a25 100644 --- a/src/communication/mqtt_report/src/fault_codes.cpp +++ b/src/communication/mqtt_report/src/fault_codes.cpp @@ -1,9 +1,16 @@ +// fault_codes.cpp + #include "mqtt_report/fault_codes.h" -ErrorCodeSet vehicle_error_code; // 整车错误码 +#include "nlohmann/json.hpp" + +using json = nlohmann::json; +using ordered_json = nlohmann::ordered_json; + +ErrorCodeSet vehicle_error_code; // 整车错误码 // 根据错误码列表,生成错误上报json -std::string generateFaultJson(int code, const std::string &vin, int64_t timestamp) +std::string generateFaultJson(int code, const std::string& vin, int64_t timestamp) { auto it = faultMap.find(code); if (it == faultMap.end()) @@ -11,7 +18,7 @@ std::string generateFaultJson(int code, const std::string &vin, int64_t timestam throw std::runtime_error("错误码不存在"); } - const FaultInfo &info = it->second; + const FaultInfo& info = it->second; ordered_json j; j["vin"] = vin; @@ -37,7 +44,8 @@ bool updateFaultLevel(int code, int newLevel) return true; } -const uint8_t epsErrorCode[18] = {0x10, 0x14, 0x12, 0x21, 0x22, 0x23, 0x24, 0x25, 0x32, 0x33, 0x35, 0x37, 0x43, 0x46, 0x51, 0x55, 0x61, 0x62}; +const uint8_t epsErrorCode[18] = {0x10, 0x14, 0x12, 0x21, 0x22, 0x23, 0x24, 0x25, 0x32, + 0x33, 0x35, 0x37, 0x43, 0x46, 0x51, 0x55, 0x61, 0x62}; std::unordered_map faultMap = { {1001, {"BMS", "BMS_TEMP_HIGH_ERR", "温度过高", 0}}, diff --git a/src/communication/mqtt_report/src/get_config.cpp b/src/communication/mqtt_report/src/get_config.cpp index c07298d..0578809 100644 --- a/src/communication/mqtt_report/src/get_config.cpp +++ b/src/communication/mqtt_report/src/get_config.cpp @@ -1,3 +1,5 @@ +// get_config.cpp + #include "mqtt_report/get_config.h" bool load_config(const std::string& path, Config& config) @@ -22,6 +24,7 @@ bool load_config(const std::string& path, Config& config) config.mqtt_password = mqtt.at("password").get(); config.info_topic_template = mqtt.at("info_topic").get(); + config.gps_topic_template = mqtt.at("gps_topic").get(); config.fault_topic_template = mqtt.at("fault_topic").get(); return true; diff --git a/src/communication/mqtt_report/src/mqtt_report_node.cpp b/src/communication/mqtt_report/src/mqtt_report_node.cpp index 77ba711..4613613 100644 --- a/src/communication/mqtt_report/src/mqtt_report_node.cpp +++ b/src/communication/mqtt_report/src/mqtt_report_node.cpp @@ -1,21 +1,17 @@ // mqtt_report_node.cpp -#include -#include #include -#include -#include -#include #include #include #include #include "mqtt_report/can/can_decoder.h" -#include "mqtt_report/fault_codes.h" #include "mqtt_report/get_config.h" #include "mqtt_report/mqtt_client.hpp" #include "mqtt_report/report_struct.h" +#include "mqtt_report/vehicle_context.h" #include "sweeper_interfaces/msg/can_frame.hpp" +#include "sweeper_interfaces/msg/rtk.hpp" #include "sweeper_interfaces/msg/vehicle_identity.hpp" namespace sweeperMsg = sweeper_interfaces::msg; @@ -23,6 +19,19 @@ using sweeper_interfaces::msg::VehicleIdentity; int mqtt_reconnect_interval = 5000; // ms +static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000; // 每 10s 警告一次 +static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000; +static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000; +static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000; + +struct InputMonitor +{ + bool ok = false; + bool sticky = false; // 是否一次成功就永久 OK + int64_t last_rx_ts = 0; + int64_t last_warn_ts = 0; +}; + // ===================== JSON 打包 ===================== static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei) { @@ -39,6 +48,19 @@ static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::s return j.dump(-1); } +static std::string pack_gps_to_json(const GpsMsg& gps, const std::string& imei) +{ + ordered_json j; + j["imei"] = imei; + j["lat"] = gps.lat; + j["lng"] = gps.lng; + j["course"] = gps.course; + j["mode"] = gps.mode; + j["gps_timestamp"] = gps.timestamp; + j["report_timestamp"] = getCurrentTimestampMs(); + return j.dump(-1); +} + static std::string make_topic(const std::string& tpl, const std::string& vid) { std::string topic = tpl; @@ -60,21 +82,28 @@ class CanDataSubscriber : public rclcpp::Node config_(cfg), mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username, cfg.mqtt_password, mqtt_reconnect_interval), - can_decoder_(info_report_, info_mutex_, fault_mutex_) + can_decoder_(ctx_) { // CAN subscription_ = this->create_subscription( "can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); - // Identity(关键) + // Identity identity_sub_ = this->create_subscription( "/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(), std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1)); + // rtk + rtk_sub_ = this->create_subscription( + "rtk_message", 10, std::bind(&CanDataSubscriber::rtk_callback, this, std::placeholders::_1)); + // Timers info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), std::bind(&CanDataSubscriber::info_timer_callback, this)); + gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200), + std::bind(&CanDataSubscriber::gps_timer_callback, this)); + fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), std::bind(&CanDataSubscriber::fault_timer_callback, this)); @@ -85,67 +114,77 @@ class CanDataSubscriber : public rclcpp::Node ~CanDataSubscriber() override { if (info_timer_) info_timer_->cancel(); + if (gps_timer_) gps_timer_->cancel(); if (fault_timer_) fault_timer_->cancel(); if (connection_check_timer_) connection_check_timer_->cancel(); } private: // ===================== identity ===================== - bool snapshot_identity(std::string& imei, std::string& vid) - { - std::lock_guard lock(identity_mutex_); - if (!vehicle_ready_) return false; - - imei = vehicle_imei_; - vid = vehicle_vid_; - return true; - } - void identity_callback(const VehicleIdentity::SharedPtr msg) { - std::lock_guard lock(identity_mutex_); - vehicle_imei_ = msg->imei; - vehicle_vid_ = msg->vid; - vehicle_ready_ = msg->ready; + std::lock_guard lock(ctx_.mtx); + ctx_.imei = msg->imei; + ctx_.vid = msg->vid; + ctx_.ready = msg->ready; - RCLCPP_INFO(this->get_logger(), "Vehicle identity received: IMEI=%s, VID=%s", vehicle_imei_.c_str(), - vehicle_vid_.c_str()); + int64_t now = getCurrentTimestampMs(); + + if (!identity_mon_.ok) + { + RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", ctx_.imei.c_str(), ctx_.vid.c_str()); + } + + identity_mon_.ok = true; + identity_mon_.last_rx_ts = now; + } + + bool snapshot_identity(std::string& imei, std::string& vid) + { + std::lock_guard lock(ctx_.mtx); + if (!ctx_.ready) return false; + imei = ctx_.imei; + vid = ctx_.vid; + return true; } // ===================== timers ===================== void info_timer_callback() { - if (!rclcpp::ok()) return; - std::string imei, vid; - if (!snapshot_identity(imei, vid)) - { - RCLCPP_WARN_THROTTLE(this->get_logger(), *this->get_clock(), 5000, - "Vehicle identity not ready, skip MQTT publish"); - return; - } + if (!snapshot_identity(imei, vid)) return; const std::string topic = make_topic(config_.info_topic_template, vid); GeneralMsg snapshot; { - std::lock_guard lock(info_mutex_); - info_report_.timestamp = getCurrentTimestampMs(); - snapshot = info_report_; + std::lock_guard lock(ctx_.mtx); + ctx_.info.timestamp = getCurrentTimestampMs(); + snapshot = ctx_.info; } - const std::string payload = pack_general_info_to_json(snapshot, imei); + mqtt_client_.publish(topic, pack_general_info_to_json(snapshot, imei), 1); + } - if (!mqtt_client_.publish(topic, payload, 1)) + void gps_timer_callback() + { + std::string imei, vid; + if (!snapshot_identity(imei, vid)) return; + + GpsMsg snapshot; { - RCLCPP_WARN(this->get_logger(), "Failed to publish info message to MQTT"); + std::lock_guard lock(ctx_.mtx); + if (!ctx_.gps.valid) return; + snapshot = ctx_.gps; } + + const std::string topic = make_topic(config_.gps_topic_template, vid); + + mqtt_client_.publish(topic, pack_gps_to_json(snapshot, imei), 1); } void fault_timer_callback() { - if (!rclcpp::ok()) return; - std::string imei, vid; if (!snapshot_identity(imei, vid)) return; @@ -153,61 +192,126 @@ class CanDataSubscriber : public rclcpp::Node std::vector codes; { - std::lock_guard lock(fault_mutex_); - const auto& err_set = vehicle_error_code.getAllErrorCodes(); + std::lock_guard lock(ctx_.mtx); + const auto& err_set = ctx_.errors.getAllErrorCodes(); codes.assign(err_set.begin(), err_set.end()); } for (int code : codes) { - const std::string payload = generateFaultJson(code, vid, getCurrentTimestampMs()); - - if (!mqtt_client_.publish(topic, payload, 0)) - { - RCLCPP_WARN(this->get_logger(), "Failed to publish fault code %d to MQTT", code); - } + mqtt_client_.publish(topic, generateFaultJson(code, vid, getCurrentTimestampMs()), 0); } } void check_connection_callback() { + int64_t now = getCurrentTimestampMs(); + if (!mqtt_client_.isConnected()) { - RCLCPP_WARN(this->get_logger(), "MQTT connection lost."); + RCLCPP_WARN(get_logger(), "MQTT connection lost."); + } + + check_input("vehicle identity (/vehicle/identity)", identity_mon_, now, IDENTITY_TIMEOUT_MS); + + check_input("GPS (/rtk_message)", gps_mon_, now, GPS_TIMEOUT_MS); + + check_input("CAN (/can_data)", can_mon_, now, CAN_TIMEOUT_MS); + } + + void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms) + { + // sticky 输入:一旦 OK,不再 timeout + if (mon.sticky && mon.ok) return; + + // 从 OK → 不 OK(流断了) + if (mon.ok && now - mon.last_rx_ts > timeout_ms) + { + mon.ok = false; + RCLCPP_WARN(get_logger(), "%s timeout (%ld ms without data).", name, now - mon.last_rx_ts); + mon.last_warn_ts = now; + return; + } + + // 一直不 OK → 低频 WARN + if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS) + { + RCLCPP_WARN(get_logger(), "Waiting for %s...", name); + mon.last_warn_ts = now; } } - // ===================== CAN handler ===================== - void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) { can_decoder_.handle(msg); } + // ========== CAN ========== + void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) + { + can_decoder_.handle(msg); + + int64_t now = getCurrentTimestampMs(); + + if (!can_mon_.ok) + { + RCLCPP_INFO(get_logger(), "CAN data stream active (/can_data)."); + } + + can_mon_.ok = true; + can_mon_.last_rx_ts = now; + } + + // ========== RTK ========== + void rtk_callback(const sweeper_interfaces::msg::Rtk::SharedPtr msg) + { + std::lock_guard lock(ctx_.mtx); + + ctx_.gps.valid = true; + + ctx_.gps.lat = msg->lat; + ctx_.gps.lng = msg->lon; + ctx_.gps.course = msg->head; + + if (msg->p_quality == 5) + ctx_.gps.mode = 2; + else if (msg->p_quality == 4) + ctx_.gps.mode = 3; + else + ctx_.gps.mode = msg->p_quality; + + ctx_.gps.timestamp = getCurrentTimestampMs(); + ctx_.gps.valid = true; + + int64_t now = ctx_.gps.timestamp; + + if (!gps_mon_.ok) + { + RCLCPP_INFO(get_logger(), "GPS data stream active (/rtk_message)."); + } + + gps_mon_.ok = true; + gps_mon_.last_rx_ts = now; + } private: - // ROS - rclcpp::Subscription::SharedPtr subscription_; - rclcpp::Subscription::SharedPtr identity_sub_; - - rclcpp::TimerBase::SharedPtr info_timer_; - rclcpp::TimerBase::SharedPtr fault_timer_; - rclcpp::TimerBase::SharedPtr connection_check_timer_; + // CAN decoder + VehicleContext ctx_; + CanDecoder can_decoder_; Config config_; // MQTT MQTTClientWrapper mqtt_client_; - // Data - GeneralMsg info_report_; - std::mutex info_mutex_; + // ROS + rclcpp::Subscription::SharedPtr subscription_; + rclcpp::Subscription::SharedPtr identity_sub_; + rclcpp::Subscription::SharedPtr rtk_sub_; - // CAN decoder - CanDecoder can_decoder_; + rclcpp::TimerBase::SharedPtr info_timer_; + rclcpp::TimerBase::SharedPtr gps_timer_; + rclcpp::TimerBase::SharedPtr fault_timer_; + rclcpp::TimerBase::SharedPtr connection_check_timer_; - std::mutex fault_mutex_; - - // Identity cache - std::mutex identity_mutex_; - std::string vehicle_imei_; - std::string vehicle_vid_; - bool vehicle_ready_ = false; + InputMonitor identity_mon_{false, true}; + InputMonitor gps_mon_; + InputMonitor can_mon_; }; // ===================== main ===================== @@ -229,18 +333,20 @@ int main(int argc, char** argv) Config config; + auto logger = rclcpp::get_logger("main"); + if (!load_config(config_path, config)) { - RCLCPP_ERROR(rclcpp::get_logger("main"), "Failed to load config: %s", config_path.c_str()); + RCLCPP_ERROR(logger, "Failed to load config: %s", config_path.c_str()); return 1; } if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000; - RCLCPP_INFO(rclcpp::get_logger("main"), "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(), - config.mqtt_port, config.mqtt_username.empty() ? "" : config.mqtt_username.c_str()); + RCLCPP_INFO(logger, "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(), config.mqtt_port, + config.mqtt_username.empty() ? "" : config.mqtt_username.c_str()); - RCLCPP_INFO(rclcpp::get_logger("main"), "Topic templates: info='%s', fault='%s'", + RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(), config.info_topic_template.c_str(), config.fault_topic_template.c_str()); auto node = std::make_shared(config); diff --git a/src/communication/mqtt_report/src/report_struct.cpp b/src/communication/mqtt_report/src/report_struct.cpp index 8be7b0f..ecd854c 100644 --- a/src/communication/mqtt_report/src/report_struct.cpp +++ b/src/communication/mqtt_report/src/report_struct.cpp @@ -1,10 +1,11 @@ +// report_struct.cpp + #include "mqtt_report/report_struct.h" + #include int64_t getCurrentTimestampMs() { using namespace std::chrono; - return duration_cast( - system_clock::now().time_since_epoch()) - .count(); + return duration_cast(system_clock::now().time_since_epoch()).count(); }