// rtsp_manager.cpp #include "rtmp_manager.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include "logger.hpp" // ---------- 小工具 ---------- static bool device_exists(const std::string &path) { struct stat st; return (stat(path.c_str(), &st) == 0); } static std::string get_ip_address(const std::string &ifname) { struct ifaddrs *ifaddr, *ifa; char ip[INET_ADDRSTRLEN] = {0}; if (getifaddrs(&ifaddr) == -1) { perror("getifaddrs"); return ""; } for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { if (ifa->ifa_addr == nullptr) continue; if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { void *addr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; if (inet_ntop(AF_INET, addr, ip, sizeof(ip))) { freeifaddrs(ifaddr); return ip; } } } freeifaddrs(ifaddr); return ""; // 没找到 } static inline std::string stream_type_suffix(StreamType type) { return (type == StreamType::MAIN) ? "_main" : "_sub"; } // ---------- 静态成员 ---------- std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; RTMPManager::StreamCallback RTMPManager::status_callback = nullptr; // ---------- 内部工具 ---------- std::string RTMPManager::make_stream_key(const std::string &cam_name, StreamType type) { return cam_name + stream_type_suffix(type); } void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized."); } void RTMPManager::set_status_callback(StreamCallback cb) { status_callback = std::move(cb); } void RTMPManager::update_status(const std::string &key, const StreamStatus &status) { { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it != streams.end()) it->second->status = status; } if (status_callback) status_callback(key, status); } // 决定 app:MAIN -> record(录像),SUB -> live(按需) static inline std::string pick_app(StreamType type) { return (type == StreamType::MAIN) ? "record" : "live"; } GstElement *RTMPManager::create_pipeline(const Camera &cam, StreamType type) { int width = cam.width, height = cam.height, fps = cam.fps, bitrate = cam.bitrate; if (type == StreamType::SUB) { bitrate = std::max(300000, bitrate / 2); } const std::string stream_name = cam.name + stream_type_suffix(type); const std::string app = pick_app(type); // NV12 -> mpph264enc -> h264parse -> flvmux(+静音音轨) -> rtmpsink std::string pipeline_str = "v4l2src device=" + cam.device + " ! video/x-raw,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=" + std::to_string(fps) + "/1 " "! videoconvert ! video/x-raw,format=NV12 " "! mpph264enc bps=" + std::to_string(bitrate) + " gop=" + std::to_string(fps) + " " "! h264parse " "! flvmux name=mux streamable=true " "audiotestsrc wave=silence ! audioconvert ! audioresample ! voaacenc ! aacparse ! mux. " "mux. ! rtmpsink name=rtmp_sink location=\"rtmp://127.0.0.1/" + app + "/" + stream_name + " live=1\" sync=false"; LOG_INFO("[RTMP] Creating pipeline for '" + stream_name + "' -> app '" + app + "': " + pipeline_str); GError *error = nullptr; GstElement *pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { LOG_ERROR("[RTMP] Pipeline parse error: " + std::string(error->message)); g_error_free(error); return nullptr; } return pipeline; } // 摄像头在配置中的索引(用于 reply.loc) static int get_camera_index(const std::string &name) { for (size_t i = 0; i < g_app_config.cameras.size(); ++i) if (g_app_config.cameras[i].name == name) return static_cast(i); return -1; } // ---------- 对外接口:启动/停止 ---------- RTMPManager::StreamResultInfo RTMPManager::start_camera(const Camera &cam, StreamType type) { StreamResultInfo res; res.loc = get_camera_index(cam.name); res.url = get_stream_url(cam.name, type); if (res.loc < 0 || res.loc >= static_cast(g_app_config.cameras.size())) { res.result = 1; res.reason = "Invalid channel index"; return res; } // 主码流(record)允许设备缺失时也进入“重试守护”,所以这里只在 SUB 时严格检查 if (type == StreamType::SUB && !device_exists(cam.device)) { res.result = 1; res.reason = "Device not found: " + cam.device; return res; } const std::string key = make_stream_key(cam.name, type); std::unique_ptr ctx; std::future status_future; { std::lock_guard lock(streams_mutex); auto it = streams.find(key); // 若旧线程仍在运行,先安全停止并移除 if (it != streams.end()) { if (it->second->running.load()) { LOG_WARN("[RTMP] Restarting existing stream: " + key); it->second->running.store(false); if (it->second->thread.joinable()) it->second->thread.join(); } streams.erase(it); } ctx = std::make_unique(); ctx->running.store(true); status_future = ctx->start_promise.get_future(); StreamContext *ctx_ptr = ctx.get(); ctx->thread = std::thread( [cam, type, ctx_ptr]() { try { RTMPManager::stream_loop(cam, type, ctx_ptr); } catch (const std::exception &e) { LOG_ERROR("[RTMP] Exception in stream thread: " + std::string(e.what())); } catch (...) { LOG_ERROR("[RTMP] Unknown exception in stream thread"); } }); streams.emplace(key, std::move(ctx)); } // 等待首帧或错误,最多 7 秒 if (status_future.wait_for(std::chrono::seconds(7)) == std::future_status::ready) { const StreamStatus s = status_future.get(); res.result = s.running ? 0 : 1; res.reason = s.running ? "Started OK" : (s.last_error.empty() ? "Failed to start" : s.last_error); } else { res.result = 1; res.reason = "Timeout waiting for stream"; } return res; } RTMPManager::StreamResultInfo RTMPManager::stop_camera(const std::string &cam_name, StreamType type) { StreamResultInfo res; res.loc = get_camera_index(cam_name); res.url = get_stream_url(cam_name, type); if (res.loc < 0 || res.loc >= static_cast(g_app_config.cameras.size())) { res.result = 1; res.reason = "Invalid channel index"; return res; } std::unique_ptr ctx; std::string key = make_stream_key(cam_name, type); { std::lock_guard lock(streams_mutex); auto it = streams.find(key); if (it == streams.end()) { res.result = 0; res.reason = "Already stopped (no active stream)"; return res; } ctx = std::move(it->second); streams.erase(it); } bool was_running = ctx->running.load(); ctx->running.store(false); if (ctx->thread.joinable()) ctx->thread.join(); if (was_running) { res.result = 0; res.reason = "Stopped manually"; } else { res.result = 1; res.reason = "Not running (stream never started)"; } return res; } // ---------- 核心:推流循环(带重试策略) ---------- void RTMPManager::stream_loop(Camera cam, StreamType type, StreamContext *ctx) { const std::string key = make_stream_key(cam.name, type); auto try_set_start = [&](const StreamStatus &st) { bool expected = false; if (ctx && ctx->start_promise_set.compare_exchange_strong(expected, true)) { try { ctx->start_promise.set_value(st); } catch (...) { } } }; // MAIN(record)= 永久自动重试;SUB(live)= 限次重试 if (type == StreamType::MAIN) { int backoff_ms = RETRY_BASE_DELAY_MS; // 可指数退避 while (ctx->running.load()) { const bool ok = do_stream_once(cam, type, ctx); if (!ok && ctx->running.load()) { LOG_WARN("[RTMP] MAIN(record) stream lost, retry in " + std::to_string(backoff_ms) + "ms: " + cam.name); std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); backoff_ms = std::min(backoff_ms * 2, 60000); // up to 60s } else { backoff_ms = RETRY_BASE_DELAY_MS; // 成功后重置退避 } } } else // SUB / live { int retries = 0; while (ctx->running.load()) { const bool ok = do_stream_once(cam, type, ctx); if (ok) break; // 正常退出(被 stop 或 EOS) retries++; if (retries >= LIVE_MAX_RETRIES || !ctx->running.load()) { LOG_ERROR("[RTMP] SUB(live) stream failed after retries, giving up: " + cam.name); break; } LOG_WARN("[RTMP] SUB(live) retry " + std::to_string(retries) + "/" + std::to_string(LIVE_MAX_RETRIES) + " in " + std::to_string(RETRY_BASE_DELAY_MS) + "ms: " + cam.name); std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_BASE_DELAY_MS)); } } ctx->status.running = false; } // 单次推流:创建 pipeline -> PLAYING -> loop 直到 ERROR/EOS/stop bool RTMPManager::do_stream_once(const Camera &cam, StreamType type, StreamContext *ctx) { const std::string key = make_stream_key(cam.name, type); auto try_set_start = [&](const StreamStatus &st) { bool expected = false; if (ctx && ctx->start_promise_set.compare_exchange_strong(expected, true)) { try { ctx->start_promise.set_value(st); } catch (...) { } } }; RTMPManager::StreamStatus status; status.running = false; status.last_result = StreamResult::UNKNOWN; status.last_error.clear(); GstElement *pipeline = create_pipeline(cam, type); if (!pipeline) { status.last_result = StreamResult::PIPELINE_ERROR; status.last_error = "Failed to create pipeline"; update_status(key, status); try_set_start(status); return false; } GstBus *bus = gst_element_get_bus(pipeline); gst_element_set_state(pipeline, GST_STATE_PLAYING); bool started_reported = false; auto t0 = std::chrono::steady_clock::now(); const int first_frame_timeout_s = 5; const int connect_guard_ms = 2000; bool need_restart = false; bool ok = false; while (true) { if (!ctx->running.load()) { status.running = false; status.last_result = StreamResult::UNKNOWN; status.last_error = "Stream stopped manually"; update_status(key, status); try_set_start(status); break; } // 首帧/连通性超时保护 if (!started_reported) { auto elapsed_s = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0).count(); if (elapsed_s > first_frame_timeout_s) { status.running = false; status.last_result = StreamResult::TIMEOUT; status.last_error = "No frames/connection established within timeout"; update_status(key, status); try_set_start(status); need_restart = true; break; } } GstMessage *msg = gst_bus_timed_pop_filtered(bus, 200 * GST_MSECOND, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS | GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_STREAM_STATUS)); if (!msg) { // 过了 connect_guard_ms 且 rtmpsink 进入 PLAYING,认为启动成功 if (!started_reported) { auto elapsed_ms = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0) .count(); if (elapsed_ms >= connect_guard_ms) { GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "rtmp_sink"); if (sink) { GstState cur, pending; if (gst_element_get_state(sink, &cur, &pending, 0) == GST_STATE_CHANGE_SUCCESS && cur == GST_STATE_PLAYING) { status.running = true; ctx->status.running = true; status.last_result = StreamResult::OK; status.last_error.clear(); update_status(key, status); try_set_start(status); started_reported = true; LOG_INFO("[RTMP] Guard-start OK for '" + cam.name + "'"); } gst_object_unref(sink); } } } continue; } switch (GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_STREAM_STATUS: { GstStreamStatusType type_ss; GstElement *owner = nullptr; gst_message_parse_stream_status(msg, &type_ss, &owner); if (!started_reported && owner) { const gchar *oname = GST_OBJECT_NAME(owner); if (oname && std::strcmp(oname, "rtmp_sink") == 0) { if (type_ss == GST_STREAM_STATUS_TYPE_CREATE || type_ss == GST_STREAM_STATUS_TYPE_ENTER || type_ss == GST_STREAM_STATUS_TYPE_START) { status.running = true; ctx->status.running = true; status.last_result = StreamResult::OK; status.last_error.clear(); update_status(key, status); try_set_start(status); started_reported = true; LOG_INFO(std::string("[RTMP] Connected (STREAM_STATUS) for '") + cam.name + "'"); } } } break; } case GST_MESSAGE_STATE_CHANGED: // 可用于扩展日志;不以 pipeline 的 PLAYING 作为成功判据 break; case GST_MESSAGE_ERROR: { GError *err = nullptr; gchar *dbg = nullptr; gst_message_parse_error(msg, &err, &dbg); status.running = false; status.last_result = StreamResult::CONNECTION_FAIL; status.last_error = err ? err->message : "GStreamer error"; LOG_ERROR("[RTMP] Stream error from '" + cam.name + "': " + status.last_error); ctx->status.running = false; need_restart = true; if (err) g_error_free(err); if (dbg) g_free(dbg); update_status(key, status); try_set_start(status); break; } case GST_MESSAGE_EOS: { status.running = false; status.last_result = StreamResult::EOS_RECEIVED; status.last_error = "EOS received"; update_status(key, status); try_set_start(status); need_restart = true; break; } default: break; } gst_message_unref(msg); if (need_restart) break; if (started_reported) { // 正常运行中,继续监听 } } // 收尾 gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); ok = started_reported && !need_restart; // 若在 stop 前一直运行且未触发重启则认为 ok return ok; } // ---------- 其他工具 ---------- void RTMPManager::stop_all() { std::vector> ctxs; { std::lock_guard lock(streams_mutex); for (auto &kv : streams) kv.second->running.store(false); for (auto &kv : streams) ctxs.push_back(std::move(kv.second)); streams.clear(); } for (auto &ctx : ctxs) if (ctx->thread.joinable()) ctx->thread.join(); LOG_INFO("[RTMP] stop_all completed."); } bool RTMPManager::is_streaming(const std::string &cam_name, StreamType type) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_stream_key(cam_name, type)); return it != streams.end() && it->second->running.load(); } bool RTMPManager::is_any_streaming() { std::lock_guard lock(streams_mutex); for (auto &kv : streams) if (kv.second->running.load()) return true; return false; } std::string RTMPManager::get_stream_url(const std::string &cam_name, StreamType type) { // 你可以把这里的网卡名替换成你的默认上行网卡 std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "127.0.0.1"; const std::string app = pick_app(type); const std::string stream_name = make_stream_key(cam_name, type); // 返回 WebRTC 拉流地址(WHEP) return "http://" + ip + ":1985/rtc/v1/whep/?app=" + app + "&stream=" + stream_name; } // 批量处理 std::vector RTMPManager::process_push_request(const VideoPushRequest &req) { std::vector results; std::vector> futures; for (const auto &item : req.data) { StreamType type = (item.streamType == 0) ? StreamType::MAIN : StreamType::SUB; for (int ch : item.channels) { if (ch < 0 || ch >= static_cast(g_app_config.cameras.size())) continue; const auto &cam = g_app_config.cameras[ch]; if (item.switchVal == 0) { futures.emplace_back( std::async(std::launch::async, [&, cam, type]() { return start_camera(cam, type); })); } else { futures.emplace_back( std::async(std::launch::async, [&, cam, type]() { return stop_camera(cam.name, type); })); } } } for (auto &f : futures) { try { results.push_back(f.get()); } catch (const std::exception &e) { StreamResultInfo err; err.result = 1; err.reason = std::string("Exception: ") + e.what(); results.push_back(err); } } return results; } void RTMPManager::start_all_record_streams() { LOG_INFO("[RTMP] Auto-starting all record (MAIN) streams..."); for (const auto &cam : g_app_config.cameras) { // 检查是否已经存在 if (is_streaming(cam.name, StreamType::MAIN)) { LOG_INFO("[RTMP] Record stream already running: " + cam.name); continue; } auto res = start_camera(cam, StreamType::MAIN); if (res.result == 0) { LOG_INFO("[RTMP] Record stream started for " + cam.name + ": " + res.reason); } else { LOG_WARN("[RTMP] Failed to start record stream for " + cam.name + ": " + res.reason); } } LOG_INFO("[RTMP] Auto-start for record streams done."); }