拆分节点逻辑与业务逻辑

This commit is contained in:
lyq 2026-01-13 16:14:55 +08:00
parent 3568ef7ff9
commit cc09a445de
10 changed files with 533 additions and 515 deletions

4
.clang-format Normal file
View File

@ -0,0 +1,4 @@
BasedOnStyle: Google
IndentWidth: 4
ColumnLimit: 120
BreakBeforeBraces: Allman

21
.cmake-format.yaml Normal file
View File

@ -0,0 +1,21 @@
# 一行不要太短,避免过度折行
line_width: 120
# CMake 社区默认习惯
tab_size: 2
# 只有参数很多时才换行
max_pargs_hwrap: 4
# 括号跟最后一个参数同一行(关键)
dangle_parens: false
# if/while/function 后不要加空格
separate_ctrl_name_with_space: false
# 风格统一
command_case: lower
keyword_case: upper
# 不做奇怪的 markup
enable_markup: false

View File

@ -16,7 +16,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options(-w) #
add_compile_options(-w) #
endif()
# find dependencies
@ -27,45 +27,36 @@ find_package(radio_ctrl REQUIRED)
find_package(std_msgs REQUIRED)
find_package(ament_index_cpp REQUIRED)
include_directories(
include
${catkin_INCLUDE_DIRS}
)
include_directories(include ${catkin_INCLUDE_DIRS})
add_executable(mqtt_report_node
src/fault_codes.cpp
add_executable(
mqtt_report_node
src/mqtt_report_node.cpp
src/get_config.cpp
src/mqtt_report.cpp
)
src/fault_codes.cpp
src/report_struct.cpp
src/can/can_decoder.cpp)
ament_target_dependencies(mqtt_report_node ament_index_cpp rclcpp sweeper_interfaces std_msgs)
ament_target_dependencies(
mqtt_report_node
ament_index_cpp
rclcpp
sweeper_interfaces
std_msgs)
# radio_ctrl
target_include_directories(mqtt_report_node
PRIVATE ${radio_ctrl_INCLUDE_DIRS}
)
target_include_directories(mqtt_report_node PRIVATE ${radio_ctrl_INCLUDE_DIRS})
# radio_ctrl
target_link_libraries(mqtt_report_node
${radio_ctrl_LIBRARIES}
)
target_link_libraries(mqtt_report_node ${radio_ctrl_LIBRARIES})
if(CMAKE_SYSTEM_PROCESSOR MATCHES aarch64)
target_link_libraries(
mqtt_report_node
${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a
)
target_link_libraries(mqtt_report_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a)
else()
target_link_libraries(
mqtt_report_node
${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a
)
target_link_libraries(mqtt_report_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a)
endif()
install(TARGETS
mqtt_report_node
DESTINATION lib/${PROJECT_NAME}
)
install(TARGETS mqtt_report_node DESTINATION lib/${PROJECT_NAME})
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)

View File

@ -0,0 +1,22 @@
#pragma once
#include <mutex>
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "mqtt_report/report_struct.h"
namespace sweeperMsg = sweeper_interfaces::msg;
class CanDecoder
{
public:
CanDecoder(GeneralMsg &info,
std::mutex &info_mutex,
std::mutex &fault_mutex);
void handle(const sweeperMsg::CanFrame::SharedPtr msg);
private:
GeneralMsg &info_report_;
std::mutex &info_mutex_;
std::mutex &fault_mutex_;
};

View File

@ -5,11 +5,7 @@
#include <vector>
#include <chrono>
int64_t getCurrentTimestampMs()
{
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
int64_t getCurrentTimestampMs();
struct GeneralMsg
{

View File

@ -0,0 +1,231 @@
#include "mqtt_report/can/can_decoder.h"
#include "mqtt_report/fault_codes.h"
#include <iostream>
#include <iomanip>
CanDecoder::CanDecoder(GeneralMsg &info,
std::mutex &info_mutex,
std::mutex &fault_mutex)
: info_report_(info),
info_mutex_(info_mutex),
fault_mutex_(fault_mutex)
{
}
void CanDecoder::handle(const sweeperMsg::CanFrame::SharedPtr msg)
{
switch (msg->id)
{
case 0x100: // BMS_Status 100
{
uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1];
int16_t current_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5];
float voltage = voltage_raw * 0.01f;
float current = current_raw * 0.01f;
float capacity = capacity_raw * 0.01f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.chargeStatus = (current >= 0);
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x100] 总电压: " << voltage << " V, "
<< "电流: " << current << " A, "
<< "剩余容量: " << capacity << " Ah, "
<< "是否在充电: " << ((current >= 0) ? "" : "") << std::endl;
break;
}
case 0x101: // BMS_Status 101
{
uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1];
uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5];
float full_capacity = full_capacity_raw * 0.01f;
uint16_t cycle_count = cycle_count_raw;
float rsoc = rsoc_raw * 1.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.power = rsoc;
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, "
<< "循环次数: " << cycle_count << " 次, "
<< "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl;
break;
}
case 0x0CF11E05: // MCU_Status_1 + MCU_Fault
{
uint16_t raw_speed =
(static_cast<uint16_t>(msg->data[1]) << 8) | msg->data[0];
float actual_speed_rpm = static_cast<float>(raw_speed);
float gear_ratio = 32.0f;
float wheel_radius = 0.3f;
float speed_kmh =
(actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) /
(1000.0f * gear_ratio);
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.speed = speed_kmh;
}
uint16_t mcu_fault_code =
(static_cast<uint16_t>(msg->data[7]) << 8) | msg->data[6];
{
std::lock_guard<std::mutex> lock(fault_mutex_);
// 先清 MCU 故障区
for (int code = 1101; code <= 1116; ++code)
vehicle_error_code.removeErrorCode(code);
for (int i = 0; i < 16; ++i)
{
bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0;
int error_code = 1101 + i;
if (is_fault && faultMap.count(error_code))
{
vehicle_error_code.addErrorCode(error_code);
updateFaultLevel(error_code, 1);
}
else
{
vehicle_error_code.removeErrorCode(error_code);
}
}
}
break;
}
case 0x0CF11F05: // MCU_Status_2
{
int raw_ctrl_temp = static_cast<int>(msg->data[2]);
int raw_motor_temp = static_cast<int>(msg->data[3]);
uint8_t ctrl_status = msg->data[4];
int controllerTemp = raw_ctrl_temp - 40;
int motorTemp = raw_motor_temp - 30;
int feedback_gear = (ctrl_status >> 2) & 0x03;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.controllerTemp = controllerTemp;
info_report_.motorTemp = motorTemp;
info_report_.gear = feedback_gear;
}
break;
}
case 0x401: // EPS_Status + EPS_Fault
{
uint16_t raw_value =
static_cast<uint16_t>((msg->data[3] << 8) | msg->data[4]);
float steeringAngle = (raw_value - 1024) / 7.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.steeringAngle = steeringAngle;
}
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1201; code <= 1218; ++code)
vehicle_error_code.removeErrorCode(code);
uint8_t data_bytes[2] = {msg->data[2], msg->data[6]};
for (uint8_t byte : data_bytes)
{
for (int i = 0; i < 18; ++i)
{
if (byte == epsErrorCode[i])
{
vehicle_error_code.addErrorCode(1201 + i);
break;
}
}
}
}
break;
}
case 0x216: // VCU_Fault_1/2
{
uint32_t fault1_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault2_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1701; code <= 1764; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault1_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1701 + bit);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault2_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1733 + bit);
}
break;
}
case 0x217: // VCU_Fault_3/4
{
uint32_t fault3_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault4_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1765; code <= 1814; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault3_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1765 + bit);
for (uint8_t bit = 0; bit <= 18; ++bit)
if ((fault4_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1797 + bit);
}
break;
}
default:
break;
}
}

