This commit is contained in:
lyq 2026-01-06 10:18:25 +08:00
parent f445d3718b
commit 0195ddb3f8
7 changed files with 555 additions and 417 deletions

View File

@ -20,6 +20,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Route.msg" "msg/Route.msg"
"msg/Fu.msg" "msg/Fu.msg"
"msg/Task.msg" "msg/Task.msg"
"msg/VehicleIdentity.msg"
DEPENDENCIES std_msgs DEPENDENCIES std_msgs
) )

View File

@ -0,0 +1,3 @@
string imei
string vid
bool ready

View File

@ -1,25 +1,54 @@
cmake_minimum_required(VERSION 3.8) cmake_minimum_required(VERSION 3.8)
project(vehicle_params) project(vehicle_params)
# =========================
#
# =========================
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic) add_compile_options(-Wall -Wextra -Wpedantic)
endif() endif()
# find dependencies # =========================
#
# =========================
find_package(ament_cmake REQUIRED) find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED) find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED) find_package(sweeper_interfaces REQUIRED) #
if(BUILD_TESTING) # =========================
find_package(ament_lint_auto REQUIRED) # include
# the following line skips the linter which checks for copyrights # =========================
# comment the line when a copyright and license is added to all source files include_directories(
set(ament_cmake_copyright_FOUND TRUE) include
# the following line skips cpplint (only works in a git repo) )
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
# =========================
#
# =========================
add_executable(vehicle_params_node
src/vehicle_params_node.cpp
)
# =========================
#
# =========================
ament_target_dependencies(vehicle_params_node
rclcpp
sweeper_interfaces #
)
# =========================
#
# =========================
install(TARGETS vehicle_params_node
DESTINATION lib/${PROJECT_NAME}
)
install(DIRECTORY include/
DESTINATION include/
)
# =========================
#
# =========================
ament_package() ament_package()

View File

@ -1,17 +1,27 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd"
schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"> <package format="3">
<name>vehicle_params</name> <name>vehicle_params</name>
<version>0.0.0</version> <version>0.1.0</version>
<description>TODO: Package description</description>
<maintainer email="zxwl@44.com">nvidia</maintainer>
<license>TODO: License declaration</license>
<description>
Central vehicle identity provider node.
Fetches IMEI / VID from B-side service and publishes them as ROS2 messages.
</description>
<maintainer email="zxwl@44.com">nvidia</maintainer>
<license>Apache-2.0</license>
<!-- build system -->
<buildtool_depend>ament_cmake</buildtool_depend> <buildtool_depend>ament_cmake</buildtool_depend>
<!-- runtime deps -->
<depend>rclcpp</depend> <depend>rclcpp</depend>
<depend>std_msgs</depend> <depend>sweeper_interfaces</depend> <!-- ⭐ 新增 -->
<!-- tests -->
<test_depend>ament_lint_auto</test_depend> <test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend> <test_depend>ament_lint_common</test_depend>

View File

