diff --git a/include/logger.hpp b/include/logger.hpp new file mode 100644 index 0000000..4bb2b48 --- /dev/null +++ b/include/logger.hpp @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +enum class LogLevel +{ + INFO, + WARN, + ERROR +}; + +class Logger +{ +public: + static void set_log_to_file(const std::string &filename); + + static void log(LogLevel level, const std::string &msg); + +private: + static std::ofstream log_file; + static std::string current_log_filename; + static std::string get_time_string(); + + static void rotate_logs(); + static size_t get_file_size(const std::string &filename); + + static constexpr size_t MAX_LOG_FILE_SIZE = 10 * 1024 * 1024; // 10MB + static constexpr int MAX_LOG_BACKUP_COUNT = 5; +}; + +// 简化宏 +#define LOG_INFO(msg) Logger::log(LogLevel::INFO, msg) +#define LOG_WARN(msg) Logger::log(LogLevel::WARN, msg) +#define LOG_ERROR(msg) Logger::log(LogLevel::ERROR, msg) diff --git a/include/mqtt_client.hpp b/include/mqtt_client.hpp new file mode 100644 index 0000000..eeeef0f --- /dev/null +++ b/include/mqtt_client.hpp @@ -0,0 +1,51 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "mqtt_config.hpp" + +class MQTTClient : public virtual mqtt::callback +{ +public: + using ConnectCallback = std::function; + using DisconnectCallback = std::function; + using MessageCallback = std::function; + + explicit MQTTClient(const MQTTConfig &config); + ~MQTTClient(); + + void setConnectCallback(ConnectCallback cb); + void setDisconnectCallback(DisconnectCallback cb); + void setMessageCallback(MessageCallback cb); + + void connect(); + void disconnect(); + void publish(const std::string &topic, const std::string &payload, int qos = -1); + void subscribe(const std::string &topic, int qos = -1); + void switchServer(const MQTTConfig &newConfig); + + bool isConnected() const; + +private: + void initializeClient(); + void startReconnect(); + + // mqtt::callback 实现 + void connection_lost(const std::string &cause) override; + void message_arrived(mqtt::const_message_ptr msg) override; + void delivery_complete(mqtt::delivery_token_ptr token) override {} + + MQTTConfig config_; + std::shared_ptr client_; + std::atomic connected_; + std::atomic reconnect_active_; + std::mutex mutex_; + + ConnectCallback on_connect_; + DisconnectCallback on_disconnect_; + MessageCallback on_message_; +}; diff --git a/include/mqtt_client_wrapper.hpp b/include/mqtt_client_wrapper.hpp new file mode 100644 index 0000000..c24a043 --- /dev/null +++ b/include/mqtt_client_wrapper.hpp @@ -0,0 +1,12 @@ +#pragma once + +#include "mqtt_config.hpp" +#include "logger.hpp" +#include "mqtt_client.hpp" + +// 启动 MQTT 客户端(内部自动连接、订阅、发布等) +void mqtt_client_thread_func(); + +// 外部可访问的 MQTT 客户端指针 +extern std::shared_ptr mqtt_client; +extern std::atomic mqtt_restart_required; \ No newline at end of file diff --git a/include/mqtt_config.hpp b/include/mqtt_config.hpp new file mode 100644 index 0000000..7a8167f --- /dev/null +++ b/include/mqtt_config.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "logger.hpp" + +using ordered_json = nlohmann::ordered_json; + +struct MQTTTopics +{ + std::string uplink_1; + std::string uplink_2; + std::string downlink; +}; + +struct MQTTPorts +{ + int config_port; + int uplink_1_port; + int uplink_2_port; + int downlink_port; +}; + +struct MQTTConfig +{ + std::string device_id; + std::string vin; + std::string server_ip; + int server_port; + std::string client_id; + std::string username; + std::string password; + int qos; + int keep_alive; + bool clean_session; + MQTTTopics topics; + MQTTPorts ports; + + static MQTTConfig load_from_file(const std::string &filepath); + // 用完整 JSON 对象更新原始配置中已有字段 + bool update_mqtt_config(const ordered_json &new_values, + const std::string &filepath = "config.json"); +}; + +std::string get_executable_dir(); +std::string get_executable_dir_file_path(const std::string &filename); + +// 全局配置变量 +extern MQTTConfig g_mqtt_config; diff --git a/src/logger.cpp b/src/logger.cpp new file mode 100644 index 0000000..acecf7c --- /dev/null +++ b/src/logger.cpp @@ -0,0 +1,101 @@ +#include "logger.hpp" +#include +#include +#include +#include + +std::ofstream Logger::log_file; +std::string Logger::current_log_filename; + +void Logger::set_log_to_file(const std::string &filename) +{ + current_log_filename = filename; + log_file.open(filename, std::ios::app); + if (!log_file.is_open()) + { + std::cerr << "[Logger] Failed to open log file: " << filename << std::endl; + } +} + +std::string Logger::get_time_string() +{ + using namespace std::chrono; + + // 获取当前时间点 + auto now = system_clock::now(); + auto ms = duration_cast(now.time_since_epoch()) % 1000; + + // 转换为 time_t(秒) + auto t = system_clock::to_time_t(now); + std::tm ltm = *std::localtime(&t); + + // 拼接格式化字符串 + std::ostringstream oss; + oss << std::put_time(<m, "%Y-%m-%d %H:%M:%S") + << '.' << std::setw(3) << std::setfill('0') << ms.count(); + + return oss.str(); +} + +void Logger::log(LogLevel level, const std::string &msg) +{ + std::string level_str; + switch (level) + { + case LogLevel::INFO: + level_str = "[INFO] "; + break; + case LogLevel::WARN: + level_str = "[WARN] "; + break; + case LogLevel::ERROR: + level_str = "[ERROR]"; + break; + } + + std::string full_msg = get_time_string() + " " + level_str + " " + msg; + + std::cout << full_msg << std::endl; + + if (log_file.is_open()) + { + log_file << full_msg << std::endl; + log_file.flush(); + + // 检查是否需要轮转 + if (get_file_size(current_log_filename) >= MAX_LOG_FILE_SIZE) + { + rotate_logs(); + } + } +} + +size_t Logger::get_file_size(const std::string &filename) +{ + std::ifstream in(filename, std::ifstream::ate | std::ifstream::binary); + return in.is_open() ? static_cast(in.tellg()) : 0; +} + +void Logger::rotate_logs() +{ + log_file.close(); + + // 删除最旧的日志 + std::string oldest = current_log_filename + "." + std::to_string(MAX_LOG_BACKUP_COUNT); + std::remove(oldest.c_str()); + + // 重命名 *.4 -> *.5, ..., *.1 -> *.2 + for (int i = MAX_LOG_BACKUP_COUNT - 1; i >= 1; --i) + { + std::string old_name = current_log_filename + "." + std::to_string(i); + std::string new_name = current_log_filename + "." + std::to_string(i + 1); + std::rename(old_name.c_str(), new_name.c_str()); + } + + // 当前日志 -> .1 + std::string first_backup = current_log_filename + ".1"; + std::rename(current_log_filename.c_str(), first_backup.c_str()); + + // 重新打开新日志 + log_file.open(current_log_filename, std::ios::trunc); +} diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp new file mode 100644 index 0000000..83ca99a --- /dev/null +++ b/src/mqtt_client.cpp @@ -0,0 +1,168 @@ +#include "mqtt_client.hpp" +#include +#include +#include + +MQTTClient::MQTTClient(const MQTTConfig &config) + : config_(config), connected_(false), reconnect_active_(false) +{ + initializeClient(); +} + +MQTTClient::~MQTTClient() +{ + disconnect(); +} + +void MQTTClient::initializeClient() +{ + std::string address = "tcp://" + config_.server_ip + ":" + std::to_string(config_.server_port); + client_ = std::make_shared(address, config_.client_id); + client_->set_callback(*this); +} + +void MQTTClient::connect() +{ + std::lock_guard lock(mutex_); + if (connected_) + return; + + try + { + auto connOpts = mqtt::connect_options_builder() + .clean_session(config_.clean_session) + .automatic_reconnect(true) + .keep_alive_interval(std::chrono::seconds(config_.keep_alive)) + .user_name(config_.username) + .password(config_.password) + .finalize(); + + client_->connect(connOpts)->wait(); + connected_ = true; + reconnect_active_ = false; + + if (on_connect_) + on_connect_(); + } + catch (const mqtt::exception &e) + { + std::cerr << "[MQTTClient] Connect failed: " << e.what() << std::endl; + startReconnect(); + } +} + +void MQTTClient::disconnect() +{ + std::lock_guard lock(mutex_); + if (!connected_) + return; + + try + { + client_->disconnect()->wait(); + connected_ = false; + if (on_disconnect_) + on_disconnect_(); + } + catch (const mqtt::exception &e) + { + std::cerr << "[MQTTClient] Disconnect failed: " << e.what() << std::endl; + } +} + +void MQTTClient::publish(const std::string &topic, const std::string &payload, int qos) +{ + if (qos == -1) + qos = config_.qos; + + try + { + client_->publish(topic, payload.data(), payload.size(), qos, false)->wait_for(std::chrono::milliseconds(500)); + } + catch (const mqtt::exception &e) + { + std::cerr << "[MQTTClient] Publish failed: " << e.what() << std::endl; + if (!connected_) + startReconnect(); + } +} + +void MQTTClient::subscribe(const std::string &topic, int qos) +{ + if (qos == -1) + qos = config_.qos; + + try + { + client_->subscribe(topic, qos)->wait(); + } + catch (const mqtt::exception &e) + { + std::cerr << "[MQTTClient] Subscribe failed: " << e.what() << std::endl; + if (!connected_) + startReconnect(); + } +} + +void MQTTClient::switchServer(const MQTTConfig &newConfig) +{ + std::lock_guard lock(mutex_); + if (connected_) + { + try + { + client_->disconnect()->wait(); + } + catch (...) + { + } + connected_ = false; + } + config_ = newConfig; + initializeClient(); + connect(); +} + +void MQTTClient::startReconnect() +{ + if (reconnect_active_) + return; + reconnect_active_ = true; + + std::thread([this] + { + while (!connected_ && reconnect_active_) { + std::this_thread::sleep_for(std::chrono::seconds(2)); + try { + connect(); + } catch (...) {} + } + reconnect_active_ = false; }) + .detach(); +} + +void MQTTClient::connection_lost(const std::string &cause) +{ + std::lock_guard lock(mutex_); + connected_ = false; + if (on_disconnect_) + on_disconnect_(); + startReconnect(); +} + +void MQTTClient::message_arrived(mqtt::const_message_ptr msg) +{ + if (on_message_) + { + on_message_(msg->get_topic(), msg->get_payload_str()); + } +} + +bool MQTTClient::isConnected() const +{ + return connected_; +} + +void MQTTClient::setConnectCallback(ConnectCallback cb) { on_connect_ = cb; } +void MQTTClient::setDisconnectCallback(DisconnectCallback cb) { on_disconnect_ = cb; } +void MQTTClient::setMessageCallback(MessageCallback cb) { on_message_ = cb; } diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp new file mode 100644 index 0000000..9640d90 --- /dev/null +++ b/src/mqtt_client_wrapper.cpp @@ -0,0 +1,57 @@ +#include "mqtt_client_wrapper.hpp" + +std::shared_ptr mqtt_client; +std::atomic mqtt_restart_required{false}; + +static uint16_t broadcast_sequence = 0; + +// MQTT 回调定义 +static void on_mqtt_connected() +{ + LOG_INFO("[MQTT] Connected to broker."); + mqtt_client->subscribe(g_mqtt_config.topics.downlink); +} + +static void on_mqtt_disconnected() +{ + LOG_WARN("[MQTT] Disconnected from broker."); +} + +static void on_mqtt_message_received(const std::string &topic, const std::string &message) +{ + LOG_INFO("[MQTT] Received message on topic [" + topic + "], len = " + std::to_string(message.size())); + + try + { + } + catch (const std::exception &e) + { + LOG_ERROR(std::string("[MQTT] Failed to process incoming JSON: ") + e.what()); + } +} + +void mqtt_client_thread_func() +{ + while (true) + { + const auto &cfg = g_mqtt_config; + mqtt_client = std::make_unique(cfg); + mqtt_client->setConnectCallback(on_mqtt_connected); + mqtt_client->setDisconnectCallback(on_mqtt_disconnected); + mqtt_client->setMessageCallback(on_mqtt_message_received); + + mqtt_client->connect(); + + // 主线程监听重启信号 + while (!mqtt_restart_required) + { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + // 需要重启 + LOG_INFO("[MQTT] Restarting client..."); + mqtt_client->disconnect(); // 可加锁 + mqtt_client.reset(); + mqtt_restart_required = false; + } +} diff --git a/src/mqtt_config.cpp b/src/mqtt_config.cpp new file mode 100644 index 0000000..6482170 --- /dev/null +++ b/src/mqtt_config.cpp @@ -0,0 +1,122 @@ +#include "mqtt_config.hpp" + +MQTTConfig g_mqtt_config; + +MQTTConfig MQTTConfig::load_from_file(const std::string &filepath) +{ + MQTTConfig cfg; + + std::ifstream ifs(filepath); + if (!ifs) + { + LOG_ERROR("[Config] Failed to open config file: " + filepath); + throw std::runtime_error("Failed to open config file: " + filepath); + } + + nlohmann::json j; + ifs >> j; + + cfg.device_id = j["device_id"]; + cfg.vin = j["vin"]; + cfg.server_ip = j["server_ip"]; + cfg.server_port = j["server_port"]; + cfg.client_id = j["client_id"]; + cfg.username = j["username"]; + cfg.password = j["password"]; + cfg.qos = j["qos"]; + cfg.keep_alive = j["keep_alive"]; + cfg.clean_session = j["clean_session"]; + + const std::string &id = cfg.device_id; + cfg.topics.uplink_1 = j["topics"]["uplink_1"].get() + id; + cfg.topics.uplink_2 = j["topics"]["uplink_2"].get() + id; + cfg.topics.downlink = j["topics"]["downlink"].get() + id; + + cfg.ports.config_port = j["ports"]["config_port"]; + cfg.ports.uplink_1_port = j["ports"]["uplink_1_port"]; + cfg.ports.uplink_2_port = j["ports"]["uplink_2_port"]; + cfg.ports.downlink_port = j["ports"]["downlink_port"]; + + LOG_INFO("[Config] Loaded config from " + filepath); + LOG_INFO(" - MQTT Server: " + cfg.server_ip + ":" + std::to_string(cfg.server_port)); + LOG_INFO(" - Client ID: " + cfg.client_id); + LOG_INFO(" - Topics: " + cfg.topics.uplink_1 + ", " + cfg.topics.uplink_2 + ", " + cfg.topics.downlink); + + return cfg; +} + +bool MQTTConfig::update_mqtt_config(const ordered_json &new_values, + const std::string &filepath) +{ + std::ifstream ifs(filepath); + if (!ifs.is_open()) + { + LOG_ERROR("[ConfigEdit] Failed to open config file: " + filepath); + return false; + } + + ordered_json j; + try + { + ifs >> j; + ifs.close(); + } + catch (const std::exception &e) + { + LOG_ERROR(std::string("[ConfigEdit] JSON parse error: ") + e.what()); + return false; + } + + for (auto it = new_values.begin(); it != new_values.end(); ++it) + { + const std::string &key = it.key(); + const auto &val = it.value(); + + if (j.contains(key)) + { + LOG_INFO("[ConfigEdit] Updating key: " + key + " -> " + val.dump()); + j[key] = val; + } + else + { + LOG_WARN("[ConfigEdit] Warning: key '" + key + "' not found in config, ignored."); + } + } + + std::ofstream ofs(filepath); + if (!ofs.is_open()) + { + LOG_ERROR("[ConfigEdit] Failed to write updated config file: " + filepath); + return false; + } + + ofs << j.dump(4); + ofs.close(); + return true; +} + +std::string get_executable_dir() +{ + char result[PATH_MAX] = {0}; + ssize_t count = readlink("/proc/self/exe", result, PATH_MAX); + if (count == -1) + { + throw std::runtime_error("Failed to read /proc/self/exe"); + } + + std::string full_path(result, count); + auto pos = full_path.find_last_of('/'); + if (pos == std::string::npos) + { + throw std::runtime_error("Failed to find executable directory"); + } + return full_path.substr(0, pos); +} + +std::string get_executable_dir_file_path(const std::string &filename) +{ + std::string dir = get_executable_dir(); + if (dir.back() != '/') + dir += '/'; + return dir + filename; +} \ No newline at end of file