View File

@ -1,480 +0,0 @@
// mqtt_report_node.cpp
#include <algorithm>
#include <bitset>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <string>
#include <vector>
#include <rclcpp/rclcpp.hpp>
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
#include "mqtt_report/fault_codes.h"
#include "mqtt_report/get_config.h"
#include "mqtt_report/mqtt_client.hpp"
#include "mqtt_report/report_struct.h"
namespace sweeperMsg = sweeper_interfaces::msg;
using sweeper_interfaces::msg::VehicleIdentity;
Config config;
int mqtt_reconnect_interval = 5000; // ms
// ===================== JSON 打包 =====================
static std::string pack_general_info_to_json(
const GeneralMsg &msg,
const std::string &imei)
{
ordered_json j;
j["imei"] = imei;
j["power"] = msg.power;
j["chargeStatus"] = msg.chargeStatus;
j["gear"] = msg.gear;
j["speed"] = msg.speed;
j["steeringAngle"] = msg.steeringAngle;
j["motorTemp"] = msg.motorTemp;
j["controllerTemp"] = msg.controllerTemp;
j["timestamp"] = msg.timestamp;
return j.dump(-1);
}
// ===================== Node =====================
class CanDataSubscriber : public rclcpp::Node
{
public:
explicit CanDataSubscriber(const Config &cfg)
: Node("can_data_subscriber"),
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port),
generate_mqtt_client_id(),
"zxwl",
"zxwl1234@",
mqtt_reconnect_interval)
{
// CAN
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
"can_data",
10,
std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
// Identity关键
identity_sub_ = this->create_subscription<VehicleIdentity>(
"/vehicle/identity",
rclcpp::QoS(1).transient_local().reliable(),
std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1));
// Timers
info_timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::info_timer_callback, this));
fault_timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::fault_timer_callback, this));
connection_check_timer_ = this->create_wall_timer(
std::chrono::seconds(10),
std::bind(&CanDataSubscriber::check_connection_callback, this));
}
~CanDataSubscriber() override
{
if (info_timer_)
info_timer_->cancel();
if (fault_timer_)
fault_timer_->cancel();
if (connection_check_timer_)
connection_check_timer_->cancel();
}
private:
// ===================== identity =====================
bool snapshot_identity(std::string &imei, std::string &vid)
{
std::lock_guard<std::mutex> lock(identity_mutex_);
if (!vehicle_ready_)
return false;
imei = vehicle_imei_;
vid = vehicle_vid_;
return true;
}
void identity_callback(const VehicleIdentity::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(identity_mutex_);
vehicle_imei_ = msg->imei;
vehicle_vid_ = msg->vid;
vehicle_ready_ = msg->ready;
RCLCPP_INFO(this->get_logger(),
"Vehicle identity received: IMEI=%s, VID=%s",
vehicle_imei_.c_str(),
vehicle_vid_.c_str());
}
// ===================== timers =====================
void info_timer_callback()
{
if (!rclcpp::ok())
return;
std::string imei, vid;
if (!snapshot_identity(imei, vid))
{
RCLCPP_WARN_THROTTLE(
this->get_logger(),
*this->get_clock(),
5000,
"Vehicle identity not ready, skip MQTT publish");
return;
}
const std::string topic = "/zxwl/vehicle/" + vid + "/info";
GeneralMsg snapshot;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.timestamp = getCurrentTimestampMs();
snapshot = info_report_;
}
const std::string payload =
pack_general_info_to_json(snapshot, imei);
if (!mqtt_client_.publish(topic, payload, 1))
{
RCLCPP_WARN(this->get_logger(),
"Failed to publish info message to MQTT");
}
}
void fault_timer_callback()
{
if (!rclcpp::ok())
return;
std::string imei, vid;
if (!snapshot_identity(imei, vid))
return;
const std::string topic = "/zxwl/vehicle/" + vid + "/fault";
std::vector<int> codes;
{
std::lock_guard<std::mutex> lock(fault_mutex_);
const auto &err_set = vehicle_error_code.getAllErrorCodes();
codes.assign(err_set.begin(), err_set.end());
}
for (int code : codes)
{
const std::string payload =
generateFaultJson(code, vid, getCurrentTimestampMs());
if (!mqtt_client_.publish(topic, payload, 0))
{
RCLCPP_WARN(this->get_logger(),
"Failed to publish fault code %d to MQTT",
code);
}
}
}
void check_connection_callback()
{
if (!mqtt_client_.isConnected())
{
RCLCPP_WARN(this->get_logger(),
"MQTT connection lost.");
}
}
// ===================== CAN handler =====================
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
{
Msg_Handler(msg);
}
void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg)
{
switch (msg->id)
{
case 0x100: // BMS_Status 100
{
uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1];
int16_t current_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5];
float voltage = voltage_raw * 0.01f;
float current = current_raw * 0.01f;
float capacity = capacity_raw * 0.01f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.chargeStatus = (current >= 0);
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x100] 总电压: " << voltage << " V, "
<< "电流: " << current << " A, "
<< "剩余容量: " << capacity << " Ah, "
<< "是否在充电: " << ((current >= 0) ? "" : "") << std::endl;
break;
}
case 0x101: // BMS_Status 101
{
uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1];
uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5];
float full_capacity = full_capacity_raw * 0.01f;
uint16_t cycle_count = cycle_count_raw;
float rsoc = rsoc_raw * 1.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.power = rsoc;
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, "
<< "循环次数: " << cycle_count << " 次, "
<< "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl;
break;
}
case 0x0CF11E05: // MCU_Status_1 + MCU_Fault
{
uint16_t raw_speed =
(static_cast<uint16_t>(msg->data[1]) << 8) | msg->data[0];
float actual_speed_rpm = static_cast<float>(raw_speed);
float gear_ratio = 32.0f;
float wheel_radius = 0.3f;
float speed_kmh =
(actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) /
(1000.0f * gear_ratio);
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.speed = speed_kmh;
}
uint16_t mcu_fault_code =
(static_cast<uint16_t>(msg->data[7]) << 8) | msg->data[6];
{
std::lock_guard<std::mutex> lock(fault_mutex_);
// 先清 MCU 故障区
for (int code = 1101; code <= 1116; ++code)
vehicle_error_code.removeErrorCode(code);
for (int i = 0; i < 16; ++i)
{
bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0;
int error_code = 1101 + i;
if (is_fault && faultMap.count(error_code))
{
vehicle_error_code.addErrorCode(error_code);
updateFaultLevel(error_code, 1);
}
else
{
vehicle_error_code.removeErrorCode(error_code);
}
}
}
break;
}
case 0x0CF11F05: // MCU_Status_2
{
int raw_ctrl_temp = static_cast<int>(msg->data[2]);
int raw_motor_temp = static_cast<int>(msg->data[3]);
uint8_t ctrl_status = msg->data[4];
int controllerTemp = raw_ctrl_temp - 40;
int motorTemp = raw_motor_temp - 30;
int feedback_gear = (ctrl_status >> 2) & 0x03;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.controllerTemp = controllerTemp;
info_report_.motorTemp = motorTemp;
info_report_.gear = feedback_gear;
}
break;
}
case 0x401: // EPS_Status + EPS_Fault
{
uint16_t raw_value =
static_cast<uint16_t>((msg->data[3] << 8) | msg->data[4]);
float steeringAngle = (raw_value - 1024) / 7.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.steeringAngle = steeringAngle;
}
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1201; code <= 1218; ++code)
vehicle_error_code.removeErrorCode(code);
uint8_t data_bytes[2] = {msg->data[2], msg->data[6]};
for (uint8_t byte : data_bytes)
{
for (int i = 0; i < 18; ++i)
{
if (byte == epsErrorCode[i])
{
vehicle_error_code.addErrorCode(1201 + i);
break;
}
}
}
}
break;
}
case 0x216: // VCU_Fault_1/2
{
uint32_t fault1_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault2_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1701; code <= 1764; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault1_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1701 + bit);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault2_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1733 + bit);
}
break;
}
case 0x217: // VCU_Fault_3/4
{
uint32_t fault3_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault4_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1765; code <= 1814; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault3_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1765 + bit);
for (uint8_t bit = 0; bit <= 18; ++bit)
if ((fault4_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1797 + bit);
}
break;
}
default:
break;
}
}
private:
// ROS
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_;
// MQTT
MQTTClientWrapper mqtt_client_;
// Data
GeneralMsg info_report_;
std::mutex info_mutex_;
std::mutex fault_mutex_;
// Identity cache
std::mutex identity_mutex_;
std::string vehicle_imei_;
std::string vehicle_vid_;
bool vehicle_ready_ = false;
};
// ===================== main =====================
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
if (!load_config(config))
{
RCLCPP_ERROR(rclcpp::get_logger("main"),
"Failed to load MQTT config.");
return 1;
}
if (mqtt_reconnect_interval <= 0)
mqtt_reconnect_interval = 5000;
auto node = std::make_shared<CanDataSubscriber>(config);
RCLCPP_INFO(node->get_logger(),
"mqtt_ip: %s",
config.mqtt_ip.c_str());
RCLCPP_INFO(node->get_logger(),
"info_topic: %s",
config.info_topic.c_str());
RCLCPP_INFO(node->get_logger(),
"fault_topic: %s",
config.fault_topic.c_str());
RCLCPP_INFO(node->get_logger(),
"MQTT reconnection interval: %d ms",
mqtt_reconnect_interval);
RCLCPP_INFO(node->get_logger(),
"MqttReport node started.");
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