@ -0,0 +1,125 @@
#include <rclcpp/rclcpp.hpp>
#include <nlohmann/json.hpp>
#include <atomic>
#include <chrono>
#include <string>
#include <thread>
#include "vehicle_params/httplib.h"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
using sweeper_interfaces::msg::VehicleIdentity;
class VehicleParamsNode : public rclcpp::Node
{
public:
VehicleParamsNode()
: Node("vehicle_params_node")
{
RCLCPP_INFO(this->get_logger(), "vehicle_params_node starting...");
// ---------- neardi ----------
this->declare_parameter<std::string>("neardi.host", "192.168.5.1");
this->declare_parameter<int>("neardi.port", 8080);
// ---------- publisher ----------
identity_pub_ = this->create_publisher<VehicleIdentity>(
"/vehicle/identity",
rclcpp::QoS(1).transient_local().reliable());
// ---------- worker ----------
worker_ = std::thread([this]()
{ fetch_from_neardi_loop(); });
}
~VehicleParamsNode()
{
running_.store(false);
if (worker_.joinable())
worker_.join();
}
private:
std::atomic<bool> running_{true};
std::thread worker_;
rclcpp::Publisher<VehicleIdentity>::SharedPtr identity_pub_;
void fetch_from_neardi_loop()
{
std::string host;
int port = 0;
this->get_parameter("neardi.host", host);
this->get_parameter("neardi.port", port);
httplib::Client cli(host, port);
cli.set_connection_timeout(3);
cli.set_read_timeout(3);
while (rclcpp::ok() && running_.load())
{
RCLCPP_INFO(this->get_logger(),
"Requesting vehicle identity from neardi (%s:%d)...",
host.c_str(), port);
auto res = cli.Post("/api/v1/device/register");
if (!res || res->status != 200)
{
RCLCPP_WARN(this->get_logger(), "neardi request failed, retrying...");
retry_sleep();
continue;
}
try
{
auto j = nlohmann::json::parse(res->body);
if (!j.value("ok", false))
{
retry_sleep();
continue;
}
auto dev = j.at("device");
std::string imei = dev.value("imei", "");
std::string vid = dev.value("vid", "");
if (imei.empty() || vid.empty())
{
retry_sleep();
continue;
}
VehicleIdentity msg;
msg.imei = imei;
msg.vid = vid;
msg.ready = true;
identity_pub_->publish(msg);
RCLCPP_INFO(this->get_logger(),
"Vehicle identity published: IMEI=%s, VID=%s",
imei.c_str(), vid.c_str());
return; // 成功一次即可
}
catch (...)
{
retry_sleep();
}
}
}
void retry_sleep()
{
std::this_thread::sleep_for(std::chrono::seconds(2));
}
};
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<VehicleParamsNode>());
rclcpp::shutdown();
return 0;
}

View File

