diff --git a/src/communication/mqtt_report/CMakeLists.txt b/src/communication/mqtt_report/CMakeLists.txt index a5c4f50..5f242cd 100644 --- a/src/communication/mqtt_report/CMakeLists.txt +++ b/src/communication/mqtt_report/CMakeLists.txt @@ -31,10 +31,12 @@ include_directories(include ${catkin_INCLUDE_DIRS}) add_executable( mqtt_report_node - src/mqtt_report_node.cpp + src/node/mqtt_vehicle_node.cpp + src/core/input_health_monitor.cpp src/get_config.cpp src/fault_codes.cpp src/report_struct.cpp + src/mqtt/mqtt_reporter.cpp src/can/can_decoder.cpp) ament_target_dependencies( diff --git a/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h b/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h index 62b8cd9..d89e736 100644 --- a/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h +++ b/src/communication/mqtt_report/include/mqtt_report/can/can_decoder.h @@ -2,7 +2,7 @@ #pragma once -#include "mqtt_report/vehicle_context.h" +#include "mqtt_report/core/vehicle_context.h" #include "sweeper_interfaces/msg/can_frame.hpp" namespace sweeperMsg = sweeper_interfaces::msg; diff --git a/src/communication/mqtt_report/include/mqtt_report/core/input_health_monitor.h b/src/communication/mqtt_report/include/mqtt_report/core/input_health_monitor.h new file mode 100644 index 0000000..663e4da --- /dev/null +++ b/src/communication/mqtt_report/include/mqtt_report/core/input_health_monitor.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +struct InputMonitor +{ + bool ok = false; + bool sticky = false; + int64_t last_rx_ts = 0; + int64_t last_warn_ts = 0; +}; + +class InputHealthMonitor +{ + public: + InputHealthMonitor(); + + void on_identity_rx(int64_t now); + void on_gps_rx(int64_t now); + void on_can_rx(int64_t now); + + void check(rclcpp::Logger logger, int64_t now, bool mqtt_connected); + + private: + void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms); + + private: + InputMonitor identity_{false, true}; + InputMonitor gps_; + InputMonitor can_; +}; diff --git a/src/communication/mqtt_report/include/mqtt_report/mqtt/mqtt_reporter.h b/src/communication/mqtt_report/include/mqtt_report/mqtt/mqtt_reporter.h new file mode 100644 index 0000000..b6beca3 --- /dev/null +++ b/src/communication/mqtt_report/include/mqtt_report/mqtt/mqtt_reporter.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include "mqtt_report/core/vehicle_context.h" +#include "mqtt_report/get_config.h" +#include "mqtt_report/mqtt_client.hpp" + +class MqttReporter +{ + public: + MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config); + + void publish_info(); + void publish_gps(); + void publish_faults(); + + private: + std::string make_topic(const std::string& tpl, const std::string& vid) const; + + private: + VehicleContext& ctx_; + MQTTClientWrapper& mqtt_; + const Config& config_; +}; diff --git a/src/communication/mqtt_report/src/core/input_health_monitor.cpp b/src/communication/mqtt_report/src/core/input_health_monitor.cpp new file mode 100644 index 0000000..4f946ab --- /dev/null +++ b/src/communication/mqtt_report/src/core/input_health_monitor.cpp @@ -0,0 +1,60 @@ +#include "mqtt_report/core/input_health_monitor.h" + +#include + +static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000; +static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000; +static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000; +static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000; + +InputHealthMonitor::InputHealthMonitor() { identity_.sticky = true; } + +void InputHealthMonitor::on_identity_rx(int64_t now) +{ + identity_.ok = true; + identity_.last_rx_ts = now; +} + +void InputHealthMonitor::on_gps_rx(int64_t now) +{ + gps_.ok = true; + gps_.last_rx_ts = now; +} + +void InputHealthMonitor::on_can_rx(int64_t now) +{ + can_.ok = true; + can_.last_rx_ts = now; +} + +void InputHealthMonitor::check(rclcpp::Logger logger, int64_t now, bool mqtt_connected) +{ + if (!mqtt_connected) + { + RCLCPP_WARN(logger, "MQTT connection lost."); + } + + check_input("vehicle identity (/vehicle/identity)", identity_, now, IDENTITY_TIMEOUT_MS); + check_input("GPS (/rtk_message)", gps_, now, GPS_TIMEOUT_MS); + check_input("CAN (/can_data)", can_, now, CAN_TIMEOUT_MS); +} + +void InputHealthMonitor::check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms) +{ + if (mon.sticky && mon.ok) return; + + if (mon.ok && now - mon.last_rx_ts > timeout_ms) + { + mon.ok = false; + RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "%s timeout (%ld ms without data).", name, + now - mon.last_rx_ts); + mon.last_warn_ts = now; + return; + } + + if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS) + { + RCLCPP_WARN(rclcpp::get_logger("input_monitor"), "Waiting for %s...", name); + mon.last_warn_ts = now; + } +} diff --git a/src/communication/mqtt_report/src/mqtt/mqtt_reporter.cpp b/src/communication/mqtt_report/src/mqtt/mqtt_reporter.cpp new file mode 100644 index 0000000..d68b0db --- /dev/null +++ b/src/communication/mqtt_report/src/mqtt/mqtt_reporter.cpp @@ -0,0 +1,99 @@ +#include "mqtt_report/mqtt/mqtt_reporter.h" + +#include "mqtt_report/report_struct.h" + +MqttReporter::MqttReporter(VehicleContext& ctx, MQTTClientWrapper& mqtt, const Config& config) + : ctx_(ctx), mqtt_(mqtt), config_(config) +{ +} + +std::string MqttReporter::make_topic(const std::string& tpl, const std::string& vid) const +{ + std::string topic = tpl; + const std::string key = "{vid}"; + auto pos = topic.find(key); + if (pos != std::string::npos) + { + topic.replace(pos, key.size(), vid); + } + return topic; +} + +void MqttReporter::publish_info() +{ + std::string imei, vid; + { + std::lock_guard lock(ctx_.mtx); + if (!ctx_.ready) return; + imei = ctx_.imei; + vid = ctx_.vid; + ctx_.info.timestamp = getCurrentTimestampMs(); + } + + GeneralMsg snapshot; + { + std::lock_guard lock(ctx_.mtx); + snapshot = ctx_.info; + } + + ordered_json j; + j["imei"] = imei; + j["power"] = snapshot.power; + j["chargeStatus"] = snapshot.chargeStatus; + j["gear"] = snapshot.gear; + j["speed"] = snapshot.speed; + j["steeringAngle"] = snapshot.steeringAngle; + j["motorTemp"] = snapshot.motorTemp; + j["controllerTemp"] = snapshot.controllerTemp; + j["timestamp"] = snapshot.timestamp; + + mqtt_.publish(make_topic(config_.info_topic_template, vid), j.dump(), 1); +} + +void MqttReporter::publish_gps() +{ + std::string imei, vid; + GpsMsg gps; + + { + std::lock_guard lock(ctx_.mtx); + if (!ctx_.ready || !ctx_.gps.valid) return; + imei = ctx_.imei; + vid = ctx_.vid; + gps = ctx_.gps; + } + + ordered_json j; + j["imei"] = imei; + j["lat"] = gps.lat; + j["lng"] = gps.lng; + j["course"] = gps.course; + j["mode"] = gps.mode; + j["gps_timestamp"] = gps.timestamp; + j["report_timestamp"] = getCurrentTimestampMs(); + + mqtt_.publish(make_topic(config_.gps_topic_template, vid), j.dump(), 1); +} + +void MqttReporter::publish_faults() +{ + std::string vid; + { + std::lock_guard lock(ctx_.mtx); + if (!ctx_.ready) return; + vid = ctx_.vid; + } + + std::vector codes; + { + std::lock_guard lock(ctx_.mtx); + const auto& err_set = ctx_.errors.getAllErrorCodes(); + codes.assign(err_set.begin(), err_set.end()); + } + + for (int code : codes) + { + mqtt_.publish(make_topic(config_.fault_topic_template, vid), + generateFaultJson(code, vid, getCurrentTimestampMs()), 0); + } +} diff --git a/src/communication/mqtt_report/src/node/mqtt_vehicle_node.cpp b/src/communication/mqtt_report/src/node/mqtt_vehicle_node.cpp index 4613613..e1dbd49 100644 --- a/src/communication/mqtt_report/src/node/mqtt_vehicle_node.cpp +++ b/src/communication/mqtt_report/src/node/mqtt_vehicle_node.cpp @@ -6,10 +6,11 @@ #include #include "mqtt_report/can/can_decoder.h" +#include "mqtt_report/core/input_health_monitor.h" +#include "mqtt_report/core/vehicle_context.h" #include "mqtt_report/get_config.h" -#include "mqtt_report/mqtt_client.hpp" +#include "mqtt_report/mqtt/mqtt_reporter.h" #include "mqtt_report/report_struct.h" -#include "mqtt_report/vehicle_context.h" #include "sweeper_interfaces/msg/can_frame.hpp" #include "sweeper_interfaces/msg/rtk.hpp" #include "sweeper_interfaces/msg/vehicle_identity.hpp" @@ -19,99 +20,45 @@ using sweeper_interfaces::msg::VehicleIdentity; int mqtt_reconnect_interval = 5000; // ms -static constexpr int64_t WARN_INTERVAL_MS = 10 * 1000; // 每 10s 警告一次 -static constexpr int64_t IDENTITY_TIMEOUT_MS = 15 * 1000; -static constexpr int64_t GPS_TIMEOUT_MS = 5 * 1000; -static constexpr int64_t CAN_TIMEOUT_MS = 3 * 1000; - -struct InputMonitor -{ - bool ok = false; - bool sticky = false; // 是否一次成功就永久 OK - int64_t last_rx_ts = 0; - int64_t last_warn_ts = 0; -}; - -// ===================== JSON 打包 ===================== -static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::string& imei) -{ - ordered_json j; - j["imei"] = imei; - j["power"] = msg.power; - j["chargeStatus"] = msg.chargeStatus; - j["gear"] = msg.gear; - j["speed"] = msg.speed; - j["steeringAngle"] = msg.steeringAngle; - j["motorTemp"] = msg.motorTemp; - j["controllerTemp"] = msg.controllerTemp; - j["timestamp"] = msg.timestamp; - return j.dump(-1); -} - -static std::string pack_gps_to_json(const GpsMsg& gps, const std::string& imei) -{ - ordered_json j; - j["imei"] = imei; - j["lat"] = gps.lat; - j["lng"] = gps.lng; - j["course"] = gps.course; - j["mode"] = gps.mode; - j["gps_timestamp"] = gps.timestamp; - j["report_timestamp"] = getCurrentTimestampMs(); - return j.dump(-1); -} - -static std::string make_topic(const std::string& tpl, const std::string& vid) -{ - std::string topic = tpl; - const std::string key = "{vid}"; - auto pos = topic.find(key); - if (pos != std::string::npos) - { - topic.replace(pos, key.size(), vid); - } - return topic; -} - // ===================== Node ===================== -class CanDataSubscriber : public rclcpp::Node +class VehicleReportNode : public rclcpp::Node { public: - explicit CanDataSubscriber(const Config& cfg) + explicit VehicleReportNode(const Config& cfg) : Node("can_data_subscriber"), config_(cfg), mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username, cfg.mqtt_password, mqtt_reconnect_interval), - can_decoder_(ctx_) + can_decoder_(ctx_), + reporter_(ctx_, mqtt_client_, config_) { // CAN subscription_ = this->create_subscription( - "can_data", 10, std::bind(&CanDataSubscriber::topic_callback, this, std::placeholders::_1)); + "can_data", 10, std::bind(&VehicleReportNode::topic_callback, this, std::placeholders::_1)); // Identity identity_sub_ = this->create_subscription( "/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(), - std::bind(&CanDataSubscriber::identity_callback, this, std::placeholders::_1)); + std::bind(&VehicleReportNode::identity_callback, this, std::placeholders::_1)); // rtk rtk_sub_ = this->create_subscription( - "rtk_message", 10, std::bind(&CanDataSubscriber::rtk_callback, this, std::placeholders::_1)); + "rtk_message", 10, std::bind(&VehicleReportNode::rtk_callback, this, std::placeholders::_1)); // Timers - info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), - std::bind(&CanDataSubscriber::info_timer_callback, this)); + info_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_info(); }); - gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200), - std::bind(&CanDataSubscriber::gps_timer_callback, this)); + gps_timer_ = this->create_wall_timer(std::chrono::milliseconds(200), [this]() { reporter_.publish_gps(); }); - fault_timer_ = this->create_wall_timer(std::chrono::milliseconds(500), - std::bind(&CanDataSubscriber::fault_timer_callback, this)); + fault_timer_ = + this->create_wall_timer(std::chrono::milliseconds(500), [this]() { reporter_.publish_faults(); }); connection_check_timer_ = this->create_wall_timer( - std::chrono::seconds(10), std::bind(&CanDataSubscriber::check_connection_callback, this)); + std::chrono::seconds(10), + [this]() { health_.check(get_logger(), getCurrentTimestampMs(), mqtt_client_.isConnected()); }); } - ~CanDataSubscriber() override + ~VehicleReportNode() override { if (info_timer_) info_timer_->cancel(); if (gps_timer_) gps_timer_->cancel(); @@ -128,15 +75,11 @@ class CanDataSubscriber : public rclcpp::Node ctx_.vid = msg->vid; ctx_.ready = msg->ready; + RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", msg->imei.c_str(), msg->vid.c_str()); + int64_t now = getCurrentTimestampMs(); - if (!identity_mon_.ok) - { - RCLCPP_INFO(get_logger(), "Vehicle identity ready: IMEI=%s, VID=%s", ctx_.imei.c_str(), ctx_.vid.c_str()); - } - - identity_mon_.ok = true; - identity_mon_.last_rx_ts = now; + health_.on_identity_rx(now); } bool snapshot_identity(std::string& imei, std::string& vid) @@ -148,99 +91,6 @@ class CanDataSubscriber : public rclcpp::Node return true; } - // ===================== timers ===================== - void info_timer_callback() - { - std::string imei, vid; - if (!snapshot_identity(imei, vid)) return; - - const std::string topic = make_topic(config_.info_topic_template, vid); - - GeneralMsg snapshot; - { - std::lock_guard lock(ctx_.mtx); - ctx_.info.timestamp = getCurrentTimestampMs(); - snapshot = ctx_.info; - } - - mqtt_client_.publish(topic, pack_general_info_to_json(snapshot, imei), 1); - } - - void gps_timer_callback() - { - std::string imei, vid; - if (!snapshot_identity(imei, vid)) return; - - GpsMsg snapshot; - { - std::lock_guard lock(ctx_.mtx); - if (!ctx_.gps.valid) return; - snapshot = ctx_.gps; - } - - const std::string topic = make_topic(config_.gps_topic_template, vid); - - mqtt_client_.publish(topic, pack_gps_to_json(snapshot, imei), 1); - } - - void fault_timer_callback() - { - std::string imei, vid; - if (!snapshot_identity(imei, vid)) return; - - const std::string topic = make_topic(config_.fault_topic_template, vid); - - std::vector codes; - { - std::lock_guard lock(ctx_.mtx); - const auto& err_set = ctx_.errors.getAllErrorCodes(); - codes.assign(err_set.begin(), err_set.end()); - } - - for (int code : codes) - { - mqtt_client_.publish(topic, generateFaultJson(code, vid, getCurrentTimestampMs()), 0); - } - } - - void check_connection_callback() - { - int64_t now = getCurrentTimestampMs(); - - if (!mqtt_client_.isConnected()) - { - RCLCPP_WARN(get_logger(), "MQTT connection lost."); - } - - check_input("vehicle identity (/vehicle/identity)", identity_mon_, now, IDENTITY_TIMEOUT_MS); - - check_input("GPS (/rtk_message)", gps_mon_, now, GPS_TIMEOUT_MS); - - check_input("CAN (/can_data)", can_mon_, now, CAN_TIMEOUT_MS); - } - - void check_input(const char* name, InputMonitor& mon, int64_t now, int64_t timeout_ms) - { - // sticky 输入:一旦 OK,不再 timeout - if (mon.sticky && mon.ok) return; - - // 从 OK → 不 OK(流断了) - if (mon.ok && now - mon.last_rx_ts > timeout_ms) - { - mon.ok = false; - RCLCPP_WARN(get_logger(), "%s timeout (%ld ms without data).", name, now - mon.last_rx_ts); - mon.last_warn_ts = now; - return; - } - - // 一直不 OK → 低频 WARN - if (!mon.ok && now - mon.last_warn_ts > WARN_INTERVAL_MS) - { - RCLCPP_WARN(get_logger(), "Waiting for %s...", name); - mon.last_warn_ts = now; - } - } - // ========== CAN ========== void topic_callback(const sweeperMsg::CanFrame::SharedPtr msg) { @@ -248,13 +98,7 @@ class CanDataSubscriber : public rclcpp::Node int64_t now = getCurrentTimestampMs(); - if (!can_mon_.ok) - { - RCLCPP_INFO(get_logger(), "CAN data stream active (/can_data)."); - } - - can_mon_.ok = true; - can_mon_.last_rx_ts = now; + health_.on_can_rx(now); } // ========== RTK ========== @@ -280,13 +124,7 @@ class CanDataSubscriber : public rclcpp::Node int64_t now = ctx_.gps.timestamp; - if (!gps_mon_.ok) - { - RCLCPP_INFO(get_logger(), "GPS data stream active (/rtk_message)."); - } - - gps_mon_.ok = true; - gps_mon_.last_rx_ts = now; + health_.on_gps_rx(now); } private: @@ -309,9 +147,9 @@ class CanDataSubscriber : public rclcpp::Node rclcpp::TimerBase::SharedPtr fault_timer_; rclcpp::TimerBase::SharedPtr connection_check_timer_; - InputMonitor identity_mon_{false, true}; - InputMonitor gps_mon_; - InputMonitor can_mon_; + InputHealthMonitor health_; + + MqttReporter reporter_; }; // ===================== main ===================== @@ -349,7 +187,7 @@ int main(int argc, char** argv) RCLCPP_INFO(logger, "Topic templates: gps='%s', info='%s', fault='%s'", config.gps_topic_template.c_str(), config.info_topic_template.c_str(), config.fault_topic_template.c_str()); - auto node = std::make_shared(config); + auto node = std::make_shared(config); rclcpp::spin(node); rclcpp::shutdown();