sweeper_200/src/pub_gps/src/pub_gps_node.cpp
2025-10-30 11:10:23 +08:00

285 lines
8.8 KiB
C++
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "MQTTClient.h"
#include "rclcpp/rclcpp.hpp"
#include "sweeper_interfaces/msg/rtk.hpp"
#include "json.h"
#include "pub_gps_node.hpp"
#include <fstream>
#include <iostream>
#include <string>
#include <chrono>
#include <ctime>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <pthread.h>
struct MqttConf
{
std::string address; // 形如 tcp://ip:port
std::string client_id; // 客户端 ID
std::string user;
std::string password;
std::string pub_gps_topic;
int qos = 1;
long timeout_ms = 10000L;
int reconnect_interval = 5000; // 重连间隔(ms)
} g_conf;
GPS gps_mes;
// 增加连接状态标志,确保线程安全
static bool is_connected = false;
static pthread_mutex_t connect_mutex = PTHREAD_MUTEX_INITIALIZER;
// 检查连接状态
bool check_connection(MQTTClient client)
{
pthread_mutex_lock(&connect_mutex);
int state = MQTTClient_isConnected(client);
is_connected = (state == 1);
pthread_mutex_unlock(&connect_mutex);
return is_connected;
}
// 设置连接状态
void set_connected(bool connected)
{
pthread_mutex_lock(&connect_mutex);
is_connected = connected;
pthread_mutex_unlock(&connect_mutex);
}
// 重连函数
bool reconnect_client(MQTTClient *client)
{
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = g_conf.user.c_str();
conn_opts.password = g_conf.password.c_str();
int rc;
// 尝试重连最多尝试5次
for (int i = 0; i < 5; ++i)
{
std::cerr << "[MQTT] Reconnecting... (attempt " << i + 1 << "/5)" << std::endl;
rc = MQTTClient_connect(*client, &conn_opts);
if (rc == MQTTCLIENT_SUCCESS)
{
set_connected(true);
std::cout << "[MQTT] Reconnected successfully" << std::endl;
return true;
}
std::cerr << "[MQTT] Reconnect failed, rc=" << rc << ". Retrying in "
<< g_conf.reconnect_interval << "ms..." << std::endl;
usleep(g_conf.reconnect_interval * 1000);
}
return false;
}
void *mqtt_pub_gps(void *arg)
{
(void)arg;
MQTTClient client = nullptr;
MQTTClient_deliveryToken token_d_m;
int rc;
// 初始化客户端
if ((rc = MQTTClient_create(&client,
g_conf.address.c_str(),
g_conf.client_id.c_str(),
MQTTCLIENT_PERSISTENCE_NONE,
nullptr)) != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to create client, rc=" << rc << std::endl;
return nullptr;
}
// 初始连接
if (!reconnect_client(&client))
{
std::cerr << "[MQTT] Failed to establish initial connection" << std::endl;
MQTTClient_destroy(&client);
return nullptr;
}
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.qos = g_conf.qos;
pubmsg.retained = 0;
Json::Value root_st;
Json::FastWriter json_writer;
unsigned char msg_st[256] = {0};
while (true)
{
// 检查连接状态,断开则尝试重连
if (!check_connection(client))
{
std::cerr << "[MQTT] Connection lost. Attempting to reconnect..." << std::endl;
// 先断开旧连接
MQTTClient_disconnect(client, 1000);
// 重连失败则销毁客户端重新创建
if (!reconnect_client(&client))
{
std::cerr << "[MQTT] All reconnection attempts failed. Recreating client..." << std::endl;
MQTTClient_destroy(&client);
// 重新创建客户端
if ((rc = MQTTClient_create(&client,
g_conf.address.c_str(),
g_conf.client_id.c_str(),
MQTTCLIENT_PERSISTENCE_NONE,
nullptr)) != MQTTCLIENT_SUCCESS)
{
std::cerr << "Failed to recreate client, rc=" << rc << std::endl;
usleep(g_conf.reconnect_interval * 1000);
continue;
}
// 再次尝试连接
if (!reconnect_client(&client))
{
usleep(g_conf.reconnect_interval * 1000);
continue;
}
}
}
root_st.clear();
root_st["lng"] = [&]
{ std::ostringstream s; s<<std::fixed<<std::setprecision(7)<<gps_mes.lng; return s.str(); }();
root_st["lat"] = [&]
{ std::ostringstream s; s<<std::fixed<<std::setprecision(7)<<gps_mes.lat; return s.str(); }();
root_st["course"] = [&]
{ std::ostringstream s; s<<std::fixed<<std::setprecision(1)<<gps_mes.course; return s.str(); }();
root_st["mode"] = gps_mes.mode;
root_st["timestamp"] =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
std::string json_string = json_writer.write(root_st);
memcpy(msg_st, json_string.c_str(), json_string.size());
pubmsg.payload = msg_st;
pubmsg.payloadlen = static_cast<int>(json_string.size());
// 发布消息
if (check_connection(client))
{
rc = MQTTClient_publishMessage(client,
g_conf.pub_gps_topic.c_str(),
&pubmsg,
&token_d_m);
if (rc == MQTTCLIENT_SUCCESS)
{
std::cout << "[MQTT] Published to topic: " << g_conf.pub_gps_topic
<< "\n msg: " << json_string << std::endl;
MQTTClient_waitForCompletion(client, token_d_m, g_conf.timeout_ms);
}
else
{
std::cerr << "[MQTT] publish failed, rc=" << rc << std::endl;
set_connected(false); // 发布失败标记为断开连接
}
}
usleep(200000); // 200ms
}
// 清理资源
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
pthread_mutex_destroy(&connect_mutex);
return nullptr;
}
class pub_gps_node : public rclcpp::Node
{
public:
explicit pub_gps_node(const std::string &name) : Node(name)
{
RCLCPP_INFO(this->get_logger(), "%s 节点已启动.", name.c_str());
gps_subscribe_ = this->create_subscription<sweeper_interfaces::msg::Rtk>(
"rtk_message", 10,
std::bind(&pub_gps_node::command_callback, this, std::placeholders::_1));
}
private:
void command_callback(const sweeper_interfaces::msg::Rtk::SharedPtr msg)
{
gps_mes.lat = msg->lat;
gps_mes.lng = msg->lon;
gps_mes.course = msg->head;
if (msg->p_quality == 5)
gps_mes.mode = 2;
else if (msg->p_quality == 4)
gps_mes.mode = 3;
else
gps_mes.mode = msg->p_quality;
}
rclcpp::Subscription<sweeper_interfaces::msg::Rtk>::SharedPtr gps_subscribe_;
};
bool load_config(const std::string &path)
{
Json::Reader reader;
Json::Value root;
std::ifstream in(path, std::ios::binary);
if (!in.is_open())
{
std::cerr << "[config] open " << path << " failed\n";
return false;
}
if (!reader.parse(in, root))
{
std::cerr << "[config] parse json failed\n";
return false;
}
const auto &m = root["mqtt"];
std::string ip = m["external_net_address"].asString(); // tcp://36.153.162.171
int port = m["external_net_port"].asInt(); // 19683
// std::string ip = m["inter_net_address"].asString(); // tcp://192.168.4.196
// int port = m["inter_net_port"].asInt(); // 11883
g_conf.address = ip + ":" + std::to_string(port);
g_conf.client_id = "sweeper_CLIENT_1532_GPS"; // 如需可改成 VID 等
g_conf.user = m["mqtt_user"].asString();
g_conf.password = m["mqtt_password"].asString();
g_conf.pub_gps_topic = m["pub_gps_topic"].asString();
// 从配置读取重连间隔默认5000ms
if (m.isMember("reconnect_interval"))
{
g_conf.reconnect_interval = m["reconnect_interval"].asInt();
}
return true;
}
int main(int argc, char **argv)
{
if (!load_config("config.json"))
return -1;
// 初始化互斥锁
pthread_mutex_init(&connect_mutex, nullptr);
pthread_t mqtt_thread;
pthread_create(&mqtt_thread, nullptr, mqtt_pub_gps, nullptr);
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<pub_gps_node>("pub_gps_node"));
rclcpp::shutdown();
// 清理资源
pthread_mutex_destroy(&connect_mutex);
return 0;
}