Compare commits
2 Commits
43b8299e6a
...
180bf5df50
| Author | SHA1 | Date | |
|---|---|---|---|
| 180bf5df50 | |||
| d3246d551d |
@ -31,10 +31,12 @@ include_directories(include ${catkin_INCLUDE_DIRS})
|
||||
|
||||
add_executable(
|
||||
mqtt_report_node
|
||||
src/mqtt_report_node.cpp
|
||||
src/node/mqtt_vehicle_node.cpp
|
||||
src/core/input_health_monitor.cpp
|
||||
src/get_config.cpp
|
||||
src/fault_codes.cpp
|
||||
src/report_struct.cpp
|
||||
src/mqtt/mqtt_reporter.cpp
|
||||
src/can/can_decoder.cpp)
|
||||
|
||||
ament_target_dependencies(
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mqtt_report/vehicle_context.h"
|
||||
#include "mqtt_report/core/vehicle_context.h"
|
||||
#include "sweeper_interfaces/msg/can_frame.hpp"
|
||||
|
||||
namespace sweeperMsg = sweeper_interfaces::msg;
|
||||
|
||||
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <rclcpp/logger.hpp>
|
||||
|
||||
struct InputMonitor
|
||||
{
|
||||
bool ok = false;
|
||||
bool sticky = false;
|
||||
int64_t last_rx_ts = 0;
|
||||
int64_t last_warn_ts = 0;
|
||||
};
|
||||
|
||||
class InputHealthMonitor
|
||||
{
|
||||
public:
|
||||
InputHealthMonitor();
|
||||
|
||||
void on_identity_rx(int64_t now);
|
||||
void on_gps_rx(int64_t now);
|
||||
void on_can_rx(int64_t now);
|
||||
|
||||
void check(rclcpp::Logger logger, int64_t now, bool mqtt_connected);
|
||||
|
||||
private:
|
||||
void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms);
|
||||
|
||||
private:
|
||||
InputMonitor identity_{false, true};
|
||||
InputMonitor gps_;
|
||||
InputMonitor can_;
|
||||
};
|
||||
@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "mqtt_report/core/vehicle_context.h"
|
||||
#include "mqtt_report/get_config.h"
|
||||
#include "mqtt_report/mqtt_client.hpp"
|
||||
|
||||
class MqttReporter
|
||||
{
|
||||
public:
|
||||
MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config);
|
||||
|
||||
void publish_info();
|
||||
void publish_gps();
|
||||
void publish_faults();
|
||||
|
||||
private:
|
||||
std::string make_topic(const std::string& tpl, const std::string& vid) const;
|
||||
|
||||
private:
|
||||
VehicleContext& ctx_;
|
||||
MQTTClientWrapper& mqtt_;
|
||||
const Config& config_;
|
||||
};
|
||||
@ -0,0 +1,60 @@
|
||||
#include "mqtt_report/core/input_health_monitor.h"
|
||||
|
||||
#include <rclcpp/rclcpp.hpp>
|
||||
|
||||
static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000;
|
||||
static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000;
|
||||
static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000;
|
||||
static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000;
|
||||
|
||||
InputHealthMonitor::InputHealthMonitor() { identity_.sticky = true; }
|
||||
|
||||
void InputHealthMonitor::on_identity_rx(int64_t now)
|
||||
{
|
||||
identity_.ok = true;
|
||||
identity_.last_rx_ts = now;
|
||||
}
|
||||
|
||||
void InputHealthMonitor::on_gps_rx(int64_t now)
|
||||
{
|
||||
gps_.ok = true;
|
||||
gps_.last_rx_ts = now;
|
||||
}
|
||||
|
||||
void InputHealthMonitor::on_can_rx(int64_t now)
|
||||
{
|
||||
can_.ok = true;
|
||||
can_.last_rx_ts = now;
|
||||
}
|
||||
|
||||
void InputHealthMonitor::check(rclcpp::Logger logger, int64_t now, bool mqtt_connected)
|
||||
{
|
||||
if (!mqtt_connected)
|
||||
{
|
||||
RCLCPP_WARN(logger, "MQTT connection lost.");
|
||||
}
|
||||
|
||||
check_input("vehicle identity (/vehicle/identity)", identity_, now, IDENTITY_TIMEOUT_MS);
|
||||
check_input("GPS (/rtk_message)", gps_, now, GPS_TIMEOUT_MS);
|
||||
check_input("CAN (/can_data)", can_, now, CAN_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
void InputHealthMonitor::check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms)
|
||||
{
|
||||
if (mon.sticky && mon.ok) return;
|
||||
|
||||
if (mon.ok && now - mon.last_rx_ts > timeout_ms)
|
||||
{
|
||||
mon.ok = false;
|
||||
RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "%s timeout (%ld ms without data).", name,
|
||||
now - mon.last_rx_ts);
|
||||
mon.last_warn_ts = now;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS)
|
||||
{
|
||||
RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "Waiting for %s...", name);
|
||||
mon.last_warn_ts = now;
|
||||
}
|
||||
}
|
||||
99
src/communication/mqtt_report/src/mqtt/mqtt_reporter.cpp
Normal file
99
src/communication/mqtt_report/src/mqtt/mqtt_reporter.cpp
Normal file
@ -0,0 +1,99 @@
|
||||
#include "mqtt_report/mqtt/mqtt_reporter.h"
|
||||
|
||||
#include "mqtt_report/report_struct.h"
|
||||
|
||||
MqttReporter::MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config)
|
||||
: ctx_(ctx), mqtt_(mqtt), config_(config)
|
||||
{
|
||||
}
|
||||
|
||||
std::string MqttReporter::make_topic(const std::string& tpl, const std::string& vid) const
|
||||
{
|
||||
std::string topic = tpl;
|
||||
const std::string key = "{vid}";
|
||||
auto pos = topic.find(key);
|
||||
if (pos != std::string::npos)
|
||||
{
|
||||
topic.replace(pos, key.size(), vid);
|
||||
}
|
||||
return topic;
|
||||
}
|
||||
|
||||
void MqttReporter::publish_info()
|
||||
{
|
||||
std::string imei, vid;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
if (!ctx_.ready) return;
|
||||
imei = ctx_.imei;
|
||||
vid = ctx_.vid;
|
||||
ctx_.info.timestamp = getCurrentTimestampMs();
|
||||
}
|
||||
|
||||
GeneralMsg snapshot;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
snapshot = ctx_.info;
|
||||
}
|
||||
|
||||
ordered_json j;
|
||||
j["imei"] = imei;
|
||||
j["power"] = snapshot.power;
|
||||
j["chargeStatus"] = snapshot.chargeStatus;
|
||||
j["gear"] = snapshot.gear;
|
||||
j["speed"] = snapshot.speed;
|
||||
j["steeringAngle"] = snapshot.steeringAngle;
|
||||
j["motorTemp"] = snapshot.motorTemp;
|
||||
j["controllerTemp"] = snapshot.controllerTemp;
|
||||
j["timestamp"] = snapshot.timestamp;
|
||||
|
||||
mqtt_.publish(make_topic(config_.info_topic_template, vid), j.dump(), 1);
|
||||
}
|
||||
|
||||
void MqttReporter::publish_gps()
|
||||
{
|
||||
std::string imei, vid;
|
||||
GpsMsg gps;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
if (!ctx_.ready || !ctx_.gps.valid) return;
|
||||
imei = ctx_.imei;
|
||||
vid = ctx_.vid;
|
||||
gps = ctx_.gps;
|
||||
}
|
||||
|
||||
ordered_json j;
|
||||
j["imei"] = imei;
|
||||
j["lat"] = gps.lat;
|
||||
j["lng"] = gps.lng;
|
||||
j["course"] = gps.course;
|
||||
j["mode"] = gps.mode;
|
||||
j["gps_timestamp"] = gps.timestamp;
|
||||
j["report_timestamp"] = getCurrentTimestampMs();
|
||||
|
||||
mqtt_.publish(make_topic(config_.gps_topic_template, vid), j.dump(), 1);
|
||||
}
|
||||
|
||||
void MqttReporter::publish_faults()
|
||||
{
|
||||
std::string vid;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
if (!ctx_.ready) return;
|
||||
vid = ctx_.vid;
|
||||
}
|
||||
|
||||
std::vector<int> codes;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
const auto& err_set = ctx_.errors.getAllErrorCodes();
|
||||
codes.assign(err_set.begin(), err_set.end());
|
||||
}
|
||||
|
||||
for (int code : codes)
|
||||
{
|
||||
mqtt_.publish(make_topic(config_.fault_topic_template, vid),
|
||||
generateFaultJson(code, vid, getCurrentTimestampMs()), 0);
|
||||
}
|
||||
}
|
||||
@ -1,357 +0,0 @@
|
||||
// mqtt_report_node.cpp
|
||||
|
||||
#include <chrono>
|
||||
#include <rclcpp/rclcpp.hpp>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "mqtt_report/can/can_decoder.h"
|
||||
#include "mqtt_report/get_config.h"
|
||||
#include "mqtt_report/mqtt_client.hpp"
|
||||
#include "mqtt_report/report_struct.h"
|
||||
#include "mqtt_report/vehicle_context.h"
|
||||
#include "sweeper_interfaces/msg/can_frame.hpp"
|
||||
#include "sweeper_interfaces/msg/rtk.hpp"
|
||||
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
|
||||
|
||||
namespace sweeperMsg = sweeper_interfaces::msg;
|
||||
using sweeper_interfaces::msg::VehicleIdentity;
|
||||
|
||||
int mqtt_reconnect_interval = 5000; // ms
|
||||
|
||||
static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000; // 每 10s 警告一次
|
||||
static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000;
|
||||
static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000;
|
||||
static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000;
|
||||
|
||||
struct InputMonitor
|
||||
{
|
||||
bool ok = false;
|
||||
bool sticky = false; // 是否一次成功就永久 OK
|
||||
int64_t last_rx_ts = 0;
|
||||
int64_t last_warn_ts = 0;
|
||||
};
|
||||
|
||||
// ===================== JSON 打包 =====================
|
||||
static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei)
|
||||
{
|
||||
ordered_json j;
|
||||
j["imei"] = imei;
|
||||
j["power"] = msg.power;
|
||||
j["chargeStatus"] = msg.chargeStatus;
|
||||
j["gear"] = msg.gear;
|
||||
j["speed"] = msg.speed;
|
||||
j["steeringAngle"] = msg.steeringAngle;
|
||||
j["motorTemp"] = msg.motorTemp;
|
||||
j["controllerTemp"] = msg.controllerTemp;
|
||||
j["timestamp"] = msg.timestamp;
|
||||
return j.dump(-1);
|
||||
}
|
||||
|
||||
static std::string pack_gps_to_json(const GpsMsg& gps, const std::string& imei)
|
||||
{
|
||||
ordered_json j;
|
||||
j["imei"] = imei;
|
||||
j["lat"] = gps.lat;
|
||||
j["lng"] = gps.lng;
|
||||
j["course"] = gps.course;
|
||||
j["mode"] = gps.mode;
|
||||
j["gps_timestamp"] = gps.timestamp;
|
||||
j["report_timestamp"] = getCurrentTimestampMs();
|
||||
return j.dump(-1);
|
||||
}
|
||||
|
||||
static std::string make_topic(const std::string& tpl, const std::string& vid)
|
||||
{
|
||||
std::string topic = tpl;
|
||||
const std::string key = "{vid}";
|
||||
auto pos = topic.find(key);
|
||||
if (pos != std::string::npos)
|
||||
{
|
||||
topic.replace(pos, key.size(), vid);
|
||||
}
|
||||
return topic;
|
||||
}
|
||||
|
||||
// ===================== Node =====================
|
||||
class CanDataSubscriber : public rclcpp::Node
|
||||
{
|
||||
public:
|
||||
explicit CanDataSubscriber(const Config& cfg)
|
||||
: Node("can_data_subscriber"),
|
||||
config_(cfg),
|
||||
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
|
||||
cfg.mqtt_password, mqtt_reconnect_interval),
|
||||
can_decoder_(ctx_)
|
||||
{
|
||||
// CAN
|
||||
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
|
||||
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
|
||||
|
||||
// Identity
|
||||
identity_sub_ = this->create_subscription<VehicleIdentity>(
|
||||
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
|
||||
std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1));
|
||||
|
||||
// rtk
|
||||
rtk_sub_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
|
||||
"rtk_message", 10, std::bind(&CanDataSubscriber::rtk_callback, this, std::placeholders::_1));
|
||||
|
||||
// Timers
|
||||
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
|
||||
std::bind(&CanDataSubscriber::info_timer_callback, this));
|
||||
|
||||
gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200),
|
||||
std::bind(&CanDataSubscriber::gps_timer_callback, this));
|
||||
|
||||
fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
|
||||
std::bind(&CanDataSubscriber::fault_timer_callback, this));
|
||||
|
||||
connection_check_timer_ = this->create_wall_timer(
|
||||
std::chrono::seconds(10), std::bind(&CanDataSubscriber::check_connection_callback, this));
|
||||
}
|
||||
|
||||
~CanDataSubscriber() override
|
||||
{
|
||||
if (info_timer_) info_timer_->cancel();
|
||||
if (gps_timer_) gps_timer_->cancel();
|
||||
if (fault_timer_) fault_timer_->cancel();
|
||||
if (connection_check_timer_) connection_check_timer_->cancel();
|
||||
}
|
||||
|
||||
private:
|
||||
// ===================== identity =====================
|
||||
void identity_callback(const VehicleIdentity::SharedPtr msg)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
ctx_.imei = msg->imei;
|
||||
ctx_.vid = msg->vid;
|
||||
ctx_.ready = msg->ready;
|
||||
|
||||
int64_t now = getCurrentTimestampMs();
|
||||
|
||||
if (!identity_mon_.ok)
|
||||
{
|
||||
RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", ctx_.imei.c_str(), ctx_.vid.c_str());
|
||||
}
|
||||
|
||||
identity_mon_.ok = true;
|
||||
identity_mon_.last_rx_ts = now;
|
||||
}
|
||||
|
||||
bool snapshot_identity(std::string& imei, std::string& vid)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
if (!ctx_.ready) return false;
|
||||
imei = ctx_.imei;
|
||||
vid = ctx_.vid;
|
||||
return true;
|
||||
}
|
||||
|
||||
// ===================== timers =====================
|
||||
void info_timer_callback()
|
||||
{
|
||||
std::string imei, vid;
|
||||
if (!snapshot_identity(imei, vid)) return;
|
||||
|
||||
const std::string topic = make_topic(config_.info_topic_template, vid);
|
||||
|
||||
GeneralMsg snapshot;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
ctx_.info.timestamp = getCurrentTimestampMs();
|
||||
snapshot = ctx_.info;
|
||||
}
|
||||
|
||||
mqtt_client_.publish(topic, pack_general_info_to_json(snapshot, imei), 1);
|
||||
}
|
||||
|
||||
void gps_timer_callback()
|
||||
{
|
||||
std::string imei, vid;
|
||||
if (!snapshot_identity(imei, vid)) return;
|
||||
|
||||
GpsMsg snapshot;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
if (!ctx_.gps.valid) return;
|
||||
snapshot = ctx_.gps;
|
||||
}
|
||||
|
||||
const std::string topic = make_topic(config_.gps_topic_template, vid);
|
||||
|
||||
mqtt_client_.publish(topic, pack_gps_to_json(snapshot, imei), 1);
|
||||
}
|
||||
|
||||
void fault_timer_callback()
|
||||
{
|
||||
std::string imei, vid;
|
||||
if (!snapshot_identity(imei, vid)) return;
|
||||
|
||||
const std::string topic = make_topic(config_.fault_topic_template, vid);
|
||||
|
||||
std::vector<int> codes;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
const auto& err_set = ctx_.errors.getAllErrorCodes();
|
||||
codes.assign(err_set.begin(), err_set.end());
|
||||
}
|
||||
|
||||
for (int code : codes)
|
||||
{
|
||||
mqtt_client_.publish(topic, generateFaultJson(code, vid, getCurrentTimestampMs()), 0);
|
||||
}
|
||||
}
|
||||
|
||||
void check_connection_callback()
|
||||
{
|
||||
int64_t now = getCurrentTimestampMs();
|
||||
|
||||
if (!mqtt_client_.isConnected())
|
||||
{
|
||||
RCLCPP_WARN(get_logger(), "MQTT connection lost.");
|
||||
}
|
||||
|
||||
check_input("vehicle identity (/vehicle/identity)", identity_mon_, now, IDENTITY_TIMEOUT_MS);
|
||||
|
||||
check_input("GPS (/rtk_message)", gps_mon_, now, GPS_TIMEOUT_MS);
|
||||
|
||||
check_input("CAN (/can_data)", can_mon_, now, CAN_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms)
|
||||
{
|
||||
// sticky 输入:一旦 OK,不再 timeout
|
||||
if (mon.sticky && mon.ok) return;
|
||||
|
||||
// 从 OK → 不 OK(流断了)
|
||||
if (mon.ok && now - mon.last_rx_ts > timeout_ms)
|
||||
{
|
||||
mon.ok = false;
|
||||
RCLCPP_WARN(get_logger(), "%s timeout (%ld ms without data).", name, now - mon.last_rx_ts);
|
||||
mon.last_warn_ts = now;
|
||||
return;
|
||||
}
|
||||
|
||||
// 一直不 OK → 低频 WARN
|
||||
if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS)
|
||||
{
|
||||
RCLCPP_WARN(get_logger(), "Waiting for %s...", name);
|
||||
mon.last_warn_ts = now;
|
||||
}
|
||||
}
|
||||
|
||||
// ========== CAN ==========
|
||||
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
|
||||
{
|
||||
can_decoder_.handle(msg);
|
||||
|
||||
int64_t now = getCurrentTimestampMs();
|
||||
|
||||
if (!can_mon_.ok)
|
||||
{
|
||||
RCLCPP_INFO(get_logger(), "CAN data stream active (/can_data).");
|
||||
}
|
||||
|
||||
can_mon_.ok = true;
|
||||
can_mon_.last_rx_ts = now;
|
||||
}
|
||||
|
||||
// ========== RTK ==========
|
||||
void rtk_callback(const sweeper_interfaces::msg::Rtk::SharedPtr msg)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
|
||||
ctx_.gps.valid = true;
|
||||
|
||||
ctx_.gps.lat = msg->lat;
|
||||
ctx_.gps.lng = msg->lon;
|
||||
ctx_.gps.course = msg->head;
|
||||
|
||||
if (msg->p_quality == 5)
|
||||
ctx_.gps.mode = 2;
|
||||
else if (msg->p_quality == 4)
|
||||
ctx_.gps.mode = 3;
|
||||
else
|
||||
ctx_.gps.mode = msg->p_quality;
|
||||
|
||||
ctx_.gps.timestamp = getCurrentTimestampMs();
|
||||
ctx_.gps.valid = true;
|
||||
|
||||
int64_t now = ctx_.gps.timestamp;
|
||||
|
||||
if (!gps_mon_.ok)
|
||||
{
|
||||
RCLCPP_INFO(get_logger(), "GPS data stream active (/rtk_message).");
|
||||
}
|
||||
|
||||
gps_mon_.ok = true;
|
||||
gps_mon_.last_rx_ts = now;
|
||||
}
|
||||
|
||||
private:
|
||||
// CAN decoder
|
||||
VehicleContext ctx_;
|
||||
CanDecoder can_decoder_;
|
||||
|
||||
Config config_;
|
||||
|
||||
// MQTT
|
||||
MQTTClientWrapper mqtt_client_;
|
||||
|
||||
// ROS
|
||||
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
|
||||
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
|
||||
rclcpp::Subscription<sweeper_interfaces::msg::Rtk>::SharedPtr rtk_sub_;
|
||||
|
||||
rclcpp::TimerBase::SharedPtr info_timer_;
|
||||
rclcpp::TimerBase::SharedPtr gps_timer_;
|
||||
rclcpp::TimerBase::SharedPtr fault_timer_;
|
||||
rclcpp::TimerBase::SharedPtr connection_check_timer_;
|
||||
|
||||
InputMonitor identity_mon_{false, true};
|
||||
InputMonitor gps_mon_;
|
||||
InputMonitor can_mon_;
|
||||
};
|
||||
|
||||
// ===================== main =====================
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
std::string config_path = "./config.json"; // 默认配置
|
||||
|
||||
// 手动解析 --config
|
||||
for (int i = 1; i < argc; ++i)
|
||||
{
|
||||
if (std::string(argv[i]) == "--config" && i + 1 < argc)
|
||||
{
|
||||
config_path = argv[i + 1];
|
||||
++i; // 跳过参数值
|
||||
}
|
||||
}
|
||||
|
||||
rclcpp::init(argc, argv);
|
||||
|
||||
Config config;
|
||||
|
||||
auto logger = rclcpp::get_logger("main");
|
||||
|
||||
if (!load_config(config_path, config))
|
||||
{
|
||||
RCLCPP_ERROR(logger, "Failed to load config: %s", config_path.c_str());
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000;
|
||||
|
||||
RCLCPP_INFO(logger, "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(), config.mqtt_port,
|
||||
config.mqtt_username.empty() ? "<anonymous>" : config.mqtt_username.c_str());
|
||||
|
||||
RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(),
|
||||
config.info_topic_template.c_str(), config.fault_topic_template.c_str());
|
||||
|
||||
auto node = std::make_shared<CanDataSubscriber>(config);
|
||||
|
||||
rclcpp::spin(node);
|
||||
rclcpp::shutdown();
|
||||
return 0;
|
||||
}
|
||||
195
src/communication/mqtt_report/src/node/mqtt_vehicle_node.cpp
Normal file
195
src/communication/mqtt_report/src/node/mqtt_vehicle_node.cpp
Normal file
@ -0,0 +1,195 @@
|
||||
// mqtt_report_node.cpp
|
||||
|
||||
#include <chrono>
|
||||
#include <rclcpp/rclcpp.hpp>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "mqtt_report/can/can_decoder.h"
|
||||
#include "mqtt_report/core/input_health_monitor.h"
|
||||
#include "mqtt_report/core/vehicle_context.h"
|
||||
#include "mqtt_report/get_config.h"
|
||||
#include "mqtt_report/mqtt/mqtt_reporter.h"
|
||||
#include "mqtt_report/report_struct.h"
|
||||
#include "sweeper_interfaces/msg/can_frame.hpp"
|
||||
#include "sweeper_interfaces/msg/rtk.hpp"
|
||||
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
|
||||
|
||||
namespace sweeperMsg = sweeper_interfaces::msg;
|
||||
using sweeper_interfaces::msg::VehicleIdentity;
|
||||
|
||||
int mqtt_reconnect_interval = 5000; // ms
|
||||
|
||||
// ===================== Node =====================
|
||||
class VehicleReportNode : public rclcpp::Node
|
||||
{
|
||||
public:
|
||||
explicit VehicleReportNode(const Config& cfg)
|
||||
: Node("can_data_subscriber"),
|
||||
config_(cfg),
|
||||
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
|
||||
cfg.mqtt_password, mqtt_reconnect_interval),
|
||||
can_decoder_(ctx_),
|
||||
reporter_(ctx_, mqtt_client_, config_)
|
||||
{
|
||||
// CAN
|
||||
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
|
||||
"can_data", 10, std::bind(&VehicleReportNode::topic_callback, this, std::placeholders::_1));
|
||||
|
||||
// Identity
|
||||
identity_sub_ = this->create_subscription<VehicleIdentity>(
|
||||
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
|
||||
std::bind(&VehicleReportNode::identity_callback, this, std::placeholders::_1));
|
||||
|
||||
// rtk
|
||||
rtk_sub_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
|
||||
"rtk_message", 10, std::bind(&VehicleReportNode::rtk_callback, this, std::placeholders::_1));
|
||||
|
||||
// Timers
|
||||
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_info(); });
|
||||
|
||||
gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200), [this]() { reporter_.publish_gps(); });
|
||||
|
||||
fault_timer_ =
|
||||
this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_faults(); });
|
||||
|
||||
connection_check_timer_ = this->create_wall_timer(
|
||||
std::chrono::seconds(10),
|
||||
[this]() { health_.check(get_logger(), getCurrentTimestampMs(), mqtt_client_.isConnected()); });
|
||||
}
|
||||
|
||||
~VehicleReportNode() override
|
||||
{
|
||||
if (info_timer_) info_timer_->cancel();
|
||||
if (gps_timer_) gps_timer_->cancel();
|
||||
if (fault_timer_) fault_timer_->cancel();
|
||||
if (connection_check_timer_) connection_check_timer_->cancel();
|
||||
}
|
||||
|
||||
private:
|
||||
// ===================== identity =====================
|
||||
void identity_callback(const VehicleIdentity::SharedPtr msg)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
ctx_.imei = msg->imei;
|
||||
ctx_.vid = msg->vid;
|
||||
ctx_.ready = msg->ready;
|
||||
|
||||
RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", msg->imei.c_str(), msg->vid.c_str());
|
||||
|
||||
int64_t now = getCurrentTimestampMs();
|
||||
|
||||
health_.on_identity_rx(now);
|
||||
}
|
||||
|
||||
bool snapshot_identity(std::string& imei, std::string& vid)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
if (!ctx_.ready) return false;
|
||||
imei = ctx_.imei;
|
||||
vid = ctx_.vid;
|
||||
return true;
|
||||
}
|
||||
|
||||
// ========== CAN ==========
|
||||
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
|
||||
{
|
||||
can_decoder_.handle(msg);
|
||||
|
||||
int64_t now = getCurrentTimestampMs();
|
||||
|
||||
health_.on_can_rx(now);
|
||||
}
|
||||
|
||||
// ========== RTK ==========
|
||||
void rtk_callback(const sweeper_interfaces::msg::Rtk::SharedPtr msg)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||
|
||||
ctx_.gps.valid = true;
|
||||
|
||||
ctx_.gps.lat = msg->lat;
|
||||
ctx_.gps.lng = msg->lon;
|
||||
ctx_.gps.course = msg->head;
|
||||
|
||||
if (msg->p_quality == 5)
|
||||
ctx_.gps.mode = 2;
|
||||
else if (msg->p_quality == 4)
|
||||
ctx_.gps.mode = 3;
|
||||
else
|
||||
ctx_.gps.mode = msg->p_quality;
|
||||
|
||||
ctx_.gps.timestamp = getCurrentTimestampMs();
|
||||
ctx_.gps.valid = true;
|
||||
|
||||
int64_t now = ctx_.gps.timestamp;
|
||||
|
||||
health_.on_gps_rx(now);
|
||||
}
|
||||
|
||||
private:
|
||||
// CAN decoder
|
||||
VehicleContext ctx_;
|
||||
CanDecoder can_decoder_;
|
||||
|
||||
Config config_;
|
||||
|
||||
// MQTT
|
||||
MQTTClientWrapper mqtt_client_;
|
||||
|
||||
// ROS
|
||||
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr subscription_;
|
||||
rclcpp::Subscription<VehicleIdentity>::SharedPtr identity_sub_;
|
||||
rclcpp::Subscription<sweeper_interfaces::msg::Rtk>::SharedPtr rtk_sub_;
|
||||
|
||||
rclcpp::TimerBase::SharedPtr info_timer_;
|
||||
rclcpp::TimerBase::SharedPtr gps_timer_;
|
||||
rclcpp::TimerBase::SharedPtr fault_timer_;
|
||||
rclcpp::TimerBase::SharedPtr connection_check_timer_;
|
||||
|
||||
InputHealthMonitor health_;
|
||||
|
||||
MqttReporter reporter_;
|
||||
};
|
||||
|
||||
// ===================== main =====================
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
std::string config_path = "./config.json"; // 默认配置
|
||||
|
||||
// 手动解析 --config
|
||||
for (int i = 1; i < argc; ++i)
|
||||
{
|
||||
if (std::string(argv[i]) == "--config" && i + 1 < argc)
|
||||
{
|
||||
config_path = argv[i + 1];
|
||||
++i; // 跳过参数值
|
||||
}
|
||||
}
|
||||
|
||||
rclcpp::init(argc, argv);
|
||||
|
||||
Config config;
|
||||
|
||||
auto logger = rclcpp::get_logger("main");
|
||||
|
||||
if (!load_config(config_path, config))
|
||||
{
|
||||
RCLCPP_ERROR(logger, "Failed to load config: %s", config_path.c_str());
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (mqtt_reconnect_interval <= 0) mqtt_reconnect_interval = 5000;
|
||||
|
||||
RCLCPP_INFO(logger, "Config loaded: broker=%s:%d, user=%s", config.mqtt_ip.c_str(), config.mqtt_port,
|
||||
config.mqtt_username.empty() ? "<anonymous>" : config.mqtt_username.c_str());
|
||||
|
||||
RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(),
|
||||
config.info_topic_template.c_str(), config.fault_topic_template.c_str());
|
||||
|
||||
auto node = std::make_shared<VehicleReportNode>(config);
|
||||
|
||||
rclcpp::spin(node);
|
||||
rclcpp::shutdown();
|
||||
return 0;
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user