From 8f5821824a13c29aca845ed0cfb63502f135da1a Mon Sep 17 00:00:00 2001 From: cxh Date: Tue, 6 Jan 2026 09:38:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=BD=91=E7=BB=9C=E8=B4=A8?= =?UTF-8?q?=E9=87=8F=E7=9B=91=E6=B5=8B=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/serial_AT.hpp | 9 ++ include/serial_port.h | 54 ++++++++++++ src/main.cpp | 4 + src/rtmp_manager.cpp | 113 +++++++++++++++++++----- src/serial_AT.cpp | 162 ++++++++++++++++++++++++++++++++++ src/serial_port.cpp | 197 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 519 insertions(+), 20 deletions(-) create mode 100644 include/serial_AT.hpp create mode 100644 include/serial_port.h create mode 100644 src/serial_AT.cpp create mode 100644 src/serial_port.cpp diff --git a/include/serial_AT.hpp b/include/serial_AT.hpp new file mode 100644 index 0000000..c23a1bb --- /dev/null +++ b/include/serial_AT.hpp @@ -0,0 +1,9 @@ +#pragma once + +#include "serial_port.h" + +// 初始化 AT 串口(启动线程) +void init_serial_at(const std::string& device, int baudrate); + +// 停止 AT 串口(停止线程,join) +void stop_serial_at(); diff --git a/include/serial_port.h b/include/serial_port.h new file mode 100644 index 0000000..19b11cc --- /dev/null +++ b/include/serial_port.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "logger.hpp" + +class SerialPort +{ + public: + using ReceiveCallback = std::function&)>; + using ReceiveStringCallback = std::function; + + SerialPort(const std::string& id, const std::string& device, int baudrate, int retry_interval = 5); + + ~SerialPort(); + + void start(); // 启动串口(含自动重连) + void stop(); // 停止串口 + + bool is_open() const; + bool send_data(const std::vector& data); + bool send_data(const std::string& data); + + void set_receive_callback(ReceiveCallback cb); + void set_receive_callback(ReceiveStringCallback cb); + + private: + bool open_port(); + void close_port(); + void reader_loop(); + void reconnect_loop(); + bool configure_port(int fd); + + private: + std::string id_; + std::string device_; + int baudrate_; + int fd_ = -1; + + std::atomic running_{false}; + std::atomic stop_flag_{false}; + + std::thread reader_thread_; + std::thread reconnect_thread_; + std::mutex send_mutex_; + + ReceiveCallback receive_callback_; + int retry_interval_; +}; diff --git a/src/main.cpp b/src/main.cpp index 206d59d..0fa1499 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -10,6 +10,7 @@ #include "logger.hpp" #include "mqtt_client_wrapper.hpp" #include "rtmp_manager.hpp" +#include "serial_AT.hpp" std::atomic g_running(true); @@ -48,6 +49,7 @@ int main() return -1; } + init_serial_at("/dev/ttyUSB3", 115200); // ---------- GStreamer ---------- RTMPManager::init(); @@ -70,6 +72,8 @@ int main() // ---------- 退出 ---------- LOG_INFO("[MAIN] Shutdown requested. Stopping services..."); + stop_serial_at(); + RTMPManager::stop_all(); if (mqtt_thread.joinable()) mqtt_thread.join(); diff --git a/src/rtmp_manager.cpp b/src/rtmp_manager.cpp index 7cb0d34..5c3ccd3 100644 --- a/src/rtmp_manager.cpp +++ b/src/rtmp_manager.cpp @@ -63,41 +63,114 @@ void RTMPManager::init() std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } // ========== 创建推流管线 ========== +// GstElement* RTMPManager::create_pipeline(const Camera& cam) +// { +// const int width = cam.width; +// const int height = cam.height; +// const int fps = cam.fps; +// const int bitrate = cam.bitrate; + +// // MediaMTX 中的 stream key +// const std::string stream_name = cam.name; + +// // RTMP 推送到 MediaMTX +// // mediamtx.yml 中 paths 会自动创建 +// const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + stream_name; + +// /* +// * Pipeline 说明: +// * v4l2src -> mpph264enc -> h264parse -> flvmux -> rtmpsink +// * +// * - 不使用 tee(降低死锁概率) +// * - 不引入音频(MediaMTX 不强制要求) +// * - 纯视频、纯 RTMP、纯 TCP +// */ +// std::string pipeline_str = "v4l2src name=src device=" + cam.device + +// " ! video/x-raw,format=NV12,width=" + std::to_string(width) + +// ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + +// "/1 " +// " ! mpph264enc bps=" + +// std::to_string(bitrate) + " gop=" + std::to_string(fps) + +// " rc-mode=cbr " +// " ! h264parse name=parse " +// " ! flvmux streamable=true " +// " ! rtmpsink location=\"" + +// rtmp_url + +// "\" " +// " sync=false async=false"; + +// GError* error = nullptr; +// GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); +// if (error) +// { +// LOG_ERROR(std::string("[RTMP] Pipeline creation failed: ") + error->message); +// g_error_free(error); +// return nullptr; +// } + +// return pipeline; +// } + GstElement* RTMPManager::create_pipeline(const Camera& cam) { - const int width = cam.width; - const int height = cam.height; + const int out_width = cam.width; // 推流分辨率(如 1280 / 960 / 720) + const int out_height = cam.height; const int fps = cam.fps; const int bitrate = cam.bitrate; - // MediaMTX 中的 stream key + // MediaMTX stream key const std::string stream_name = cam.name; - - // RTMP 推送到 MediaMTX - // mediamtx.yml 中 paths 会自动创建 const std::string rtmp_url = "rtmp://127.0.0.1:1935/" + stream_name; /* - * Pipeline 说明: - * v4l2src -> mpph264enc -> h264parse -> flvmux -> rtmpsink + * A 方案 Pipeline: * - * - 不使用 tee(降低死锁概率) - * - 不引入音频(MediaMTX 不强制要求) - * - 纯视频、纯 RTMP、纯 TCP + * v4l2src (原生 1280x960) + * -> videoscale + caps(编码前缩放) + * -> queue(低延迟) + * -> mpph264enc(CBR) + * -> h264parse + * -> flvmux + * -> rtmpsink */ std::string pipeline_str = "v4l2src name=src device=" + cam.device + - " ! video/x-raw,format=NV12,width=" + std::to_string(width) + - ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + + " io-mode=dmabuf " + + // ⭐ 相机真实输出:原生 1280x960 + "! video/x-raw,format=NV12," + "width=1280,height=960," + "framerate=" + + std::to_string(fps) + "/1 " - " ! mpph264enc bps=" + - std::to_string(bitrate) + " gop=" + std::to_string(fps) + - " rc-mode=cbr " - " ! h264parse name=parse " - " ! flvmux streamable=true " - " ! rtmpsink location=\"" + + + // ⭐ 编码前缩放(关键) + "! videoscale " + "! video/x-raw," + "width=" + + std::to_string(out_width) + ",height=" + std::to_string(out_height) + + " " + + // ⭐ 低延迟队列,防止下游阻塞反噬 v4l2 + "! queue max-size-buffers=2 max-size-time=0 leaky=downstream " + + // ⭐ 硬件编码,SIM 卡友好 + "! mpph264enc " + "rc-mode=cbr " + "bps=" + + std::to_string(bitrate) + + " " + "gop=" + + std::to_string(fps) + + " " + "header-mode=each-idr " + "profile=baseline " + + "! h264parse config-interval=1 " + "! flvmux streamable=true " + "! rtmpsink location=\"" + rtmp_url + "\" " - " sync=false async=false"; + "sync=false async=false"; GError* error = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); diff --git a/src/serial_AT.cpp b/src/serial_AT.cpp new file mode 100644 index 0000000..5b8dcca --- /dev/null +++ b/src/serial_AT.cpp @@ -0,0 +1,162 @@ +#include "serial_AT.hpp" + +#include +#include +#include +#include +#include + +#include "logger.hpp" +#include "serial_port.h" + +// ================== 运行控制 ================== +static std::atomic serial_at_running{false}; + +// ================== AT 任务结构 ================== +struct AtTask +{ + std::string cmd; + int interval_sec; // 周期(秒) + int max_retries; // -1 表示无限 + int sent_count; + std::chrono::steady_clock::time_point last_sent; +}; + +static std::unique_ptr serial_at; +static std::thread serial_at_sender; +static std::mutex at_tasks_mutex; + +// ================== AT 任务列表 ================== +// +// CSQ : 高频心跳(信号强度) +// QENG : 低频全量无线质量 +// +static std::vector at_tasks = { + {"AT+CSQ", 5, -1, 0, {}}, // 每 5 秒 + {"AT+QENG=\"servingcell\"", 30, -1, 0, {}} // 每 30 秒 +}; + +// ================== 发送线程 ================== +static void serial_at_send_loop() +{ + LOG_INFO("[serial_at] Sender thread started"); + + while (serial_at_running.load(std::memory_order_relaxed)) + { + if (!serial_at || !serial_at->is_open()) + { + std::this_thread::sleep_for(std::chrono::seconds(1)); + continue; + } + + auto now = std::chrono::steady_clock::now(); + + { + std::lock_guard lock(at_tasks_mutex); + + for (auto& task : at_tasks) + { + bool need_send = false; + + if (task.last_sent.time_since_epoch().count() == 0) { need_send = true; } + else + { + auto elapsed = std::chrono::duration_cast(now - task.last_sent).count(); + if (elapsed >= task.interval_sec) need_send = true; + } + + if (need_send) + { + serial_at->send_data(task.cmd + "\r\n"); + task.sent_count++; + task.last_sent = now; + } + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + + LOG_INFO("[serial_at] Sender thread exiting"); +} + +// ================== 接收处理 ================== +static void handle_serial_at_data(const std::string& data) +{ + std::istringstream iss(data); + std::string line; + + while (std::getline(iss, line)) + { + // trim + line.erase(0, line.find_first_not_of(" \t\r\n")); + line.erase(line.find_last_not_of(" \t\r\n") + 1); + + if (line.empty() || line == "OK") continue; + + // ---------- CSQ ---------- + // +CSQ: , + if (line.rfind("+CSQ:", 0) == 0) + { + int rssi = -1, ber = -1; + if (sscanf(line.c_str(), "+CSQ: %d,%d", &rssi, &ber) == 2) + { + int dbm = (rssi >= 0 && rssi <= 31) ? (-113 + rssi * 2) : -999; + + LOG_INFO("[serial_at] CSQ rssi=" + std::to_string(rssi) + " (" + std::to_string(dbm) + + " dBm)" + " ber=" + + std::to_string(ber)); + } + continue; + } + + // ---------- QENG servingcell ---------- + if (line.rfind("+QENG:", 0) == 0 && line.find("servingcell") != std::string::npos) + { + // 先只打原始信息,后续你再精解析 + LOG_INFO("[serial_at] QENG: " + line); + continue; + } + + // ---------- 其它 AT 回显 ---------- + // 默认忽略,避免日志污染 + // 如遇现场问题,可临时加 LOG_INFO 打印 + } +} + +// ================== 初始化 ================== +void init_serial_at(const std::string& device, int baudrate) +{ + if (serial_at_running.load()) + { + LOG_WARN("[serial_at] Already running"); + return; + } + + serial_at_running.store(true, std::memory_order_relaxed); + + serial_at = std::make_unique("serial_at", device, baudrate, 5); + serial_at->set_receive_callback(handle_serial_at_data); + serial_at->start(); + + serial_at_sender = std::thread(serial_at_send_loop); +} + +// ================== 停止 ================== +void stop_serial_at() +{ + if (!serial_at_running.load()) return; + + LOG_INFO("[serial_at] Stopping..."); + + serial_at_running.store(false, std::memory_order_relaxed); + + if (serial_at) serial_at->stop(); + + if (serial_at_sender.joinable()) serial_at_sender.join(); + + serial_at.reset(); + + LOG_INFO("[serial_at] Stopped cleanly"); +} diff --git a/src/serial_port.cpp b/src/serial_port.cpp new file mode 100644 index 0000000..f8c1507 --- /dev/null +++ b/src/serial_port.cpp @@ -0,0 +1,197 @@ +#include "serial_port.h" + +#include +#include +#include + +#include +#include +#include + +// --------- 波特率映射 --------- +static speed_t baud_to_speed(int baud) +{ + switch (baud) + { + case 9600: + return B9600; + case 19200: + return B19200; + case 38400: + return B38400; + case 57600: + return B57600; + case 115200: + return B115200; +#ifdef B230400 + case 230400: + return B230400; +#endif + default: + return B115200; + } +} + +SerialPort::SerialPort(const std::string& id, const std::string& device, int baudrate, int retry_interval) + : id_(id), device_(device), baudrate_(baudrate), retry_interval_(retry_interval) +{ +} + +SerialPort::~SerialPort() { stop(); } + +void SerialPort::start() +{ + stop_flag_ = false; + reconnect_thread_ = std::thread(&SerialPort::reconnect_loop, this); +} + +void SerialPort::stop() +{ + stop_flag_ = true; + running_ = false; + + // 先关闭 fd,打断 reader 的阻塞 read() + close_port(); + + if (reader_thread_.joinable()) reader_thread_.join(); + if (reconnect_thread_.joinable()) reconnect_thread_.join(); +} + +bool SerialPort::open_port() +{ + fd_ = open(device_.c_str(), O_RDWR | O_NOCTTY | O_SYNC); + if (fd_ < 0) + { + LOG_ERROR("[" + id_ + "] Failed to open " + device_ + ": " + strerror(errno)); + return false; + } + + if (!configure_port(fd_)) + { + LOG_ERROR("[" + id_ + "] Failed to configure " + device_); + close(fd_); + fd_ = -1; + return false; + } + + running_ = true; + reader_thread_ = std::thread(&SerialPort::reader_loop, this); + + LOG_INFO("[" + id_ + "] Opened serial port " + device_ + " at " + std::to_string(baudrate_) + " baud"); + return true; +} + +void SerialPort::close_port() +{ + running_ = false; + + if (fd_ >= 0) + { + close(fd_); + fd_ = -1; + } +} + +bool SerialPort::is_open() const { return fd_ >= 0; } + +bool SerialPort::send_data(const std::vector& data) +{ + if (fd_ < 0) return false; + + std::lock_guard lock(send_mutex_); + ssize_t n = write(fd_, data.data(), data.size()); + return n == static_cast(data.size()); +} + +bool SerialPort::send_data(const std::string& data) +{ + return send_data(std::vector(data.begin(), data.end())); +} + +void SerialPort::set_receive_callback(ReceiveCallback cb) { receive_callback_ = std::move(cb); } + +void SerialPort::set_receive_callback(ReceiveStringCallback cb) +{ + receive_callback_ = [cb](const std::vector& data) { cb(std::string(data.begin(), data.end())); }; +} + +void SerialPort::reader_loop() +{ + std::vector buffer(1024); + + while (running_ && !stop_flag_) + { + int n = read(fd_, buffer.data(), buffer.size()); + + if (n > 0) + { + if (receive_callback_) receive_callback_({buffer.begin(), buffer.begin() + n}); + } + else if (n < 0) + { + LOG_ERROR("[" + id_ + "] Read error: " + std::string(strerror(errno))); + break; + } + // n == 0:超时,正常情况,继续读 + } + + running_ = false; +} + +static void interruptible_sleep(std::atomic& stop_flag, int seconds) +{ + using namespace std::chrono; + for (int i = 0; i < seconds * 10 && !stop_flag.load(std::memory_order_relaxed); ++i) + std::this_thread::sleep_for(100ms); +} + +void SerialPort::reconnect_loop() +{ + int current_interval = retry_interval_; + const int max_interval = 300; + + while (!stop_flag_) + { + if (open_port()) + { + // 不 join reader,只等 stop_flag_ + while (running_ && !stop_flag_) std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + close_port(); + LOG_WARN("[" + id_ + "] Port closed, will retry"); + current_interval = retry_interval_; + } + else + { + LOG_INFO("[" + id_ + "] Connect failed, retry in " + std::to_string(current_interval) + "s"); + + interruptible_sleep(stop_flag_, current_interval); + current_interval = std::min(current_interval * 2, max_interval); + } + } +} + +bool SerialPort::configure_port(int fd) +{ + struct termios tty{}; + if (tcgetattr(fd, &tty) != 0) return false; + + speed_t spd = baud_to_speed(baudrate_); + cfsetospeed(&tty, spd); + cfsetispeed(&tty, spd); + + tty.c_cflag = (tty.c_cflag & ~CSIZE) | CS8; + tty.c_iflag &= ~IGNBRK; + tty.c_lflag = 0; + tty.c_oflag = 0; + tty.c_cc[VMIN] = 0; + tty.c_cc[VTIME] = 10; + + tty.c_iflag &= ~(IXON | IXOFF | IXANY); + tty.c_cflag |= (CLOCAL | CREAD); + tty.c_cflag &= ~(PARENB | PARODD); + tty.c_cflag &= ~CSTOPB; + tty.c_cflag &= ~CRTSCTS; + + return tcsetattr(fd, TCSANOW, &tty) == 0; +}