View File

@ -0,0 +1,223 @@
// mqtt_report_node.cpp
#include <algorithm>
#include <bitset>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <vector>
#include "mqtt_report/can/can_decoder.h"
#include "mqtt_report/fault_codes.h"
#include "mqtt_report/get_config.h"
#include "mqtt_report/mqtt_client.hpp"
#include "mqtt_report/report_struct.h"
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
namespace sweeperMsg = sweeper_interfaces::msg;
using sweeper_interfaces::msg::VehicleIdentity;
Config config;
int mqtt_reconnect_interval = 5000; // ms
// ===================== JSON 打包 =====================
static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei)
{
ordered_json j;
j["imei"] = imei;
j["power"] = msg.power;
j["chargeStatus"] = msg.chargeStatus;
j["gear"] = msg.gear;
j["speed"] = msg.speed;
j["steeringAngle"] = msg.steeringAngle;
j["motorTemp"] = msg.motorTemp;
j["controllerTemp"] = msg.controllerTemp;
j["timestamp"] = msg.timestamp;
return j.dump(-1);
}
// ===================== Node =====================
class CanDataSubscriber : public rclcpp::Node
{
public:
explicit CanDataSubscriber(const Config& cfg)
: Node("can_data_subscriber"),
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), "zxwl",
"zxwl1234@", mqtt_reconnect_interval),
can_decoder_(info_report_, info_mutex_, fault_mutex_)
{
// CAN
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
// Identity关键
identity_sub_ = this->create_subscription<VehicleIdentity>(
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1));
// Timers
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::info_timer_callback, this));
fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::fault_timer_callback, this));
connection_check_timer_ = this->create_wall_timer(
std::chrono::seconds(10), std::bind(&CanDataSubscriber::check_connection_callback, this));
}
~CanDataSubscriber() override
{
if (info_timer_) info_timer_->cancel();
if (fault_timer_) fault_timer_->cancel();
if (connection_check_timer_) connection_check_timer_->cancel();
}
private:
// ===================== identity =====================
bool snapshot_identity(std::string& imei, std::string& vid)
{
std::lock_guard<std::mutex> lock(identity_mutex_);
if (!vehicle_ready_) return false;
imei = vehicle_imei_;
vid = vehicle_vid_;
return true;
}
void identity_callback(const VehicleIdentity::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(identity_mutex_);
vehicle_imei_ = msg->imei;
vehicle_vid_ = msg->vid;
vehicle_ready_ = msg->ready;
RCLCPP_INFO(this->get_logger(), "Vehicle identity received: IMEI=%s, VID=%s", vehicle_imei_.c_str(),
vehicle_vid_.c_str());
}
// ===================== timers =====================
void info_timer_callback()
{
if (!rclcpp::ok()) return;
std::string imei, vid;
if (!snapshot_identity(imei, vid))
{
RCLCPP_WARN_THROTTLE(this->get_logger(), *this->get_clock(), 5000,
"Vehicle identity not ready, skip MQTT publish");
return;
}
const std::string topic = "/zxwl/vehicle/" + vid + "/info";
GeneralMsg snapshot;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.timestamp = getCurrentTimestampMs();
snapshot = info_report_;
}
const std::string payload = pack_general_info_to_json(snapshot, imei);
if (!mqtt_client_.publish(topic, payload, 1))
{
RCLCPP_WARN(this->get_logger(), "Failed to publish info message to MQTT");
}
}
void fault_timer_callback()
{
if (!rclcpp::ok()) return;
std::string imei, vid;
if (!snapshot_identity(imei, vid)) return;
const std::string topic = "/zxwl/vehicle/" + vid + "/fault";
std::vector<int> codes;
{
std::lock_guard<std::mutex> lock(fault_mutex_);
const auto& err_set = vehicle_error_code.getAllErrorCodes();
codes.assign(err_set.begin(), err_set.end());
}
for (int code : codes)
{
const std::string payload = generateFaultJson(code, vid, getCurrentTimestampMs());
if (!mqtt_client_.publish(topic, payload, 0))
{
RCLCPP_WARN(this->get_logger(), "Failed to publish fault code %d to MQTT", code);
}
}
}
void check_connection_callback()
{
if (!mqtt_client_.isConnected())
{
RCLCPP_WARN(this->get_logger(), "MQTT connection lost.");
}
}
// ===================== CAN handler =====================
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) { can_decoder_.handle(msg); }
private:
// ROS
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_;
// MQTT
MQTTClientWrapper mqtt_client_;
// Data
GeneralMsg info_report_;
std::mutex info_mutex_;
// CAN decoder
CanDecoder can_decoder_;
std::mutex fault_mutex_;
// Identity cache
std::mutex identity_mutex_;
std::string vehicle_imei_;
std::string vehicle_vid_;
bool vehicle_ready_ = false;
};
// ===================== main =====================
int main(int argc, char** argv)
{
rclcpp::init(argc, argv);
if (!load_config(config))
{
RCLCPP_ERROR(rclcpp::get_logger("main"), "Failed to load MQTT config.");
return 1;
}
if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000;
auto node = std::make_shared<CanDataSubscriber>(config);
RCLCPP_INFO(node->get_logger(), "mqtt_ip: %s", config.mqtt_ip.c_str());
RCLCPP_INFO(node->get_logger(), "info_topic: %s", config.info_topic.c_str());
RCLCPP_INFO(node->get_logger(), "fault_topic: %s", config.fault_topic.c_str());
RCLCPP_INFO(node->get_logger(), "MQTT reconnection interval: %d ms", mqtt_reconnect_interval);
RCLCPP_INFO(node->get_logger(), "MqttReport node started.");
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

View File

@ -0,0 +1,10 @@
#include "mqtt_report/report_struct.h"
#include <chrono>
int64_t getCurrentTimestampMs()
{
using namespace std::chrono;
return duration_cast<milliseconds>(
system_clock::now().time_since_epoch())
.count();
}

View File

@ -19,8 +19,8 @@ if(${ENABLE_TRANSFORM})
add_definitions("-DENABLE_TRANSFORM")
endif(${ENABLE_TRANSFORM})
set(rs_driver_INCLUDE_DIRS "/home/nvidia/Downloads/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include")
set(RS_DRIVER_INCLUDE_DIRS "/home/nvidia/Downloads/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include")
set(rs_driver_INCLUDE_DIRS "/home/nvidia/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include")
set(RS_DRIVER_INCLUDE_DIRS "/home/nvidia/sweeper_200/src/perception/airy/rslidar_sdk-main/src/rs_driver/src;/usr/local/rslidar_sdk/include")
set(rs_driver_LIBRARIES "pthread;pcap")
set(RS_DRIVER_LIBRARIES "pthread;pcap")