first commit

This commit is contained in:
cxh 2025-09-08 10:59:08 +08:00
parent 4771b97c80
commit 0c217a5d6a
8 changed files with 606 additions and 0 deletions

40
include/logger.hpp Normal file
View File

@ -0,0 +1,40 @@
#pragma once
#include <iostream>
#include <fstream>
#include <string>
#include <ctime>
#include <chrono>
#include <iomanip>
#include <sstream>
enum class LogLevel
{
INFO,
WARN,
ERROR
};
class Logger
{
public:
static void set_log_to_file(const std::string &filename);
static void log(LogLevel level, const std::string &msg);
private:
static std::ofstream log_file;
static std::string current_log_filename;
static std::string get_time_string();
static void rotate_logs();
static size_t get_file_size(const std::string &filename);
static constexpr size_t MAX_LOG_FILE_SIZE = 10 * 1024 * 1024; // 10MB
static constexpr int MAX_LOG_BACKUP_COUNT = 5;
};
// 简化宏
#define LOG_INFO(msg) Logger::log(LogLevel::INFO, msg)
#define LOG_WARN(msg) Logger::log(LogLevel::WARN, msg)
#define LOG_ERROR(msg) Logger::log(LogLevel::ERROR, msg)

51
include/mqtt_client.hpp Normal file
View File

@ -0,0 +1,51 @@
#pragma once
#include <string>
#include <functional>
#include <memory>
#include <atomic>
#include <mutex>
#include <mqtt/async_client.h>
#include "mqtt_config.hpp"
class MQTTClient : public virtual mqtt::callback
{
public:
using ConnectCallback = std::function<void()>;
using DisconnectCallback = std::function<void()>;
using MessageCallback = std::function<void(const std::string &topic, const std::string &message)>;
explicit MQTTClient(const MQTTConfig &config);
~MQTTClient();
void setConnectCallback(ConnectCallback cb);
void setDisconnectCallback(DisconnectCallback cb);
void setMessageCallback(MessageCallback cb);
void connect();
void disconnect();
void publish(const std::string &topic, const std::string &payload, int qos = -1);
void subscribe(const std::string &topic, int qos = -1);
void switchServer(const MQTTConfig &newConfig);
bool isConnected() const;
private:
void initializeClient();
void startReconnect();
// mqtt::callback 实现
void connection_lost(const std::string &cause) override;
void message_arrived(mqtt::const_message_ptr msg) override;
void delivery_complete(mqtt::delivery_token_ptr token) override {}
MQTTConfig config_;
std::shared_ptr<mqtt::async_client> client_;
std::atomic<bool> connected_;
std::atomic<bool> reconnect_active_;
std::mutex mutex_;
ConnectCallback on_connect_;
DisconnectCallback on_disconnect_;
MessageCallback on_message_;
};

View File

@ -0,0 +1,12 @@
#pragma once
#include "mqtt_config.hpp"
#include "logger.hpp"
#include "mqtt_client.hpp"
// 启动 MQTT 客户端(内部自动连接、订阅、发布等)
void mqtt_client_thread_func();
// 外部可访问的 MQTT 客户端指针
extern std::shared_ptr<MQTTClient> mqtt_client;
extern std::atomic<bool> mqtt_restart_required;

55
include/mqtt_config.hpp Normal file
View File

@ -0,0 +1,55 @@
#pragma once
#include <string>
#include <map>
#include <fstream>
#include <iostream>
#include <nlohmann/json.hpp>
#include <unistd.h>
#include <limits.h>
#include <stdexcept>
#include "logger.hpp"
using ordered_json = nlohmann::ordered_json;
struct MQTTTopics
{
std::string uplink_1;
std::string uplink_2;
std::string downlink;
};
struct MQTTPorts
{
int config_port;
int uplink_1_port;
int uplink_2_port;
int downlink_port;
};
struct MQTTConfig
{
std::string device_id;
std::string vin;
std::string server_ip;
int server_port;
std::string client_id;
std::string username;
std::string password;
int qos;
int keep_alive;
bool clean_session;
MQTTTopics topics;
MQTTPorts ports;
static MQTTConfig load_from_file(const std::string &filepath);
// 用完整 JSON 对象更新原始配置中已有字段
bool update_mqtt_config(const ordered_json &new_values,
const std::string &filepath = "config.json");
};
std::string get_executable_dir();
std::string get_executable_dir_file_path(const std::string &filename);
// 全局配置变量
extern MQTTConfig g_mqtt_config;

