Compare commits

...

2 Commits

Author SHA1 Message Date
lyq
689c10bd91 修正先订阅topic再获取vid bug 2026-02-02 14:13:33 +08:00
lyq
93afcdebbb 提高代码健壮性 2026-02-02 13:59:53 +08:00
3 changed files with 354 additions and 123 deletions

View File

@ -359,13 +359,35 @@ int handleMqttMessage(char* topicName, int topicLen, MQTTClient_message* message
void try_subscribe_task_topic() void try_subscribe_task_topic()
{ {
if (!identity_ready.load()) return; if (!identity_ready.load()) {
if (mqtt_topic_sub_task.empty()) return; LOG_DEBUG("[TASK] Identity not ready, skipping subscribe");
return;
}
if (mqtt_topic_sub_task.empty()) {
LOG_DEBUG("[TASK] MQTT topic not configured, skipping subscribe");
return;
}
if (subscribed.exchange(true)) return; // 已经订阅过了 if (!mqtt_manager.isConnected()) {
LOG_DEBUG("[TASK] MQTT not connected, skipping subscribe");
subscribed.store(false); // 确保下次可以重试
return;
}
if (subscribed.load()) {
LOG_DEBUG("[TASK] Already subscribed, skipping");
return;
}
LOG_INFO("[TASK] subscribe MQTT topic: %s", mqtt_topic_sub_task.c_str()); LOG_INFO("[TASK] subscribe MQTT topic: %s", mqtt_topic_sub_task.c_str());
mqtt_manager.subscribe(mqtt_topic_sub_task, 0); bool success = mqtt_manager.subscribe(mqtt_topic_sub_task, 0);
if (success) {
subscribed.store(true);
LOG_INFO("[TASK] Subscribe successful");
} else {
LOG_ERROR("[TASK] Subscribe failed, will retry later");
subscribed.store(false); // 订阅失败,允许重试
}
} }
// 周期性上报任务状态到MQTT200ms间隔持续上报 // 周期性上报任务状态到MQTT200ms间隔持续上报
@ -420,7 +442,10 @@ class TaskManagerNode : public rclcpp::Node
vid = msg->vid; vid = msg->vid;
identity_ready.store(!vid.empty()); identity_ready.store(!vid.empty());
if (!identity_ready.load()) return; if (!identity_ready.load()) {
LOG_WARN("[TASK] Invalid VID received: %s", msg->vid.c_str());
return;
}
// 拼 topic // 拼 topic
mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl; mqtt_topic_sub_task = config.mqtt_topic_sub_task_tmpl;
@ -435,6 +460,8 @@ class TaskManagerNode : public rclcpp::Node
LOG_INFO("[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(), mqtt_topic_sub_task.c_str(), LOG_INFO("[TASK] identity ready VID=%s => sub=%s pub=%s", vid.c_str(), mqtt_topic_sub_task.c_str(),
mqtt_topic_push_status.c_str()); mqtt_topic_push_status.c_str());
// 重置订阅状态,确保可以重新尝试订阅
subscribed.store(false);
try_subscribe_task_topic(); try_subscribe_task_topic();
}); });
} }
@ -521,6 +548,7 @@ int main(int argc, char** argv)
{ {
LOG_INFO("[TASK] MQTT reconnected: %s", cause); LOG_INFO("[TASK] MQTT reconnected: %s", cause);
subscribed.store(false); subscribed.store(false);
// 尝试重新订阅任务话题
try_subscribe_task_topic(); try_subscribe_task_topic();
}); });
@ -539,9 +567,28 @@ int main(int argc, char** argv)
rclcpp::executors::SingleThreadedExecutor executor; rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node); executor.add_node(node);
// 用于定期检查订阅状态的定时器
auto last_check_time = std::chrono::steady_clock::now();
const std::chrono::seconds SUBSCRIBE_CHECK_INTERVAL = std::chrono::seconds(5);
while (rclcpp::ok() && !signal_received) while (rclcpp::ok() && !signal_received)
{ {
executor.spin_some(); executor.spin_some();
// 定期检查并尝试订阅任务话题
auto now = std::chrono::steady_clock::now();
if (now - last_check_time >= SUBSCRIBE_CHECK_INTERVAL)
{
last_check_time = now;
// 如果已经准备好身份但尚未成功订阅,则尝试订阅
if (identity_ready.load() && !subscribed.load())
{
LOG_INFO("[TASK] Periodic check: identity ready but not subscribed, attempting subscribe");
try_subscribe_task_topic();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }

View File

@ -47,123 +47,249 @@ class rtk_node : public rclcpp::Node
line_string = p_gpybm; line_string = p_gpybm;
for (int i = 0; i < 3; i++) // 解析过程添加完整的错误检查
try
{ {
// 跳过前3个字段
for (int i = 0; i < 3; i++)
{
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误字段数量不足");
return;
}
line_string.erase(0, position + 1);
}
// 解析纬度
position = line_string.find(","); position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误缺少纬度字段");
return;
}
lat_s = line_string.substr(0, position);
line_string.erase(0, position + 1); line_string.erase(0, position + 1);
// 解析经度
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误缺少经度字段");
return;
}
lon_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
// 跳过1个字段
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误字段数量不足");
return;
}
line_string.erase(0, position + 1);
// 解析航向
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误缺少航向字段");
return;
}
head_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
// 跳过4个字段
for (int i = 0; i < 4; i++)
{
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误字段数量不足");
return;
}
line_string.erase(0, position + 1);
}
// 解析速度
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误缺少速度字段");
return;
}
sp_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
// 跳过4个字段
for (int i = 0; i < 4; i++)
{
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误字段数量不足");
return;
}
line_string.erase(0, position + 1);
}
// 解析定位质量
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误缺少定位质量字段");
return;
}
p_q_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
// 解析定向质量
position = line_string.find(",");
if (position == string::npos)
{
LOG_WARN("GPYBM数据格式错误缺少定向质量字段");
return;
}
h_q_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
// 验证字段内容是否有效
if (lat_s.empty() || lon_s.empty() || head_s.empty() || sp_s.empty() || p_q_s.empty() || h_q_s.empty())
{
LOG_WARN("GPYBM数据格式错误字段内容为空");
return;
}
// 检查字段是否包含合法的数字字符
auto is_valid_number = [](const string& s) -> bool
{
if (s.empty()) return false;
for (char c : s)
{
if (!isdigit(c) && c != '.' && c != '-' && c != '+')
{
return false;
}
}
return true;
};
if (!is_valid_number(lat_s) || !is_valid_number(lon_s) || !is_valid_number(head_s) ||
!is_valid_number(sp_s) || !is_valid_number(p_q_s) || !is_valid_number(h_q_s))
{
LOG_WARN("GPYBM数据格式错误字段包含非法字符");
return;
}
// 转换为数值
lat = atof(lat_s.c_str());
lon = atof(lon_s.c_str());
head = atof(head_s.c_str());
speed = atof(sp_s.c_str());
p_quality = atof(p_q_s.c_str());
h_quality = atof(h_q_s.c_str());
// 验证地理坐标范围(简单验证)
if (lat < -90.0 || lat > 90.0 || lon < -180.0 || lon > 180.0)
{
LOG_WARN("GPYBM数据错误坐标值超出范围");
lat = 0.0;
lon = 0.0;
return;
}
} }
catch (const exception& e)
position = line_string.find(",");
lat_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
position = line_string.find(",");
lon_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
position = line_string.find(",");
line_string.erase(0, position + 1);
position = line_string.find(",");
head_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
for (int i = 0; i < 4; i++)
{ {
position = line_string.find(","); LOG_ERROR("解析GPYBM数据时发生异常%s", e.what());
line_string.erase(0, position + 1); return;
} }
catch (...)
position = line_string.find(",");
sp_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
for (int i = 0; i < 4; i++)
{ {
position = line_string.find(","); LOG_ERROR("解析GPYBM数据时发生未知异常");
line_string.erase(0, position + 1); return;
} }
position = line_string.find(",");
p_q_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
position = line_string.find(",");
h_q_s = line_string.substr(0, position);
line_string.erase(0, position + 1);
lat = atof(lat_s.c_str());
lon = atof(lon_s.c_str());
head = atof(head_s.c_str());
speed = atof(sp_s.c_str());
p_quality = atof(p_q_s.c_str());
h_quality = atof(h_q_s.c_str());
} }
void timer_callback() void timer_callback()
{ {
// 读取串口传来的定位信息 try
memset(serial_buf, 0, sizeof(serial_buf));
int num = boost_serial->serial_read(serial_buf, 200);
// LOG_DEBUG("num:%d \n",num);
// LOG_DEBUG("serial_buf: ");
// for (int i = 0; i < 300; i++)
// {
// LOG_DEBUG("%c", serial_buf[i]);
// }
// LOG_DEBUG("\n");
if (c_queue.size() >= 400)
{ {
std::queue<char> empty; // 读取串口传来的定位信息
swap(empty, c_queue); memset(serial_buf, 0, sizeof(serial_buf));
} int num = boost_serial->serial_read(serial_buf, 200);
for (int i = 0; i < num; i++) if (num < 0)
{
if (serial_buf[i] == '*')
{ {
// c_queue.push(serial_buf[i]); LOG_WARN("串口读取失败:返回值为负");
memset(gps_buf, 0, sizeof(gps_buf)); return;
int j = 0; }
while (!c_queue.empty())
if (c_queue.size() >= 400)
{
std::queue<char> empty;
swap(empty, c_queue);
LOG_WARN("队列已满,已清空队列");
}
for (int i = 0; i < num; i++)
{
if (serial_buf[i] == '*')
{ {
gps_buf[j] = c_queue.front(); memset(gps_buf, 0, sizeof(gps_buf));
c_queue.pop(); size_t j = 0; // 改为 size_t 类型,避免与 sizeof 结果比较的警告
j++; while (!c_queue.empty() && j < sizeof(gps_buf) - 1)
{
gps_buf[j] = c_queue.front();
c_queue.pop();
j++;
}
gps_buf[j] = '\0'; // 确保字符串结束符
// 检查缓冲区是否足够大
if (j >= sizeof(gps_buf) - 1)
{
LOG_WARN("GPS数据过长可能导致缓冲区溢出");
std::queue<char> empty;
swap(empty, c_queue);
continue;
}
// 解析定位信息
GPYBM_to_gps(gps_buf);
// 创建消息
auto message = sweeper_interfaces::msg::Rtk();
message.lat = lat;
message.lon = lon;
message.head = head;
message.speed = speed;
message.p_quality = p_quality;
message.h_quality = h_quality;
// 日志打印
LOG_INFO("lat:'%.9lf',lon:'%.9lf',head:'%lf',speed:'%lf',p_quality:'%d',h_quality:'%d'",
message.lat, message.lon, message.head, message.speed, message.p_quality,
message.h_quality);
// 发布消息
command_publisher_->publish(message);
}
else
{
c_queue.push(serial_buf[i]);
} }
// 解析定位信息
// LOG_DEBUG("gps_buf: ");
// for (int i = 0; i < 300; i++)
// {
// LOG_DEBUG("%c", gps_buf[i]);
// }
// LOG_DEBUG("\n");
GPYBM_to_gps(gps_buf);
// 创建消息
auto message = sweeper_interfaces::msg::Rtk();
message.lat = lat;
message.lon = lon;
message.head = head;
message.speed = speed;
message.p_quality = p_quality;
message.h_quality = h_quality;
// 日志打印
LOG_INFO("lat:'%.9lf',lon:'%.9lf',head:'%lf',speed:'%lf',p_quality:'%d',h_quality:'%d'", message.lat,
message.lon, message.head, message.speed, message.p_quality, message.h_quality);
// 发布消息
command_publisher_->publish(message);
}
else
{
c_queue.push(serial_buf[i]);
} }
} }
catch (const exception& e)
{
LOG_ERROR("定时器回调函数异常:%s", e.what());
}
catch (...)
{
LOG_ERROR("定时器回调函数未知异常");
}
} }
// 声名定时器指针 // 声名定时器指针
@ -188,7 +314,7 @@ class rtk_node : public rclcpp::Node
std::queue<char> c_queue; std::queue<char> c_queue;
// 解析定位信息用到的中间变量 // 解析定位信息用到的中间变量
string lat_s, lon_s, head_s, sp_s, p_q_s, h_q_s; string lat_s, lon_s, head_s, sp_s, p_q_s, h_q_s;
int position; size_t position; // 改为 size_t 类型,匹配 string::find() 的返回类型
string line_string; string line_string;
}; };

