调整mqtt节点架构
This commit is contained in:
parent
d3246d551d
commit
180bf5df50
@ -31,10 +31,12 @@ include_directories(include ${catkin_INCLUDE_DIRS})
|
|||||||
|
|
||||||
add_executable(
|
add_executable(
|
||||||
mqtt_report_node
|
mqtt_report_node
|
||||||
src/mqtt_report_node.cpp
|
src/node/mqtt_vehicle_node.cpp
|
||||||
|
src/core/input_health_monitor.cpp
|
||||||
src/get_config.cpp
|
src/get_config.cpp
|
||||||
src/fault_codes.cpp
|
src/fault_codes.cpp
|
||||||
src/report_struct.cpp
|
src/report_struct.cpp
|
||||||
|
src/mqtt/mqtt_reporter.cpp
|
||||||
src/can/can_decoder.cpp)
|
src/can/can_decoder.cpp)
|
||||||
|
|
||||||
ament_target_dependencies(
|
ament_target_dependencies(
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "mqtt_report/vehicle_context.h"
|
#include "mqtt_report/core/vehicle_context.h"
|
||||||
#include "sweeper_interfaces/msg/can_frame.hpp"
|
#include "sweeper_interfaces/msg/can_frame.hpp"
|
||||||
|
|
||||||
namespace sweeperMsg = sweeper_interfaces::msg;
|
namespace sweeperMsg = sweeper_interfaces::msg;
|
||||||
|
|||||||
@ -0,0 +1,32 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <rclcpp/logger.hpp>
|
||||||
|
|
||||||
|
struct InputMonitor
|
||||||
|
{
|
||||||
|
bool ok = false;
|
||||||
|
bool sticky = false;
|
||||||
|
int64_t last_rx_ts = 0;
|
||||||
|
int64_t last_warn_ts = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
class InputHealthMonitor
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
InputHealthMonitor();
|
||||||
|
|
||||||
|
void on_identity_rx(int64_t now);
|
||||||
|
void on_gps_rx(int64_t now);
|
||||||
|
void on_can_rx(int64_t now);
|
||||||
|
|
||||||
|
void check(rclcpp::Logger logger, int64_t now, bool mqtt_connected);
|
||||||
|
|
||||||
|
private:
|
||||||
|
void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms);
|
||||||
|
|
||||||
|
private:
|
||||||
|
InputMonitor identity_{false, true};
|
||||||
|
InputMonitor gps_;
|
||||||
|
InputMonitor can_;
|
||||||
|
};
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include "mqtt_report/core/vehicle_context.h"
|
||||||
|
#include "mqtt_report/get_config.h"
|
||||||
|
#include "mqtt_report/mqtt_client.hpp"
|
||||||
|
|
||||||
|
class MqttReporter
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config);
|
||||||
|
|
||||||
|
void publish_info();
|
||||||
|
void publish_gps();
|
||||||
|
void publish_faults();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::string make_topic(const std::string& tpl, const std::string& vid) const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
VehicleContext& ctx_;
|
||||||
|
MQTTClientWrapper& mqtt_;
|
||||||
|
const Config& config_;
|
||||||
|
};
|
||||||
@ -0,0 +1,60 @@
|
|||||||
|
#include "mqtt_report/core/input_health_monitor.h"
|
||||||
|
|
||||||
|
#include <rclcpp/rclcpp.hpp>
|
||||||
|
|
||||||
|
static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000;
|
||||||
|
static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000;
|
||||||
|
static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000;
|
||||||
|
static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000;
|
||||||
|
|
||||||
|
InputHealthMonitor::InputHealthMonitor() { identity_.sticky = true; }
|
||||||
|
|
||||||
|
void InputHealthMonitor::on_identity_rx(int64_t now)
|
||||||
|
{
|
||||||
|
identity_.ok = true;
|
||||||
|
identity_.last_rx_ts = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputHealthMonitor::on_gps_rx(int64_t now)
|
||||||
|
{
|
||||||
|
gps_.ok = true;
|
||||||
|
gps_.last_rx_ts = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputHealthMonitor::on_can_rx(int64_t now)
|
||||||
|
{
|
||||||
|
can_.ok = true;
|
||||||
|
can_.last_rx_ts = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputHealthMonitor::check(rclcpp::Logger logger, int64_t now, bool mqtt_connected)
|
||||||
|
{
|
||||||
|
if (!mqtt_connected)
|
||||||
|
{
|
||||||
|
RCLCPP_WARN(logger, "MQTT connection lost.");
|
||||||
|
}
|
||||||
|
|
||||||
|
check_input("vehicle identity (/vehicle/identity)", identity_, now, IDENTITY_TIMEOUT_MS);
|
||||||
|
check_input("GPS (/rtk_message)", gps_, now, GPS_TIMEOUT_MS);
|
||||||
|
check_input("CAN (/can_data)", can_, now, CAN_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputHealthMonitor::check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms)
|
||||||
|
{
|
||||||
|
if (mon.sticky && mon.ok) return;
|
||||||
|
|
||||||
|
if (mon.ok && now - mon.last_rx_ts > timeout_ms)
|
||||||
|
{
|
||||||
|
mon.ok = false;
|
||||||
|
RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "%s timeout (%ld ms without data).", name,
|
||||||
|
now - mon.last_rx_ts);
|
||||||
|
mon.last_warn_ts = now;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS)
|
||||||
|
{
|
||||||
|
RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "Waiting for %s...", name);
|
||||||
|
mon.last_warn_ts = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
99
src/communication/mqtt_report/src/mqtt/mqtt_reporter.cpp
Normal file
99
src/communication/mqtt_report/src/mqtt/mqtt_reporter.cpp
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
#include "mqtt_report/mqtt/mqtt_reporter.h"
|
||||||
|
|
||||||
|
#include "mqtt_report/report_struct.h"
|
||||||
|
|
||||||
|
MqttReporter::MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config)
|
||||||
|
: ctx_(ctx), mqtt_(mqtt), config_(config)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string MqttReporter::make_topic(const std::string& tpl, const std::string& vid) const
|
||||||
|
{
|
||||||
|
std::string topic = tpl;
|
||||||
|
const std::string key = "{vid}";
|
||||||
|
auto pos = topic.find(key);
|
||||||
|
if (pos != std::string::npos)
|
||||||
|
{
|
||||||
|
topic.replace(pos, key.size(), vid);
|
||||||
|
}
|
||||||
|
return topic;
|
||||||
|
}
|
||||||
|
|
||||||
|
void MqttReporter::publish_info()
|
||||||
|
{
|
||||||
|
std::string imei, vid;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||||
|
if (!ctx_.ready) return;
|
||||||
|
imei = ctx_.imei;
|
||||||
|
vid = ctx_.vid;
|
||||||
|
ctx_.info.timestamp = getCurrentTimestampMs();
|
||||||
|
}
|
||||||
|
|
||||||
|
GeneralMsg snapshot;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||||
|
snapshot = ctx_.info;
|
||||||
|
}
|
||||||
|
|
||||||
|
ordered_json j;
|
||||||
|
j["imei"] = imei;
|
||||||
|
j["power"] = snapshot.power;
|
||||||
|
j["chargeStatus"] = snapshot.chargeStatus;
|
||||||
|
j["gear"] = snapshot.gear;
|
||||||
|
j["speed"] = snapshot.speed;
|
||||||
|
j["steeringAngle"] = snapshot.steeringAngle;
|
||||||
|
j["motorTemp"] = snapshot.motorTemp;
|
||||||
|
j["controllerTemp"] = snapshot.controllerTemp;
|
||||||
|
j["timestamp"] = snapshot.timestamp;
|
||||||
|
|
||||||
|
mqtt_.publish(make_topic(config_.info_topic_template, vid), j.dump(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MqttReporter::publish_gps()
|
||||||
|
{
|
||||||
|
std::string imei, vid;
|
||||||
|
GpsMsg gps;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||||
|
if (!ctx_.ready || !ctx_.gps.valid) return;
|
||||||
|
imei = ctx_.imei;
|
||||||
|
vid = ctx_.vid;
|
||||||
|
gps = ctx_.gps;
|
||||||
|
}
|
||||||
|
|
||||||
|
ordered_json j;
|
||||||
|
j["imei"] = imei;
|
||||||
|
j["lat"] = gps.lat;
|
||||||
|
j["lng"] = gps.lng;
|
||||||
|
j["course"] = gps.course;
|
||||||
|
j["mode"] = gps.mode;
|
||||||
|
j["gps_timestamp"] = gps.timestamp;
|
||||||
|
j["report_timestamp"] = getCurrentTimestampMs();
|
||||||
|
|
||||||
|
mqtt_.publish(make_topic(config_.gps_topic_template, vid), j.dump(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MqttReporter::publish_faults()
|
||||||
|
{
|
||||||
|
std::string vid;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||||
|
if (!ctx_.ready) return;
|
||||||
|
vid = ctx_.vid;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<int> codes;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
||||||
|
const auto& err_set = ctx_.errors.getAllErrorCodes();
|
||||||
|
codes.assign(err_set.begin(), err_set.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int code : codes)
|
||||||
|
{
|
||||||
|
mqtt_.publish(make_topic(config_.fault_topic_template, vid),
|
||||||
|
generateFaultJson(code, vid, getCurrentTimestampMs()), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -6,10 +6,11 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "mqtt_report/can/can_decoder.h"
|
#include "mqtt_report/can/can_decoder.h"
|
||||||
|
#include "mqtt_report/core/input_health_monitor.h"
|
||||||
|
#include "mqtt_report/core/vehicle_context.h"
|
||||||
#include "mqtt_report/get_config.h"
|
#include "mqtt_report/get_config.h"
|
||||||
#include "mqtt_report/mqtt_client.hpp"
|
#include "mqtt_report/mqtt/mqtt_reporter.h"
|
||||||
#include "mqtt_report/report_struct.h"
|
#include "mqtt_report/report_struct.h"
|
||||||
#include "mqtt_report/vehicle_context.h"
|
|
||||||
#include "sweeper_interfaces/msg/can_frame.hpp"
|
#include "sweeper_interfaces/msg/can_frame.hpp"
|
||||||
#include "sweeper_interfaces/msg/rtk.hpp"
|
#include "sweeper_interfaces/msg/rtk.hpp"
|
||||||
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
|
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
|
||||||
@ -19,99 +20,45 @@ using sweeper_interfaces::msg::VehicleIdentity;
|
|||||||
|
|
||||||
int mqtt_reconnect_interval = 5000; // ms
|
int mqtt_reconnect_interval = 5000; // ms
|
||||||
|
|
||||||
static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000; // 每 10s 警告一次
|
|
||||||
static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000;
|
|
||||||
static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000;
|
|
||||||
static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000;
|
|
||||||
|
|
||||||
struct InputMonitor
|
|
||||||
{
|
|
||||||
bool ok = false;
|
|
||||||
bool sticky = false; // 是否一次成功就永久 OK
|
|
||||||
int64_t last_rx_ts = 0;
|
|
||||||
int64_t last_warn_ts = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
// ===================== JSON 打包 =====================
|
|
||||||
static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei)
|
|
||||||
{
|
|
||||||
ordered_json j;
|
|
||||||
j["imei"] = imei;
|
|
||||||
j["power"] = msg.power;
|
|
||||||
j["chargeStatus"] = msg.chargeStatus;
|
|
||||||
j["gear"] = msg.gear;
|
|
||||||
j["speed"] = msg.speed;
|
|
||||||
j["steeringAngle"] = msg.steeringAngle;
|
|
||||||
j["motorTemp"] = msg.motorTemp;
|
|
||||||
j["controllerTemp"] = msg.controllerTemp;
|
|
||||||
j["timestamp"] = msg.timestamp;
|
|
||||||
return j.dump(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
static std::string pack_gps_to_json(const GpsMsg& gps, const std::string& imei)
|
|
||||||
{
|
|
||||||
ordered_json j;
|
|
||||||
j["imei"] = imei;
|
|
||||||
j["lat"] = gps.lat;
|
|
||||||
j["lng"] = gps.lng;
|
|
||||||
j["course"] = gps.course;
|
|
||||||
j["mode"] = gps.mode;
|
|
||||||
j["gps_timestamp"] = gps.timestamp;
|
|
||||||
j["report_timestamp"] = getCurrentTimestampMs();
|
|
||||||
return j.dump(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
static std::string make_topic(const std::string& tpl, const std::string& vid)
|
|
||||||
{
|
|
||||||
std::string topic = tpl;
|
|
||||||
const std::string key = "{vid}";
|
|
||||||
auto pos = topic.find(key);
|
|
||||||
if (pos != std::string::npos)
|
|
||||||
{
|
|
||||||
topic.replace(pos, key.size(), vid);
|
|
||||||
}
|
|
||||||
return topic;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===================== Node =====================
|
// ===================== Node =====================
|
||||||
class CanDataSubscriber : public rclcpp::Node
|
class VehicleReportNode : public rclcpp::Node
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit CanDataSubscriber(const Config& cfg)
|
explicit VehicleReportNode(const Config& cfg)
|
||||||
: Node("can_data_subscriber"),
|
: Node("can_data_subscriber"),
|
||||||
config_(cfg),
|
config_(cfg),
|
||||||
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
|
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
|
||||||
cfg.mqtt_password, mqtt_reconnect_interval),
|
cfg.mqtt_password, mqtt_reconnect_interval),
|
||||||
can_decoder_(ctx_)
|
can_decoder_(ctx_),
|
||||||
|
reporter_(ctx_, mqtt_client_, config_)
|
||||||
{
|
{
|
||||||
// CAN
|
// CAN
|
||||||
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
|
subscription_ = this->create_subscription<sweeperMsg::CanFrame>(
|
||||||
"can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1));
|
"can_data", 10, std::bind(&VehicleReportNode::topic_callback, this, std::placeholders::_1));
|
||||||
|
|
||||||
// Identity
|
// Identity
|
||||||
identity_sub_ = this->create_subscription<VehicleIdentity>(
|
identity_sub_ = this->create_subscription<VehicleIdentity>(
|
||||||
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
|
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
|
||||||
std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1));
|
std::bind(&VehicleReportNode::identity_callback, this, std::placeholders::_1));
|
||||||
|
|
||||||
// rtk
|
// rtk
|
||||||
rtk_sub_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
|
rtk_sub_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
|
||||||
"rtk_message", 10, std::bind(&CanDataSubscriber::rtk_callback, this, std::placeholders::_1));
|
"rtk_message", 10, std::bind(&VehicleReportNode::rtk_callback, this, std::placeholders::_1));
|
||||||
|
|
||||||
// Timers
|
// Timers
|
||||||
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
|
info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_info(); });
|
||||||
std::bind(&CanDataSubscriber::info_timer_callback, this));
|
|
||||||
|
|
||||||
gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200),
|
gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200), [this]() { reporter_.publish_gps(); });
|
||||||
std::bind(&CanDataSubscriber::gps_timer_callback, this));
|
|
||||||
|
|
||||||
fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500),
|
fault_timer_ =
|
||||||
std::bind(&CanDataSubscriber::fault_timer_callback, this));
|
this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_faults(); });
|
||||||
|
|
||||||
connection_check_timer_ = this->create_wall_timer(
|
connection_check_timer_ = this->create_wall_timer(
|
||||||
std::chrono::seconds(10), std::bind(&CanDataSubscriber::check_connection_callback, this));
|
std::chrono::seconds(10),
|
||||||
|
[this]() { health_.check(get_logger(), getCurrentTimestampMs(), mqtt_client_.isConnected()); });
|
||||||
}
|
}
|
||||||
|
|
||||||
~CanDataSubscriber() override
|
~VehicleReportNode() override
|
||||||
{
|
{
|
||||||
if (info_timer_) info_timer_->cancel();
|
if (info_timer_) info_timer_->cancel();
|
||||||
if (gps_timer_) gps_timer_->cancel();
|
if (gps_timer_) gps_timer_->cancel();
|
||||||
@ -128,15 +75,11 @@ class CanDataSubscriber : public rclcpp::Node
|
|||||||
ctx_.vid = msg->vid;
|
ctx_.vid = msg->vid;
|
||||||
ctx_.ready = msg->ready;
|
ctx_.ready = msg->ready;
|
||||||
|
|
||||||
|
RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", msg->imei.c_str(), msg->vid.c_str());
|
||||||
|
|
||||||
int64_t now = getCurrentTimestampMs();
|
int64_t now = getCurrentTimestampMs();
|
||||||
|
|
||||||
if (!identity_mon_.ok)
|
health_.on_identity_rx(now);
|
||||||
{
|
|
||||||
RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", ctx_.imei.c_str(), ctx_.vid.c_str());
|
|
||||||
}
|
|
||||||
|
|
||||||
identity_mon_.ok = true;
|
|
||||||
identity_mon_.last_rx_ts = now;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool snapshot_identity(std::string& imei, std::string& vid)
|
bool snapshot_identity(std::string& imei, std::string& vid)
|
||||||
@ -148,99 +91,6 @@ class CanDataSubscriber : public rclcpp::Node
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===================== timers =====================
|
|
||||||
void info_timer_callback()
|
|
||||||
{
|
|
||||||
std::string imei, vid;
|
|
||||||
if (!snapshot_identity(imei, vid)) return;
|
|
||||||
|
|
||||||
const std::string topic = make_topic(config_.info_topic_template, vid);
|
|
||||||
|
|
||||||
GeneralMsg snapshot;
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
|
||||||
ctx_.info.timestamp = getCurrentTimestampMs();
|
|
||||||
snapshot = ctx_.info;
|
|
||||||
}
|
|
||||||
|
|
||||||
mqtt_client_.publish(topic, pack_general_info_to_json(snapshot, imei), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
void gps_timer_callback()
|
|
||||||
{
|
|
||||||
std::string imei, vid;
|
|
||||||
if (!snapshot_identity(imei, vid)) return;
|
|
||||||
|
|
||||||
GpsMsg snapshot;
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
|
||||||
if (!ctx_.gps.valid) return;
|
|
||||||
snapshot = ctx_.gps;
|
|
||||||
}
|
|
||||||
|
|
||||||
const std::string topic = make_topic(config_.gps_topic_template, vid);
|
|
||||||
|
|
||||||
mqtt_client_.publish(topic, pack_gps_to_json(snapshot, imei), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
void fault_timer_callback()
|
|
||||||
{
|
|
||||||
std::string imei, vid;
|
|
||||||
if (!snapshot_identity(imei, vid)) return;
|
|
||||||
|
|
||||||
const std::string topic = make_topic(config_.fault_topic_template, vid);
|
|
||||||
|
|
||||||
std::vector<int> codes;
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(ctx_.mtx);
|
|
||||||
const auto& err_set = ctx_.errors.getAllErrorCodes();
|
|
||||||
codes.assign(err_set.begin(), err_set.end());
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int code : codes)
|
|
||||||
{
|
|
||||||
mqtt_client_.publish(topic, generateFaultJson(code, vid, getCurrentTimestampMs()), 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void check_connection_callback()
|
|
||||||
{
|
|
||||||
int64_t now = getCurrentTimestampMs();
|
|
||||||
|
|
||||||
if (!mqtt_client_.isConnected())
|
|
||||||
{
|
|
||||||
RCLCPP_WARN(get_logger(), "MQTT connection lost.");
|
|
||||||
}
|
|
||||||
|
|
||||||
check_input("vehicle identity (/vehicle/identity)", identity_mon_, now, IDENTITY_TIMEOUT_MS);
|
|
||||||
|
|
||||||
check_input("GPS (/rtk_message)", gps_mon_, now, GPS_TIMEOUT_MS);
|
|
||||||
|
|
||||||
check_input("CAN (/can_data)", can_mon_, now, CAN_TIMEOUT_MS);
|
|
||||||
}
|
|
||||||
|
|
||||||
void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms)
|
|
||||||
{
|
|
||||||
// sticky 输入:一旦 OK,不再 timeout
|
|
||||||
if (mon.sticky && mon.ok) return;
|
|
||||||
|
|
||||||
// 从 OK → 不 OK(流断了)
|
|
||||||
if (mon.ok && now - mon.last_rx_ts > timeout_ms)
|
|
||||||
{
|
|
||||||
mon.ok = false;
|
|
||||||
RCLCPP_WARN(get_logger(), "%s timeout (%ld ms without data).", name, now - mon.last_rx_ts);
|
|
||||||
mon.last_warn_ts = now;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 一直不 OK → 低频 WARN
|
|
||||||
if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS)
|
|
||||||
{
|
|
||||||
RCLCPP_WARN(get_logger(), "Waiting for %s...", name);
|
|
||||||
mon.last_warn_ts = now;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ========== CAN ==========
|
// ========== CAN ==========
|
||||||
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
|
void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg)
|
||||||
{
|
{
|
||||||
@ -248,13 +98,7 @@ class CanDataSubscriber : public rclcpp::Node
|
|||||||
|
|
||||||
int64_t now = getCurrentTimestampMs();
|
int64_t now = getCurrentTimestampMs();
|
||||||
|
|
||||||
if (!can_mon_.ok)
|
health_.on_can_rx(now);
|
||||||
{
|
|
||||||
RCLCPP_INFO(get_logger(), "CAN data stream active (/can_data).");
|
|
||||||
}
|
|
||||||
|
|
||||||
can_mon_.ok = true;
|
|
||||||
can_mon_.last_rx_ts = now;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========== RTK ==========
|
// ========== RTK ==========
|
||||||
@ -280,13 +124,7 @@ class CanDataSubscriber : public rclcpp::Node
|
|||||||
|
|
||||||
int64_t now = ctx_.gps.timestamp;
|
int64_t now = ctx_.gps.timestamp;
|
||||||
|
|
||||||
if (!gps_mon_.ok)
|
health_.on_gps_rx(now);
|
||||||
{
|
|
||||||
RCLCPP_INFO(get_logger(), "GPS data stream active (/rtk_message).");
|
|
||||||
}
|
|
||||||
|
|
||||||
gps_mon_.ok = true;
|
|
||||||
gps_mon_.last_rx_ts = now;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -309,9 +147,9 @@ class CanDataSubscriber : public rclcpp::Node
|
|||||||
rclcpp::TimerBase::SharedPtr fault_timer_;
|
rclcpp::TimerBase::SharedPtr fault_timer_;
|
||||||
rclcpp::TimerBase::SharedPtr connection_check_timer_;
|
rclcpp::TimerBase::SharedPtr connection_check_timer_;
|
||||||
|
|
||||||
InputMonitor identity_mon_{false, true};
|
InputHealthMonitor health_;
|
||||||
InputMonitor gps_mon_;
|
|
||||||
InputMonitor can_mon_;
|
MqttReporter reporter_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// ===================== main =====================
|
// ===================== main =====================
|
||||||
@ -349,7 +187,7 @@ int main(int argc, char** argv)
|
|||||||
RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(),
|
RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(),
|
||||||
config.info_topic_template.c_str(), config.fault_topic_template.c_str());
|
config.info_topic_template.c_str(), config.fault_topic_template.c_str());
|
||||||
|
|
||||||
auto node = std::make_shared<CanDataSubscriber>(config);
|
auto node = std::make_shared<VehicleReportNode>(config);
|
||||||
|
|
||||||
rclcpp::spin(node);
|
rclcpp::spin(node);
|
||||||
rclcpp::shutdown();
|
rclcpp::shutdown();
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user