修改远控看门狗逻辑

This commit is contained in:
lyq 2025-12-30 11:25:32 +08:00
parent 3ce0c0b16e
commit 673a5e4adc
3 changed files with 231 additions and 102 deletions

View File

@ -1,6 +1,6 @@
[MQTT] [MQTT]
host = 192.168.4.150 host = 36.153.162.171
port = 1883 port = 61883
username = zxwl username = zxwl
password = zxwl1234@ password = zxwl1234@
vehicle_id = V090001 vehicle_id = V090001

View File

@ -81,6 +81,7 @@ public:
msg_cb_ = std::move(cb); msg_cb_ = std::move(cb);
} }
// 注意connected_cb_ 在 MQTT 网络线程中调用,禁止阻塞
void setConnectedCallback(ConnectedCallback cb) void setConnectedCallback(ConnectedCallback cb)
{ {
std::lock_guard<std::mutex> lk(mtx_); std::lock_guard<std::mutex> lk(mtx_);
@ -90,7 +91,10 @@ public:
bool subscribe(const std::string &topic, int qos = 0) bool subscribe(const std::string &topic, int qos = 0)
{ {
std::lock_guard<std::mutex> lk(mtx_); std::lock_guard<std::mutex> lk(mtx_);
if (!client_ || !connected_)
if (!client_ ||
!connected_.load(std::memory_order_acquire) ||
!ready_.load(std::memory_order_acquire))
return false; return false;
int rc = MQTTClient_subscribe(client_, topic.c_str(), qos); int rc = MQTTClient_subscribe(client_, topic.c_str(), qos);
@ -109,7 +113,10 @@ public:
int qos = 0) int qos = 0)
{ {
std::lock_guard<std::mutex> lk(mtx_); std::lock_guard<std::mutex> lk(mtx_);
if (!client_ || !connected_)
if (!client_ ||
!connected_.load(std::memory_order_acquire) ||
!ready_.load(std::memory_order_acquire))
return false; return false;
MQTTClient_message msg = MQTTClient_message_initializer; MQTTClient_message msg = MQTTClient_message_initializer;
@ -122,7 +129,8 @@ public:
if (rc != MQTTCLIENT_SUCCESS) if (rc != MQTTCLIENT_SUCCESS)
{ {
connected_ = false; connected_.store(false, std::memory_order_release);
ready_.store(false, std::memory_order_release);
return false; return false;
} }
@ -153,20 +161,27 @@ private:
int rc = MQTTClient_connect(client_, &opts); int rc = MQTTClient_connect(client_, &opts);
if (rc == MQTTCLIENT_SUCCESS) if (rc == MQTTCLIENT_SUCCESS)
{ {
connected_ = true; connected_.store(true, std::memory_order_release);
if (connected_cb_) ready_.store(false, std::memory_order_release);
connected_cb_();
// ⭐ 标记:需要在 loop 线程中触发 connected_cb_
need_callback_.store(true, std::memory_order_release);
return true; return true;
} }
connected_ = false; connected_.store(false, std::memory_order_release);
ready_.store(false, std::memory_order_release);
return false; return false;
} }
static void connLostCB(void *ctx, char *cause) static void connLostCB(void *ctx, char *cause)
{ {
auto self = static_cast<MQTTClientWrapper *>(ctx); auto self = static_cast<MQTTClientWrapper *>(ctx);
self->connected_ = false;
self->connected_.store(false, std::memory_order_release);
self->ready_.store(false, std::memory_order_release);
std::cerr << "[MQTT] lost connection: " std::cerr << "[MQTT] lost connection: "
<< (cause ? cause : "unknown") << std::endl; << (cause ? cause : "unknown") << std::endl;
} }
@ -193,19 +208,41 @@ private:
// 异步线程:负责 reconnect + yield // 异步线程:负责 reconnect + yield
void loopFunc() void loopFunc()
{ {
auto last_connect_try = std::chrono::steady_clock::now();
while (!stop_flag_) while (!stop_flag_)
{ {
// 1⃣ 未连接:按重连间隔尝试连接
if (!connected_) if (!connected_.load(std::memory_order_acquire))
{ {
doConnect(); auto now = std::chrono::steady_clock::now();
if (now - last_connect_try >= std::chrono::milliseconds(reconnect_ms_))
{
last_connect_try = now;
doConnect();
}
}
else
{
// 2⃣ 已连接:处理网络事件
MQTTClient_yield();
// 3⃣ 第一次 yield 后,才算真正 ready
if (!ready_.load(std::memory_order_acquire))
{
ready_.store(true, std::memory_order_release);
// ⭐ 在 ready 状态触发 connected callback
if (need_callback_.exchange(false))
{
if (connected_cb_)
connected_cb_();
}
}
} }
// 关键:处理 MQTT 网络事件 // ⭐ 正常网络轮询节奏
MQTTClient_yield(); std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::this_thread::sleep_for(
std::chrono::milliseconds(reconnect_ms_));
} }
} }
@ -223,4 +260,7 @@ private:
MessageCallback msg_cb_; MessageCallback msg_cb_;
ConnectedCallback connected_cb_; ConnectedCallback connected_cb_;
std::atomic<bool> ready_{false}; // 是否真正进入可用状态
std::atomic<bool> need_callback_{false};
}; };

