diff --git a/include/tunnel_client.hpp b/include/tunnel_client.hpp new file mode 100644 index 0000000..676a9fb --- /dev/null +++ b/include/tunnel_client.hpp @@ -0,0 +1,25 @@ +#pragma once +#include +#include +#include + +class TunnelClient +{ + public: + TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port); + + void start(); + void stop(); + + private: + void run_loop(); + std::string handle_local_http(const std::string& method, const std::string& path, const std::string& body); + + private: + std::string vid_; + std::string ws_url_; + int local_port_; + + std::atomic running_{false}; + std::thread th_; +}; diff --git a/src/main.cpp b/src/main.cpp index 5ba080a..7ce2b40 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -12,8 +12,10 @@ #include "record_manager.hpp" #include "rtmp_manager.hpp" #include "serial_AT.hpp" +#include "tunnel_client.hpp" std::atomic g_running(true); +TunnelClient* g_tunnel_client = nullptr; static void signal_handler(int signum) { @@ -62,6 +64,18 @@ int main() LOG_INFO("[MAIN] Starting all record streams..."); RTMPManager::start_all(); + { + std::string vid = g_app_config.vehicle_id; + int local_http_port = 2980; + + std::string tunnel_server = "ws://36.153.162.171:2088/tunnel"; + + g_tunnel_client = new TunnelClient(vid, tunnel_server, local_http_port); + + g_tunnel_client->start(); + LOG_INFO("[MAIN] TunnelClient started: %s", tunnel_server.c_str()); + } + // 启动 MQTT 线程 std::thread mqtt_thread( [] @@ -84,6 +98,14 @@ int main() // ---------- 退出清理 ---------- LOG_INFO("[MAIN] Shutdown requested. Stopping RTMP streams..."); + if (g_tunnel_client) + { + g_tunnel_client->stop(); + delete g_tunnel_client; + g_tunnel_client = nullptr; + LOG_INFO("[MAIN] TunnelClient stopped."); + } + // 停止 RecordManager 自动扫描线程 if (g_record_manager) g_record_manager->stopAutoScan(); diff --git a/src/tunnel_client.cpp b/src/tunnel_client.cpp new file mode 100644 index 0000000..a187de5 --- /dev/null +++ b/src/tunnel_client.cpp @@ -0,0 +1,117 @@ +#include "tunnel_client.hpp" + +#include +#include +#include + +#include "httplib.h" +using json = nlohmann::json; + +using ws_client = websocketpp::client; + +TunnelClient::TunnelClient(const std::string& vid, const std::string& server_ws_url, int local_http_port) + : vid_(vid), ws_url_(server_ws_url + "?vid=" + vid), local_port_(local_http_port) +{ +} + +void TunnelClient::start() +{ + running_ = true; + th_ = std::thread(&TunnelClient::run_loop, this); +} + +void TunnelClient::stop() +{ + running_ = false; + if (th_.joinable()) th_.join(); +} + +std::string TunnelClient::handle_local_http(const std::string& method, const std::string& path, const std::string& body) +{ + httplib::Client cli("127.0.0.1", local_port_); + cli.set_read_timeout(10, 0); + + httplib::Result res; + + if (method == "GET") + { + res = cli.Get(path.c_str()); + } + else if (method == "POST") + { + res = cli.Post(path.c_str(), body, "application/json"); + } + else + { + json err = {{"status", 405}, {"body", "unsupported method"}}; + return err.dump(); + } + + if (!res) + { + json err = {{"status", 500}, {"body", "local http error"}}; + return err.dump(); + } + + json resp = {{"status", res->status}, {"body", res->body}}; + return resp.dump(); +} + +void TunnelClient::run_loop() +{ + ws_client c; + c.init_asio(); + + websocketpp::connection_hdl hdl; + + c.set_open_handler( + [&](websocketpp::connection_hdl h) + { + hdl = h; + printf("[Tunnel] Connected to server\n"); + }); + + c.set_close_handler([&](websocketpp::connection_hdl) { printf("[Tunnel] Disconnected from server\n"); }); + + c.set_message_handler( + [&](websocketpp::connection_hdl, ws_client::message_ptr msg) + { + // 服务器传来的 HTTP 请求(JSON) + std::string payload = msg->get_payload(); + json req = json::parse(payload); + + std::string req_id = req["req_id"]; + std::string method = req["method"]; + std::string path = req["path"]; + std::string body = req.value("body", ""); + + // 转发给本地 HTTP 服务 + std::string result = handle_local_http(method, path, body); + + // 包装 response + json resp; + resp["req_id"] = req_id; + resp["resp"] = json::parse(result); + + // 发送回服务器 + c.send(hdl, resp.dump(), websocketpp::frame::opcode::text); + }); + + websocketpp::lib::error_code ec; + auto conn = c.get_connection(ws_url_, ec); + if (ec) + { + printf("[Tunnel] Connection init failed: %s\n", ec.message().c_str()); + return; + } + + c.connect(conn); + + while (running_) + { + c.run_once(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + printf("[Tunnel] Loop exit\n"); +}