修改sub节点获取vid的方式,以及topic的方式

This commit is contained in:
lyq 2026-01-15 17:05:43 +08:00
parent 19182ffe53
commit caeb6cbc2b
12 changed files with 5279 additions and 4759 deletions

View File

@ -1,13 +1,13 @@
{ {
"mqtt": { "mqtt": {
"external_net_address": "tcp://36.153.162.171", "external_net_address": "36.153.162.171",
"external_net_port": 19683, "external_net_port": 19683,
"username": "zxwl", "username": "zxwl",
"password": "zxwl1234@", "password": "zxwl1234@",
"info_topic": "/zxwl/sweeper/{vid}/info", "info_topic": "/zxwl/sweeper/{vid}/info",
"fault_topic": "/zxwl/sweeper/{vid}/fault", "fault_topic": "/zxwl/sweeper/{vid}/fault",
"gps_topic": "/zxwl/sweeper/{vid}/gps", "gps_topic": "/zxwl/sweeper/{vid}/gps",
"remote_topic": "/zxwl/sweeper/V060003/ctrl", "remote_topic": "/zxwl/sweeper/{vid}/ctrl",
"upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload", "upload_url": "https://qsc.ntiov.com:8443/api/sys/route/upload",
"download_url_pre": "http://36.153.162.171:9510/api/ccp-web/file/", "download_url_pre": "http://36.153.162.171:9510/api/ccp-web/file/",
"mqtt_topic_push_status": "/zxwl/sweeper/V060003/task/status", "mqtt_topic_push_status": "/zxwl/sweeper/V060003/task/status",

View File

@ -16,7 +16,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
endif() endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang") if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options(-w) # add_compile_options(-w) #
endif() endif()
# find dependencies # find dependencies
@ -26,37 +26,32 @@ find_package(std_msgs REQUIRED)
find_package(sweeper_interfaces REQUIRED) find_package(sweeper_interfaces REQUIRED)
find_package(CURL REQUIRED) find_package(CURL REQUIRED)
include_directories( add_executable(
include/sub
include/paho_mqtt_3c
${catkin_INCLUDE_DIRS}
)
add_executable(sub_node
src/sub_node.cpp
src/jsoncpp.cpp
src/md5.cpp
)
ament_target_dependencies(sub_node rclcpp std_msgs sweeper_interfaces CURL)
if(CMAKE_SYSTEM_PROCESSOR MATCHES aarch64)
target_link_libraries(
sub_node
${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a
)
else()
target_link_libraries(
sub_node
${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a
)
endif()
install(TARGETS
sub_node sub_node
DESTINATION lib/${PROJECT_NAME} src/main.cpp
) src/sub_node.cpp
src/mqtt_receiver.cpp
src/control_mapper.cpp
src/jsoncpp.cpp
src/md5.cpp)
target_include_directories(sub_node PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_CURRENT_SOURCE_DIR}/include/paho_mqtt_3c)
ament_target_dependencies(
sub_node
rclcpp
std_msgs
sweeper_interfaces
CURL)
if(CMAKE_SYSTEM_PROCESSOR MATCHES aarch64)
target_link_libraries(sub_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c-static.a)
else()
target_link_libraries(sub_node ${PROJECT_SOURCE_DIR}/lib/libpaho-mqtt3c.a)
endif()
install(TARGETS sub_node DESTINATION lib/${PROJECT_NAME})
if(BUILD_TESTING) if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED) find_package(ament_lint_auto REQUIRED)
@ -67,5 +62,5 @@ if(BUILD_TESTING)
# uncomment the line when this package is not in a git repo # uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE) #set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies() ament_lint_auto_find_test_dependencies()
endif() endif()
ament_package() ament_package()

View File

@ -0,0 +1,17 @@
#pragma once
#include "sub/control_state.hpp"
#include "sweeper_interfaces/msg/mc_ctrl.hpp"
namespace sub_node_pkg
{
int mapGearToMcCtrl(int gear_mqtt);
int mapThrottleToRpm(int throttle);
int mapSteeringToAngle(int steering_raw);
void fillMcCtrlFromCarCtrl(const CarCtrl& in, sweeper_interfaces::msg::McCtrl& out);
} // namespace sub_node_pkg

View File

@ -0,0 +1,61 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <mutex>
#include <string>
namespace sub_node_pkg
{
struct CarCtrl
{
// 0: 手动驾驶, 3: 远程驾驶
int mode = 0;
// 0N档1D档2R档这是你 MQTT 侧的定义)
int gear = 0;
// 0~65535
int throttle = 0;
int steering = 32767;
int brake = 0;
// 1: 清扫 0: 不清扫
int sweepCtrl = 0;
};
struct MqttConfig
{
// MQTT broker
std::string inter_net_address;
int inter_net_port = 0;
std::string external_net_address;
int external_net_port = 0;
std::string mqtt_user;
std::string mqtt_password;
// topic template (must contain {vid})
std::string remote_topic_template;
// runtime
std::string address;
std::string client_id;
};
struct ControlState
{
std::mutex mtx;
CarCtrl ctrl;
int get_route = 0;
// identity新增
std::string vid;
bool identity_ready = false;
std::atomic<bool> mqtt_connected{false};
std::atomic<int64_t> last_msg_ms{0};
};
} // namespace sub_node_pkg

View File

@ -0,0 +1,54 @@
#pragma once
#include <MQTTClient.h>
#include <atomic>
#include <mutex>
#include <string>
#include <thread>
#include "sub/control_state.hpp"
namespace sub_node_pkg
{
class MqttReceiver
{
public:
MqttReceiver(ControlState& state, MqttConfig cfg);
~MqttReceiver();
bool start();
void stop();
MqttReceiver(const MqttReceiver&) = delete;
MqttReceiver& operator=(const MqttReceiver&) = delete;
private:
void runLoop();
// Paho C callbacks (static -> instance)
static void onDelivered(void* context, MQTTClient_deliveryToken dt);
static int onMessageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message);
static void onConnLost(void* context, char* cause);
// instance handlers
int handleMessage(char* topicName, MQTTClient_message* message);
void sendResponse(const std::string& topic, long long seqNo, int code, const std::string& msg);
static std::string generateClientId();
private:
ControlState& state_;
MqttConfig cfg_;
std::atomic<bool> running_{false};
std::thread th_;
MQTTClient client_{nullptr};
MQTTClient_connectOptions conn_opts_ = MQTTClient_connectOptions_initializer;
// 防止回调与重连线程同时 publish
std::mutex publish_mtx_;
};
} // namespace sub_node_pkg

View File

@ -1,20 +1,30 @@
#pragma once
#include <rclcpp/rclcpp.hpp>
#include <string> #include "sub/control_state.hpp"
#ifndef __SUB_NODE_H__ #include "sweeper_interfaces/msg/mc_ctrl.hpp"
#define __SUB_NODE_H__ #include "sweeper_interfaces/msg/sub.hpp"
#include "sweeper_interfaces/msg/vehicle_identity.hpp"
struct car_ctrl namespace sub_node_pkg
{ {
// 1. 使能模式
int mode; // 0:手动驾驶, 3:远程驾驶
int gear; // 挡位 0N档1D档2R档 class SubNode : public rclcpp::Node
int throttle; // 油门 0~65535 {
int steering; // 转向 0~65535 public:
int brake; // 刹车 0~65535 SubNode(ControlState& state, const std::string& name);
int sweepCtrl; // 清扫 1:清扫 0:不清扫
private:
void timerCallback();
void identityCallback(const sweeper_interfaces::msg::VehicleIdentity::SharedPtr msg);
private:
ControlState& state_;
rclcpp::Subscription<sweeper_interfaces::msg::VehicleIdentity>::SharedPtr identity_sub_;
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<sweeper_interfaces::msg::Sub>::SharedPtr pub_gather_;
rclcpp::Publisher<sweeper_interfaces::msg::McCtrl>::SharedPtr pub_mc_;
}; };
extern car_ctrl car_ctrl_mes; } // namespace sub_node_pkg
#endif

View File

@ -0,0 +1,71 @@
#include "sub/control_mapper.hpp"
#include <algorithm>
#include <cstdint>
namespace sub_node_pkg
{
int mapGearToMcCtrl(int gear_mqtt)
{
if (gear_mqtt == 0) return 0;
if (gear_mqtt == 1) return 2;
if (gear_mqtt == 2) return 1;
return 0;
}
int mapThrottleToRpm(int throttle)
{
// 修正避免整数除法导致一直为0
// 线性映射到 0~1000
throttle = std::clamp(throttle, 0, 65535);
const double ratio = static_cast<double>(throttle) / 65535.0;
return static_cast<int>(ratio * 1000.0);
}
int mapSteeringToAngle(int raw)
{
raw = std::clamp(raw, 0, 65535);
if (raw < 32200)
{
// -40 + raw*(40/32200)
return -40 + static_cast<int>(static_cast<double>(raw) * (40.0 / 32200.0));
}
// 33200~65535 -> 0~40
if (raw > 33200)
{
const int in_w = 65535 - 33200; // 32335
const int x = raw - 33200;
return static_cast<int>(static_cast<double>(x) * (40.0 / static_cast<double>(in_w)));
}
// deadzone
return 0;
}
void fillMcCtrlFromCarCtrl(const CarCtrl& in, sweeper_interfaces::msg::McCtrl& out)
{
// 默认安全态
out.brake = 1;
out.gear = 0;
out.rpm = 0;
out.angle = 0;
out.angle_speed = 120;
out.sweep = in.sweepCtrl;
if (in.mode == 3)
{
// 允许远控:逻辑 msg.brake=0
out.brake = 0;
out.gear = mapGearToMcCtrl(in.gear);
out.rpm = mapThrottleToRpm(in.throttle);
out.angle = mapSteeringToAngle(in.steering);
out.angle_speed = 120;
}
}
} // namespace sub_node_pkg

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,92 @@
#include <fstream>
#include <iostream>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include "sub/control_state.hpp"
#include "sub/json.h"
#include "sub/mqtt_receiver.hpp"
#include "sub/sub_node.hpp"
using sub_node_pkg::ControlState;
using sub_node_pkg::MqttConfig;
using sub_node_pkg::MqttReceiver;
using sub_node_pkg::SubNode;
static bool load_config(const std::string& path, MqttConfig& cfg)
{
Json::Reader reader;
Json::Value root;
std::ifstream in(path, std::ios::binary);
if (!in.is_open())
{
std::cerr << "Failed to open config: " << path << std::endl;
return false;
}
if (!reader.parse(in, root))
{
std::cerr << "Failed to parse config json.\n";
return false;
}
const auto& mqtt = root["mqtt"];
cfg.inter_net_address = mqtt["inter_net_address"].asString();
cfg.inter_net_port = mqtt["inter_net_port"].asInt();
cfg.external_net_address = mqtt["external_net_address"].asString();
cfg.external_net_port = mqtt["external_net_port"].asInt();
cfg.mqtt_user = mqtt.get("username", "").asString();
cfg.mqtt_password = mqtt.get("password", "").asString();
// ⚠️ 注意:这里读的是 template而不是完整 topic
cfg.remote_topic_template = mqtt["remote_topic"].asString();
// 基本校验:必须包含 {vid}
if (cfg.remote_topic_template.find("{vid}") == std::string::npos)
{
std::cerr << "remote_topic must contain '{vid}' placeholder\n";
return false;
}
// Paho MQTT C 要求 tcp://
cfg.address = "tcp://" + cfg.external_net_address + ":" + std::to_string(cfg.external_net_port);
return true;
}
int main(int argc, char** argv)
{
// 1) load config
MqttConfig cfg;
if (!load_config("config.json", cfg))
{
return 1;
}
std::cout << "MQTT address: " << cfg.address << "\n";
std::cout << "MQTT topic template: " << cfg.remote_topic_template << "\n";
// 2) shared state
ControlState state;
{
std::lock_guard<std::mutex> lock(state.mtx);
state.ctrl.steering = 32767; // 你原默认
}
// 3) start mqtt receiver
MqttReceiver mqtt(state, cfg);
mqtt.start();
// 4) ROS2 spin
rclcpp::init(argc, argv);
auto node = std::make_shared<SubNode>(state, "sub_node");
rclcpp::spin(node);
rclcpp::shutdown();
// 5) stop mqtt
mqtt.stop();
return 0;
}

View File

@ -1,7 +1,9 @@
#include "md5.h" #include "sub/md5.h"
#include <string.h> #include <string.h>
#include <string>
#include <fstream> #include <fstream>
#include <string>
// 重命名自定义的 byte 类型 // 重命名自定义的 byte 类型
typedef unsigned char uchar; typedef unsigned char uchar;
@ -66,41 +68,34 @@ Rotation is separate from addition to prevent recomputation.
} }
const uchar MD5::PADDING[64] = {0x80}; const uchar MD5::PADDING[64] = {0x80};
const char MD5::HEX[16] = { const char MD5::HEX[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
'0', '1', '2', '3',
'4', '5', '6', '7',
'8', '9', 'a', 'b',
'c', 'd', 'e', 'f'};
/* Default construct. */ /* Default construct. */
MD5::MD5() MD5::MD5() { reset(); }
{
reset();
}
/* Construct a MD5 object with a input buffer. */ /* Construct a MD5 object with a input buffer. */
MD5::MD5(const void *input, size_t length) MD5::MD5(const void* input, size_t length)
{ {
reset(); reset();
update(input, length); update(input, length);
} }
/* Construct a MD5 object with a string. */ /* Construct a MD5 object with a string. */
MD5::MD5(const string &str) MD5::MD5(const string& str)
{ {
reset(); reset();
update(str); update(str);
} }
/* Construct a MD5 object with a file. */ /* Construct a MD5 object with a file. */
MD5::MD5(ifstream &in) MD5::MD5(ifstream& in)
{ {
reset(); reset();
update(in); update(in);
} }
/* Return the message-digest */ /* Return the message-digest */
const uchar *MD5::digest() const uchar* MD5::digest()
{ {
if (!_finished) if (!_finished)
{ {
@ -124,21 +119,21 @@ void MD5::reset()
} }
/* Updating the context with a input buffer. */ /* Updating the context with a input buffer. */
void MD5::update(const void *input, size_t length) void MD5::update(const void* input, size_t length)
{ {
const uchar *uinput = static_cast<const uchar *>(input); const uchar* uinput = static_cast<const uchar*>(input);
update(uinput, length); update(uinput, length);
} }
/* Updating the context with a string. */ /* Updating the context with a string. */
void MD5::update(const string &str) void MD5::update(const string& str)
{ {
const uchar *uinput = reinterpret_cast<const uchar *>(str.c_str()); const uchar* uinput = reinterpret_cast<const uchar*>(str.c_str());
update(uinput, str.length()); update(uinput, str.length());
} }
/* Updating the context with a file. */ /* Updating the context with a file. */
void MD5::update(ifstream &in) void MD5::update(ifstream& in)
{ {
if (!in) if (!in)
{ {
@ -153,7 +148,7 @@ void MD5::update(ifstream &in)
length = in.gcount(); length = in.gcount();
if (length > 0) if (length > 0)
{ {
const uchar *uinput = reinterpret_cast<const uchar *>(buffer); const uchar* uinput = reinterpret_cast<const uchar*>(buffer);
update(uinput, length); update(uinput, length);
} }
} }
@ -164,7 +159,7 @@ void MD5::update(ifstream &in)
operation, processing another message block, and updating the operation, processing another message block, and updating the
context. context.
*/ */
void MD5::update(const uchar *input, size_t length) void MD5::update(const uchar* input, size_t length)
{ {
uint32 i, index, partLen; uint32 i, index, partLen;
@ -325,7 +320,7 @@ void MD5::transform(const uchar block[64])
/* Encodes input (ulong) into output (byte). Assumes length is /* Encodes input (ulong) into output (byte). Assumes length is
a multiple of 4. a multiple of 4.
*/ */
void MD5::encode(const uint32 *input, uchar *output, size_t length) void MD5::encode(const uint32* input, uchar* output, size_t length)
{ {
for (size_t i = 0, j = 0; j < length; ++i, j += 4) for (size_t i = 0, j = 0; j < length; ++i, j += 4)
{ {
@ -339,17 +334,17 @@ void MD5::encode(const uint32 *input, uchar *output, size_t length)
/* Decodes input (byte) into output (ulong). Assumes length is /* Decodes input (byte) into output (ulong). Assumes length is
a multiple of 4. a multiple of 4.
*/ */
void MD5::decode(const uchar *input, uint32 *output, size_t length) void MD5::decode(const uchar* input, uint32* output, size_t length)
{ {
for (size_t i = 0, j = 0; j < length; ++i, j += 4) for (size_t i = 0, j = 0; j < length; ++i, j += 4)
{ {
output[i] = ((uint32)input[j]) | (((uint32)input[j + 1]) << 8) | output[i] = ((uint32)input[j]) | (((uint32)input[j + 1]) << 8) | (((uint32)input[j + 2]) << 16) |
(((uint32)input[j + 2]) << 16) | (((uint32)input[j + 3]) << 24); (((uint32)input[j + 3]) << 24);
} }
} }
/* Convert byte array to hex string. */ /* Convert byte array to hex string. */
string MD5::bytesToHexString(const uchar *input, size_t length) string MD5::bytesToHexString(const uchar* input, size_t length)
{ {
string str; string str;
str.reserve(length << 1); str.reserve(length << 1);
@ -365,7 +360,4 @@ string MD5::bytesToHexString(const uchar *input, size_t length)
} }
/* Convert digest to string value */ /* Convert digest to string value */
string MD5::toString() string MD5::toString() { return bytesToHexString(digest(), 16); }
{
return bytesToHexString(digest(), 16);
}

View File

@ -0,0 +1,373 @@
#include "sub/mqtt_receiver.hpp"
#include <chrono>
#include <cstring>
#include <iomanip>
#include <iostream>
#include <random>
#include <sstream>
#include <thread>
#include <vector>
#include "sub/json.h"
namespace sub_node_pkg
{
static std::string expand_topic(const std::string& tpl, const std::string& vid)
{
std::string t = tpl;
auto pos = t.find("{vid}");
if (pos != std::string::npos) t.replace(pos, 5, vid);
return t;
}
static int64_t now_ms()
{
using namespace std::chrono;
return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
}
MqttReceiver::MqttReceiver(ControlState& state, MqttConfig cfg) : state_(state), cfg_(std::move(cfg)) {}
MqttReceiver::~MqttReceiver() { stop(); }
std::string MqttReceiver::generateClientId()
{
// 毫秒时间戳 + 4位随机数
auto millis =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1000, 9999);
std::ostringstream oss;
oss << "client_" << millis << "_" << std::setw(4) << std::setfill('0') << dis(gen);
return oss.str();
}
bool MqttReceiver::start()
{
if (running_) return true;
running_ = true;
th_ = std::thread(&MqttReceiver::runLoop, this);
return true;
}
void MqttReceiver::stop()
{
if (!running_) return;
running_ = false;
if (th_.joinable()) th_.join();
if (client_)
{
MQTTClient_disconnect(client_, 2000);
MQTTClient_destroy(&client_);
client_ = nullptr;
}
state_.mqtt_connected = false;
}
void MqttReceiver::onDelivered(void* context, MQTTClient_deliveryToken dt)
{
(void)dt;
(void)context;
// 可选:打印确认
}
void MqttReceiver::onConnLost(void* context, char* cause)
{
auto* self = static_cast<MqttReceiver*>(context);
self->state_.mqtt_connected = false;
std::cerr << "[MQTT] Connection lost, cause: " << (cause ? cause : "(null)") << std::endl;
}
int MqttReceiver::onMessageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
{
(void)topicLen;
auto* self = static_cast<MqttReceiver*>(context);
return self->handleMessage(topicName, message);
}
void MqttReceiver::sendResponse(const std::string& topic, long long seqNo, int code, const std::string& msg)
{
Json::Value response;
response["type"] = "response";
response["seqNo"] = static_cast<Json::Int64>(seqNo);
Json::Value data;
data["code"] = code;
data["msg"] = msg;
response["data"] = data;
Json::FastWriter writer;
const std::string payload = writer.write(response);
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = (void*)payload.data();
pubmsg.payloadlen = static_cast<int>(payload.size());
pubmsg.qos = 1;
pubmsg.retained = 0;
MQTTClient_deliveryToken token;
std::lock_guard<std::mutex> lk(publish_mtx_);
if (!client_ || MQTTClient_isConnected(client_) == 0)
{
return;
}
int rc = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "[MQTT] publish failed rc=" << rc << std::endl;
return;
}
MQTTClient_waitForCompletion(client_, token, 10000L);
}
int MqttReceiver::handleMessage(char* topicName, MQTTClient_message* message)
{
// 拷贝 payloadmessage->payload 不一定以 \0 结尾)
std::string payload;
payload.assign(static_cast<const char*>(message->payload), static_cast<size_t>(message->payloadlen));
Json::Reader reader;
Json::Value root;
if (!reader.parse(payload, root))
{
std::cerr << "[MQTT] JSON parse failed\n";
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
// type 必须是 request
if (!root.isMember("type") || root["type"].asString() != "request")
{
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
if (!root.isMember("seqNo") || !root["seqNo"].isInt64())
{
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
const long long seqNo = root["seqNo"].asInt64();
if (!root.isMember("data") || !root["data"].isObject())
{
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
const Json::Value data = root["data"];
const std::string command = data.get("command", "").asString();
// 收到控制消息,更新时间(用于“控制超时”)
state_.last_msg_ms = now_ms();
if (command == "mode" || command == "gear" || command == "sweepCtrl" || command == "gather")
{
const int value = data.get("value", 0).asInt();
std::lock_guard<std::mutex> lock(state_.mtx);
if (command == "mode")
state_.ctrl.mode = value;
else if (command == "gear")
state_.ctrl.gear = value;
else if (command == "sweepCtrl")
state_.ctrl.sweepCtrl = value;
else if (command == "gather")
state_.get_route = value;
}
else if (command == "drive")
{
// 你原协议value = "throttle,brake,steering"
const std::string driveValues = data.get("value", "").asString();
std::stringstream ss(driveValues);
std::vector<std::string> values;
std::string tok;
while (std::getline(ss, tok, ',')) values.push_back(tok);
if (values.size() >= 3)
{
int throttle = std::stoi(values[0]);
int brake = std::stoi(values[1]);
int steering = std::stoi(values[2]);
// 你原逻辑steering = 32768 - steering
steering = 32768 - steering;
std::lock_guard<std::mutex> lock(state_.mtx);
state_.ctrl.throttle = throttle;
state_.ctrl.brake = brake;
state_.ctrl.steering = steering;
}
}
// 发送 response同步避免 detached thread
std::string resp_topic;
{
std::lock_guard<std::mutex> lock(state_.mtx);
if (!state_.identity_ready)
{
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
resp_topic = expand_topic(cfg_.remote_topic_template, state_.vid);
}
sendResponse(resp_topic, seqNo, 200, "Success");
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void MqttReceiver::runLoop()
{
// 生成 client id如未提供
if (cfg_.client_id.empty()) cfg_.client_id = generateClientId();
const std::string& address = cfg_.address;
if (address.rfind("tcp://", 0) != 0)
{
std::cerr << "[MQTT] address must be 'tcp://ip:port', got: " << address << std::endl;
}
int rc = MQTTClient_create(&client_, address.c_str(), cfg_.client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "[MQTT] create failed rc=" << rc << std::endl;
state_.mqtt_connected = false;
return;
}
rc = MQTTClient_setCallbacks(client_, this, &MqttReceiver::onConnLost, &MqttReceiver::onMessageArrived,
&MqttReceiver::onDelivered);
if (rc != MQTTCLIENT_SUCCESS)
{
std::cerr << "[MQTT] setCallbacks failed rc=" << rc << std::endl;
MQTTClient_destroy(&client_);
client_ = nullptr;
state_.mqtt_connected = false;
return;
}
conn_opts_.keepAliveInterval = 20;
conn_opts_.cleansession = 1;
conn_opts_.username = cfg_.mqtt_user.c_str();
conn_opts_.password = cfg_.mqtt_password.c_str();
auto get_sub_topic = [&]() -> std::string
{
std::lock_guard<std::mutex> lock(state_.mtx);
if (!state_.identity_ready) return {};
return expand_topic(cfg_.remote_topic_template, state_.vid);
};
auto do_connect_and_sub = [&]() -> bool
{
int rc2 = MQTTClient_connect(client_, &conn_opts_);
if (rc2 != MQTTCLIENT_SUCCESS)
{
std::cerr << "[MQTT] connect failed rc=" << rc2 << "\n";
state_.mqtt_connected = false;
return false;
}
else
{
std::cout << "[MQTT] Connected to broker\n";
}
std::string topic = get_sub_topic();
if (topic.empty())
{
state_.mqtt_connected = false;
return false;
}
rc2 = MQTTClient_subscribe(client_, topic.c_str(), 1);
if (rc2 != MQTTCLIENT_SUCCESS)
{
std::cerr << "[MQTT] subscribe failed rc=" << rc2 << "\n";
state_.mqtt_connected = false;
return false;
}
state_.mqtt_connected = true;
std::cout << "[MQTT] Connected & subscribed: " << topic << "\n";
return true;
};
// 首次连接identity 可能还没 ready所以这里可能失败没关系
do_connect_and_sub();
bool last_ready = false;
std::string last_topic;
while (running_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (!client_) continue;
// 1) 检测 identity ready 以及当前应订阅的 topic
bool ready = false;
std::string topic;
{
std::lock_guard<std::mutex> lock(state_.mtx);
ready = state_.identity_ready;
if (ready) topic = expand_topic(cfg_.remote_topic_template, state_.vid);
}
// 2) identity 从 false->true或 vid 变化)时,主动订阅一次
if (ready && (!last_ready || topic != last_topic))
{
std::cout << "[MQTT] Identity ready/topic changed, subscribing: " << topic << "\n";
// 已连接就直接 subscribe未连接则走 do_connect_and_sub
if (MQTTClient_isConnected(client_) != 0)
{
int rc_sub = MQTTClient_subscribe(client_, topic.c_str(), 1);
if (rc_sub == MQTTCLIENT_SUCCESS)
{
state_.mqtt_connected = true;
std::cout << "[MQTT] Subscribed: " << topic << "\n";
last_topic = topic;
}
else
{
state_.mqtt_connected = false;
std::cerr << "[MQTT] subscribe failed rc=" << rc_sub << "\n";
}
}
else
{
do_connect_and_sub();
last_topic = topic; // 记录期望值,避免频繁刷屏
}
}
last_ready = ready;
// 3) 断线重连逻辑保留
if (MQTTClient_isConnected(client_) == 0)
{
state_.mqtt_connected = false;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
do_connect_and_sub();
}
}
}
} // namespace sub_node_pkg

View File

@ -1,478 +1,76 @@
#include "MQTTClient.h" #include "sub/sub_node.hpp"
#include "rclcpp/rclcpp.hpp"
#include "sweeper_interfaces/msg/sub.hpp"
#include "sweeper_interfaces/msg/mc_ctrl.hpp"
#include "json.h"
#include "sub_node.hpp"
#include <curl/curl.h>
#include "md5.h"
#include <unistd.h>
#include <fstream>
#include <iostream>
#include <chrono> #include <chrono>
#include <ctime>
#include <string.h>
#include <time.h>
#include <random>
#include <sstream>
#include <iomanip>
#include <pthread.h>
#include <cstdlib>
using namespace std;
// 全局变量声明 #include "sub/control_mapper.hpp"
std::string command;
std::string sub_topic;
int get_route = 0;
int horn = 0;
int isDisconn = 0;
MQTTClient client; namespace sub_node_pkg
MQTTClient_deliveryToken token_d_m;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg_d_m = MQTTClient_message_initializer;
// mqtt相关
std::string mqtt_vid;
std::string mqtt_inter_net_address;
int mqtt_inter_net_port;
std::string mqtt_external_net_address;
int mqtt_external_net_port;
std::string mqtt_user;
std::string mqtt_password;
std::string mqtt_topic_remote;
// 新增:声明未定义的常量
std::string ADDRESS;
std::string CLIENTID_SUB; // 客户端ID提供默认值
std::chrono::steady_clock::time_point last_message_time = std::chrono::steady_clock::now();
constexpr auto QOS = 1;
constexpr auto TIMEOUT = 10000L;
volatile MQTTClient_deliveryToken deliveredtoken;
char sub_buff[500];
car_ctrl car_ctrl_mes;
std::string generate_mqtt_client_id()
{ {
// 获取当前时间戳(以毫秒为单位)
auto now = std::chrono::system_clock::now();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
// 生成一个 4 位随机数 SubNode::SubNode(ControlState& state, const std::string& name) : Node(name), state_(state)
std::random_device rd; {
std::mt19937 gen(rd()); RCLCPP_INFO(this->get_logger(), "%s node started.", name.c_str());
std::uniform_int_distribution<> dis(1000, 9999);
int random_num = dis(gen);
// 拼接成 client ID pub_mc_ = this->create_publisher<sweeper_interfaces::msg::McCtrl>("remote_mc_ctrl", 10);
std::ostringstream oss; pub_gather_ = this->create_publisher<sweeper_interfaces::msg::Sub>("gather", 10);
oss << "client_" << millis << "_" << std::setw(4) << std::setfill('0') << random_num; // identitytransient_local确保晚启动也能拿到
identity_sub_ = this->create_subscription<sweeper_interfaces::msg::VehicleIdentity>(
"/vehicle/identity", rclcpp::QoS(1).transient_local().reliable(),
std::bind(&SubNode::identityCallback, this, std::placeholders::_1));
return oss.str(); timer_ = this->create_wall_timer(std::chrono::milliseconds(100), std::bind(&SubNode::timerCallback, this));
} }
void sendResponse(MQTTClient client, const std::string &topic, long long seqNo, int code, std::string msg) void SubNode::timerCallback()
{ {
// 使用JSON库构建响应数据 // 读快照(避免与 MQTT 线程 data race
Json::Value responseData; CarCtrl ctrl_snapshot;
responseData["type"] = "response"; int get_route_snapshot = 0;
responseData["seqNo"] = static_cast<Json::Int64>(seqNo); bool mqtt_connected = state_.mqtt_connected.load();
const int64_t last_ms = state_.last_msg_ms.load();
Json::Value data; {
data["code"] = code; std::lock_guard<std::mutex> lock(state_.mtx);
data["msg"] = msg; ctrl_snapshot = state_.ctrl;
get_route_snapshot = state_.get_route;
}
responseData["data"] = data; // gather 发布
sweeper_interfaces::msg::Sub gather_msg;
gather_msg.get_route = get_route_snapshot;
pub_gather_->publish(gather_msg);
// 使用JSON writer生成字符串 // 生成控制
Json::FastWriter writer; sweeper_interfaces::msg::McCtrl mc;
std::string responseJson = writer.write(responseData); fillMcCtrlFromCarCtrl(ctrl_snapshot, mc);
// 发布MQTT消息 // 可选:控制超时保护(例如 500ms 内未收到控制,强制安全态)
pubmsg_d_m.payload = (void *)responseJson.c_str(); // 你原来把“500ms无消息”当掉线我这里改成“控制超时保护”语义更准确
pubmsg_d_m.payloadlen = responseJson.length(); const int64_t now =
pubmsg_d_m.qos = QOS; std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch())
pubmsg_d_m.retained = 0; .count();
constexpr int64_t CONTROL_TIMEOUT_MS = 500;
MQTTClient_publishMessage(client, topic.c_str(), &pubmsg_d_m, &token_d_m); if (!mqtt_connected || (last_ms > 0 && (now - last_ms) > CONTROL_TIMEOUT_MS))
MQTTClient_waitForCompletion(client, token_d_m, TIMEOUT); {
printf("Message with delivery token %d delivered\n", token_d_m); // 强制安全态
mc.brake = 1;
mc.gear = 0;
mc.rpm = 0;
mc.angle = 0;
mc.angle_speed = 120;
}
pub_mc_->publish(mc);
} }
// 修正将getSubTopic()函数移到使用它的mqtt_sub()函数之前 void SubNode::identityCallback(const sweeper_interfaces::msg::VehicleIdentity::SharedPtr msg)
const char *getSubTopic()
{ {
return sub_topic.c_str(); std::lock_guard<std::mutex> lock(state_.mtx);
state_.vid = msg->vid;
state_.identity_ready = msg->ready;
RCLCPP_INFO(get_logger(), "Identity: VID=%s ready=%d", msg->vid.c_str(), msg->ready);
} }
void delivered(void *context, MQTTClient_deliveryToken dt) } // namespace sub_node_pkg
{
(void)context;
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
(void)context;
(void)topicLen;
// printf("Message arrived\n");
last_message_time = std::chrono::steady_clock::now();
// printf("topic: %s\n", topicName);
// printf("message: %.*s\n", message->payloadlen, (char *)message->payload);
memset(sub_buff, 0, sizeof(sub_buff));
memcpy(&sub_buff, (char *)message->payload, message->payloadlen);
Json::Reader reader;
Json::Value root;
if (!reader.parse(sub_buff, root))
{
printf("recv json fail\n");
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
isDisconn = 0;
// std::cout << root << std::endl;
// Check type
if (!root.isMember("type") || root["type"].asString() != "request")
{
std::cout << "Message type is not 'request', ignoring.\n";
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
// Extract seqNo
if (!root.isMember("seqNo") || !root["seqNo"].isInt64())
{
std::cout << "Invalid or missing seqNo.\n";
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
long long seqNo = root["seqNo"].asInt64();
// Check data
if (!root.isMember("data") || !root["data"].isObject())
{
std::cout << "Invalid message: missing 'data' field.\n";
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
Json::Value data = root["data"];
std::string command = data["command"].asString();
int value = data["value"].asInt();
// Handle command
if (command == "mode")
car_ctrl_mes.mode = value;
else if (command == "gear")
car_ctrl_mes.gear = value;
else if (command == "drive")
{
std::string driveValues = data["value"].asString();
std::stringstream ss(driveValues);
std::vector<std::string> values;
std::string token;
while (std::getline(ss, token, ','))
{
values.push_back(token);
}
if (values.size() >= 3)
{
int throttle = std::stoi(values[0]);
int brake = std::stoi(values[1]);
int steering = std::stoi(values[2]);
steering = 32768 - steering;
car_ctrl_mes.throttle = throttle;
car_ctrl_mes.brake = brake;
car_ctrl_mes.steering = steering;
}
}
else if (command == "sweepCtrl")
car_ctrl_mes.sweepCtrl = value;
else if (command == "gather")
get_route = value;
// Generate response
// sendResponse(client, mqtt_topic_remote, seqNo, 200, "Success");
// sendResponse作为线程函数发送响应
// 创建线程发送响应
std::thread responseThread(sendResponse, client, mqtt_topic_remote, seqNo, 200, "Success");
responseThread.detach();
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
(void)context;
isDisconn = 1;
printf("\nConnection lost\n");
printf("cause: %s\n", cause);
}
void *mqtt_sub(void *arg)
{
(void)arg;
// 使用已定义的getSubTopic()函数
const char *SUB_TOPIC = getSubTopic();
int rc;
const char *username = mqtt_user.c_str();
const char *password = mqtt_password.c_str();
if ((rc = MQTTClient_create(&client, ADDRESS.c_str(), CLIENTID_SUB.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = username;
conn_opts.password = password;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", SUB_TOPIC, CLIENTID_SUB.c_str(), QOS);
if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
while (1)
{
usleep(100);
// 检查连接状态,如果连接丢失,则尝试重新连接
if (MQTTClient_isConnected(client) == 0)
{
printf("MQTT connection lost, trying to reconnect...\n");
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
isDisconn = 1;
printf("Failed to reconnect, return code %d\n", rc);
// 处理连接失败的情况,例如等待一段时间后再次尝试
}
else
{
isDisconn = 0;
printf("Reconnected to MQTT server.\n");
// 重新订阅主题
if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to resubscribe, return code %d\n", rc);
}
}
}
else
{
isDisconn = 0;
auto current_time = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(current_time - last_message_time).count() > 500)
{
isDisconn = 1;
// printf("Heartbeat timeout: No message received in 500ms.\n");
// 执行心跳超时的处理逻辑
}
}
}
if ((rc = MQTTClient_unsubscribe(client, SUB_TOPIC)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to unsubscribe, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to disconnect, return code %d\n", rc);
MQTTClient_destroy(&client);
return NULL;
}
MQTTClient_destroy(&client);
return NULL;
}
class sub_node : public rclcpp::Node
{
public:
// 构造函数,有一个参数为节点名称
sub_node(std::string name) : Node(name)
{
RCLCPP_INFO(this->get_logger(), "%s节点已经启动.", name.c_str());
timer_ = this->create_wall_timer(std::chrono::milliseconds(100), std::bind(&sub_node::timer_callback, this));
// 发布控制指令消息的发布器
pub_mc = this->create_publisher<sweeper_interfaces::msg::McCtrl>("remote_mc_ctrl", 10);
pub_gather = this->create_publisher<sweeper_interfaces::msg::Sub>("gather", 10);
}
private:
void timer_callback()
{
sweeper_interfaces::msg::Sub message;
message.get_route = get_route;
pub_gather->publish(message);
sweeper_interfaces::msg::McCtrl msg;
// if (isDisconn == 0)
{
// mcu 部分
if (car_ctrl_mes.mode == 3) // 台架端远程驾驶模式
{
msg.brake = 0;
if (car_ctrl_mes.gear == 0) // N档
msg.gear = 0;
else if (car_ctrl_mes.gear == 1) // D档
msg.gear = 2;
else if (car_ctrl_mes.gear == 2) // R档
msg.gear = 1;
msg.rpm = car_ctrl_mes.throttle / 65535 * 1000;
// 转向
// 0~32200 -> -40~0 33200~65535 -> 0~40
int steering = 0;
if (car_ctrl_mes.steering < 32200)
{
const int originalWidth1 = 32200;
const int targetWidth1 = 40;
double unitLength1 = targetWidth1 / originalWidth1;
steering = -40 + static_cast<int>(car_ctrl_mes.steering * unitLength1);
}
else if (car_ctrl_mes.steering > 33200)
{
const int originalWidth2 = 32355; // 注意这里是从 33200 到 65535共计 32355 个数
const int targetWidth2 = 40;
double unitLength2 = targetWidth2 / originalWidth2;
steering = static_cast<int>((car_ctrl_mes.steering - 32200) * unitLength2);
}
else
{
steering = 0; // 朝向前方
}
msg.angle = steering;
msg.angle_speed = 120;
}
else
{
msg.brake = 1;
msg.gear = 0;
msg.rpm = 0;
// eps部分
msg.angle = 0;
msg.angle_speed = 120;
}
msg.sweep = car_ctrl_mes.sweepCtrl;
pub_mc->publish(msg);
// RCLCPP_INFO_STREAM(this->get_logger(), "Publishing ControlMsg:" << "\n 档位: " << [&]()
// {
// switch(msg.gear) {
// case 0: return "空挡";
// case 1: return "后退";
// case 2: return "前进";
// default: return "未知档位"; // 添加默认分支
// } }() << "\n 油门转速: " << msg.rpm << " rpm"
// << "\n 转向角度: " << msg.angle << "°"
// << "\n 刹车: " << (msg.brake == 0 ? "开(释放)" : "关(刹住)") << "\n 清扫: " << (msg.sweep ? "开启清扫" : "关闭清扫"));
}
}
// 声明定时器指针
rclcpp::TimerBase::SharedPtr timer_;
// 声明话题发布者指针
rclcpp::Publisher<sweeper_interfaces::msg::Sub>::SharedPtr pub_gather;
rclcpp::Publisher<sweeper_interfaces::msg::McCtrl>::SharedPtr pub_mc;
};
void init_main()
{
Json::Reader reader;
Json::Value root;
std::ifstream in("config.json", std::ios::binary);
if (!in.is_open())
{
std::cout << "read config file error" << std::endl;
return;
}
if (reader.parse(in, root))
{
mqtt_vid = root["mqtt"]["vid"].asString();
mqtt_inter_net_address = root["mqtt"]["inter_net_address"].asString();
mqtt_inter_net_port = root["mqtt"]["inter_net_port"].asInt();
mqtt_external_net_address = root["mqtt"]["external_net_address"].asString();
mqtt_external_net_port = root["mqtt"]["external_net_port"].asInt();
mqtt_user = root["mqtt"]["mqtt_user"].asString();
mqtt_password = root["mqtt"]["mqtt_password"].asString();
mqtt_topic_remote = root["mqtt"]["remote_topic"].asString();
// 添加对sub_topic的初始化
sub_topic = mqtt_topic_remote;
CLIENTID_SUB = generate_mqtt_client_id();
// ADDRESS = mqtt_inter_net_address + ":" + std::to_string(mqtt_inter_net_port);
ADDRESS = mqtt_external_net_address + ":" + std::to_string(mqtt_external_net_port);
cout << "ADDRESS: " << ADDRESS << endl;
cout << "CLIENTID_SUB: " << CLIENTID_SUB << endl;
cout << "mqtt_vid: " << mqtt_vid << endl;
}
in.close(); // 关闭文件流
}
pthread_t mqtt_sub_thread_t;
int main(int argc, char **argv)
{
init_main();
memset(&car_ctrl_mes, 0, sizeof(car_ctrl_mes));
car_ctrl_mes.steering = 32767;
pthread_create(&mqtt_sub_thread_t, NULL, mqtt_sub, NULL);
rclcpp::init(argc, argv);
/*创建对应节点的共享指针对象*/
auto node = std::make_shared<sub_node>("sub_node");
/* 运行节点,并检测退出信号*/
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}