View File

@ -118,22 +118,28 @@ public:
client_id.c_str()); client_id.c_str());
// ===== 看门狗 & 定时发布 ===== // ===== 看门狗 & 定时发布 =====
remote_enabled_.store(false);
last_msg_time_ = std::chrono::steady_clock::now(); last_msg_time_ = std::chrono::steady_clock::now();
// 看门狗:300ms 内没收到新 remote 指令则停控 // 看门狗:800ms 内没收到新 remote 指令则停控
watchdog_ = this->create_wall_timer( watchdog_ = this->create_wall_timer(
100ms, 100ms,
[this]() [this]()
{ {
// 未授权远控watchdog 什么都不做
if (!remote_authorized_.load(std::memory_order_acquire))
return;
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
if (remote_enabled_.load() &&
(now - last_msg_time_ > 300ms)) // 超时阈值:建议 800ms ~ 1500ms
if (now - last_msg_time_ > 800ms)
{ {
RCLCPP_WARN(this->get_logger(), // 只在第一次进入“失活”时打印日志
"[REMOTE] timeout, disable remote"); if (remote_alive_.exchange(false))
remote_enabled_.store(false); {
desired_ = {}; RCLCPP_WARN(this->get_logger(),
"[REMOTE] control timeout, enter safe-stop");
}
} }
}); });
@ -142,18 +148,35 @@ public:
20ms, // 50Hz 20ms, // 50Hz
[this]() [this]()
{ {
if (!remote_enabled_.load()) // 未授权远控:完全不发
if (!remote_authorized_.load(std::memory_order_acquire))
return; return;
sweeperMsg::McCtrl msg; sweeperMsg::McCtrl msg;
{ {
std::lock_guard<std::mutex> lock(state_mtx_); std::lock_guard<std::mutex> lock(state_mtx_);
msg.gear = desired_.gear;
msg.rpm = desired_.rpm; // 已授权但超时:安全停车态
msg.brake = desired_.brake; if (!remote_alive_.load(std::memory_order_relaxed))
msg.angle = desired_.angle; {
msg.sweep = desired_.sweep; msg.gear = desired_.gear; // 通常保持挡位
msg.angle_speed = desired_.angle_speed; msg.brake = 1;
msg.rpm = 0;
msg.angle = 0.0f;
msg.angle_speed = 120;
msg.sweep = false;
}
// 正常远控
else
{
msg.gear = desired_.gear;
msg.brake = desired_.brake;
msg.rpm = desired_.rpm;
msg.angle = desired_.angle;
msg.angle_speed = desired_.angle_speed;
msg.sweep = desired_.sweep;
}
} }
pub_->publish(msg); pub_->publish(msg);
@ -187,11 +210,22 @@ private:
void onMqttMessage(const std::string &payload) void onMqttMessage(const std::string &payload)
{ {
std::lock_guard<std::mutex> lock(state_mtx_); std::lock_guard<std::mutex> lock(state_mtx_);
// =====================================================
// 1. MQTT 消息到达:刷新活性 & 判断是否从超时中恢复
// =====================================================
bool was_dead = !remote_alive_.exchange(true, std::memory_order_relaxed);
if (was_dead)
{
RCLCPP_INFO(this->get_logger(),
"[REMOTE] control recovered");
}
last_msg_time_ = std::chrono::steady_clock::now(); last_msg_time_ = std::chrono::steady_clock::now();
RCLCPP_INFO(this->get_logger(), RCLCPP_DEBUG(this->get_logger(),
"[REMOTE] recv MQTT payload: %s", "[REMOTE] recv MQTT payload: %s",
payload.c_str()); payload.c_str());
try try
{ {
@ -199,93 +233,145 @@ private:
if (!j.contains("data")) if (!j.contains("data"))
return; return;
std::string cmd = j["data"]["command"].get<std::string>(); const auto &data = j["data"];
auto value = j["data"]["value"]; if (!data.contains("command") || !data.contains("value"))
return;
const std::string cmd = data["command"].get<std::string>();
const auto &value = data["value"];
// =====================================================
// 2. mode 指令:唯一的“授权 / 取消授权”入口
// =====================================================
if (cmd == "mode") if (cmd == "mode")
{ {
int mode = value.get<int>(); int mode = value.get<int>();
if (mode == 3) if (mode == 3)
{ {
remote_enabled_.store(true); // 进入远控授权态(锁存)
RCLCPP_INFO(this->get_logger(), "[REMOTE] enabled"); remote_authorized_.store(true, std::memory_order_release);
remote_alive_.store(true, std::memory_order_relaxed);
RCLCPP_INFO(this->get_logger(),
"[REMOTE] authorized (mode=3)");
} }
else if (mode == 0) else if (mode == 0)
{ {
remote_enabled_.store(false); // 退出远控授权态(锁存)
remote_authorized_.store(false, std::memory_order_release);
remote_alive_.store(false, std::memory_order_relaxed);
// 明确清空目标状态
desired_ = {}; desired_ = {};
RCLCPP_INFO(this->get_logger(), "[REMOTE] disabled");
RCLCPP_INFO(this->get_logger(),
"[REMOTE] deauthorized (mode=0)");
}
// mode 指令处理完直接返回
return;
}
// =====================================================
// 3. 非 mode 指令:只有在“已授权”状态下才处理
// =====================================================
if (!remote_authorized_.load(std::memory_order_acquire))
return;
// =====================================================
// 4. drive油门 / 刹车 / 转向
// =====================================================
if (cmd == "drive")
{
int throttle = 0;
int brake = 1;
int steer_raw = 0;
if (value.is_string())
{
sscanf(value.get<std::string>().c_str(),
"%d,%d,%d",
&throttle,
&brake,
&steer_raw);
}
else
{
return;
}
if (brake > 0)
{
// 刹车优先
desired_.brake = 1;
desired_.rpm = 0;
desired_.angle = 0.0f;
}
else
{
desired_.brake = 0;
// 油门映射0~65535 -> 0~6000 rpm
desired_.rpm =
(static_cast<uint32_t>(throttle) * 6000U) / 65535U;
// 转向映射0~65535 -> -50~+50 度
float target_angle =
(static_cast<float>(steer_raw) * 100.0f / 65535.0f) - 50.0f;
float delta = target_angle - current_feedback_angle_;
float angle_speed_deg = std::fabs(delta) / DELTA_T;
float motor_rpm =
angle_speed_deg * DEG_PER_SEC_TO_RPM * GEAR_RATIO;
desired_.angle_speed = std::clamp<uint16_t>(
static_cast<uint16_t>(motor_rpm),
120,
1500);
desired_.angle = target_angle;
} }
} }
else if (remote_enabled_.load()) // =====================================================
// 5. gear挡位
// =====================================================
else if (cmd == "gear")
{ {
if (cmd == "drive") int gear = value.get<int>();
switch (gear)
{ {
int t = 0, b = 1, s = 0; case 0:
sscanf(value.get<std::string>().c_str(), desired_.gear = 0; // N
"%d,%d,%d", &t, &b, &s); break;
case 1:
if (b > 0) desired_.gear = 2; // D
{ break;
desired_.brake = 1; case 2:
desired_.rpm = 0; desired_.gear = 1; // R
desired_.angle = 0; break;
} default:
else desired_.gear = 0;
{ break;
desired_.brake = 0;
desired_.rpm = (uint32_t(t) * 6000) / 65535;
// s: 0~65535 -> -50~+50
float steer = (float(s) * 100.0f / 65535.0f) - 50.0f;
float target_angle = steer;
float delta = target_angle - current_feedback_angle_;
float angle_speed_deg = std::fabs(delta) / DELTA_T;
float motor_rpm = angle_speed_deg * DEG_PER_SEC_TO_RPM * GEAR_RATIO;
uint16_t can_speed = std::clamp<uint16_t>(
static_cast<uint16_t>(motor_rpm),
120,
1500);
desired_.angle = target_angle;
desired_.angle_speed = can_speed;
}
}
else if (cmd == "gear")
{
int gear = value.get<int>();
switch (gear)
{
case 0:
desired_.gear = 0; // N
break;
case 1:
desired_.gear = 2; // D
break;
case 2:
desired_.gear = 1; // R
break;
default:
desired_.gear = 0;
break;
}
}
else if (cmd == "sweepCtrl")
{
desired_.sweep = (value.get<int>() != 0);
} }
} }
// =====================================================
// 6. sweepCtrl清扫开关
// =====================================================
else if (cmd == "sweepCtrl")
{
desired_.sweep = (value.get<int>() != 0);
}
} }
catch (...) catch (const std::exception &e)
{ {
RCLCPP_WARN(this->get_logger(), "[REMOTE] invalid JSON"); RCLCPP_WARN(this->get_logger(),
"[REMOTE] JSON parse error: %s",
e.what());
} }
} }
private: private:
std::atomic<bool> remote_enabled_{false};
RemoteCtrlState desired_; RemoteCtrlState desired_;
std::mutex state_mtx_; std::mutex state_mtx_;
@ -296,7 +382,10 @@ private:
rclcpp::TimerBase::SharedPtr timer_; rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr can_sub_; rclcpp::Subscription<sweeperMsg::CanFrame>::SharedPtr can_sub_;
std::atomic<bool> remote_authorized_{false}; // 是否允许远控(锁存)
std::atomic<bool> remote_alive_{false}; // 是否活跃(心跳)
std::chrono::steady_clock::time_point last_msg_time_; std::chrono::steady_clock::time_point last_msg_time_;
float current_feedback_angle_{0.0f}; float current_feedback_angle_{0.0f};
}; };