Compare commits

...

2 Commits

Author SHA1 Message Date
lyq
180bf5df50 调整mqtt节点架构 2026-01-15 09:02:54 +08:00
lyq
d3246d551d 调整mqtt_node架构 2026-01-15 09:01:56 +08:00
9 changed files with 415 additions and 359 deletions

View File

@ -31,10 +31,12 @@ include_directories(include ${catkin_INCLUDE_DIRS})
add_executable( add_executable(
mqtt_report_node mqtt_report_node
src/mqtt_report_node.cpp src/node/mqtt_vehicle_node.cpp
src/core/input_health_monitor.cpp
src/get_config.cpp src/get_config.cpp
src/fault_codes.cpp src/fault_codes.cpp
src/report_struct.cpp src/report_struct.cpp
src/mqtt/mqtt_reporter.cpp
src/can/can_decoder.cpp) src/can/can_decoder.cpp)
ament_target_dependencies( ament_target_dependencies(

View File

@ -2,7 +2,7 @@
#pragma once #pragma once
#include "mqtt_report/vehicle_context.h" #include "mqtt_report/core/vehicle_context.h"
#include "sweeper_interfaces/msg/can_frame.hpp" #include "sweeper_interfaces/msg/can_frame.hpp"
namespace sweeperMsg = sweeper_interfaces::msg; namespace sweeperMsg = sweeper_interfaces::msg;

View File

@ -0,0 +1,32 @@
#pragma once
#include <cstdint>
#include <rclcpp/logger.hpp>
struct InputMonitor
{
bool ok = false;
bool sticky = false;
int64_t last_rx_ts = 0;
int64_t last_warn_ts = 0;
};
class InputHealthMonitor
{
public:
InputHealthMonitor();
void on_identity_rx(int64_t now);
void on_gps_rx(int64_t now);
void on_can_rx(int64_t now);
void check(rclcpp::Logger logger, int64_t now, bool mqtt_connected);
private:
void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms);
private:
InputMonitor identity_{false, true};
InputMonitor gps_;
InputMonitor can_;
};

View File

@ -0,0 +1,25 @@
#pragma once
#include <string>
#include "mqtt_report/core/vehicle_context.h"
#include "mqtt_report/get_config.h"
#include "mqtt_report/mqtt_client.hpp"
class MqttReporter
{
public:
MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config);
void publish_info();
void publish_gps();
void publish_faults();
private:
std::string make_topic(const std::string& tpl, const std::string& vid) const;
private:
VehicleContext& ctx_;
MQTTClientWrapper& mqtt_;
const Config& config_;
};

View File

@ -0,0 +1,60 @@
#include "mqtt_report/core/input_health_monitor.h"
#include <rclcpp/rclcpp.hpp>
static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000;
static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000;
static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000;
static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000;
InputHealthMonitor::InputHealthMonitor() { identity_.sticky = true; }
void InputHealthMonitor::on_identity_rx(int64_t now)
{
identity_.ok = true;
identity_.last_rx_ts = now;
}
void InputHealthMonitor::on_gps_rx(int64_t now)
{
gps_.ok = true;
gps_.last_rx_ts = now;
}
void InputHealthMonitor::on_can_rx(int64_t now)
{
can_.ok = true;
can_.last_rx_ts = now;
}
void InputHealthMonitor::check(rclcpp::Logger logger, int64_t now, bool mqtt_connected)
{
if (!mqtt_connected)
{
RCLCPP_WARN(logger, "MQTT connection lost.");
}
check_input("vehicle identity (/vehicle/identity)", identity_, now, IDENTITY_TIMEOUT_MS);
check_input("GPS (/rtk_message)", gps_, now, GPS_TIMEOUT_MS);
check_input("CAN (/can_data)", can_, now, CAN_TIMEOUT_MS);
}
void InputHealthMonitor::check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms)
{
if (mon.sticky && mon.ok) return;
if (mon.ok && now - mon.last_rx_ts > timeout_ms)
{
mon.ok = false;
RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "%s timeout (%ld ms without data).", name,
now - mon.last_rx_ts);
mon.last_warn_ts = now;
return;
}
if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS)
{
RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "Waiting for %s...", name);
mon.last_warn_ts = now;
}
}

