调整部分报文和打印

This commit is contained in:
cxh 2026-01-29 16:34:05 +08:00
parent 62e8c71410
commit 9f9f311ab4
2 changed files with 48 additions and 51 deletions

View File

@ -64,8 +64,8 @@ static void send_heartbeat()
// ===== 新增:把 radio 塞进心跳 ===== // ===== 新增:把 radio 塞进心跳 =====
hb["radio"] = { hb["radio"] = {
{"state", ri.state}, {"rat", ri.rat}, {"pci", ri.pci}, {"band", ri.band}, {"arfcn", ri.arfcn}, {"state", ri.state}, {"rat", ri.rat}, {"pci", ri.pci}, {"band", ri.band},
{"rsrp", ri.rsrp}, {"rsrq", ri.rsrq}, {"sinr", ri.sinr}, {"raw", ri.raw}, {"arfcn", ri.arfcn}, {"rsrp", ri.rsrp}, {"rsrq", ri.rsrq}, {"sinr", ri.sinr},
}; };
// 发布心跳 // 发布心跳

View File

@ -12,6 +12,7 @@
#include <websocketpp/config/asio_no_tls_client.hpp> #include <websocketpp/config/asio_no_tls_client.hpp>
#include "httplib.h" #include "httplib.h"
#include "logger.hpp"
using json = nlohmann::json; using json = nlohmann::json;
using ws_client = websocketpp::client<websocketpp::config::asio_client>; using ws_client = websocketpp::client<websocketpp::config::asio_client>;
@ -23,12 +24,16 @@ constexpr int kHttpTimeoutSec = 30;
static bool looks_binary_content_type(const std::string& ct) static bool looks_binary_content_type(const std::string& ct)
{ {
// 你可以按需扩展
if (ct.find("video/") == 0) return true; if (ct.find("video/") == 0) return true;
if (ct.find("application/octet-stream") != std::string::npos) return true; if (ct.find("application/octet-stream") != std::string::npos) return true;
if (ct.find("image/") == 0) return true; if (ct.find("image/") == 0) return true;
return false; return false;
} }
// 小工具:拼 log 字符串
static inline std::string kv(const std::string& k, const std::string& v) { return k + "=" + v; }
static inline std::string kv(const std::string& k, int v) { return k + "=" + std::to_string(v); }
} // namespace } // namespace
TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port) TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port)
@ -46,7 +51,7 @@ void TunnelClient::stop()
{ {
running_.store(false); running_.store(false);
ws_client* cli = client_; // 拷贝一份,避免 race ws_client* cli = client_; // copy to avoid race
if (cli) if (cli)
{ {
cli->get_io_service().post( cli->get_io_service().post(
@ -66,11 +71,12 @@ void TunnelClient::stop()
if (th_.joinable()) th_.join(); if (th_.joinable()) th_.join();
} }
// =========================== 发送工具(线程安全) =========================== // =========================== send helpers (thread-safe) ===========================
void TunnelClient::send_text_safe(const std::string& s) void TunnelClient::send_text_safe(const std::string& s)
{ {
if (!client_) return; if (!client_) return;
client_->get_io_service().post( client_->get_io_service().post(
[this, s]() [this, s]()
{ {
@ -78,7 +84,7 @@ void TunnelClient::send_text_safe(const std::string& s)
client_->send(hdl_, s, websocketpp::frame::opcode::text, ec); client_->send(hdl_, s, websocketpp::frame::opcode::text, ec);
if (ec) if (ec)
{ {
std::printf("[Tunnel] send_text failed: %s\n", ec.message().c_str()); LOG_ERROR(std::string("[Tunnel] send_text failed: ") + ec.message());
} }
}); });
} }
@ -86,7 +92,8 @@ void TunnelClient::send_text_safe(const std::string& s)
void TunnelClient::send_binary_safe(const void* data, size_t len) void TunnelClient::send_binary_safe(const void* data, size_t len)
{ {
if (!client_) return; if (!client_) return;
std::string payload(reinterpret_cast<const char*>(data), len); // 拷贝一份避免原buffer生命周期问题
std::string payload(reinterpret_cast<const char*>(data), len); // copy to avoid buffer lifetime issue
client_->get_io_service().post( client_->get_io_service().post(
[this, payload]() [this, payload]()
{ {
@ -94,12 +101,12 @@ void TunnelClient::send_binary_safe(const void* data, size_t len)
client_->send(hdl_, payload, websocketpp::frame::opcode::binary, ec); client_->send(hdl_, payload, websocketpp::frame::opcode::binary, ec);
if (ec) if (ec)
{ {
std::printf("[Tunnel] send_binary failed: %s\n", ec.message().c_str()); LOG_ERROR(std::string("[Tunnel] send_binary failed: ") + ec.message());
} }
}); });
} }
// =========================== 核心:处理本地 HTTP 并回传 =========================== // =========================== core: proxy local HTTP and reply ===========================
void TunnelClient::handle_request_and_reply(const json& req) void TunnelClient::handle_request_and_reply(const json& req)
{ {
@ -108,7 +115,7 @@ void TunnelClient::handle_request_and_reply(const json& req)
const std::string path = req.value("path", "/"); const std::string path = req.value("path", "/");
const std::string body = req.value("body", ""); const std::string body = req.value("body", "");
std::printf("[Tunnel] req_id=%s %s %s\n", req_id.c_str(), method.c_str(), path.c_str()); LOG_INFO("[Tunnel] " + kv("req_id", req_id) + " " + method + " " + path);
httplib::Client cli("127.0.0.1", local_port_); httplib::Client cli("127.0.0.1", local_port_);
cli.set_connection_timeout(kHttpTimeoutSec, 0); cli.set_connection_timeout(kHttpTimeoutSec, 0);
@ -116,17 +123,15 @@ void TunnelClient::handle_request_and_reply(const json& req)
cli.set_write_timeout(kHttpTimeoutSec, 0); cli.set_write_timeout(kHttpTimeoutSec, 0);
cli.set_keep_alive(true); cli.set_keep_alive(true);
// 这里只实现 GET / POST你可以补 PUT/DELETE
if (method == "GET") if (method == "GET")
{ {
// 关键:用流式接收响应体,避免 res->body 把大文件吃进内存
bool started_header = false; bool started_header = false;
int status_code = 0; int status_code = 0;
std::string content_type = "application/octet-stream"; std::string content_type = "application/octet-stream";
auto ok = cli.Get( auto ok = cli.Get(
path.c_str(), path.c_str(),
// on_response: 拿到 header 时回一个 header frame // on_response: got headers
[&](const httplib::Response& res) [&](const httplib::Response& res)
{ {
status_code = res.status; status_code = res.status;
@ -142,20 +147,24 @@ void TunnelClient::handle_request_and_reply(const json& req)
send_text_safe(hdr.dump()); send_text_safe(hdr.dump());
started_header = true; started_header = true;
std::printf("[Tunnel] local status=%d ct=%s\n", status_code, content_type.c_str()); LOG_INFO("[Tunnel] local " + kv("status", status_code) + " " + kv("ct", content_type));
return true; // continue return true; // continue
}, },
// on_data: 每块数据都用 binary frame 发给服务器 // on_data: stream body in binary frames
[&](const char* data, size_t data_length) [&](const char* data, size_t data_length)
{ {
if (!started_header) if (!started_header)
{ {
// 极少情况:没走到 on_response按 httplib 实现一般不会发生)
json hdr = { json hdr = {
{"type", "header"}, {"req_id", req_id}, {"status", 200}, {"content_type", content_type}}; {"type", "header"},
{"req_id", req_id},
{"status", 200},
{"content_type", content_type},
};
send_text_safe(hdr.dump()); send_text_safe(hdr.dump());
started_header = true; started_header = true;
} }
send_binary_safe(data, data_length); send_binary_safe(data, data_length);
return true; // continue return true; // continue
}); });
@ -164,26 +173,24 @@ void TunnelClient::handle_request_and_reply(const json& req)
{ {
json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}}; json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}};
send_text_safe(err.dump()); send_text_safe(err.dump());
std::printf("[Tunnel] local http error (GET)\n"); LOG_ERROR("[Tunnel] local http error (GET) " + kv("req_id", req_id));
return; return;
} }
// 结束帧
json end = {{"type", "end"}, {"req_id", req_id}}; json end = {{"type", "end"}, {"req_id", req_id}};
send_text_safe(end.dump()); send_text_safe(end.dump());
std::printf("[Tunnel] done req_id=%s\n", req_id.c_str()); LOG_INFO("[Tunnel] done " + kv("req_id", req_id));
return; return;
} }
if (method == "POST") if (method == "POST")
{ {
// POST 多数是小 JSON先简单按文本处理
auto res = cli.Post(path.c_str(), body, "application/json"); auto res = cli.Post(path.c_str(), body, "application/json");
if (!res) if (!res)
{ {
json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}}; json err = {{"type", "json"}, {"req_id", req_id}, {"status", 500}, {"body", "local http error"}};
send_text_safe(err.dump()); send_text_safe(err.dump());
std::printf("[Tunnel] local http error (POST)\n"); LOG_ERROR("[Tunnel] local http error (POST) " + kv("req_id", req_id));
return; return;
} }
@ -195,12 +202,13 @@ void TunnelClient::handle_request_and_reply(const json& req)
{"type", "json"}, {"req_id", req_id}, {"status", res->status}, {"content_type", ct}, {"body", res->body}, {"type", "json"}, {"req_id", req_id}, {"status", res->status}, {"content_type", ct}, {"body", res->body},
}; };
send_text_safe(out.dump()); send_text_safe(out.dump());
std::printf("[Tunnel] POST done req_id=%s status=%d\n", req_id.c_str(), res->status); LOG_INFO("[Tunnel] POST done " + kv("req_id", req_id) + " " + kv("status", res->status));
return; return;
} }
json out = {{"type", "json"}, {"req_id", req_id}, {"status", 405}, {"body", "unsupported method"}}; json out = {{"type", "json"}, {"req_id", req_id}, {"status", 405}, {"body", "unsupported method"}};
send_text_safe(out.dump()); send_text_safe(out.dump());
LOG_WARN("[Tunnel] unsupported method " + kv("req_id", req_id) + " " + kv("method", method));
} }
// =========================== run loop =========================== // =========================== run loop ===========================
@ -209,34 +217,27 @@ void TunnelClient::run_loop()
{ {
while (running_.load()) while (running_.load())
{ {
// 每次循环都创建一个新的 client避免 websocketpp 状态复用崩溃
ws_client c; ws_client c;
c.clear_access_channels(websocketpp::log::alevel::all); c.clear_access_channels(websocketpp::log::alevel::all);
c.clear_error_channels(websocketpp::log::elevel::all); c.clear_error_channels(websocketpp::log::elevel::all);
c.init_asio(); c.init_asio();
// (可选但推荐)让 run() 不会因为没有 work 而提前退出
c.start_perpetual(); c.start_perpetual();
// 让 stop() 能安全拿到当前 client // expose current client to stop()
{
// 如果你不想引入 mutex这段也可以不加但建议加
client_ = &c; client_ = &c;
}
c.set_open_handler( c.set_open_handler(
[this](websocketpp::connection_hdl h) [this](websocketpp::connection_hdl h)
{ {
hdl_ = h; hdl_ = h;
std::printf("[Tunnel] Connected to server\n"); LOG_INFO("[Tunnel] Connected to server");
}); });
// IMPORTANT: do not print reconnect related logs
c.set_close_handler( c.set_close_handler(
[this, &c](websocketpp::connection_hdl) [this, &c](websocketpp::connection_hdl)
{ {
std::printf("[Tunnel] Disconnected from server\n");
// 让 run() 退出,进入下一轮重连
c.stop_perpetual(); c.stop_perpetual();
c.stop(); c.stop();
}); });
@ -244,8 +245,6 @@ void TunnelClient::run_loop()
c.set_fail_handler( c.set_fail_handler(
[this, &c](websocketpp::connection_hdl) [this, &c](websocketpp::connection_hdl)
{ {
std::printf("[Tunnel] Connect failed\n");
// 让 run() 退出,进入下一轮重连
c.stop_perpetual(); c.stop_perpetual();
c.stop(); c.stop();
}); });
@ -257,12 +256,12 @@ void TunnelClient::run_loop()
{ {
if (msg->get_opcode() != websocketpp::frame::opcode::text) if (msg->get_opcode() != websocketpp::frame::opcode::text)
{ {
std::printf("[Tunnel] ignore non-text msg from server\n"); LOG_WARN("[Tunnel] ignore non-text msg from server");
return; return;
} }
json req = json::parse(msg->get_payload()); json req = json::parse(msg->get_payload());
// 你原来的做法:起线程处理本地 HTTPOK
std::thread( std::thread(
[this, req]() [this, req]()
{ {
@ -272,22 +271,22 @@ void TunnelClient::run_loop()
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
std::printf("[Tunnel] handler exception: %s\n", e.what()); LOG_ERROR(std::string("[Tunnel] handler exception: ") + e.what());
} }
catch (...) catch (...)
{ {
std::printf("[Tunnel] handler unknown exception\n"); LOG_ERROR("[Tunnel] handler unknown exception");
} }
}) })
.detach(); .detach();
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
std::printf("[Tunnel] JSON parse error: %s\n", e.what()); LOG_ERROR(std::string("[Tunnel] JSON parse error: ") + e.what());
} }
catch (...) catch (...)
{ {
std::printf("[Tunnel] JSON parse unknown error\n"); LOG_ERROR("[Tunnel] JSON parse unknown error");
} }
}); });
@ -295,37 +294,35 @@ void TunnelClient::run_loop()
auto conn = c.get_connection(ws_url_, ec); auto conn = c.get_connection(ws_url_, ec);
if (ec) if (ec)
{ {
std::printf("[Tunnel] Connection init failed: %s\n", ec.message().c_str());
client_ = nullptr; client_ = nullptr;
// 这里别直接 return继续重试 // do not print reconnect logs; just sleep and retry
goto retry_sleep; std::this_thread::sleep_for(std::chrono::seconds(2));
continue;
} }
c.connect(conn); c.connect(conn);
try try
{ {
// 阻塞运行,直到 fail/close 里 stop(),或 stop() 主动 close
c.run(); c.run();
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
std::printf("[Tunnel] run exception: %s\n", e.what()); LOG_ERROR(std::string("[Tunnel] run exception: ") + e.what());
} }
catch (...) catch (...)
{ {
std::printf("[Tunnel] run unknown exception\n"); LOG_ERROR("[Tunnel] run unknown exception");
} }
client_ = nullptr; client_ = nullptr;
if (!running_.load()) break; if (!running_.load()) break;
retry_sleep: // do not print reconnect logs
std::printf("[Tunnel] reconnecting in 2s...\n");
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(std::chrono::seconds(2));
} }
client_ = nullptr; client_ = nullptr;
std::printf("[Tunnel] Loop exit\n"); LOG_INFO("[Tunnel] Loop exit");
} }