From cc09a445de6d4d6f1446e1e770a17ca5f455391b Mon Sep 17 00:00:00 2001 From: lyq Date: Tue, 13 Jan 2026 16:14:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86=E8=8A=82=E7=82=B9=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=B8=8E=E4=B8=9A=E5=8A=A1=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .clang-format | 4 + .cmake-format.yaml | 21 + src/communication/mqtt_report/CMakeLists.txt | 47 +- .../include/mqtt_report/can/can_decoder.h | 22 + .../include/mqtt_report/report_struct.h | 6 +- .../mqtt_report/src/can/can_decoder.cpp | 231 +++++++++ .../mqtt_report/src/mqtt_report.cpp | 480 ------------------ .../mqtt_report/src/mqtt_report_node.cpp | 223 ++++++++ .../mqtt_report/src/report_struct.cpp | 10 + .../src/rs_driver/cmake/rs_driverConfig.cmake | 4 +- 10 files changed, 533 insertions(+), 515 deletions(-) create mode 100644 .clang-format create mode 100644 .cmake-format.yaml create mode 100644 src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h create mode 100644 src/communication/mqtt_report/src/can/can_decoder.cpp delete mode 100644 src/communication/mqtt_report/src/mqtt_report.cpp create mode 100644 src/communication/mqtt_report/src/mqtt_report_node.cpp create mode 100644 src/communication/mqtt_report/src/report_struct.cpp diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..42a2256 --- /dev/null +++ b/.clang-format @@ -0,0 +1,4 @@ +BasedOnStyle: Google +IndentWidth: 4 +ColumnLimit: 120 +BreakBeforeBraces: Allman diff --git a/.cmake-format.yaml b/.cmake-format.yaml new file mode 100644 index 0000000..b638188 --- /dev/null +++ b/.cmake-format.yaml @@ -0,0 +1,21 @@ +# 一行不要太短,避免过度折行 +line_width: 120 + +# CMake 社区默认习惯 +tab_size: 2 + +# 只有参数很多时才换行 +max_pargs_hwrap: 4 + +# 括号跟最后一个参数同一行(关键) +dangle_parens: false + +# if/while/function 后不要加空格 +separate_ctrl_name_with_space: false + +# 风格统一 +command_case: lower +keyword_case: upper + +# 不做奇怪的 markup +enable_markup: false diff --git a/src/communication/mqtt_report/CMakeLists.txt b/src/communication/mqtt_report/CMakeLists.txt index 899cc49..a5c4f50 100644 --- a/src/communication/mqtt_report/CMakeLists.txt +++ b/src/communication/mqtt_report/CMakeLists.txt @@ -16,7 +16,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") endif() if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - add_compile_options(-w) # 禁用所有警告 + add_compile_options(-w) # 禁用所有警告 endif() # find dependencies @@ -27,45 +27,36 @@ find_package(radio_ctrl REQUIRED) find_package(std_msgs REQUIRED) find_package(ament_index_cpp REQUIRED) -include_directories( - include - ${catkin_INCLUDE_DIRS} -) +include_directories(include ${catkin_INCLUDE_DIRS}) -add_executable(mqtt_report_node - src/fault_codes.cpp +add_executable( + mqtt_report_node + src/mqtt_report_node.cpp src/get_config.cpp - src/mqtt_report.cpp -) + src/fault_codes.cpp + src/report_struct.cpp + src/can/can_decoder.cpp) -ament_target_dependencies(mqtt_report_node ament_index_cpp rclcpp sweeper_interfaces std_msgs) +ament_target_dependencies( + mqtt_report_node + ament_index_cpp + rclcpp + sweeper_interfaces + std_msgs) # 包含 radio_ctrl 头文件 -target_include_directories(mqtt_report_node - PRIVATE ${radio_ctrl_INCLUDE_DIRS} -) +target_include_directories(mqtt_report_node PRIVATE ${radio_ctrl_INCLUDE_DIRS}) # 链接 radio_ctrl 库 -target_link_libraries(mqtt_report_node - ${radio_ctrl_LIBRARIES} -) +target_link_libraries(mqtt_report_node ${radio_ctrl_LIBRARIES}) if(CMAKE_SYSTEM_PROCESSOR MATCHES aarch64) - target_link_libraries( - mqtt_report_node - ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a - ) + target_link_libraries(mqtt_report_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a) else() - target_link_libraries( - mqtt_report_node - ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a - ) + target_link_libraries(mqtt_report_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a) endif() -install(TARGETS - mqtt_report_node - DESTINATION lib/${PROJECT_NAME} -) +install(TARGETS mqtt_report_node DESTINATION lib/${PROJECT_NAME}) if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) diff --git a/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h b/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h new file mode 100644 index 0000000..a4d427d --- /dev/null +++ b/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include "sweeper_interfaces/msg/can_frame.hpp" +#include "mqtt_report/report_struct.h" + +namespace sweeperMsg = sweeper_interfaces::msg; + +class CanDecoder +{ +public: + CanDecoder(GeneralMsg &info, + std::mutex &info_mutex, + std::mutex &fault_mutex); + + void handle(const sweeperMsg::CanFrame::SharedPtr msg); + +private: + GeneralMsg &info_report_; + std::mutex &info_mutex_; + std::mutex &fault_mutex_; +}; diff --git a/src/communication/mqtt_report/include/mqtt_report/report_struct.h b/src/communication/mqtt_report/include/mqtt_report/report_struct.h index 2f7b0e7..a10c143 100644 --- a/src/communication/mqtt_report/include/mqtt_report/report_struct.h +++ b/src/communication/mqtt_report/include/mqtt_report/report_struct.h @@ -5,11 +5,7 @@ #include #include -int64_t getCurrentTimestampMs() -{ - using namespace std::chrono; - return duration_cast(system_clock::now().time_since_epoch()).count(); -} +int64_t getCurrentTimestampMs(); struct GeneralMsg { diff --git a/src/communication/mqtt_report/src/can/can_decoder.cpp b/src/communication/mqtt_report/src/can/can_decoder.cpp new file mode 100644 index 0000000..20cad4c --- /dev/null +++ b/src/communication/mqtt_report/src/can/can_decoder.cpp @@ -0,0 +1,231 @@ +#include "mqtt_report/can/can_decoder.h" +#include "mqtt_report/fault_codes.h" + +#include +#include + +CanDecoder::CanDecoder(GeneralMsg &info, + std::mutex &info_mutex, + std::mutex &fault_mutex) + : info_report_(info), + info_mutex_(info_mutex), + fault_mutex_(fault_mutex) +{ +} + +void CanDecoder::handle(const sweeperMsg::CanFrame::SharedPtr msg) +{ + switch (msg->id) + { + case 0x100: // BMS_Status 100 + { + uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1]; + int16_t current_raw = (msg->data[2] << 8) | msg->data[3]; + uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5]; + + float voltage = voltage_raw * 0.01f; + float current = current_raw * 0.01f; + float capacity = capacity_raw * 0.01f; + + { + std::lock_guard lock(info_mutex_); + info_report_.chargeStatus = (current >= 0); + } + + std::cout << std::fixed << std::setprecision(2); + std::cout << "[0x100] 总电压: " << voltage << " V, " + << "电流: " << current << " A, " + << "剩余容量: " << capacity << " Ah, " + << "是否在充电: " << ((current >= 0) ? "是" : "否") << std::endl; + break; + } + + case 0x101: // BMS_Status 101 + { + uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1]; + uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3]; + uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5]; + + float full_capacity = full_capacity_raw * 0.01f; + uint16_t cycle_count = cycle_count_raw; + float rsoc = rsoc_raw * 1.0f; + + { + std::lock_guard lock(info_mutex_); + info_report_.power = rsoc; + } + + std::cout << std::fixed << std::setprecision(2); + std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, " + << "循环次数: " << cycle_count << " 次, " + << "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl; + break; + } + + case 0x0CF11E05: // MCU_Status_1 + MCU_Fault + { + uint16_t raw_speed = + (static_cast(msg->data[1]) << 8) | msg->data[0]; + float actual_speed_rpm = static_cast(raw_speed); + + float gear_ratio = 32.0f; + float wheel_radius = 0.3f; + float speed_kmh = + (actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) / + (1000.0f * gear_ratio); + + { + std::lock_guard lock(info_mutex_); + info_report_.speed = speed_kmh; + } + + uint16_t mcu_fault_code = + (static_cast(msg->data[7]) << 8) | msg->data[6]; + + { + std::lock_guard lock(fault_mutex_); + + // 先清 MCU 故障区 + for (int code = 1101; code <= 1116; ++code) + vehicle_error_code.removeErrorCode(code); + + for (int i = 0; i < 16; ++i) + { + bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0; + int error_code = 1101 + i; + + if (is_fault && faultMap.count(error_code)) + { + vehicle_error_code.addErrorCode(error_code); + updateFaultLevel(error_code, 1); + } + else + { + vehicle_error_code.removeErrorCode(error_code); + } + } + } + + break; + } + + case 0x0CF11F05: // MCU_Status_2 + { + int raw_ctrl_temp = static_cast(msg->data[2]); + int raw_motor_temp = static_cast(msg->data[3]); + uint8_t ctrl_status = msg->data[4]; + + int controllerTemp = raw_ctrl_temp - 40; + int motorTemp = raw_motor_temp - 30; + int feedback_gear = (ctrl_status >> 2) & 0x03; + + { + std::lock_guard lock(info_mutex_); + info_report_.controllerTemp = controllerTemp; + info_report_.motorTemp = motorTemp; + info_report_.gear = feedback_gear; + } + break; + } + + case 0x401: // EPS_Status + EPS_Fault + { + uint16_t raw_value = + static_cast((msg->data[3] << 8) | msg->data[4]); + float steeringAngle = (raw_value - 1024) / 7.0f; + + { + std::lock_guard lock(info_mutex_); + info_report_.steeringAngle = steeringAngle; + } + + { + std::lock_guard lock(fault_mutex_); + + for (int code = 1201; code <= 1218; ++code) + vehicle_error_code.removeErrorCode(code); + + uint8_t data_bytes[2] = {msg->data[2], msg->data[6]}; + for (uint8_t byte : data_bytes) + { + for (int i = 0; i < 18; ++i) + { + if (byte == epsErrorCode[i]) + { + vehicle_error_code.addErrorCode(1201 + i); + break; + } + } + } + } + break; + } + + case 0x216: // VCU_Fault_1/2 + { + uint32_t fault1_bits = + (static_cast(msg->data[0]) << 0) | + (static_cast(msg->data[1]) << 8) | + (static_cast(msg->data[2]) << 16) | + (static_cast(msg->data[3]) << 24); + + uint32_t fault2_bits = + (static_cast(msg->data[4]) << 0) | + (static_cast(msg->data[5]) << 8) | + (static_cast(msg->data[6]) << 16) | + (static_cast(msg->data[7]) << 24); + + { + std::lock_guard lock(fault_mutex_); + + for (int code = 1701; code <= 1764; ++code) + vehicle_error_code.removeErrorCode(code); + + for (uint8_t bit = 0; bit <= 31; ++bit) + if ((fault1_bits >> bit) & 0x01) + vehicle_error_code.addErrorCode(1701 + bit); + + for (uint8_t bit = 0; bit <= 31; ++bit) + if ((fault2_bits >> bit) & 0x01) + vehicle_error_code.addErrorCode(1733 + bit); + } + + break; + } + + case 0x217: // VCU_Fault_3/4 + { + uint32_t fault3_bits = + (static_cast(msg->data[0]) << 0) | + (static_cast(msg->data[1]) << 8) | + (static_cast(msg->data[2]) << 16) | + (static_cast(msg->data[3]) << 24); + + uint32_t fault4_bits = + (static_cast(msg->data[4]) << 0) | + (static_cast(msg->data[5]) << 8) | + (static_cast(msg->data[6]) << 16) | + (static_cast(msg->data[7]) << 24); + + { + std::lock_guard lock(fault_mutex_); + + for (int code = 1765; code <= 1814; ++code) + vehicle_error_code.removeErrorCode(code); + + for (uint8_t bit = 0; bit <= 31; ++bit) + if ((fault3_bits >> bit) & 0x01) + vehicle_error_code.addErrorCode(1765 + bit); + + for (uint8_t bit = 0; bit <= 18; ++bit) + if ((fault4_bits >> bit) & 0x01) + vehicle_error_code.addErrorCode(1797 + bit); + } + + break; + } + + default: + break; + } +} diff --git a/src/communication/mqtt_report/src/mqtt_report.cpp b/src/communication/mqtt_report/src/mqtt_report.cpp deleted file mode 100644 index caa1723..0000000 --- a/src/communication/mqtt_report/src/mqtt_report.cpp +++ /dev/null @@ -1,480 +0,0 @@ -// mqtt_report_node.cpp - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "sweeper_interfaces/msg/can_frame.hpp" -#include "sweeper_interfaces/msg/vehicle_identity.hpp" - -#include "mqtt_report/fault_codes.h" -#include "mqtt_report/get_config.h" -#include "mqtt_report/mqtt_client.hpp" -#include "mqtt_report/report_struct.h" - -namespace sweeperMsg = sweeper_interfaces::msg; -using sweeper_interfaces::msg::VehicleIdentity; - -Config config; -int mqtt_reconnect_interval = 5000; // ms - -// ===================== JSON 打包 ===================== -static std::string pack_general_info_to_json( - const GeneralMsg &msg, - const std::string &imei) -{ - ordered_json j; - j["imei"] = imei; - j["power"] = msg.power; - j["chargeStatus"] = msg.chargeStatus; - j["gear"] = msg.gear; - j["speed"] = msg.speed; - j["steeringAngle"] = msg.steeringAngle; - j["motorTemp"] = msg.motorTemp; - j["controllerTemp"] = msg.controllerTemp; - j["timestamp"] = msg.timestamp; - return j.dump(-1); -} - -// ===================== Node ===================== -class CanDataSubscriber : public rclcpp::Node -{ -public: - explicit CanDataSubscriber(const Config &cfg) - : Node("can_data_subscriber"), - mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), - generate_mqtt_client_id(), - "zxwl", - "zxwl1234@", - mqtt_reconnect_interval) - { - // CAN - subscription_ = this->create_subscription( - "can_data", - 10, - std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); - - // Identity(关键) - identity_sub_ = this->create_subscription( - "/vehicle/identity", - rclcpp::QoS(1).transient_local().reliable(), - std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1)); - - // Timers - info_timer_ = this->create_wall_timer( - std::chrono::milliseconds(500), - std::bind(&CanDataSubscriber::info_timer_callback, this)); - - fault_timer_ = this->create_wall_timer( - std::chrono::milliseconds(500), - std::bind(&CanDataSubscriber::fault_timer_callback, this)); - - connection_check_timer_ = this->create_wall_timer( - std::chrono::seconds(10), - std::bind(&CanDataSubscriber::check_connection_callback, this)); - } - - ~CanDataSubscriber() override - { - if (info_timer_) - info_timer_->cancel(); - if (fault_timer_) - fault_timer_->cancel(); - if (connection_check_timer_) - connection_check_timer_->cancel(); - } - -private: - // ===================== identity ===================== - bool snapshot_identity(std::string &imei, std::string &vid) - { - std::lock_guard lock(identity_mutex_); - if (!vehicle_ready_) - return false; - - imei = vehicle_imei_; - vid = vehicle_vid_; - return true; - } - - void identity_callback(const VehicleIdentity::SharedPtr msg) - { - std::lock_guard lock(identity_mutex_); - vehicle_imei_ = msg->imei; - vehicle_vid_ = msg->vid; - vehicle_ready_ = msg->ready; - - RCLCPP_INFO(this->get_logger(), - "Vehicle identity received: IMEI=%s, VID=%s", - vehicle_imei_.c_str(), - vehicle_vid_.c_str()); - } - - // ===================== timers ===================== - void info_timer_callback() - { - if (!rclcpp::ok()) - return; - - std::string imei, vid; - if (!snapshot_identity(imei, vid)) - { - RCLCPP_WARN_THROTTLE( - this->get_logger(), - *this->get_clock(), - 5000, - "Vehicle identity not ready, skip MQTT publish"); - return; - } - - const std::string topic = "/zxwl/vehicle/" + vid + "/info"; - - GeneralMsg snapshot; - { - std::lock_guard lock(info_mutex_); - info_report_.timestamp = getCurrentTimestampMs(); - snapshot = info_report_; - } - - const std::string payload = - pack_general_info_to_json(snapshot, imei); - - if (!mqtt_client_.publish(topic, payload, 1)) - { - RCLCPP_WARN(this->get_logger(), - "Failed to publish info message to MQTT"); - } - } - - void fault_timer_callback() - { - if (!rclcpp::ok()) - return; - - std::string imei, vid; - if (!snapshot_identity(imei, vid)) - return; - - const std::string topic = "/zxwl/vehicle/" + vid + "/fault"; - - std::vector codes; - { - std::lock_guard lock(fault_mutex_); - const auto &err_set = vehicle_error_code.getAllErrorCodes(); - codes.assign(err_set.begin(), err_set.end()); - } - - for (int code : codes) - { - const std::string payload = - generateFaultJson(code, vid, getCurrentTimestampMs()); - - if (!mqtt_client_.publish(topic, payload, 0)) - { - RCLCPP_WARN(this->get_logger(), - "Failed to publish fault code %d to MQTT", - code); - } - } - } - - void check_connection_callback() - { - if (!mqtt_client_.isConnected()) - { - RCLCPP_WARN(this->get_logger(), - "MQTT connection lost."); - } - } - - // ===================== CAN handler ===================== - void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) - { - Msg_Handler(msg); - } - - void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg) - { - switch (msg->id) - { - case 0x100: // BMS_Status 100 - { - uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1]; - int16_t current_raw = (msg->data[2] << 8) | msg->data[3]; - uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5]; - - float voltage = voltage_raw * 0.01f; - float current = current_raw * 0.01f; - float capacity = capacity_raw * 0.01f; - - { - std::lock_guard lock(info_mutex_); - info_report_.chargeStatus = (current >= 0); - } - - std::cout << std::fixed << std::setprecision(2); - std::cout << "[0x100] 总电压: " << voltage << " V, " - << "电流: " << current << " A, " - << "剩余容量: " << capacity << " Ah, " - << "是否在充电: " << ((current >= 0) ? "是" : "否") << std::endl; - break; - } - - case 0x101: // BMS_Status 101 - { - uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1]; - uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3]; - uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5]; - - float full_capacity = full_capacity_raw * 0.01f; - uint16_t cycle_count = cycle_count_raw; - float rsoc = rsoc_raw * 1.0f; - - { - std::lock_guard lock(info_mutex_); - info_report_.power = rsoc; - } - - std::cout << std::fixed << std::setprecision(2); - std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, " - << "循环次数: " << cycle_count << " 次, " - << "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl; - break; - } - - case 0x0CF11E05: // MCU_Status_1 + MCU_Fault - { - uint16_t raw_speed = - (static_cast(msg->data[1]) << 8) | msg->data[0]; - float actual_speed_rpm = static_cast(raw_speed); - - float gear_ratio = 32.0f; - float wheel_radius = 0.3f; - float speed_kmh = - (actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) / - (1000.0f * gear_ratio); - - { - std::lock_guard lock(info_mutex_); - info_report_.speed = speed_kmh; - } - - uint16_t mcu_fault_code = - (static_cast(msg->data[7]) << 8) | msg->data[6]; - - { - std::lock_guard lock(fault_mutex_); - - // 先清 MCU 故障区 - for (int code = 1101; code <= 1116; ++code) - vehicle_error_code.removeErrorCode(code); - - for (int i = 0; i < 16; ++i) - { - bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0; - int error_code = 1101 + i; - - if (is_fault && faultMap.count(error_code)) - { - vehicle_error_code.addErrorCode(error_code); - updateFaultLevel(error_code, 1); - } - else - { - vehicle_error_code.removeErrorCode(error_code); - } - } - } - - break; - } - - case 0x0CF11F05: // MCU_Status_2 - { - int raw_ctrl_temp = static_cast(msg->data[2]); - int raw_motor_temp = static_cast(msg->data[3]); - uint8_t ctrl_status = msg->data[4]; - - int controllerTemp = raw_ctrl_temp - 40; - int motorTemp = raw_motor_temp - 30; - int feedback_gear = (ctrl_status >> 2) & 0x03; - - { - std::lock_guard lock(info_mutex_); - info_report_.controllerTemp = controllerTemp; - info_report_.motorTemp = motorTemp; - info_report_.gear = feedback_gear; - } - break; - } - - case 0x401: // EPS_Status + EPS_Fault - { - uint16_t raw_value = - static_cast((msg->data[3] << 8) | msg->data[4]); - float steeringAngle = (raw_value - 1024) / 7.0f; - - { - std::lock_guard lock(info_mutex_); - info_report_.steeringAngle = steeringAngle; - } - - { - std::lock_guard lock(fault_mutex_); - - for (int code = 1201; code <= 1218; ++code) - vehicle_error_code.removeErrorCode(code); - - uint8_t data_bytes[2] = {msg->data[2], msg->data[6]}; - for (uint8_t byte : data_bytes) - { - for (int i = 0; i < 18; ++i) - { - if (byte == epsErrorCode[i]) - { - vehicle_error_code.addErrorCode(1201 + i); - break; - } - } - } - } - break; - } - - case 0x216: // VCU_Fault_1/2 - { - uint32_t fault1_bits = - (static_cast(msg->data[0]) << 0) | - (static_cast(msg->data[1]) << 8) | - (static_cast(msg->data[2]) << 16) | - (static_cast(msg->data[3]) << 24); - - uint32_t fault2_bits = - (static_cast(msg->data[4]) << 0) | - (static_cast(msg->data[5]) << 8) | - (static_cast(msg->data[6]) << 16) | - (static_cast(msg->data[7]) << 24); - - { - std::lock_guard lock(fault_mutex_); - - for (int code = 1701; code <= 1764; ++code) - vehicle_error_code.removeErrorCode(code); - - for (uint8_t bit = 0; bit <= 31; ++bit) - if ((fault1_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1701 + bit); - - for (uint8_t bit = 0; bit <= 31; ++bit) - if ((fault2_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1733 + bit); - } - - break; - } - - case 0x217: // VCU_Fault_3/4 - { - uint32_t fault3_bits = - (static_cast(msg->data[0]) << 0) | - (static_cast(msg->data[1]) << 8) | - (static_cast(msg->data[2]) << 16) | - (static_cast(msg->data[3]) << 24); - - uint32_t fault4_bits = - (static_cast(msg->data[4]) << 0) | - (static_cast(msg->data[5]) << 8) | - (static_cast(msg->data[6]) << 16) | - (static_cast(msg->data[7]) << 24); - - { - std::lock_guard lock(fault_mutex_); - - for (int code = 1765; code <= 1814; ++code) - vehicle_error_code.removeErrorCode(code); - - for (uint8_t bit = 0; bit <= 31; ++bit) - if ((fault3_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1765 + bit); - - for (uint8_t bit = 0; bit <= 18; ++bit) - if ((fault4_bits >> bit) & 0x01) - vehicle_error_code.addErrorCode(1797 + bit); - } - - break; - } - - default: - break; - } - } - -private: - // ROS - rclcpp::Subscription::SharedPtr subscription_; - rclcpp::Subscription::SharedPtr identity_sub_; - - rclcpp::TimerBase::SharedPtr info_timer_; - rclcpp::TimerBase::SharedPtr fault_timer_; - rclcpp::TimerBase::SharedPtr connection_check_timer_; - - // MQTT - MQTTClientWrapper mqtt_client_; - - // Data - GeneralMsg info_report_; - std::mutex info_mutex_; - - std::mutex fault_mutex_; - - // Identity cache - std::mutex identity_mutex_; - std::string vehicle_imei_; - std::string vehicle_vid_; - bool vehicle_ready_ = false; -}; - -// ===================== main ===================== -int main(int argc, char **argv) -{ - rclcpp::init(argc, argv); - - if (!load_config(config)) - { - RCLCPP_ERROR(rclcpp::get_logger("main"), - "Failed to load MQTT config."); - return 1; - } - - if (mqtt_reconnect_interval <= 0) - mqtt_reconnect_interval = 5000; - - auto node = std::make_shared(config); - - RCLCPP_INFO(node->get_logger(), - "mqtt_ip: %s", - config.mqtt_ip.c_str()); - RCLCPP_INFO(node->get_logger(), - "info_topic: %s", - config.info_topic.c_str()); - RCLCPP_INFO(node->get_logger(), - "fault_topic: %s", - config.fault_topic.c_str()); - RCLCPP_INFO(node->get_logger(), - "MQTT reconnection interval: %d ms", - mqtt_reconnect_interval); - RCLCPP_INFO(node->get_logger(), - "MqttReport node started."); - - rclcpp::spin(node); - rclcpp::shutdown(); - return 0; -} diff --git a/src/communication/mqtt_report/src/mqtt_report_node.cpp b/src/communication/mqtt_report/src/mqtt_report_node.cpp new file mode 100644 index 0000000..f43e3d6 --- /dev/null +++ b/src/communication/mqtt_report/src/mqtt_report_node.cpp @@ -0,0 +1,223 @@ +// mqtt_report_node.cpp + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mqtt_report/can/can_decoder.h" +#include "mqtt_report/fault_codes.h" +#include "mqtt_report/get_config.h" +#include "mqtt_report/mqtt_client.hpp" +#include "mqtt_report/report_struct.h" +#include "sweeper_interfaces/msg/can_frame.hpp" +#include "sweeper_interfaces/msg/vehicle_identity.hpp" + +namespace sweeperMsg = sweeper_interfaces::msg; +using sweeper_interfaces::msg::VehicleIdentity; + +Config config; +int mqtt_reconnect_interval = 5000; // ms + +// ===================== JSON 打包 ===================== +static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei) +{ + ordered_json j; + j["imei"] = imei; + j["power"] = msg.power; + j["chargeStatus"] = msg.chargeStatus; + j["gear"] = msg.gear; + j["speed"] = msg.speed; + j["steeringAngle"] = msg.steeringAngle; + j["motorTemp"] = msg.motorTemp; + j["controllerTemp"] = msg.controllerTemp; + j["timestamp"] = msg.timestamp; + return j.dump(-1); +} + +// ===================== Node ===================== +class CanDataSubscriber : public rclcpp::Node +{ + public: + explicit CanDataSubscriber(const Config& cfg) + : Node("can_data_subscriber"), + mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), "zxwl", + "zxwl1234@", mqtt_reconnect_interval), + can_decoder_(info_report_, info_mutex_, fault_mutex_) + { + // CAN + subscription_ = this->create_subscription( + "can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); + + // Identity(关键) + identity_sub_ = this->create_subscription( + "/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(), + std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1)); + + // Timers + info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), + std::bind(&CanDataSubscriber::info_timer_callback, this)); + + fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), + std::bind(&CanDataSubscriber::fault_timer_callback, this)); + + connection_check_timer_ = this->create_wall_timer( + std::chrono::seconds(10), std::bind(&CanDataSubscriber::check_connection_callback, this)); + } + + ~CanDataSubscriber() override + { + if (info_timer_) info_timer_->cancel(); + if (fault_timer_) fault_timer_->cancel(); + if (connection_check_timer_) connection_check_timer_->cancel(); + } + + private: + // ===================== identity ===================== + bool snapshot_identity(std::string& imei, std::string& vid) + { + std::lock_guard lock(identity_mutex_); + if (!vehicle_ready_) return false; + + imei = vehicle_imei_; + vid = vehicle_vid_; + return true; + } + + void identity_callback(const VehicleIdentity::SharedPtr msg) + { + std::lock_guard lock(identity_mutex_); + vehicle_imei_ = msg->imei; + vehicle_vid_ = msg->vid; + vehicle_ready_ = msg->ready; + + RCLCPP_INFO(this->get_logger(), "Vehicle identity received: IMEI=%s, VID=%s", vehicle_imei_.c_str(), + vehicle_vid_.c_str()); + } + + // ===================== timers ===================== + void info_timer_callback() + { + if (!rclcpp::ok()) return; + + std::string imei, vid; + if (!snapshot_identity(imei, vid)) + { + RCLCPP_WARN_THROTTLE(this->get_logger(), *this->get_clock(), 5000, + "Vehicle identity not ready, skip MQTT publish"); + return; + } + + const std::string topic = "/zxwl/vehicle/" + vid + "/info"; + + GeneralMsg snapshot; + { + std::lock_guard lock(info_mutex_); + info_report_.timestamp = getCurrentTimestampMs(); + snapshot = info_report_; + } + + const std::string payload = pack_general_info_to_json(snapshot, imei); + + if (!mqtt_client_.publish(topic, payload, 1)) + { + RCLCPP_WARN(this->get_logger(), "Failed to publish info message to MQTT"); + } + } + + void fault_timer_callback() + { + if (!rclcpp::ok()) return; + + std::string imei, vid; + if (!snapshot_identity(imei, vid)) return; + + const std::string topic = "/zxwl/vehicle/" + vid + "/fault"; + + std::vector codes; + { + std::lock_guard lock(fault_mutex_); + const auto& err_set = vehicle_error_code.getAllErrorCodes(); + codes.assign(err_set.begin(), err_set.end()); + } + + for (int code : codes) + { + const std::string payload = generateFaultJson(code, vid, getCurrentTimestampMs()); + + if (!mqtt_client_.publish(topic, payload, 0)) + { + RCLCPP_WARN(this->get_logger(), "Failed to publish fault code %d to MQTT", code); + } + } + } + + void check_connection_callback() + { + if (!mqtt_client_.isConnected()) + { + RCLCPP_WARN(this->get_logger(), "MQTT connection lost."); + } + } + + // ===================== CAN handler ===================== + void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) { can_decoder_.handle(msg); } + + private: + // ROS + rclcpp::Subscription::SharedPtr subscription_; + rclcpp::Subscription::SharedPtr identity_sub_; + + rclcpp::TimerBase::SharedPtr info_timer_; + rclcpp::TimerBase::SharedPtr fault_timer_; + rclcpp::TimerBase::SharedPtr connection_check_timer_; + + // MQTT + MQTTClientWrapper mqtt_client_; + + // Data + GeneralMsg info_report_; + std::mutex info_mutex_; + + // CAN decoder + CanDecoder can_decoder_; + + std::mutex fault_mutex_; + + // Identity cache + std::mutex identity_mutex_; + std::string vehicle_imei_; + std::string vehicle_vid_; + bool vehicle_ready_ = false; +}; + +// ===================== main ===================== +int main(int argc, char** argv) +{ + rclcpp::init(argc, argv); + + if (!load_config(config)) + { + RCLCPP_ERROR(rclcpp::get_logger("main"), "Failed to load MQTT config."); + return 1; + } + + if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000; + + auto node = std::make_shared(config); + + RCLCPP_INFO(node->get_logger(), "mqtt_ip: %s", config.mqtt_ip.c_str()); + RCLCPP_INFO(node->get_logger(), "info_topic: %s", config.info_topic.c_str()); + RCLCPP_INFO(node->get_logger(), "fault_topic: %s", config.fault_topic.c_str()); + RCLCPP_INFO(node->get_logger(), "MQTT reconnection interval: %d ms", mqtt_reconnect_interval); + RCLCPP_INFO(node->get_logger(), "MqttReport node started."); + + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} diff --git a/src/communication/mqtt_report/src/report_struct.cpp b/src/communication/mqtt_report/src/report_struct.cpp new file mode 100644 index 0000000..8be7b0f --- /dev/null +++ b/src/communication/mqtt_report/src/report_struct.cpp @@ -0,0 +1,10 @@ +#include "mqtt_report/report_struct.h" +#include + +int64_t getCurrentTimestampMs() +{ + using namespace std::chrono; + return duration_cast( + system_clock::now().time_since_epoch()) + .count(); +} diff --git a/src/perception/airy/rslidar_sdk-main/src/rs_driver/cmake/rs_driverConfig.cmake b/src/perception/airy/rslidar_sdk-main/src/rs_driver/cmake/rs_driverConfig.cmake index dc19e97..91761ed 100644 --- a/src/perception/airy/rslidar_sdk-main/src/rs_driver/cmake/rs_driverConfig.cmake +++ b/src/perception/airy/rslidar_sdk-main/src/rs_driver/cmake/rs_driverConfig.cmake @@ -19,8 +19,8 @@ if(${ENABLE_TRANSFORM}) add_definitions("-DENABLE_TRANSFORM") endif(${ENABLE_TRANSFORM}) -set(rs_driver_INCLUDE_DIRS "/home/nvidia/Downloads/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include") -set(RS_DRIVER_INCLUDE_DIRS "/home/nvidia/Downloads/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include") +set(rs_driver_INCLUDE_DIRS "/home/nvidia/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include") +set(RS_DRIVER_INCLUDE_DIRS "/home/nvidia/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include") set(rs_driver_LIBRARIES "pthread;pcap") set(RS_DRIVER_LIBRARIES "pthread;pcap")