View File

@ -0,0 +1,99 @@
#include "mqtt_report/mqtt/mqtt_reporter.h"
#include "mqtt_report/report_struct.h"
MqttReporter::MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config)
: ctx_(ctx), mqtt_(mqtt), config_(config)
{
}
std::string MqttReporter::make_topic(const std::string& tpl, const std::string& vid) const
{
std::string topic = tpl;
const std::string key = "{vid}";
auto pos = topic.find(key);
if (pos != std::string::npos)
{
topic.replace(pos, key.size(), vid);
}
return topic;
}
void MqttReporter::publish_info()
{
std::string imei, vid;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.ready) return;
imei = ctx_.imei;
vid = ctx_.vid;
ctx_.info.timestamp = getCurrentTimestampMs();
}
GeneralMsg snapshot;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
snapshot = ctx_.info;
}
ordered_json j;
j["imei"] = imei;
j["power"] = snapshot.power;
j["chargeStatus"] = snapshot.chargeStatus;
j["gear"] = snapshot.gear;
j["speed"] = snapshot.speed;
j["steeringAngle"] = snapshot.steeringAngle;
j["motorTemp"] = snapshot.motorTemp;
j["controllerTemp"] = snapshot.controllerTemp;
j["timestamp"] = snapshot.timestamp;
mqtt_.publish(make_topic(config_.info_topic_template, vid), j.dump(), 1);
}
void MqttReporter::publish_gps()
{
std::string imei, vid;
GpsMsg gps;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.ready || !ctx_.gps.valid) return;
imei = ctx_.imei;
vid = ctx_.vid;
gps = ctx_.gps;
}
ordered_json j;
j["imei"] = imei;
j["lat"] = gps.lat;
j["lng"] = gps.lng;
j["course"] = gps.course;
j["mode"] = gps.mode;
j["gps_timestamp"] = gps.timestamp;
j["report_timestamp"] = getCurrentTimestampMs();
mqtt_.publish(make_topic(config_.gps_topic_template, vid), j.dump(), 1);
}
void MqttReporter::publish_faults()
{
std::string vid;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.ready) return;
vid = ctx_.vid;
}
std::vector<int> codes;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
const auto& err_set = ctx_.errors.getAllErrorCodes();
codes.assign(err_set.begin(), err_set.end());
}
for (int code : codes)
{
mqtt_.publish(make_topic(config_.fault_topic_template, vid),
generateFaultJson(code, vid, getCurrentTimestampMs()), 0);
}
}

View File

