From 9f9f311ab47e5c33d5cfa779bc9979a837a777b5 Mon Sep 17 00:00:00 2001 From: cxh Date: Thu, 29 Jan 2026 16:34:05 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=83=A8=E5=88=86=E6=8A=A5?= =?UTF-8?q?=E6=96=87=E5=92=8C=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mqtt_client_wrapper.cpp | 4 +- src/tunnel_client.cpp | 95 ++++++++++++++++++------------------- 2 files changed, 48 insertions(+), 51 deletions(-) diff --git a/src/mqtt_client_wrapper.cpp b/src/mqtt_client_wrapper.cpp index 8441459..08dd055 100644 --- a/src/mqtt_client_wrapper.cpp +++ b/src/mqtt_client_wrapper.cpp @@ -64,8 +64,8 @@ static void send_heartbeat() // ===== 新增:把 radio 塞进心跳 ===== hb["radio"] = { - {"state", ri.state}, {"rat", ri.rat}, {"pci", ri.pci}, {"band", ri.band}, {"arfcn", ri.arfcn}, - {"rsrp", ri.rsrp}, {"rsrq", ri.rsrq}, {"sinr", ri.sinr}, {"raw", ri.raw}, + {"state", ri.state}, {"rat", ri.rat}, {"pci", ri.pci}, {"band", ri.band}, + {"arfcn", ri.arfcn}, {"rsrp", ri.rsrp}, {"rsrq", ri.rsrq}, {"sinr", ri.sinr}, }; // 发布心跳 diff --git a/src/tunnel_client.cpp b/src/tunnel_client.cpp index e288b2a..5586a7e 100644 --- a/src/tunnel_client.cpp +++ b/src/tunnel_client.cpp @@ -12,6 +12,7 @@ #include #include "httplib.h" +#include "logger.hpp" using json = nlohmann::json; using ws_client = websocketpp::client; @@ -23,12 +24,16 @@ constexpr int kHttpTimeoutSec = 30; static bool looks_binary_content_type(const std::string& ct) { - // 你可以按需扩展 if (ct.find("video/") == 0) return true; if (ct.find("application/octet-stream") != std::string::npos) return true; if (ct.find("image/") == 0) return true; return false; } + +// 小工具:拼 log 字符串 +static inline std::string kv(const std::string& k, const std::string& v) { return k + "=" + v; } +static inline std::string kv(const std::string& k, int v) { return k + "=" + std::to_string(v); } + } // namespace TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port) @@ -46,7 +51,7 @@ void TunnelClient::stop() { running_.store(false); - ws_client* cli = client_; // 拷贝一份,避免 race + ws_client* cli = client_; // copy to avoid race if (cli) { cli->get_io_service().post( @@ -66,11 +71,12 @@ void TunnelClient::stop() if (th_.joinable()) th_.join(); } -// =========================== 发送工具(线程安全) =========================== +// =========================== send helpers (thread-safe) =========================== void TunnelClient::send_text_safe(const std::string& s) { if (!client_) return; + client_->get_io_service().post( [this, s]() { @@ -78,7 +84,7 @@ void TunnelClient::send_text_safe(const std::string& s) client_->send(hdl_, s, websocketpp::frame::opcode::text, ec); if (ec) { - std::printf("[Tunnel] send_text failed: %s\n", ec.message().c_str()); + LOG_ERROR(std::string("[Tunnel] send_text failed: ") + ec.message()); } }); } @@ -86,7 +92,8 @@ void TunnelClient::send_text_safe(const std::string& s) void TunnelClient::send_binary_safe(const void* data, size_t len) { if (!client_) return; - std::string payload(reinterpret_cast(data), len); // 拷贝一份,避免原buffer生命周期问题 + + std::string payload(reinterpret_cast(data), len); // copy to avoid buffer lifetime issue client_->get_io_service().post( [this, payload]() { @@ -94,12 +101,12 @@ void TunnelClient::send_binary_safe(const void* data, size_t len) client_->send(hdl_, payload, websocketpp::frame::opcode::binary, ec); if (ec) { - std::printf("[Tunnel] send_binary failed: %s\n", ec.message().c_str()); + LOG_ERROR(std::string("[Tunnel] send_binary failed: ") + ec.message()); } }); } -// =========================== 核心:处理本地 HTTP 并回传 =========================== +// =========================== core: proxy local HTTP and reply =========================== void TunnelClient::handle_request_and_reply(const json& req) { @@ -108,7 +115,7 @@ void TunnelClient::handle_request_and_reply(const json& req) const std::string path = req.value("path", "/"); const std::string body = req.value("body", ""); - std::printf("[Tunnel] req_id=%s %s %s\n", req_id.c_str(), method.c_str(), path.c_str()); + LOG_INFO("[Tunnel] " + kv("req_id", req_id) + " " + method + " " + path); httplib::Client cli("127.0.0.1", local_port_); cli.set_connection_timeout(kHttpTimeoutSec, 0); @@ -116,17 +123,15 @@ void TunnelClient::handle_request_and_reply(const json& req) cli.set_write_timeout(kHttpTimeoutSec, 0); cli.set_keep_alive(true); - // 这里只实现 GET / POST;你可以补 PUT/DELETE if (method == "GET") { - // 关键:用流式接收响应体,避免 res->body 把大文件吃进内存 bool started_header = false; int status_code = 0; std::string content_type = "application/octet-stream"; auto ok = cli.Get( path.c_str(), - // on_response: 拿到 header 时回一个 header frame + // on_response: got headers [&](const httplib::Response& res) { status_code = res.status; @@ -142,20 +147,24 @@ void TunnelClient::handle_request_and_reply(const json& req) send_text_safe(hdr.dump()); started_header = true; - std::printf("[Tunnel] local status=%d ct=%s\n", status_code, content_type.c_str()); + LOG_INFO("[Tunnel] local " + kv("status", status_code) + " " + kv("ct", content_type)); return true; // continue }, - // on_data: 每块数据都用 binary frame 发给服务器 + // on_data: stream body in binary frames [&](const char* data, size_t data_length) { if (!started_header) { - // 极少情况:没走到 on_response(按 httplib 实现一般不会发生) json hdr = { - {"type", "header"}, {"req_id", req_id}, {"status", 200}, {"content_type", content_type}}; + {"type", "header"}, + {"req_id", req_id}, + {"status", 200}, + {"content_type", content_type}, + }; send_text_safe(hdr.dump()); started_header = true; } + send_binary_safe(data, data_length); return true; // continue }); @@ -164,26 +173,24 @@ void TunnelClient::handle_request_and_reply(const json& req) { json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}}; send_text_safe(err.dump()); - std::printf("[Tunnel] local http error (GET)\n"); + LOG_ERROR("[Tunnel] local http error (GET) " + kv("req_id", req_id)); return; } - // 结束帧 json end = {{"type", "end"}, {"req_id", req_id}}; send_text_safe(end.dump()); - std::printf("[Tunnel] done req_id=%s\n", req_id.c_str()); + LOG_INFO("[Tunnel] done " + kv("req_id", req_id)); return; } if (method == "POST") { - // POST 多数是小 JSON,先简单按文本处理 auto res = cli.Post(path.c_str(), body, "application/json"); if (!res) { json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}}; send_text_safe(err.dump()); - std::printf("[Tunnel] local http error (POST)\n"); + LOG_ERROR("[Tunnel] local http error (POST) " + kv("req_id", req_id)); return; } @@ -195,12 +202,13 @@ void TunnelClient::handle_request_and_reply(const json& req) {"type", "json"}, {"req_id", req_id}, {"status", res->status}, {"content_type", ct}, {"body", res->body}, }; send_text_safe(out.dump()); - std::printf("[Tunnel] POST done req_id=%s status=%d\n", req_id.c_str(), res->status); + LOG_INFO("[Tunnel] POST done " + kv("req_id", req_id) + " " + kv("status", res->status)); return; } json out = {{"type", "json"}, {"req_id", req_id}, {"status", 405}, {"body", "unsupported method"}}; send_text_safe(out.dump()); + LOG_WARN("[Tunnel] unsupported method " + kv("req_id", req_id) + " " + kv("method", method)); } // =========================== run loop =========================== @@ -209,34 +217,27 @@ void TunnelClient::run_loop() { while (running_.load()) { - // 每次循环都创建一个新的 client,避免 websocketpp 状态复用崩溃 ws_client c; c.clear_access_channels(websocketpp::log::alevel::all); c.clear_error_channels(websocketpp::log::elevel::all); c.init_asio(); - - // (可选但推荐)让 run() 不会因为没有 work 而提前退出 c.start_perpetual(); - // 让 stop() 能安全拿到当前 client - { - // 如果你不想引入 mutex,这段也可以不加,但建议加 - client_ = &c; - } + // expose current client to stop() + client_ = &c; c.set_open_handler( [this](websocketpp::connection_hdl h) { hdl_ = h; - std::printf("[Tunnel] Connected to server\n"); + LOG_INFO("[Tunnel] Connected to server"); }); + // IMPORTANT: do not print reconnect related logs c.set_close_handler( [this, &c](websocketpp::connection_hdl) { - std::printf("[Tunnel] Disconnected from server\n"); - // 让 run() 退出,进入下一轮重连 c.stop_perpetual(); c.stop(); }); @@ -244,8 +245,6 @@ void TunnelClient::run_loop() c.set_fail_handler( [this, &c](websocketpp::connection_hdl) { - std::printf("[Tunnel] Connect failed\n"); - // 让 run() 退出,进入下一轮重连 c.stop_perpetual(); c.stop(); }); @@ -257,12 +256,12 @@ void TunnelClient::run_loop() { if (msg->get_opcode() != websocketpp::frame::opcode::text) { - std::printf("[Tunnel] ignore non-text msg from server\n"); + LOG_WARN("[Tunnel] ignore non-text msg from server"); return; } + json req = json::parse(msg->get_payload()); - // 你原来的做法:起线程处理本地 HTTP,OK std::thread( [this, req]() { @@ -272,22 +271,22 @@ void TunnelClient::run_loop() } catch (const std::exception& e) { - std::printf("[Tunnel] handler exception: %s\n", e.what()); + LOG_ERROR(std::string("[Tunnel] handler exception: ") + e.what()); } catch (...) { - std::printf("[Tunnel] handler unknown exception\n"); + LOG_ERROR("[Tunnel] handler unknown exception"); } }) .detach(); } catch (const std::exception& e) { - std::printf("[Tunnel] JSON parse error: %s\n", e.what()); + LOG_ERROR(std::string("[Tunnel] JSON parse error: ") + e.what()); } catch (...) { - std::printf("[Tunnel] JSON parse unknown error\n"); + LOG_ERROR("[Tunnel] JSON parse unknown error"); } }); @@ -295,37 +294,35 @@ void TunnelClient::run_loop() auto conn = c.get_connection(ws_url_, ec); if (ec) { - std::printf("[Tunnel] Connection init failed: %s\n", ec.message().c_str()); client_ = nullptr; - // 这里别直接 return,继续重试 - goto retry_sleep; + // do not print reconnect logs; just sleep and retry + std::this_thread::sleep_for(std::chrono::seconds(2)); + continue; } c.connect(conn); try { - // 阻塞运行,直到 fail/close 里 stop(),或 stop() 主动 close c.run(); } catch (const std::exception& e) { - std::printf("[Tunnel] run exception: %s\n", e.what()); + LOG_ERROR(std::string("[Tunnel] run exception: ") + e.what()); } catch (...) { - std::printf("[Tunnel] run unknown exception\n"); + LOG_ERROR("[Tunnel] run unknown exception"); } client_ = nullptr; if (!running_.load()) break; - retry_sleep: - std::printf("[Tunnel] reconnecting in 2s...\n"); + // do not print reconnect logs std::this_thread::sleep_for(std::chrono::seconds(2)); } client_ = nullptr; - std::printf("[Tunnel] Loop exit\n"); + LOG_INFO("[Tunnel] Loop exit"); }