@ -1,342 +1,37 @@
// mqtt_report_node.cpp
#include <algorithm> #include <algorithm>
#include <bitset> #include <bitset>
#include <iostream>
#include <chrono> #include <chrono>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <string>
#include <vector>
#include <rclcpp/rclcpp.hpp> #include <rclcpp/rclcpp.hpp>
#include "sweeper_interfaces/msg/can_frame.hpp" #include "sweeper_interfaces/msg/can_frame.hpp"
#include "mqtt_report/mqtt_client.hpp" // 包含上面修改后的MQTTClientWrapper #include "sweeper_interfaces/msg/vehicle_identity.hpp"
#include "mqtt_report/get_config.h"
#include "mqtt_report/report_struct.h"
#include "mqtt_report/fault_codes.h" #include "mqtt_report/fault_codes.h"
#include "httplib.h" #include "mqtt_report/get_config.h"
#include "mqtt_report/mqtt_client.hpp"
#include "mqtt_report/report_struct.h"
namespace sweeperMsg = sweeper_interfaces::msg; namespace sweeperMsg = sweeper_interfaces::msg;
using sweeper_interfaces::msg::VehicleIdentity;
Config config; // 清扫车配置文件 Config config;
GeneralMsg info_report; // 常规消息上报 int mqtt_reconnect_interval = 5000; // ms
int mqtt_reconnect_interval = 5000; // 重连间隔(ms)
// imei获取 // ===================== JSON 打包 =====================
std::string g_imei; static std::string pack_general_info_to_json(
std::mutex g_imei_mutex; const GeneralMsg &msg,
std::thread imei_thread_; const std::string &imei)
bool has_imei()
{
std::lock_guard<std::mutex> lock(g_imei_mutex);
return !g_imei.empty();
}
std::string get_imei()
{
std::lock_guard<std::mutex> lock(g_imei_mutex);
return g_imei;
}
void set_imei(const std::string &imei)
{
std::lock_guard<std::mutex> lock(g_imei_mutex);
g_imei = imei;
}
bool fetch_imei_from_neardi(const std::string &host, int port)
{
httplib::Client cli(host, port);
cli.set_connection_timeout(2);
cli.set_read_timeout(2);
// 只发送最核心的身份信息
nlohmann::json req;
req["vid"] = config.vid;
req["device_type"] = "orin";
req["timestamp"] = getCurrentTimestampMs();
auto res = cli.Post(
"/api/v1/device/register",
req.dump(),
"application/json");
if (!res)
{
std::cerr << "[REGISTER] HTTP request failed\n";
return false;
}
if (res->status != 200)
{
std::cerr << "[REGISTER] HTTP status = " << res->status << "\n";
return false;
}
try
{
auto j = nlohmann::json::parse(res->body);
if (!j.value("ok", false))
{
std::cerr << "[REGISTER] rejected: "
<< j.value("error", "unknown") << std::endl;
return false;
}
if (j.contains("server") && j["server"].contains("imei"))
{
std::string imei = j["server"]["imei"];
set_imei(imei);
std::cout << "[REGISTER] IMEI = " << imei << std::endl;
return true;
}
std::cerr << "[REGISTER] no imei in response\n";
}
catch (const std::exception &e)
{
std::cerr << "[REGISTER] JSON parse error: " << e.what() << "\n";
}
return false;
}
// 解析can报文做消息上报
void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg)
{
switch (msg->id)
{
// BMS_Status 100
case 0x100:
{
uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1]; // 总电压
int16_t current_raw = (msg->data[2] << 8) | msg->data[3]; // 电流
uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5]; // 剩余容量
float voltage = voltage_raw * 0.01f; // 10mV -> V 总电压
float current = current_raw * 0.01f; // 10mA -> A 电流
float capacity = capacity_raw * 0.01f; // 10mAh -> Ah 剩余容量
// 根据电流正负判断充放电状态
info_report.chargeStatus = (current >= 0) ? true : false;
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x100] 总电压: " << voltage << " V, "
<< "电流: " << current << " A, "
<< "剩余容量: " << capacity << " Ah, "
<< "是否在充电: " << (info_report.chargeStatus == 1 ? "" : "") << std::endl;
break;
}
// BMS_Status 101
case 0x101:
{
// 提取原始数据
uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1]; // 充满容量
uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3]; // 循环次数
uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5]; // 剩余容量百分比
// 数据换算
float full_capacity = full_capacity_raw * 0.01f; // 10mAh → Ah
uint16_t cycle_count = cycle_count_raw; // 单位:次
float rsoc = rsoc_raw * 1.0f; // 以百分比%
// 保存解析结果
info_report.power = rsoc;
// 输出打印
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, "
<< "循环次数: " << cycle_count << " 次, "
<< "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl;
break;
}
// MCU_Status_1 和 MCU_Fault基础ID0x0CF11E00叠加设备ID
case 0x0CF11E05:
{
// 解析电机实际转速data[0]低字节data[1]:高字节)
uint16_t raw_speed = (static_cast<uint16_t>(msg->data[1]) << 8) | msg->data[0];
float actual_speed_rpm = static_cast<float>(raw_speed); // 量程0-6000rpm1rpm/bit
// 转换为km/h
float gear_ratio = 32.0; // 电机速比
float wheel_radius = 0.3; // 轮胎直径(m)
info_report.speed = (actual_speed_rpm * 60 * 3.1416 * wheel_radius) / (1000 * gear_ratio);
// std::cout << "[0x0CF11E00] speed : " << info_report.speed << std::endl;
// 故障码data[6]低字节data[7]:高字节)
uint16_t mcu_fault_code = (static_cast<uint16_t>(msg->data[7]) << 8) | msg->data[6];
// 清空原有MCU故障码1101-1116对应ERR0-ERR15
for (int code = 1101; code <= 1116; ++code)
{
vehicle_error_code.removeErrorCode(code);
}
// 遍历16个故障位ERR0-ERR15
for (int i = 0; i < 16; ++i)
{
bool is_fault = (mcu_fault_code >> i) & 0x01; // 提取第i位故障状态
int error_code = 1101 + i; // 自定义故障码1101=ERR01116=ERR15
if (is_fault && faultMap.count(error_code)) // 故障存在且在映射表中
{
vehicle_error_code.addErrorCode(error_code);
updateFaultLevel(error_code, 1); // 协议未定义故障等级默认1级
}
else // 故障不存在
{
vehicle_error_code.removeErrorCode(error_code);
}
}
break;
}
// MCU_Status_2基础ID0x0CF11F00叠加设备ID
case 0x0CF11F05:
{
// 解析控制器温度data[2]偏移量40℃1℃/bit
int raw_ctrl_temp = static_cast<int>(msg->data[2]);
info_report.controllerTemp = raw_ctrl_temp - 40; // 实际温度=原始值-40
// 解析电机温度data[3]偏移量30℃1℃/bit
int raw_motor_temp = static_cast<int>(msg->data[3]);
info_report.motorTemp = raw_motor_temp - 30; // 实际温度=原始值-30
// 解析控制器状态data[4]BIT1-BIT0命令状态BIT3-BIT2反馈状态
uint8_t ctrl_status = msg->data[4];
// 反馈状态BIT3-BIT20=空挡1=前进2=后退3=保留
int feedback_gear = (ctrl_status >> 2) & 0x03;
info_report.gear = feedback_gear;
// std::cout << "[0x0CF11F00] controllerTemp : " << info_report.controllerTemp << std::endl;
// std::cout << "[0x0CF11F00] motorTemp : " << info_report.motorTemp << std::endl;
// std::cout << "[0x0CF11F00] gear : " << info_report.gear << std::endl;
break;
}
// EPS_Status and EPS_Fault
case 0x401:
{
// 第3、4字节拼接为原始角度数据
uint16_t raw_value = static_cast<uint16_t>((msg->data[3] << 8) | msg->data[4]);
info_report.steeringAngle = (raw_value - 1024) / 7.0f;
// std::cout << "[0x401] steeringAngle : " << info_report.steeringAngle << std::endl;
// 清空所有 EPS 错误码
for (int code = 1201; code <= 1218; ++code)
{
vehicle_error_code.removeErrorCode(code);
}
// 添加当前报文中的错误码(最多2个)
uint8_t data_bytes[2] = {msg->data[2], msg->data[6]};
for (uint8_t byte : data_bytes)
{
for (int i = 0; i < 18; ++i)
{
if (byte == epsErrorCode[i])
{
vehicle_error_code.addErrorCode(1201 + i);
break; // 每个 byte 最多匹配一个错误码
}
}
}
break;
}
// VCU_Fault_1和Fault_2报文解析 (ID: 0x216)
case 0x216:
{
// 提取故障1 (32位): msg->data[0] ~ msg->data[3]
uint32_t fault1_bits = (static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
// 提取故障2 (32位): msg->data[4] ~ msg->data[7]
uint32_t fault2_bits = (static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
// 清空VCU故障码1701 ~ 1764)
for (int code = 1701; code <= 1764; ++code)
{
vehicle_error_code.removeErrorCode(code);
}
// 处理故障1的bit0 ~ bit31 → 1701 ~ 1732
for (uint8_t bit = 0; bit <= 31; ++bit)
{
if ((fault1_bits >> bit) & 0x01)
{
vehicle_error_code.addErrorCode(1701 + bit);
}
}
// 处理故障2的bit0 ~ bit31 → 1733 ~ 1764
for (uint8_t bit = 0; bit <= 31; ++bit)
{
if ((fault2_bits >> bit) & 0x01)
{
vehicle_error_code.addErrorCode(1733 + bit);
}
}
break;
}
// VCU_Fault_3和Fault_4报文解析 (ID: 0x217)
case 0x217:
{
// 提取故障3 (32位): msg->data[0] ~ msg->data[3]
uint32_t fault3_bits = (static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
// 提取故障4 (32位): msg->data[4] ~ msg->data[7]
uint32_t fault4_bits = (static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
// 清空VCU故障码1765 ~ 1814
for (int code = 1765; code <= 1814; ++code)
{
vehicle_error_code.removeErrorCode(code);
}
// 处理故障3的bit0 ~ bit31 → 1765 ~ 1796
for (uint8_t bit = 0; bit <= 31; ++bit)
{
if ((fault3_bits >> bit) & 0x01)
{
vehicle_error_code.addErrorCode(1765 + bit);
}
}
// 处理故障4的bit0 ~ bit18 → 1797 ~ 1814
for (uint8_t bit = 0; bit <= 18; ++bit)
{
if ((fault4_bits >> bit) & 0x01)
{
vehicle_error_code.addErrorCode(1797 + bit);
}
}
break;
}
}
}
std::string pack_general_info_to_json(const GeneralMsg &msg)
{ {
ordered_json j; ordered_json j;
j["imei"] = imei;
if (has_imei())
j["imei"] = get_imei();
else
j["imei"] = nullptr;
j["power"] = msg.power; j["power"] = msg.power;
j["chargeStatus"] = msg.chargeStatus; j["chargeStatus"] = msg.chargeStatus;
j["gear"] = msg.gear; j["gear"] = msg.gear;
@ -345,25 +40,34 @@ std::string pack_general_info_to_json(const GeneralMsg &msg)
j["motorTemp"] = msg.motorTemp; j["motorTemp"] = msg.motorTemp;
j["controllerTemp"] = msg.controllerTemp; j["controllerTemp"] = msg.controllerTemp;
j["timestamp"] = msg.timestamp; j["timestamp"] = msg.timestamp;
return j.dump(-1);
return j.dump(4); // 输出有序 JSON 字符串
} }
// ===================== Node =====================
class CanDataSubscriber : public rclcpp::Node class CanDataSubscriber : public rclcpp::Node
{ {
public: public:
CanDataSubscriber(const Config &config) explicit CanDataSubscriber(const Config &cfg)
: Node("can_data_subscriber"), : Node("can_data_subscriber"),
mqtt_client_(config.mqtt_ip + ":" + std::to_string(config.mqtt_port), mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port),
generate_mqtt_client_id(), generate_mqtt_client_id(),
"zxwl", "zxwl1234@", "zxwl",
mqtt_reconnect_interval), // 传入重连间隔 "zxwl1234@",
info_topic_(config.info_topic), mqtt_reconnect_interval)
fault_topic_(config.fault_topic)
{ {
// CAN
subscription_ = this->create_subscription<sweeperMsg::CanFrame>( subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); "can_data",
10,
std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
// Identity关键
identity_sub_ = this->create_subscription<VehicleIdentity>(
"/vehicle/identity",
rclcpp::QoS(1).transient_local().reliable(),
std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1));
// Timers
info_timer_ = this->create_wall_timer( info_timer_ = this->create_wall_timer(
std::chrono::milliseconds(500), std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::info_timer_callback, this)); std::bind(&CanDataSubscriber::info_timer_callback, this));
@ -372,139 +76,405 @@ public:
std::chrono::milliseconds(500), std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::fault_timer_callback, this)); std::bind(&CanDataSubscriber::fault_timer_callback, this));
// 添加定期连接检查定时器每10秒
connection_check_timer_ = this->create_wall_timer( connection_check_timer_ = this->create_wall_timer(
std::chrono::seconds(10), std::chrono::seconds(10),
std::bind(&CanDataSubscriber::check_connection_callback, this)); std::bind(&CanDataSubscriber::check_connection_callback, this));
imei_thread_ = std::thread([this]()
{
while (rclcpp::ok())
{
bool ok = fetch_imei_from_neardi("192.168.5.1", 8080);
if (ok && !has_imei())
{
// 首次成功
}
// 不管成功还是失败,都睡
std::this_thread::sleep_for(std::chrono::seconds(5));
} });
} }
~CanDataSubscriber() ~CanDataSubscriber() override
{ {
// 停止所有定时器,避免回调继续执行 if (info_timer_)
info_timer_->cancel(); info_timer_->cancel();
fault_timer_->cancel(); if (fault_timer_)
connection_check_timer_->cancel(); fault_timer_->cancel();
if (connection_check_timer_)
if (imei_thread_.joinable()) connection_check_timer_->cancel();
imei_thread_.join();
} }
private: private:
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) // ===================== identity =====================
bool snapshot_identity(std::string &imei, std::string &vid)
{ {
Msg_Handler(msg); std::lock_guard<std::mutex> lock(identity_mutex_);
if (!vehicle_ready_)
return false;
imei = vehicle_imei_;
vid = vehicle_vid_;
return true;
} }
void identity_callback(const VehicleIdentity::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(identity_mutex_);
vehicle_imei_ = msg->imei;
vehicle_vid_ = msg->vid;
vehicle_ready_ = msg->ready;
RCLCPP_INFO(this->get_logger(),
"Vehicle identity received: IMEI=%s, VID=%s",
vehicle_imei_.c_str(),
vehicle_vid_.c_str());
}
// ===================== timers =====================
void info_timer_callback() void info_timer_callback()
{ {
if (!rclcpp::ok()) if (!rclcpp::ok())
{
return; return;
}
// 核心规则:没有 IMEI 就不上报 std::string imei, vid;
if (!has_imei()) if (!snapshot_identity(imei, vid))
{ {
RCLCPP_WARN_THROTTLE( RCLCPP_WARN_THROTTLE(
this->get_logger(), this->get_logger(),
*this->get_clock(), *this->get_clock(),
5000, 5000,
"IMEI not ready, skip info publish"); "Vehicle identity not ready, skip MQTT publish");
return; return;
} }
info_report.timestamp = getCurrentTimestampMs(); const std::string topic = "/zxwl/vehicle/" + vid + "/info";
std::string json_msg = pack_general_info_to_json(info_report);
if (!mqtt_client_.publish(info_topic_, json_msg, 1)) GeneralMsg snapshot;
{ {
RCLCPP_WARN(this->get_logger(), "Failed to publish info message to MQTT"); std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.timestamp = getCurrentTimestampMs();
snapshot = info_report_;
}
const std::string payload =
pack_general_info_to_json(snapshot, imei);
if (!mqtt_client_.publish(topic, payload, 1))
{
RCLCPP_WARN(this->get_logger(),
"Failed to publish info message to MQTT");
} }
} }
void fault_timer_callback() void fault_timer_callback()
{ {
if (!rclcpp::ok()) if (!rclcpp::ok())
{
return; return;
std::string imei, vid;
if (!snapshot_identity(imei, vid))
return;
const std::string topic = "/zxwl/vehicle/" + vid + "/fault";
std::vector<int> codes;
{
std::lock_guard<std::mutex> lock(fault_mutex_);
const auto &err_set = vehicle_error_code.getAllErrorCodes();
codes.assign(err_set.begin(), err_set.end());
} }
// 没有 IMEI一律不上报故障 for (int code : codes)
if (!has_imei())
{ {
return; const std::string payload =
} generateFaultJson(code, vid, getCurrentTimestampMs());
for (int code : vehicle_error_code.getAllErrorCodes()) if (!mqtt_client_.publish(topic, payload, 0))
{
std::string fault_json = generateFaultJson(code, config.vid, getCurrentTimestampMs());
if (!mqtt_client_.publish(fault_topic_, fault_json, 0))
{ {
RCLCPP_WARN(this->get_logger(), "Failed to publish fault code %d to MQTT", code); RCLCPP_WARN(this->get_logger(),
"Failed to publish fault code %d to MQTT",
code);
} }
} }
} }
// 定期检查连接状态,主动重连
void check_connection_callback() void check_connection_callback()
{ {
if (!mqtt_client_.isConnected()) if (!mqtt_client_.isConnected())
{ {
RCLCPP_WARN(this->get_logger(), "MQTT connection lost."); RCLCPP_WARN(this->get_logger(),
"MQTT connection lost.");
} }
} }
// ===================== CAN handler =====================
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
{
Msg_Handler(msg);
}
void Msg_Handler(const sweeperMsg::CanFrame::SharedPtr msg)
{
switch (msg->id)
{
case 0x100: // BMS_Status 100
{
uint16_t voltage_raw = (msg->data[0] << 8) | msg->data[1];
int16_t current_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t capacity_raw = (msg->data[4] << 8) | msg->data[5];
float voltage = voltage_raw * 0.01f;
float current = current_raw * 0.01f;
float capacity = capacity_raw * 0.01f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.chargeStatus = (current >= 0);
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x100] 总电压: " << voltage << " V, "
<< "电流: " << current << " A, "
<< "剩余容量: " << capacity << " Ah, "
<< "是否在充电: " << ((current >= 0) ? "" : "") << std::endl;
break;
}
case 0x101: // BMS_Status 101
{
uint16_t full_capacity_raw = (msg->data[0] << 8) | msg->data[1];
uint16_t cycle_count_raw = (msg->data[2] << 8) | msg->data[3];
uint16_t rsoc_raw = (msg->data[4] << 8) | msg->data[5];
float full_capacity = full_capacity_raw * 0.01f;
uint16_t cycle_count = cycle_count_raw;
float rsoc = rsoc_raw * 1.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.power = rsoc;
}
std::cout << std::fixed << std::setprecision(2);
std::cout << "[0x101] 充满容量: " << full_capacity << " Ah, "
<< "循环次数: " << cycle_count << " 次, "
<< "剩余容量百分比(RSOC): " << rsoc << " %" << std::endl;
break;
}
case 0x0CF11E05: // MCU_Status_1 + MCU_Fault
{
uint16_t raw_speed =
(static_cast<uint16_t>(msg->data[1]) << 8) | msg->data[0];
float actual_speed_rpm = static_cast<float>(raw_speed);
float gear_ratio = 32.0f;
float wheel_radius = 0.3f;
float speed_kmh =
(actual_speed_rpm * 60.0f * 3.1416f * wheel_radius) /
(1000.0f * gear_ratio);
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.speed = speed_kmh;
}
uint16_t mcu_fault_code =
(static_cast<uint16_t>(msg->data[7]) << 8) | msg->data[6];
{
std::lock_guard<std::mutex> lock(fault_mutex_);
// 先清 MCU 故障区
for (int code = 1101; code <= 1116; ++code)
vehicle_error_code.removeErrorCode(code);
for (int i = 0; i < 16; ++i)
{
bool is_fault = ((mcu_fault_code >> i) & 0x01) != 0;
int error_code = 1101 + i;
if (is_fault && faultMap.count(error_code))
{
vehicle_error_code.addErrorCode(error_code);
updateFaultLevel(error_code, 1);
}
else
{
vehicle_error_code.removeErrorCode(error_code);
}
}
}
break;
}
case 0x0CF11F05: // MCU_Status_2
{
int raw_ctrl_temp = static_cast<int>(msg->data[2]);
int raw_motor_temp = static_cast<int>(msg->data[3]);
uint8_t ctrl_status = msg->data[4];
int controllerTemp = raw_ctrl_temp - 40;
int motorTemp = raw_motor_temp - 30;
int feedback_gear = (ctrl_status >> 2) & 0x03;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.controllerTemp = controllerTemp;
info_report_.motorTemp = motorTemp;
info_report_.gear = feedback_gear;
}
break;
}
case 0x401: // EPS_Status + EPS_Fault
{
uint16_t raw_value =
static_cast<uint16_t>((msg->data[3] << 8) | msg->data[4]);
float steeringAngle = (raw_value - 1024) / 7.0f;
{
std::lock_guard<std::mutex> lock(info_mutex_);
info_report_.steeringAngle = steeringAngle;
}
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1201; code <= 1218; ++code)
vehicle_error_code.removeErrorCode(code);
uint8_t data_bytes[2] = {msg->data[2], msg->data[6]};
for (uint8_t byte : data_bytes)
{
for (int i = 0; i < 18; ++i)
{
if (byte == epsErrorCode[i])
{
vehicle_error_code.addErrorCode(1201 + i);
break;
}
}
}
}
break;
}
case 0x216: // VCU_Fault_1/2
{
uint32_t fault1_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault2_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1701; code <= 1764; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault1_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1701 + bit);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault2_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1733 + bit);
}
break;
}
case 0x217: // VCU_Fault_3/4
{
uint32_t fault3_bits =
(static_cast<uint32_t>(msg->data[0]) << 0) |
(static_cast<uint32_t>(msg->data[1]) << 8) |
(static_cast<uint32_t>(msg->data[2]) << 16) |
(static_cast<uint32_t>(msg->data[3]) << 24);
uint32_t fault4_bits =
(static_cast<uint32_t>(msg->data[4]) << 0) |
(static_cast<uint32_t>(msg->data[5]) << 8) |
(static_cast<uint32_t>(msg->data[6]) << 16) |
(static_cast<uint32_t>(msg->data[7]) << 24);
{
std::lock_guard<std::mutex> lock(fault_mutex_);
for (int code = 1765; code <= 1814; ++code)
vehicle_error_code.removeErrorCode(code);
for (uint8_t bit = 0; bit <= 31; ++bit)
if ((fault3_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1765 + bit);
for (uint8_t bit = 0; bit <= 18; ++bit)
if ((fault4_bits >> bit) & 0x01)
vehicle_error_code.addErrorCode(1797 + bit);
}
break;
}
default:
break;
}
}
private:
// ROS
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_; rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
MQTTClientWrapper mqtt_client_; rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
std::string info_topic_;
std::string fault_topic_;
rclcpp::TimerBase::SharedPtr info_timer_; rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_; rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_; // 新增:连接检查定时器 rclcpp::TimerBase::SharedPtr connection_check_timer_;
// MQTT
MQTTClientWrapper mqtt_client_;
// Data
GeneralMsg info_report_;
std::mutex info_mutex_;
std::mutex fault_mutex_;
// Identity cache
std::mutex identity_mutex_;
std::string vehicle_imei_;
std::string vehicle_vid_;
bool vehicle_ready_ = false;
}; };
// ===================== main =====================
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
rclcpp::init(argc, argv); rclcpp::init(argc, argv);
if (!load_config(config)) if (!load_config(config))
{ {
RCLCPP_ERROR(rclcpp::get_logger("main"), "Failed to load MQTT config."); RCLCPP_ERROR(rclcpp::get_logger("main"),
"Failed to load MQTT config.");
return 1; return 1;
} }
// 确保配置中有重连间隔默认5000ms
if (mqtt_reconnect_interval <= 0) if (mqtt_reconnect_interval <= 0)
{
mqtt_reconnect_interval = 5000; mqtt_reconnect_interval = 5000;
}
auto node = std::make_shared<CanDataSubscriber>(config); auto node = std::make_shared<CanDataSubscriber>(config);
RCLCPP_INFO(node->get_logger(), "info_topic : %s", config.info_topic.c_str()); RCLCPP_INFO(node->get_logger(),
RCLCPP_INFO(node->get_logger(), "fault_topic : %s", config.fault_topic.c_str()); "mqtt_ip: %s",
RCLCPP_INFO(node->get_logger(), "vid : %s", config.vid.c_str()); config.mqtt_ip.c_str());
RCLCPP_INFO(node->get_logger(), "mqtt_ip : %s", config.mqtt_ip.c_str()); RCLCPP_INFO(node->get_logger(),
RCLCPP_INFO(node->get_logger(), "MQTT reconnection interval: %d ms", mqtt_reconnect_interval); "info_topic: %s",
config.info_topic.c_str());
RCLCPP_INFO(node->get_logger(), "MqttReport node started."); RCLCPP_INFO(node->get_logger(),
"fault_topic: %s",
config.fault_topic.c_str());
RCLCPP_INFO(node->get_logger(),
"MQTT reconnection interval: %d ms",
mqtt_reconnect_interval);
RCLCPP_INFO(node->get_logger(),
"MqttReport node started.");
rclcpp::spin(node); rclcpp::spin(node);
rclcpp::shutdown(); rclcpp::shutdown();
return 0; return 0;
} }