101
src/logger.cpp Normal file
View File

@ -0,0 +1,101 @@
#include "logger.hpp"
#include <iomanip>
#include <ctime>
#include <fstream>
#include <cstdio>
std::ofstream Logger::log_file;
std::string Logger::current_log_filename;
void Logger::set_log_to_file(const std::string &filename)
{
current_log_filename = filename;
log_file.open(filename, std::ios::app);
if (!log_file.is_open())
{
std::cerr << "[Logger] Failed to open log file: " << filename << std::endl;
}
}
std::string Logger::get_time_string()
{
using namespace std::chrono;
// 获取当前时间点
auto now = system_clock::now();
auto ms = duration_cast<milliseconds>(now.time_since_epoch()) % 1000;
// 转换为 time_t
auto t = system_clock::to_time_t(now);
std::tm ltm = *std::localtime(&t);
// 拼接格式化字符串
std::ostringstream oss;
oss << std::put_time(&ltm, "%Y-%m-%d %H:%M:%S")
<< '.' << std::setw(3) << std::setfill('0') << ms.count();
return oss.str();
}
void Logger::log(LogLevel level, const std::string &msg)
{
std::string level_str;
switch (level)
{
case LogLevel::INFO:
level_str = "[INFO] ";
break;
case LogLevel::WARN:
level_str = "[WARN] ";
break;
case LogLevel::ERROR:
level_str = "[ERROR]";
break;
}
std::string full_msg = get_time_string() + " " + level_str + " " + msg;
std::cout << full_msg << std::endl;
if (log_file.is_open())
{
log_file << full_msg << std::endl;
log_file.flush();
// 检查是否需要轮转
if (get_file_size(current_log_filename) >= MAX_LOG_FILE_SIZE)
{
rotate_logs();
}
}
}
size_t Logger::get_file_size(const std::string &filename)
{
std::ifstream in(filename, std::ifstream::ate | std::ifstream::binary);
return in.is_open() ? static_cast<size_t>(in.tellg()) : 0;
}
void Logger::rotate_logs()
{
log_file.close();
// 删除最旧的日志
std::string oldest = current_log_filename + "." + std::to_string(MAX_LOG_BACKUP_COUNT);
std::remove(oldest.c_str());
// 重命名 *.4 -> *.5, ..., *.1 -> *.2
for (int i = MAX_LOG_BACKUP_COUNT - 1; i >= 1; --i)
{
std::string old_name = current_log_filename + "." + std::to_string(i);
std::string new_name = current_log_filename + "." + std::to_string(i + 1);
std::rename(old_name.c_str(), new_name.c_str());
}
// 当前日志 -> .1
std::string first_backup = current_log_filename + ".1";
std::rename(current_log_filename.c_str(), first_backup.c_str());
// 重新打开新日志
log_file.open(current_log_filename, std::ios::trunc);
}

168
src/mqtt_client.cpp Normal file
View File