@ -1,357 +0,0 @@
// mqtt_report_node.cpp
#include <chrono>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <vector>
#include "mqtt_report/can/can_decoder.h"
#include "mqtt_report/get_config.h"
#include "mqtt_report/mqtt_client.hpp"
#include "mqtt_report/report_struct.h"
#include "mqtt_report/vehicle_context.h"
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "sweeper_interfaces/msg/rtk.hpp"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
namespace sweeperMsg = sweeper_interfaces::msg;
using sweeper_interfaces::msg::VehicleIdentity;
int mqtt_reconnect_interval = 5000; // ms
static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000; // 每 10s 警告一次
static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000;
static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000;
static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000;
struct InputMonitor
{
bool ok = false;
bool sticky = false; // 是否一次成功就永久 OK
int64_t last_rx_ts = 0;
int64_t last_warn_ts = 0;
};
// ===================== JSON 打包 =====================
static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei)
{
ordered_json j;
j["imei"] = imei;
j["power"] = msg.power;
j["chargeStatus"] = msg.chargeStatus;
j["gear"] = msg.gear;
j["speed"] = msg.speed;
j["steeringAngle"] = msg.steeringAngle;
j["motorTemp"] = msg.motorTemp;
j["controllerTemp"] = msg.controllerTemp;
j["timestamp"] = msg.timestamp;
return j.dump(-1);
}
static std::string pack_gps_to_json(const GpsMsg& gps, const std::string& imei)
{
ordered_json j;
j["imei"] = imei;
j["lat"] = gps.lat;
j["lng"] = gps.lng;
j["course"] = gps.course;
j["mode"] = gps.mode;
j["gps_timestamp"] = gps.timestamp;
j["report_timestamp"] = getCurrentTimestampMs();
return j.dump(-1);
}
static std::string make_topic(const std::string& tpl, const std::string& vid)
{
std::string topic = tpl;
const std::string key = "{vid}";
auto pos = topic.find(key);
if (pos != std::string::npos)
{
topic.replace(pos, key.size(), vid);
}
return topic;
}
// ===================== Node =====================
class CanDataSubscriber : public rclcpp::Node
{
public:
explicit CanDataSubscriber(const Config& cfg)
: Node("can_data_subscriber"),
config_(cfg),
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
cfg.mqtt_password, mqtt_reconnect_interval),
can_decoder_(ctx_)
{
// CAN
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
// Identity
identity_sub_ = this->create_subscription<VehicleIdentity>(
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1));
// rtk
rtk_sub_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
"rtk_message", 10, std::bind(&CanDataSubscriber::rtk_callback, this, std::placeholders::_1));
// Timers
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::info_timer_callback, this));
gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200),
std::bind(&CanDataSubscriber::gps_timer_callback, this));
fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
std::bind(&CanDataSubscriber::fault_timer_callback, this));
connection_check_timer_ = this->create_wall_timer(
std::chrono::seconds(10), std::bind(&CanDataSubscriber::check_connection_callback, this));
}
~CanDataSubscriber() override
{
if (info_timer_) info_timer_->cancel();
if (gps_timer_) gps_timer_->cancel();
if (fault_timer_) fault_timer_->cancel();
if (connection_check_timer_) connection_check_timer_->cancel();
}
private:
// ===================== identity =====================
void identity_callback(const VehicleIdentity::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.imei = msg->imei;
ctx_.vid = msg->vid;
ctx_.ready = msg->ready;
int64_t now = getCurrentTimestampMs();
if (!identity_mon_.ok)
{
RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", ctx_.imei.c_str(), ctx_.vid.c_str());
}
identity_mon_.ok = true;
identity_mon_.last_rx_ts = now;
}
bool snapshot_identity(std::string& imei, std::string& vid)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.ready) return false;
imei = ctx_.imei;
vid = ctx_.vid;
return true;
}
// ===================== timers =====================
void info_timer_callback()
{
std::string imei, vid;
if (!snapshot_identity(imei, vid)) return;
const std::string topic = make_topic(config_.info_topic_template, vid);
GeneralMsg snapshot;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.info.timestamp = getCurrentTimestampMs();
snapshot = ctx_.info;
}
mqtt_client_.publish(topic, pack_general_info_to_json(snapshot, imei), 1);
}
void gps_timer_callback()
{
std::string imei, vid;
if (!snapshot_identity(imei, vid)) return;
GpsMsg snapshot;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.gps.valid) return;
snapshot = ctx_.gps;
}
const std::string topic = make_topic(config_.gps_topic_template, vid);
mqtt_client_.publish(topic, pack_gps_to_json(snapshot, imei), 1);
}
void fault_timer_callback()
{
std::string imei, vid;
if (!snapshot_identity(imei, vid)) return;
const std::string topic = make_topic(config_.fault_topic_template, vid);
std::vector<int> codes;
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
const auto& err_set = ctx_.errors.getAllErrorCodes();
codes.assign(err_set.begin(), err_set.end());
}
for (int code : codes)
{
mqtt_client_.publish(topic, generateFaultJson(code, vid, getCurrentTimestampMs()), 0);
}
}
void check_connection_callback()
{
int64_t now = getCurrentTimestampMs();
if (!mqtt_client_.isConnected())
{
RCLCPP_WARN(get_logger(), "MQTT connection lost.");
}
check_input("vehicle identity (/vehicle/identity)", identity_mon_, now, IDENTITY_TIMEOUT_MS);
check_input("GPS (/rtk_message)", gps_mon_, now, GPS_TIMEOUT_MS);
check_input("CAN (/can_data)", can_mon_, now, CAN_TIMEOUT_MS);
}
void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms)
{
// sticky 输入:一旦 OK不再 timeout
if (mon.sticky && mon.ok) return;
// 从 OK → 不 OK流断了
if (mon.ok && now - mon.last_rx_ts > timeout_ms)
{
mon.ok = false;
RCLCPP_WARN(get_logger(), "%s timeout (%ld ms without data).", name, now - mon.last_rx_ts);
mon.last_warn_ts = now;
return;
}
// 一直不 OK → 低频 WARN
if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS)
{
RCLCPP_WARN(get_logger(), "Waiting for %s...", name);
mon.last_warn_ts = now;
}
}
// ========== CAN ==========
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
{
can_decoder_.handle(msg);
int64_t now = getCurrentTimestampMs();
if (!can_mon_.ok)
{
RCLCPP_INFO(get_logger(), "CAN data stream active (/can_data).");
}
can_mon_.ok = true;
can_mon_.last_rx_ts = now;
}
// ========== RTK ==========
void rtk_callback(const sweeper_interfaces::msg::Rtk::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.gps.valid = true;
ctx_.gps.lat = msg->lat;
ctx_.gps.lng = msg->lon;
ctx_.gps.course = msg->head;
if (msg->p_quality == 5)
ctx_.gps.mode = 2;
else if (msg->p_quality == 4)
ctx_.gps.mode = 3;
else
ctx_.gps.mode = msg->p_quality;
ctx_.gps.timestamp = getCurrentTimestampMs();
ctx_.gps.valid = true;
int64_t now = ctx_.gps.timestamp;
if (!gps_mon_.ok)
{
RCLCPP_INFO(get_logger(), "GPS data stream active (/rtk_message).");
}
gps_mon_.ok = true;
gps_mon_.last_rx_ts = now;
}
private:
// CAN decoder
VehicleContext ctx_;
CanDecoder can_decoder_;
Config config_;
// MQTT
MQTTClientWrapper mqtt_client_;
// ROS
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
rclcpp::Subscription<sweeper_interfaces::msg::Rtk>::SharedPtr rtk_sub_;
rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr gps_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_;
InputMonitor identity_mon_{false, true};
InputMonitor gps_mon_;
InputMonitor can_mon_;
};
// ===================== main =====================
int main(int argc, char** argv)
{
std::string config_path = "./config.json"; // 默认配置
// 手动解析 --config
for (int i = 1; i < argc; ++i)
{
if (std::string(argv[i]) == "--config" && i + 1 < argc)
{
config_path = argv[i + 1];
++i; // 跳过参数值
}
}
rclcpp::init(argc, argv);
Config config;
auto logger = rclcpp::get_logger("main");
if (!load_config(config_path, config))
{
RCLCPP_ERROR(logger, "Failed to load config: %s", config_path.c_str());
return 1;
}
if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000;
RCLCPP_INFO(logger, "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(), config.mqtt_port,
config.mqtt_username.empty() ? "<anonymous>" : config.mqtt_username.c_str());
RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(),
config.info_topic_template.c_str(), config.fault_topic_template.c_str());
auto node = std::make_shared<CanDataSubscriber>(config);
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

