From 673a5e4adcc943121713b238365de360a7a0e777 Mon Sep 17 00:00:00 2001 From: lyq Date: Tue, 30 Dec 2025 11:25:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=BF=9C=E6=8E=A7=E7=9C=8B?= =?UTF-8?q?=E9=97=A8=E7=8B=97=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/remote_ctrl/config/config.ini | 6 +- .../include/remote_ctrl/mqtt_client.hpp | 72 +++-- src/remote_ctrl/src/remote_ctrl_node.cpp | 255 ++++++++++++------ 3 files changed, 231 insertions(+), 102 deletions(-) diff --git a/src/remote_ctrl/config/config.ini b/src/remote_ctrl/config/config.ini index eb216eb..589a9df 100644 --- a/src/remote_ctrl/config/config.ini +++ b/src/remote_ctrl/config/config.ini @@ -1,6 +1,6 @@ [MQTT] -host = 192.168.4.150 -port = 1883 +host = 36.153.162.171 +port = 61883 username = zxwl password = zxwl1234@ -vehicle_id = V090001 \ No newline at end of file +vehicle_id = V090001 diff --git a/src/remote_ctrl/include/remote_ctrl/mqtt_client.hpp b/src/remote_ctrl/include/remote_ctrl/mqtt_client.hpp index 2fedefb..e0ee9b0 100644 --- a/src/remote_ctrl/include/remote_ctrl/mqtt_client.hpp +++ b/src/remote_ctrl/include/remote_ctrl/mqtt_client.hpp @@ -81,6 +81,7 @@ public: msg_cb_ = std::move(cb); } + // 注意:connected_cb_ 在 MQTT 网络线程中调用,禁止阻塞 void setConnectedCallback(ConnectedCallback cb) { std::lock_guard lk(mtx_); @@ -90,7 +91,10 @@ public: bool subscribe(const std::string &topic, int qos = 0) { std::lock_guard lk(mtx_); - if (!client_ || !connected_) + + if (!client_ || + !connected_.load(std::memory_order_acquire) || + !ready_.load(std::memory_order_acquire)) return false; int rc = MQTTClient_subscribe(client_, topic.c_str(), qos); @@ -109,7 +113,10 @@ public: int qos = 0) { std::lock_guard lk(mtx_); - if (!client_ || !connected_) + + if (!client_ || + !connected_.load(std::memory_order_acquire) || + !ready_.load(std::memory_order_acquire)) return false; MQTTClient_message msg = MQTTClient_message_initializer; @@ -122,7 +129,8 @@ public: if (rc != MQTTCLIENT_SUCCESS) { - connected_ = false; + connected_.store(false, std::memory_order_release); + ready_.store(false, std::memory_order_release); return false; } @@ -153,20 +161,27 @@ private: int rc = MQTTClient_connect(client_, &opts); if (rc == MQTTCLIENT_SUCCESS) { - connected_ = true; - if (connected_cb_) - connected_cb_(); + connected_.store(true, std::memory_order_release); + ready_.store(false, std::memory_order_release); + + // ⭐ 标记:需要在 loop 线程中触发 connected_cb_ + need_callback_.store(true, std::memory_order_release); + return true; } - connected_ = false; + connected_.store(false, std::memory_order_release); + ready_.store(false, std::memory_order_release); return false; } static void connLostCB(void *ctx, char *cause) { auto self = static_cast(ctx); - self->connected_ = false; + + self->connected_.store(false, std::memory_order_release); + self->ready_.store(false, std::memory_order_release); + std::cerr << "[MQTT] lost connection: " << (cause ? cause : "unknown") << std::endl; } @@ -193,19 +208,41 @@ private: // 异步线程:负责 reconnect + yield void loopFunc() { + auto last_connect_try = std::chrono::steady_clock::now(); + while (!stop_flag_) { - - if (!connected_) + // 1️⃣ 未连接:按重连间隔尝试连接 + if (!connected_.load(std::memory_order_acquire)) { - doConnect(); + auto now = std::chrono::steady_clock::now(); + if (now - last_connect_try >= std::chrono::milliseconds(reconnect_ms_)) + { + last_connect_try = now; + doConnect(); + } + } + else + { + // 2️⃣ 已连接:处理网络事件 + MQTTClient_yield(); + + // 3️⃣ 第一次 yield 后,才算真正 ready + if (!ready_.load(std::memory_order_acquire)) + { + ready_.store(true, std::memory_order_release); + + // ⭐ 在 ready 状态触发 connected callback + if (need_callback_.exchange(false)) + { + if (connected_cb_) + connected_cb_(); + } + } } - // 关键:处理 MQTT 网络事件 - MQTTClient_yield(); - - std::this_thread::sleep_for( - std::chrono::milliseconds(reconnect_ms_)); + // ⭐ 正常网络轮询节奏 + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } } @@ -223,4 +260,7 @@ private: MessageCallback msg_cb_; ConnectedCallback connected_cb_; + + std::atomic ready_{false}; // 是否真正进入可用状态 + std::atomic need_callback_{false}; }; diff --git a/src/remote_ctrl/src/remote_ctrl_node.cpp b/src/remote_ctrl/src/remote_ctrl_node.cpp index 87164bf..b92c1a8 100644 --- a/src/remote_ctrl/src/remote_ctrl_node.cpp +++ b/src/remote_ctrl/src/remote_ctrl_node.cpp @@ -118,22 +118,28 @@ public: client_id.c_str()); // ===== 看门狗 & 定时发布 ===== - remote_enabled_.store(false); last_msg_time_ = std::chrono::steady_clock::now(); - // 看门狗:300ms 内没收到新 remote 指令则停控 + // 看门狗:800ms 内没收到新 remote 指令则停控 watchdog_ = this->create_wall_timer( 100ms, [this]() { + // 未授权远控,watchdog 什么都不做 + if (!remote_authorized_.load(std::memory_order_acquire)) + return; + auto now = std::chrono::steady_clock::now(); - if (remote_enabled_.load() && - (now - last_msg_time_ > 300ms)) + + // 超时阈值:建议 800ms ~ 1500ms + if (now - last_msg_time_ > 800ms) { - RCLCPP_WARN(this->get_logger(), - "[REMOTE] timeout, disable remote"); - remote_enabled_.store(false); - desired_ = {}; + // 只在第一次进入“失活”时打印日志 + if (remote_alive_.exchange(false)) + { + RCLCPP_WARN(this->get_logger(), + "[REMOTE] control timeout, enter safe-stop"); + } } }); @@ -142,18 +148,35 @@ public: 20ms, // 50Hz [this]() { - if (!remote_enabled_.load()) + // 未授权远控:完全不发 + if (!remote_authorized_.load(std::memory_order_acquire)) return; sweeperMsg::McCtrl msg; + { std::lock_guard lock(state_mtx_); - msg.gear = desired_.gear; - msg.rpm = desired_.rpm; - msg.brake = desired_.brake; - msg.angle = desired_.angle; - msg.sweep = desired_.sweep; - msg.angle_speed = desired_.angle_speed; + + // 已授权但超时:安全停车态 + if (!remote_alive_.load(std::memory_order_relaxed)) + { + msg.gear = desired_.gear; // 通常保持挡位 + msg.brake = 1; + msg.rpm = 0; + msg.angle = 0.0f; + msg.angle_speed = 120; + msg.sweep = false; + } + // 正常远控 + else + { + msg.gear = desired_.gear; + msg.brake = desired_.brake; + msg.rpm = desired_.rpm; + msg.angle = desired_.angle; + msg.angle_speed = desired_.angle_speed; + msg.sweep = desired_.sweep; + } } pub_->publish(msg); @@ -187,11 +210,22 @@ private: void onMqttMessage(const std::string &payload) { std::lock_guard lock(state_mtx_); + + // ===================================================== + // 1. MQTT 消息到达:刷新活性 & 判断是否从超时中恢复 + // ===================================================== + bool was_dead = !remote_alive_.exchange(true, std::memory_order_relaxed); + if (was_dead) + { + RCLCPP_INFO(this->get_logger(), + "[REMOTE] control recovered"); + } + last_msg_time_ = std::chrono::steady_clock::now(); - RCLCPP_INFO(this->get_logger(), - "[REMOTE] recv MQTT payload: %s", - payload.c_str()); + RCLCPP_DEBUG(this->get_logger(), + "[REMOTE] recv MQTT payload: %s", + payload.c_str()); try { @@ -199,93 +233,145 @@ private: if (!j.contains("data")) return; - std::string cmd = j["data"]["command"].get(); - auto value = j["data"]["value"]; + const auto &data = j["data"]; + if (!data.contains("command") || !data.contains("value")) + return; + const std::string cmd = data["command"].get(); + const auto &value = data["value"]; + + // ===================================================== + // 2. mode 指令:唯一的“授权 / 取消授权”入口 + // ===================================================== if (cmd == "mode") { int mode = value.get(); + if (mode == 3) { - remote_enabled_.store(true); - RCLCPP_INFO(this->get_logger(), "[REMOTE] enabled"); + // 进入远控授权态(锁存) + remote_authorized_.store(true, std::memory_order_release); + remote_alive_.store(true, std::memory_order_relaxed); + + RCLCPP_INFO(this->get_logger(), + "[REMOTE] authorized (mode=3)"); } else if (mode == 0) { - remote_enabled_.store(false); + // 退出远控授权态(锁存) + remote_authorized_.store(false, std::memory_order_release); + remote_alive_.store(false, std::memory_order_relaxed); + + // 明确清空目标状态 desired_ = {}; - RCLCPP_INFO(this->get_logger(), "[REMOTE] disabled"); + + RCLCPP_INFO(this->get_logger(), + "[REMOTE] deauthorized (mode=0)"); + } + + // mode 指令处理完直接返回 + return; + } + + // ===================================================== + // 3. 非 mode 指令:只有在“已授权”状态下才处理 + // ===================================================== + if (!remote_authorized_.load(std::memory_order_acquire)) + return; + + // ===================================================== + // 4. drive:油门 / 刹车 / 转向 + // ===================================================== + if (cmd == "drive") + { + int throttle = 0; + int brake = 1; + int steer_raw = 0; + + if (value.is_string()) + { + sscanf(value.get().c_str(), + "%d,%d,%d", + &throttle, + &brake, + &steer_raw); + } + else + { + return; + } + + if (brake > 0) + { + // 刹车优先 + desired_.brake = 1; + desired_.rpm = 0; + desired_.angle = 0.0f; + } + else + { + desired_.brake = 0; + + // 油门映射:0~65535 -> 0~6000 rpm + desired_.rpm = + (static_cast(throttle) * 6000U) / 65535U; + + // 转向映射:0~65535 -> -50~+50 度 + float target_angle = + (static_cast(steer_raw) * 100.0f / 65535.0f) - 50.0f; + + float delta = target_angle - current_feedback_angle_; + float angle_speed_deg = std::fabs(delta) / DELTA_T; + float motor_rpm = + angle_speed_deg * DEG_PER_SEC_TO_RPM * GEAR_RATIO; + + desired_.angle_speed = std::clamp( + static_cast(motor_rpm), + 120, + 1500); + + desired_.angle = target_angle; } } - else if (remote_enabled_.load()) + // ===================================================== + // 5. gear:挡位 + // ===================================================== + else if (cmd == "gear") { - if (cmd == "drive") + int gear = value.get(); + switch (gear) { - int t = 0, b = 1, s = 0; - sscanf(value.get().c_str(), - "%d,%d,%d", &t, &b, &s); - - if (b > 0) - { - desired_.brake = 1; - desired_.rpm = 0; - desired_.angle = 0; - } - else - { - desired_.brake = 0; - desired_.rpm = (uint32_t(t) * 6000) / 65535; - - // s: 0~65535 -> -50~+50 - float steer = (float(s) * 100.0f / 65535.0f) - 50.0f; - float target_angle = steer; - - float delta = target_angle - current_feedback_angle_; - float angle_speed_deg = std::fabs(delta) / DELTA_T; - float motor_rpm = angle_speed_deg * DEG_PER_SEC_TO_RPM * GEAR_RATIO; - - uint16_t can_speed = std::clamp( - static_cast(motor_rpm), - 120, - 1500); - - desired_.angle = target_angle; - desired_.angle_speed = can_speed; - } - } - else if (cmd == "gear") - { - int gear = value.get(); - switch (gear) - { - case 0: - desired_.gear = 0; // N - break; - case 1: - desired_.gear = 2; // D - break; - case 2: - desired_.gear = 1; // R - break; - default: - desired_.gear = 0; - break; - } - } - else if (cmd == "sweepCtrl") - { - desired_.sweep = (value.get() != 0); + case 0: + desired_.gear = 0; // N + break; + case 1: + desired_.gear = 2; // D + break; + case 2: + desired_.gear = 1; // R + break; + default: + desired_.gear = 0; + break; } } + // ===================================================== + // 6. sweepCtrl:清扫开关 + // ===================================================== + else if (cmd == "sweepCtrl") + { + desired_.sweep = (value.get() != 0); + } } - catch (...) + catch (const std::exception &e) { - RCLCPP_WARN(this->get_logger(), "[REMOTE] invalid JSON"); + RCLCPP_WARN(this->get_logger(), + "[REMOTE] JSON parse error: %s", + e.what()); } } private: - std::atomic remote_enabled_{false}; RemoteCtrlState desired_; std::mutex state_mtx_; @@ -296,7 +382,10 @@ private: rclcpp::TimerBase::SharedPtr timer_; rclcpp::Subscription::SharedPtr can_sub_; + std::atomic remote_authorized_{false}; // 是否允许远控(锁存) + std::atomic remote_alive_{false}; // 是否活跃(心跳) std::chrono::steady_clock::time_point last_msg_time_; + float current_feedback_angle_{0.0f}; };