@ -0,0 +1,168 @@
#include "mqtt_client.hpp"
#include <iostream>
#include <thread>
#include <chrono>
MQTTClient::MQTTClient(const MQTTConfig &config)
: config_(config), connected_(false), reconnect_active_(false)
{
initializeClient();
}
MQTTClient::~MQTTClient()
{
disconnect();
}
void MQTTClient::initializeClient()
{
std::string address = "tcp://" + config_.server_ip + ":" + std::to_string(config_.server_port);
client_ = std::make_shared<mqtt::async_client>(address, config_.client_id);
client_->set_callback(*this);
}
void MQTTClient::connect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (connected_)
return;
try
{
auto connOpts = mqtt::connect_options_builder()
.clean_session(config_.clean_session)
.automatic_reconnect(true)
.keep_alive_interval(std::chrono::seconds(config_.keep_alive))
.user_name(config_.username)
.password(config_.password)
.finalize();
client_->connect(connOpts)->wait();
connected_ = true;
reconnect_active_ = false;
if (on_connect_)
on_connect_();
}
catch (const mqtt::exception &e)
{
std::cerr << "[MQTTClient] Connect failed: " << e.what() << std::endl;
startReconnect();
}
}
void MQTTClient::disconnect()
{
std::lock_guard<std::mutex> lock(mutex_);
if (!connected_)
return;
try
{
client_->disconnect()->wait();
connected_ = false;
if (on_disconnect_)
on_disconnect_();
}
catch (const mqtt::exception &e)
{
std::cerr << "[MQTTClient] Disconnect failed: " << e.what() << std::endl;
}
}
void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500));
}
catch (const mqtt::exception &e)
{
std::cerr << "[MQTTClient] Publish failed: " << e.what() << std::endl;
if (!connected_)
startReconnect();
}
}
void MQTTClient::subscribe(const std::string &topic, int qos)
{
if (qos == -1)
qos = config_.qos;
try
{
client_->subscribe(topic, qos)->wait();
}
catch (const mqtt::exception &e)
{
std::cerr << "[MQTTClient] Subscribe failed: " << e.what() << std::endl;
if (!connected_)
startReconnect();
}
}
void MQTTClient::switchServer(const MQTTConfig &newConfig)
{
std::lock_guard<std::mutex> lock(mutex_);
if (connected_)
{
try
{
client_->disconnect()->wait();
}
catch (...)
{
}
connected_ = false;
}
config_ = newConfig;
initializeClient();
connect();
}
void MQTTClient::startReconnect()
{
if (reconnect_active_)
return;
reconnect_active_ = true;
std::thread([this]
{
while (!connected_ && reconnect_active_) {
std::this_thread::sleep_for(std::chrono::seconds(2));
try {
connect();
} catch (...) {}
}
reconnect_active_ = false; })
.detach();
}
void MQTTClient::connection_lost(const std::string &cause)
{
std::lock_guard<std::mutex> lock(mutex_);
connected_ = false;
if (on_disconnect_)
on_disconnect_();
startReconnect();
}
void MQTTClient::message_arrived(mqtt::const_message_ptr msg)
{
if (on_message_)
{
on_message_(msg->get_topic(), msg->get_payload_str());
}
}
bool MQTTClient::isConnected() const
{
return connected_;
}
void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; }
void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; }
void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; }

View File

@ -0,0 +1,57 @@
#include "mqtt_client_wrapper.hpp"
std::shared_ptr<MQTTClient> mqtt_client;
std::atomic<bool> mqtt_restart_required{false};
static uint16_t broadcast_sequence = 0;
// MQTT 回调定义
static void on_mqtt_connected()
{
LOG_INFO("[MQTT] Connected to broker.");
mqtt_client->subscribe(g_mqtt_config.topics.downlink);
}
static void on_mqtt_disconnected()
{
LOG_WARN("[MQTT] Disconnected from broker.");
}
static void on_mqtt_message_received(const std::string &topic, const std::string &message)
{
LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size()));
try
{
}
catch (const std::exception &e)
{
LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what());
}
}
void mqtt_client_thread_func()
{
while (true)
{
const auto &cfg = g_mqtt_config;
mqtt_client = std::make_unique<MQTTClient>(cfg);
mqtt_client->setConnectCallback(on_mqtt_connected);
mqtt_client->setDisconnectCallback(on_mqtt_disconnected);
mqtt_client->setMessageCallback(on_mqtt_message_received);
mqtt_client->connect();
// 主线程监听重启信号
while (!mqtt_restart_required)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// 需要重启
LOG_INFO("[MQTT] Restarting client...");
mqtt_client->disconnect(); // 可加锁
mqtt_client.reset();
mqtt_restart_required = false;
}
}

