修改mqtt上报节点的配置获取

This commit is contained in:
lyq 2026-01-13 17:24:17 +08:00
parent cc09a445de
commit 603dc42acf
4 changed files with 66 additions and 53 deletions

View File

@ -1,16 +1,15 @@
{
"mqtt": {
"vid": "V060003",
"upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload",
"download_url_pre": "http://36.153.162.171:9510/api/ccp-web/file/",
"inter_net_address": "tcp://192.168.4.196",
"inter_net_port": 11883,
"external_net_address": "tcp://36.153.162.171",
"external_net_port": 19683,
"mqtt_user": "zxwl",
"mqtt_password": "zxwl1234@",
"username": "zxwl",
"password": "zxwl1234@",
"info_topic": "/zxwl/vehicle/{vid}/info",
"fault_topic": "/zxwl/vehicle/{vid}/fault",
"pub_gps_topic": "/zxwl/sweeper/V060003/gps",
"remote_topic": "/zxwl/sweeper/V060003/ctrl",
"upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload",
"download_url_pre": "http://36.153.162.171:9510/api/ccp-web/file/",
"mqtt_topic_push_status": "/zxwl/sweeper/V060003/task/status",
"mqtt_topic_sub_task": "/zxwl/sweeper/V060003/task"
},

View File

@ -1,12 +1,13 @@
#pragma once
#include <fstream>
#include <string>
#include <iostream>
#include <chrono>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <random>
#include <sstream>
#include <iomanip>
#include <string>
#include "ament_index_cpp/get_package_share_directory.hpp"
#include "nlohmann/json.hpp"
@ -15,12 +16,15 @@ using ordered_json = nlohmann::ordered_json;
struct Config
{
std::string vid;
std::string mqtt_ip;
int mqtt_port;
std::string info_topic;
std::string fault_topic;
int mqtt_port = 1883;
std::string mqtt_username;
std::string mqtt_password;
std::string info_topic_template; // e.g. /zxwl/vehicle/{vid}/info
std::string fault_topic_template; // e.g. /zxwl/vehicle/{vid}/fault
};
bool load_config(Config &config);
bool load_config(const std::string& path, Config& config);
std::string generate_mqtt_client_id();

View File

@ -1,44 +1,32 @@
#include "mqtt_report/get_config.h"
bool load_config(Config &config)
bool load_config(const std::string& path, Config& config)
{
try
{
// std::string config_path = ament_index_cpp::get_package_share_directory("mqtt_report") + "/config/config.json";
std::string config_path = "./config.json";
std::ifstream ifs(config_path);
std::ifstream ifs(path);
if (!ifs.is_open())
{
std::cerr << "Failed to open config file: " << config_path << std::endl;
std::cerr << "Failed to open config file: " << path << std::endl;
return false;
}
json j;
ifs >> j;
config.vid = j.at("mqtt").at("vid").get<std::string>();
config.mqtt_ip = j.at("mqtt").at("external_net_address").get<std::string>();
config.mqtt_port = j.at("mqtt").at("external_net_port").get<int>();
const auto& mqtt = j.at("mqtt");
// 替换 topic 中的 {vid} 为实际 vid 值
auto replace_vid = [&](const std::string &topic_template)
{
std::string topic = topic_template;
size_t pos = topic.find("{vid}");
if (pos != std::string::npos)
{
topic.replace(pos, 5, config.vid);
}
return topic;
};
config.mqtt_ip = mqtt.at("external_net_address").get<std::string>();
config.mqtt_port = mqtt.at("external_net_port").get<int>();
config.mqtt_username = mqtt.at("username").get<std::string>();
config.mqtt_password = mqtt.at("password").get<std::string>();
config.info_topic = replace_vid(j.at("mqtt").at("info_topic").get<std::string>());
config.fault_topic = replace_vid(j.at("mqtt").at("fault_topic").get<std::string>());
config.info_topic_template = mqtt.at("info_topic").get<std::string>();
config.fault_topic_template = mqtt.at("fault_topic").get<std::string>();
return true;
}
catch (const std::exception &e)
catch (const std::exception& e)
{
std::cerr << "Error parsing MQTT config: " << e.what() << std::endl;
return false;

View File

@ -21,7 +21,6 @@
namespace sweeperMsg = sweeper_interfaces::msg;
using sweeper_interfaces::msg::VehicleIdentity;
Config config;
int mqtt_reconnect_interval = 5000; // ms
// ===================== JSON 打包 =====================
@ -40,14 +39,27 @@ static std::string pack_general_info_to_json(const GeneralMsg& msg, const std::s
return j.dump(-1);
}
static std::string make_topic(const std::string& tpl, const std::string& vid)
{
std::string topic = tpl;
const std::string key = "{vid}";
auto pos = topic.find(key);
if (pos != std::string::npos)
{
topic.replace(pos, key.size(), vid);
}
return topic;
}
// ===================== Node =====================
class CanDataSubscriber : public rclcpp::Node
{
public:
explicit CanDataSubscriber(const Config& cfg)
: Node("can_data_subscriber"),
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), "zxwl",
"zxwl1234@", mqtt_reconnect_interval),
config_(cfg),
mqtt_client_(cfg.mqtt_ip + ":" + std::to_string(cfg.mqtt_port), generate_mqtt_client_id(), cfg.mqtt_username,
cfg.mqtt_password, mqtt_reconnect_interval),
can_decoder_(info_report_, info_mutex_, fault_mutex_)
{
// CAN
@ -113,7 +125,7 @@ class CanDataSubscriber : public rclcpp::Node
return;
}
const std::string topic = "/zxwl/vehicle/" + vid + "/info";
const std::string topic = make_topic(config_.info_topic_template, vid);
GeneralMsg snapshot;
{
@ -137,7 +149,7 @@ class CanDataSubscriber : public rclcpp::Node
std::string imei, vid;
if (!snapshot_identity(imei, vid)) return;
const std::string topic = "/zxwl/vehicle/" + vid + "/fault";
const std::string topic = make_topic(config_.fault_topic_template, vid);
std::vector<int> codes;
{
@ -177,6 +189,8 @@ class CanDataSubscriber : public rclcpp::Node
rclcpp::TimerBase::SharedPtr fault_timer_;
rclcpp::TimerBase::SharedPtr connection_check_timer_;
Config config_;
// MQTT
MQTTClientWrapper mqtt_client_;
@ -199,11 +213,25 @@ class CanDataSubscriber : public rclcpp::Node
// ===================== main =====================
int main(int argc, char** argv)
{
std::string config_path = "./config.json"; // 默认配置
// 手动解析 --config
for (int i = 1; i < argc; ++i)
{
if (std::string(argv[i]) == "--config" && i + 1 < argc)
{
config_path = argv[i + 1];
++i; // 跳过参数值
}
}
rclcpp::init(argc, argv);
if (!load_config(config))
Config config;
if (!load_config(config_path, config))
{
RCLCPP_ERROR(rclcpp::get_logger("main"), "Failed to load MQTT config.");
RCLCPP_ERROR(rclcpp::get_logger("main"), "Failed to load config: %s", config_path.c_str());
return 1;
}
@ -211,12 +239,6 @@ int main(int argc, char** argv)
auto node = std::make_shared<CanDataSubscriber>(config);
RCLCPP_INFO(node->get_logger(), "mqtt_ip: %s", config.mqtt_ip.c_str());
RCLCPP_INFO(node->get_logger(), "info_topic: %s", config.info_topic.c_str());
RCLCPP_INFO(node->get_logger(), "fault_topic: %s", config.fault_topic.c_str());
RCLCPP_INFO(node->get_logger(), "MQTT reconnection interval: %d ms", mqtt_reconnect_interval);
RCLCPP_INFO(node->get_logger(), "MqttReport node started.");
rclcpp::spin(node);
rclcpp::shutdown();
return 0;