From 2759f86c760193c64b091437e9f0100b7d94cc21 Mon Sep 17 00:00:00 2001 From: cxh Date: Wed, 21 Jan 2026 14:58:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=9A=A7=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/tunnel_client.hpp | 21 ++- src/tunnel_client.cpp | 290 +++++++++++++++++++++++++++++++------- 2 files changed, 259 insertions(+), 52 deletions(-) diff --git a/include/tunnel_client.hpp b/include/tunnel_client.hpp index 676a9fb..2ca6cd4 100644 --- a/include/tunnel_client.hpp +++ b/include/tunnel_client.hpp @@ -2,24 +2,39 @@ #include #include #include +#include + +namespace websocketpp +{ +namespace client +{ +} +} // namespace websocketpp class TunnelClient { public: TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port); - void start(); void stop(); private: void run_loop(); - std::string handle_local_http(const std::string& method, const std::string& path, const std::string& body); + + // 新增:流式处理 & 安全发送 + void handle_request_and_reply(const nlohmann::json& req); + void send_text_safe(const std::string& s); + void send_binary_safe(const void* data, size_t len); private: std::string vid_; std::string ws_url_; - int local_port_; + int local_port_ = 0; std::atomic running_{false}; std::thread th_; + + // websocketpp 句柄 + websocketpp::connection_hdl hdl_; + ws_client* client_ = nullptr; // 指向 run_loop 的 client }; diff --git a/src/tunnel_client.cpp b/src/tunnel_client.cpp index c2975c8..e30d697 100644 --- a/src/tunnel_client.cpp +++ b/src/tunnel_client.cpp @@ -1,14 +1,36 @@ #include "tunnel_client.hpp" +#include +#include +#include +#include #include +#include +#include +#include #include #include #include "httplib.h" -using json = nlohmann::json; +using json = nlohmann::json; using ws_client = websocketpp::client; +namespace +{ +constexpr size_t kChunkSize = 64 * 1024; // 64KB +constexpr int kHttpTimeoutSec = 30; + +static bool looks_binary_content_type(const std::string& ct) +{ + // 你可以按需扩展 + if (ct.find("video/") == 0) return true; + if (ct.find("application/octet-stream") != std::string::npos) return true; + if (ct.find("image/") == 0) return true; + return false; +} +} // namespace + TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port) : vid_(vid), ws_url_(server_ws_url + "?vid=" + vid), local_port_(local_http_port) { @@ -16,108 +38,278 @@ TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_ void TunnelClient::start() { - running_ = true; + running_.store(true); th_ = std::thread(&TunnelClient::run_loop, this); } void TunnelClient::stop() { - running_ = false; + running_.store(false); + if (client_) + { + client_->get_io_service().post( + [this]() + { + try + { + websocketpp::lib::error_code ec; + client_->close(hdl_, websocketpp::close::status::going_away, "stop", ec); + } + catch (...) + { + } + }); + } if (th_.joinable()) th_.join(); } -std::string TunnelClient::handle_local_http(const std::string& method, const std::string& path, const std::string& body) +// =========================== 发送工具(线程安全) =========================== + +void TunnelClient::send_text_safe(const std::string& s) { + if (!client_) return; + client_->get_io_service().post( + [this, s]() + { + websocketpp::lib::error_code ec; + client_->send(hdl_, s, websocketpp::frame::opcode::text, ec); + if (ec) + { + std::printf("[Tunnel] send_text failed: %s\n", ec.message().c_str()); + } + }); +} + +void TunnelClient::send_binary_safe(const void* data, size_t len) +{ + if (!client_) return; + std::string payload(reinterpret_cast(data), len); // 拷贝一份,避免原buffer生命周期问题 + client_->get_io_service().post( + [this, payload]() + { + websocketpp::lib::error_code ec; + client_->send(hdl_, payload, websocketpp::frame::opcode::binary, ec); + if (ec) + { + std::printf("[Tunnel] send_binary failed: %s\n", ec.message().c_str()); + } + }); +} + +// =========================== 核心:处理本地 HTTP 并回传 =========================== + +void TunnelClient::handle_request_and_reply(const json& req) +{ + const std::string req_id = req.value("req_id", ""); + const std::string method = req.value("method", "GET"); + const std::string path = req.value("path", "/"); + const std::string body = req.value("body", ""); + + std::printf("[Tunnel] req_id=%s %s %s\n", req_id.c_str(), method.c_str(), path.c_str()); + httplib::Client cli("127.0.0.1", local_port_); - cli.set_read_timeout(10, 0); - - httplib::Result res; + cli.set_connection_timeout(kHttpTimeoutSec, 0); + cli.set_read_timeout(kHttpTimeoutSec, 0); + cli.set_write_timeout(kHttpTimeoutSec, 0); + cli.set_keep_alive(true); + // 这里只实现 GET / POST;你可以补 PUT/DELETE if (method == "GET") { - res = cli.Get(path.c_str()); - } - else if (method == "POST") - { - res = cli.Post(path.c_str(), body, "application/json"); - } - else - { - return json({{"status", 405}, {"body", "unsupported method"}}).dump(); + // 关键:用流式接收响应体,避免 res->body 把大文件吃进内存 + bool started_header = false; + int status_code = 0; + std::string content_type = "application/octet-stream"; + + auto ok = cli.Get( + path.c_str(), + // on_response: 拿到 header 时回一个 header frame + [&](const httplib::Response& res) + { + status_code = res.status; + auto it = res.headers.find("Content-Type"); + if (it != res.headers.end()) content_type = it->second; + + json hdr = { + {"type", "header"}, + {"req_id", req_id}, + {"status", status_code}, + {"content_type", content_type}, + }; + send_text_safe(hdr.dump()); + started_header = true; + + std::printf("[Tunnel] local status=%d ct=%s\n", status_code, content_type.c_str()); + return true; // continue + }, + // on_data: 每块数据都用 binary frame 发给服务器 + [&](const char* data, size_t data_length) + { + if (!started_header) + { + // 极少情况:没走到 on_response(按 httplib 实现一般不会发生) + json hdr = { + {"type", "header"}, {"req_id", req_id}, {"status", 200}, {"content_type", content_type}}; + send_text_safe(hdr.dump()); + started_header = true; + } + send_binary_safe(data, data_length); + return true; // continue + }); + + if (!ok) + { + json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}}; + send_text_safe(err.dump()); + std::printf("[Tunnel] local http error (GET)\n"); + return; + } + + // 结束帧 + json end = {{"type", "end"}, {"req_id", req_id}}; + send_text_safe(end.dump()); + std::printf("[Tunnel] done req_id=%s\n", req_id.c_str()); + return; } - if (!res) + if (method == "POST") { - return json({{"status", 500}, {"body", "local http error"}}).dump(); + // POST 多数是小 JSON,先简单按文本处理 + auto res = cli.Post(path.c_str(), body, "application/json"); + if (!res) + { + json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}}; + send_text_safe(err.dump()); + std::printf("[Tunnel] local http error (POST)\n"); + return; + } + + std::string ct = "application/json"; + auto it = res->headers.find("Content-Type"); + if (it != res->headers.end()) ct = it->second; + + json out = { + {"type", "json"}, {"req_id", req_id}, {"status", res->status}, {"content_type", ct}, {"body", res->body}, + }; + send_text_safe(out.dump()); + std::printf("[Tunnel] POST done req_id=%s status=%d\n", req_id.c_str(), res->status); + return; } - return json({{"status", res->status}, {"body", res->body}}).dump(); + json out = {{"type", "json"}, {"req_id", req_id}, {"status", 405}, {"body", "unsupported method"}}; + send_text_safe(out.dump()); } +// =========================== run loop =========================== + void TunnelClient::run_loop() { ws_client c; c.clear_access_channels(websocketpp::log::alevel::all); c.init_asio(); - websocketpp::connection_hdl hdl; + client_ = &c; - // ----------------------- WebSocket 回调 ----------------------- c.set_open_handler( - [&](websocketpp::connection_hdl h) + [this](websocketpp::connection_hdl h) { - hdl = h; - printf("[Tunnel] Connected to server\n"); + hdl_ = h; + std::printf("[Tunnel] Connected to server\n"); }); - c.set_close_handler([&](websocketpp::connection_hdl) { printf("[Tunnel] Disconnected from server\n"); }); + c.set_close_handler([this](websocketpp::connection_hdl) { std::printf("[Tunnel] Disconnected from server\n"); }); + + c.set_fail_handler([this](websocketpp::connection_hdl) { std::printf("[Tunnel] Connect failed\n"); }); c.set_message_handler( - [&](websocketpp::connection_hdl, ws_client::message_ptr msg) + [this](websocketpp::connection_hdl, ws_client::message_ptr msg) { try { - std::string payload = msg->get_payload(); + if (msg->get_opcode() != websocketpp::frame::opcode::text) + { + std::printf("[Tunnel] ignore non-text msg from server\n"); + return; + } + auto payload = msg->get_payload(); json req = json::parse(payload); - std::string req_id = req["req_id"]; - std::string method = req["method"]; - std::string path = req["path"]; - std::string body = req.value("body", ""); - - std::string local_result = handle_local_http(method, path, body); - - json resp; - resp["req_id"] = req_id; - resp["resp"] = json::parse(local_result); - - c.send(hdl, resp.dump(), websocketpp::frame::opcode::text); + // 同步处理会阻塞 websocket 线程:这里开线程处理,避免卡住心跳/读写 + std::thread( + [this, req]() + { + try + { + handle_request_and_reply(req); + } + catch (const std::exception& e) + { + std::printf("[Tunnel] handler exception: %s\n", e.what()); + } + catch (...) + { + std::printf("[Tunnel] handler unknown exception\n"); + } + }) + .detach(); + } + catch (const std::exception& e) + { + std::printf("[Tunnel] JSON parse error: %s\n", e.what()); } catch (...) { - printf("[Tunnel] JSON or HTTP error\n"); + std::printf("[Tunnel] JSON parse unknown error\n"); } }); - // ----------------------- 建立连接 ----------------------- websocketpp::lib::error_code ec; auto conn = c.get_connection(ws_url_, ec); if (ec) { - printf("[Tunnel] Connection init failed: %s\n", ec.message().c_str()); + std::printf("[Tunnel] Connection init failed: %s\n", ec.message().c_str()); + client_ = nullptr; return; } + c.connect(conn); - // ----------------------- 主循环 ----------------------- - while (running_) + // 用 run():稳定事件循环 + while (running_.load()) { - // 处理一次事件 - c.run_one(); + try + { + c.run(); + // run() 返回说明 stopped 了,稍等再重连 + } + catch (const std::exception& e) + { + std::printf("[Tunnel] run exception: %s\n", e.what()); + } + catch (...) + { + std::printf("[Tunnel] run unknown exception\n"); + } - // 如果连接停止(断线、错误等)——退出 loop - if (c.stopped()) break; + if (!running_.load()) break; + + std::printf("[Tunnel] reconnecting in 2s...\n"); + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // 重建连接 + c.reset(); + c.init_asio(); + + websocketpp::lib::error_code ec2; + auto conn2 = c.get_connection(ws_url_, ec2); + if (ec2) + { + std::printf("[Tunnel] reconnect init failed: %s\n", ec2.message().c_str()); + continue; + } + c.connect(conn2); } - printf("[Tunnel] Loop exit\n"); + std::printf("[Tunnel] Loop exit\n"); + client_ = nullptr; }