122
src/mqtt_config.cpp Normal file
View File

@ -0,0 +1,122 @@
#include "mqtt_config.hpp"
MQTTConfig g_mqtt_config;
MQTTConfig MQTTConfig::load_from_file(const std::string &filepath)
{
MQTTConfig cfg;
std::ifstream ifs(filepath);
if (!ifs)
{
LOG_ERROR("[Config] Failed to open config file: " + filepath);
throw std::runtime_error("Failed to open config file: " + filepath);
}
nlohmann::json j;
ifs >> j;
cfg.device_id = j["device_id"];
cfg.vin = j["vin"];
cfg.server_ip = j["server_ip"];
cfg.server_port = j["server_port"];
cfg.client_id = j["client_id"];
cfg.username = j["username"];
cfg.password = j["password"];
cfg.qos = j["qos"];
cfg.keep_alive = j["keep_alive"];
cfg.clean_session = j["clean_session"];
const std::string &id = cfg.device_id;
cfg.topics.uplink_1 = j["topics"]["uplink_1"].get<std::string>() + id;
cfg.topics.uplink_2 = j["topics"]["uplink_2"].get<std::string>() + id;
cfg.topics.downlink = j["topics"]["downlink"].get<std::string>() + id;
cfg.ports.config_port = j["ports"]["config_port"];
cfg.ports.uplink_1_port = j["ports"]["uplink_1_port"];
cfg.ports.uplink_2_port = j["ports"]["uplink_2_port"];
cfg.ports.downlink_port = j["ports"]["downlink_port"];
LOG_INFO("[Config] Loaded config from " + filepath);
LOG_INFO(" - MQTT Server: " + cfg.server_ip + ":" + std::to_string(cfg.server_port));
LOG_INFO(" - Client ID: " + cfg.client_id);
LOG_INFO(" - Topics: " + cfg.topics.uplink_1 + ", " + cfg.topics.uplink_2 + ", " + cfg.topics.downlink);
return cfg;
}
bool MQTTConfig::update_mqtt_config(const ordered_json &new_values,
const std::string &filepath)
{
std::ifstream ifs(filepath);
if (!ifs.is_open())
{
LOG_ERROR("[ConfigEdit] Failed to open config file: " + filepath);
return false;
}
ordered_json j;
try
{
ifs >> j;
ifs.close();
}
catch (const std::exception &e)
{
LOG_ERROR(std::string("[ConfigEdit] JSON parse error: ") + e.what());
return false;
}
for (auto it = new_values.begin(); it != new_values.end(); ++it)
{
const std::string &key = it.key();
const auto &val = it.value();
if (j.contains(key))
{
LOG_INFO("[ConfigEdit] Updating key: " + key + " -> " + val.dump());
j[key] = val;
}
else
{
LOG_WARN("[ConfigEdit] Warning: key '" + key + "' not found in config, ignored.");
}
}
std::ofstream ofs(filepath);
if (!ofs.is_open())
{
LOG_ERROR("[ConfigEdit] Failed to write updated config file: " + filepath);
return false;
}
ofs << j.dump(4);
ofs.close();
return true;
}
std::string get_executable_dir()
{
char result[PATH_MAX] = {0};
ssize_t count = readlink("/proc/self/exe", result, PATH_MAX);
if (count == -1)
{
throw std::runtime_error("Failed to read /proc/self/exe");
}
std::string full_path(result, count);
auto pos = full_path.find_last_of('/');
if (pos == std::string::npos)
{
throw std::runtime_error("Failed to find executable directory");
}
return full_path.substr(0, pos);
}
std::string get_executable_dir_file_path(const std::string &filename)
{
std::string dir = get_executable_dir();
if (dir.back() != '/')
dir += '/';
return dir + filename;
}