View File

@ -0,0 +1,195 @@
// mqtt_report_node.cpp
#include <chrono>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <vector>
#include "mqtt_report/can/can_decoder.h"
#include "mqtt_report/core/input_health_monitor.h"
#include "mqtt_report/core/vehicle_context.h"
#include "mqtt_report/get_config.h"
#include "mqtt_report/mqtt/mqtt_reporter.h"
#include "mqtt_report/report_struct.h"
#include "sweeper_interfaces/msg/can_frame.hpp"
#include "sweeper_interfaces/msg/rtk.hpp"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
namespace sweeperMsg = sweeper_interfaces::msg;
using sweeper_interfaces::msg::VehicleIdentity;
int mqtt_reconnect_interval = 5000; // ms
// ===================== Node =====================
class VehicleReportNode : public rclcpp::Node
{
public:
explicit VehicleReportNode(const Config& cfg)
: Node("can_data_subscriber"),
config_(cfg),
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
cfg.mqtt_password, mqtt_reconnect_interval),
can_decoder_(ctx_),
reporter_(ctx_, mqtt_client_, config_)
{
// CAN
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
"can_data", 10, std::bind(&VehicleReportNode::topic_callback, this, std::placeholders::_1));
// Identity
identity_sub_ = this->create_subscription<VehicleIdentity>(
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
std::bind(&VehicleReportNode::identity_callback, this, std::placeholders::_1));
// rtk
rtk_sub_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
"rtk_message", 10, std::bind(&VehicleReportNode::rtk_callback, this, std::placeholders::_1));
// Timers
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_info(); });
gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200), [this]() { reporter_.publish_gps(); });
fault_timer_ =
this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_faults(); });
connection_check_timer_ = this->create_wall_timer(
std::chrono::seconds(10),
[this]() { health_.check(get_logger(), getCurrentTimestampMs(), mqtt_client_.isConnected()); });
}
~VehicleReportNode() override
{
if (info_timer_) info_timer_->cancel();
if (gps_timer_) gps_timer_->cancel();
if (fault_timer_) fault_timer_->cancel();
if (connection_check_timer_) connection_check_timer_->cancel();
}
private:
// ===================== identity =====================
void identity_callback(const VehicleIdentity::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.imei = msg->imei;
ctx_.vid = msg->vid;
ctx_.ready = msg->ready;
RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", msg->imei.c_str(), msg->vid.c_str());
int64_t now = getCurrentTimestampMs();
health_.on_identity_rx(now);
}
bool snapshot_identity(std::string& imei, std::string& vid)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
if (!ctx_.ready) return false;
imei = ctx_.imei;
vid = ctx_.vid;
return true;
}
// ========== CAN ==========
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
{
can_decoder_.handle(msg);
int64_t now = getCurrentTimestampMs();
health_.on_can_rx(now);
}
// ========== RTK ==========
void rtk_callback(const sweeper_interfaces::msg::Rtk::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(ctx_.mtx);
ctx_.gps.valid = true;
ctx_.gps.lat = msg->lat;
ctx_.gps.lng = msg->lon;
ctx_.gps.course = msg->head;
if (msg->p_quality == 5)
ctx_.gps.mode = 2;
else if (msg->p_quality == 4)
ctx_.gps.mode = 3;
else
ctx_.gps.mode = msg->p_quality;
ctx_.gps.timestamp = getCurrentTimestampMs();
ctx_.gps.valid = true;
int64_t now = ctx_.gps.timestamp;
health_.on_gps_rx(now);
}
private:
// CAN decoder
VehicleContext ctx_;
CanDecoder can_decoder_;
Config config_;
// MQTT
MQTTClientWrapper mqtt_client_;
// ROS
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
rclcpp::Subscription<sweeper_interfaces::msg::Rtk>::SharedPtr rtk_sub_;
rclcpp::TimerBase::SharedPtr info_timer_;
rclcpp::TimerBase::SharedPtr gps_timer_;
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_;
InputHealthMonitor health_;
MqttReporter reporter_;
};
// ===================== main =====================
int main(int argc, char** argv)
{
std::string config_path = "./config.json"; // 默认配置
// 手动解析 --config
for (int i = 1; i < argc; ++i)
{
if (std::string(argv[i]) == "--config" && i + 1 < argc)
{
config_path = argv[i + 1];
++i; // 跳过参数值
}
}
rclcpp::init(argc, argv);
Config config;
auto logger = rclcpp::get_logger("main");
if (!load_config(config_path, config))
{
RCLCPP_ERROR(logger, "Failed to load config: %s", config_path.c_str());
return 1;
}
if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000;
RCLCPP_INFO(logger, "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(), config.mqtt_port,
config.mqtt_username.empty() ? "<anonymous>" : config.mqtt_username.c_str());
RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(),
config.info_topic_template.c_str(), config.fault_topic_template.c_str());
auto node = std::make_shared<VehicleReportNode>(config);
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}