修改隧道
This commit is contained in:
parent
cd82cc0692
commit
2759f86c76
@ -2,24 +2,39 @@
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <websocketpp/connection.hpp>
|
||||
|
||||
namespace websocketpp
|
||||
{
|
||||
namespace client
|
||||
{
|
||||
}
|
||||
} // namespace websocketpp
|
||||
|
||||
class TunnelClient
|
||||
{
|
||||
public:
|
||||
TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port);
|
||||
|
||||
void start();
|
||||
void stop();
|
||||
|
||||
private:
|
||||
void run_loop();
|
||||
std::string handle_local_http(const std::string& method, const std::string& path, const std::string& body);
|
||||
|
||||
// 新增:流式处理 & 安全发送
|
||||
void handle_request_and_reply(const nlohmann::json& req);
|
||||
void send_text_safe(const std::string& s);
|
||||
void send_binary_safe(const void* data, size_t len);
|
||||
|
||||
private:
|
||||
std::string vid_;
|
||||
std::string ws_url_;
|
||||
int local_port_;
|
||||
int local_port_ = 0;
|
||||
|
||||
std::atomic<bool> running_{false};
|
||||
std::thread th_;
|
||||
|
||||
// websocketpp 句柄
|
||||
websocketpp::connection_hdl hdl_;
|
||||
ws_client* client_ = nullptr; // 指向 run_loop 的 client
|
||||
};
|
||||
|
||||
@ -1,14 +1,36 @@
|
||||
#include "tunnel_client.hpp"
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdio>
|
||||
#include <mutex>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <websocketpp/client.hpp>
|
||||
#include <websocketpp/config/asio_no_tls_client.hpp>
|
||||
|
||||
#include "httplib.h"
|
||||
using json = nlohmann::json;
|
||||
|
||||
using json = nlohmann::json;
|
||||
using ws_client = websocketpp::client<websocketpp::config::asio_client>;
|
||||
|
||||
namespace
|
||||
{
|
||||
constexpr size_t kChunkSize = 64 * 1024; // 64KB
|
||||
constexpr int kHttpTimeoutSec = 30;
|
||||
|
||||
static bool looks_binary_content_type(const std::string& ct)
|
||||
{
|
||||
// 你可以按需扩展
|
||||
if (ct.find("video/") == 0) return true;
|
||||
if (ct.find("application/octet-stream") != std::string::npos) return true;
|
||||
if (ct.find("image/") == 0) return true;
|
||||
return false;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port)
|
||||
: vid_(vid), ws_url_(server_ws_url + "?vid=" + vid), local_port_(local_http_port)
|
||||
{
|
||||
@ -16,108 +38,278 @@ TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_
|
||||
|
||||
void TunnelClient::start()
|
||||
{
|
||||
running_ = true;
|
||||
running_.store(true);
|
||||
th_ = std::thread(&TunnelClient::run_loop, this);
|
||||
}
|
||||
|
||||
void TunnelClient::stop()
|
||||
{
|
||||
running_ = false;
|
||||
running_.store(false);
|
||||
if (client_)
|
||||
{
|
||||
client_->get_io_service().post(
|
||||
[this]()
|
||||
{
|
||||
try
|
||||
{
|
||||
websocketpp::lib::error_code ec;
|
||||
client_->close(hdl_, websocketpp::close::status::going_away, "stop", ec);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
});
|
||||
}
|
||||
if (th_.joinable()) th_.join();
|
||||
}
|
||||
|
||||
std::string TunnelClient::handle_local_http(const std::string& method, const std::string& path, const std::string& body)
|
||||
// =========================== 发送工具(线程安全) ===========================
|
||||
|
||||
void TunnelClient::send_text_safe(const std::string& s)
|
||||
{
|
||||
if (!client_) return;
|
||||
client_->get_io_service().post(
|
||||
[this, s]()
|
||||
{
|
||||
websocketpp::lib::error_code ec;
|
||||
client_->send(hdl_, s, websocketpp::frame::opcode::text, ec);
|
||||
if (ec)
|
||||
{
|
||||
std::printf("[Tunnel] send_text failed: %s\n", ec.message().c_str());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void TunnelClient::send_binary_safe(const void* data, size_t len)
|
||||
{
|
||||
if (!client_) return;
|
||||
std::string payload(reinterpret_cast<const char*>(data), len); // 拷贝一份,避免原buffer生命周期问题
|
||||
client_->get_io_service().post(
|
||||
[this, payload]()
|
||||
{
|
||||
websocketpp::lib::error_code ec;
|
||||
client_->send(hdl_, payload, websocketpp::frame::opcode::binary, ec);
|
||||
if (ec)
|
||||
{
|
||||
std::printf("[Tunnel] send_binary failed: %s\n", ec.message().c_str());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// =========================== 核心:处理本地 HTTP 并回传 ===========================
|
||||
|
||||
void TunnelClient::handle_request_and_reply(const json& req)
|
||||
{
|
||||
const std::string req_id = req.value("req_id", "");
|
||||
const std::string method = req.value("method", "GET");
|
||||
const std::string path = req.value("path", "/");
|
||||
const std::string body = req.value("body", "");
|
||||
|
||||
std::printf("[Tunnel] req_id=%s %s %s\n", req_id.c_str(), method.c_str(), path.c_str());
|
||||
|
||||
httplib::Client cli("127.0.0.1", local_port_);
|
||||
cli.set_read_timeout(10, 0);
|
||||
|
||||
httplib::Result res;
|
||||
cli.set_connection_timeout(kHttpTimeoutSec, 0);
|
||||
cli.set_read_timeout(kHttpTimeoutSec, 0);
|
||||
cli.set_write_timeout(kHttpTimeoutSec, 0);
|
||||
cli.set_keep_alive(true);
|
||||
|
||||
// 这里只实现 GET / POST;你可以补 PUT/DELETE
|
||||
if (method == "GET")
|
||||
{
|
||||
res = cli.Get(path.c_str());
|
||||
}
|
||||
else if (method == "POST")
|
||||
{
|
||||
res = cli.Post(path.c_str(), body, "application/json");
|
||||
}
|
||||
else
|
||||
{
|
||||
return json({{"status", 405}, {"body", "unsupported method"}}).dump();
|
||||
// 关键:用流式接收响应体,避免 res->body 把大文件吃进内存
|
||||
bool started_header = false;
|
||||
int status_code = 0;
|
||||
std::string content_type = "application/octet-stream";
|
||||
|
||||
auto ok = cli.Get(
|
||||
path.c_str(),
|
||||
// on_response: 拿到 header 时回一个 header frame
|
||||
[&](const httplib::Response& res)
|
||||
{
|
||||
status_code = res.status;
|
||||
auto it = res.headers.find("Content-Type");
|
||||
if (it != res.headers.end()) content_type = it->second;
|
||||
|
||||
json hdr = {
|
||||
{"type", "header"},
|
||||
{"req_id", req_id},
|
||||
{"status", status_code},
|
||||
{"content_type", content_type},
|
||||
};
|
||||
send_text_safe(hdr.dump());
|
||||
started_header = true;
|
||||
|
||||
std::printf("[Tunnel] local status=%d ct=%s\n", status_code, content_type.c_str());
|
||||
return true; // continue
|
||||
},
|
||||
// on_data: 每块数据都用 binary frame 发给服务器
|
||||
[&](const char* data, size_t data_length)
|
||||
{
|
||||
if (!started_header)
|
||||
{
|
||||
// 极少情况:没走到 on_response(按 httplib 实现一般不会发生)
|
||||
json hdr = {
|
||||
{"type", "header"}, {"req_id", req_id}, {"status", 200}, {"content_type", content_type}};
|
||||
send_text_safe(hdr.dump());
|
||||
started_header = true;
|
||||
}
|
||||
send_binary_safe(data, data_length);
|
||||
return true; // continue
|
||||
});
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}};
|
||||
send_text_safe(err.dump());
|
||||
std::printf("[Tunnel] local http error (GET)\n");
|
||||
return;
|
||||
}
|
||||
|
||||
// 结束帧
|
||||
json end = {{"type", "end"}, {"req_id", req_id}};
|
||||
send_text_safe(end.dump());
|
||||
std::printf("[Tunnel] done req_id=%s\n", req_id.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
if (!res)
|
||||
if (method == "POST")
|
||||
{
|
||||
return json({{"status", 500}, {"body", "local http error"}}).dump();
|
||||
// POST 多数是小 JSON,先简单按文本处理
|
||||
auto res = cli.Post(path.c_str(), body, "application/json");
|
||||
if (!res)
|
||||
{
|
||||
json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}};
|
||||
send_text_safe(err.dump());
|
||||
std::printf("[Tunnel] local http error (POST)\n");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string ct = "application/json";
|
||||
auto it = res->headers.find("Content-Type");
|
||||
if (it != res->headers.end()) ct = it->second;
|
||||
|
||||
json out = {
|
||||
{"type", "json"}, {"req_id", req_id}, {"status", res->status}, {"content_type", ct}, {"body", res->body},
|
||||
};
|
||||
send_text_safe(out.dump());
|
||||
std::printf("[Tunnel] POST done req_id=%s status=%d\n", req_id.c_str(), res->status);
|
||||
return;
|
||||
}
|
||||
|
||||
return json({{"status", res->status}, {"body", res->body}}).dump();
|
||||
json out = {{"type", "json"}, {"req_id", req_id}, {"status", 405}, {"body", "unsupported method"}};
|
||||
send_text_safe(out.dump());
|
||||
}
|
||||
|
||||
// =========================== run loop ===========================
|
||||
|
||||
void TunnelClient::run_loop()
|
||||
{
|
||||
ws_client c;
|
||||
c.clear_access_channels(websocketpp::log::alevel::all);
|
||||
c.init_asio();
|
||||
|
||||
websocketpp::connection_hdl hdl;
|
||||
client_ = &c;
|
||||
|
||||
// ----------------------- WebSocket 回调 -----------------------
|
||||
c.set_open_handler(
|
||||
[&](websocketpp::connection_hdl h)
|
||||
[this](websocketpp::connection_hdl h)
|
||||
{
|
||||
hdl = h;
|
||||
printf("[Tunnel] Connected to server\n");
|
||||
hdl_ = h;
|
||||
std::printf("[Tunnel] Connected to server\n");
|
||||
});
|
||||
|
||||
c.set_close_handler([&](websocketpp::connection_hdl) { printf("[Tunnel] Disconnected from server\n"); });
|
||||
c.set_close_handler([this](websocketpp::connection_hdl) { std::printf("[Tunnel] Disconnected from server\n"); });
|
||||
|
||||
c.set_fail_handler([this](websocketpp::connection_hdl) { std::printf("[Tunnel] Connect failed\n"); });
|
||||
|
||||
c.set_message_handler(
|
||||
[&](websocketpp::connection_hdl, ws_client::message_ptr msg)
|
||||
[this](websocketpp::connection_hdl, ws_client::message_ptr msg)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::string payload = msg->get_payload();
|
||||
if (msg->get_opcode() != websocketpp::frame::opcode::text)
|
||||
{
|
||||
std::printf("[Tunnel] ignore non-text msg from server\n");
|
||||
return;
|
||||
}
|
||||
auto payload = msg->get_payload();
|
||||
json req = json::parse(payload);
|
||||
|
||||
std::string req_id = req["req_id"];
|
||||
std::string method = req["method"];
|
||||
std::string path = req["path"];
|
||||
std::string body = req.value("body", "");
|
||||
|
||||
std::string local_result = handle_local_http(method, path, body);
|
||||
|
||||
json resp;
|
||||
resp["req_id"] = req_id;
|
||||
resp["resp"] = json::parse(local_result);
|
||||
|
||||
c.send(hdl, resp.dump(), websocketpp::frame::opcode::text);
|
||||
// 同步处理会阻塞 websocket 线程:这里开线程处理,避免卡住心跳/读写
|
||||
std::thread(
|
||||
[this, req]()
|
||||
{
|
||||
try
|
||||
{
|
||||
handle_request_and_reply(req);
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
std::printf("[Tunnel] handler exception: %s\n", e.what());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::printf("[Tunnel] handler unknown exception\n");
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
std::printf("[Tunnel] JSON parse error: %s\n", e.what());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
printf("[Tunnel] JSON or HTTP error\n");
|
||||
std::printf("[Tunnel] JSON parse unknown error\n");
|
||||
}
|
||||
});
|
||||
|
||||
// ----------------------- 建立连接 -----------------------
|
||||
websocketpp::lib::error_code ec;
|
||||
auto conn = c.get_connection(ws_url_, ec);
|
||||
if (ec)
|
||||
{
|
||||
printf("[Tunnel] Connection init failed: %s\n", ec.message().c_str());
|
||||
std::printf("[Tunnel] Connection init failed: %s\n", ec.message().c_str());
|
||||
client_ = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
c.connect(conn);
|
||||
|
||||
// ----------------------- 主循环 -----------------------
|
||||
while (running_)
|
||||
// 用 run():稳定事件循环
|
||||
while (running_.load())
|
||||
{
|
||||
// 处理一次事件
|
||||
c.run_one();
|
||||
try
|
||||
{
|
||||
c.run();
|
||||
// run() 返回说明 stopped 了,稍等再重连
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
std::printf("[Tunnel] run exception: %s\n", e.what());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::printf("[Tunnel] run unknown exception\n");
|
||||
}
|
||||
|
||||
// 如果连接停止(断线、错误等)——退出 loop
|
||||
if (c.stopped()) break;
|
||||
if (!running_.load()) break;
|
||||
|
||||
std::printf("[Tunnel] reconnecting in 2s...\n");
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
|
||||
// 重建连接
|
||||
c.reset();
|
||||
c.init_asio();
|
||||
|
||||
websocketpp::lib::error_code ec2;
|
||||
auto conn2 = c.get_connection(ws_url_, ec2);
|
||||
if (ec2)
|
||||
{
|
||||
std::printf("[Tunnel] reconnect init failed: %s\n", ec2.message().c_str());
|
||||
continue;
|
||||
}
|
||||
c.connect(conn2);
|
||||
}
|
||||
|
||||
printf("[Tunnel] Loop exit\n");
|
||||
std::printf("[Tunnel] Loop exit\n");
|
||||
client_ = nullptr;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user