View File

@ -2,43 +2,101 @@
#include "logger/logger.h" #include "logger/logger.h"
Boost_serial::Boost_serial() { ; } Boost_serial::Boost_serial() : sp(nullptr) { ; }
Boost_serial::~Boost_serial() Boost_serial::~Boost_serial()
{ {
if (sp) if (sp)
{ {
delete sp; try
{
if (sp->is_open())
{
sp->close();
}
delete sp;
}
catch (...)
{
LOG_ERROR("关闭串口时发生异常");
}
} }
} }
void Boost_serial::init(const string port_name) void Boost_serial::init(const string port_name)
{ {
sp = new serial_port(service); try
sp->open(port_name);
if (sp->is_open())
{ {
sp->set_option(serial_port::baud_rate(115200)); sp = new serial_port(service);
sp->set_option(serial_port::flow_control(serial_port::flow_control::none)); sp->open(port_name);
sp->set_option(serial_port::parity(serial_port::parity::none));
sp->set_option(serial_port::stop_bits(serial_port::stop_bits::one)); if (sp->is_open())
sp->set_option(serial_port::character_size(8)); {
LOG_INFO("打开串口成功!"); sp->set_option(serial_port::baud_rate(115200));
sp->set_option(serial_port::flow_control(serial_port::flow_control::none));
sp->set_option(serial_port::parity(serial_port::parity::none));
sp->set_option(serial_port::stop_bits(serial_port::stop_bits::one));
sp->set_option(serial_port::character_size(8));
LOG_INFO("打开串口成功!");
}
else
{
LOG_ERROR("打开串口失败!");
delete sp;
sp = nullptr;
}
} }
else catch (const exception& e)
{ {
LOG_ERROR("打开串口失败!"); LOG_ERROR("初始化串口时发生异常:%s", e.what());
if (sp)
{
delete sp;
sp = nullptr;
}
}
catch (...)
{
LOG_ERROR("初始化串口时发生未知异常");
if (sp)
{
delete sp;
sp = nullptr;
}
} }
} }
int Boost_serial::serial_read(char buf[], int size) int Boost_serial::serial_read(char buf[], int size)
{ {
int num = read(*sp, buffer(buf, size)); if (sp == nullptr || !sp->is_open())
if (num <= 0)
{ {
LOG_WARN("没有读到数据!"); LOG_WARN("串口未初始化或未打开");
return -1;
}
try
{
int num = read(*sp, buffer(buf, size));
if (num < 0)
{
LOG_WARN("串口读取错误");
return -1;
}
else if (num == 0)
{
LOG_INFO("串口无数据可读");
return 0;
}
return num;
}
catch (const exception& e)
{
LOG_ERROR("串口读取异常:%s", e.what());
return -1;
}
catch (...)
{
LOG_ERROR("串口读取未知异常");
return -1;
} }
return num;
} }