// rtmp_manager.cpp #include "rtmp_manager.hpp" #include #include #include #include #include #include #include // ======================================================= // 工具函数 // ======================================================= static bool device_exists(const std::string& path) { struct stat st; return stat(path.c_str(), &st) == 0; } // 获取指定网卡 IPv4 std::string get_ip_address(const std::string& ifname) { struct ifaddrs* ifaddr = nullptr; if (getifaddrs(&ifaddr) != 0) return ""; char ip[INET_ADDRSTRLEN] = {0}; for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { if (!ifa->ifa_addr) continue; if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { auto* addr = &reinterpret_cast(ifa->ifa_addr)->sin_addr; inet_ntop(AF_INET, addr, ip, sizeof(ip)); break; } } freeifaddrs(ifaddr); return ip; } // ======================================================= // 静态成员 // ======================================================= std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; // ======================================================= // 初始化 // ======================================================= void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized"); } std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } // ======================================================= // 创建推流管线(稳定版) // ======================================================= GstElement* RTMPManager::create_pipeline(const Camera& cam) { const std::string stream = cam.name + "_main"; const std::string app = "camera"; const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live"; std::string pipeline_str = "v4l2src name=src device=" + cam.device + " io-mode=dmabuf " "! video/x-raw,format=NV12," "width=" + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + ",framerate=" + std::to_string(cam.fps) + "/1 " // 🔻 队列深度压到 2 帧(≈ 60ms@30fps) "! queue max-size-buffers=2 max-size-time=0 leaky=downstream " // 🔻 编码器:baseline + CBR + 无 B 帧 "! mpph264enc " "rc-mode=cbr " "bps=" + std::to_string(cam.bitrate) + " " "gop=" + std::to_string(cam.fps) + " " "header-mode=each-idr " "profile=baseline " // SPS/PPS 每秒刷新一次(SRS/WebRTC 友好) "! h264parse config-interval=1 " // 🔻 时间戳整形但不强行等待 "! identity sync=false single-segment=true " // FLV 最小缓存模式 "! flvmux streamable=true " // sink 本身不参与同步 "! rtmpsink location=\"" + live_rtmp + "\" " "sync=false async=false"; GError* error = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &error); if (error) { LOG_ERROR("[RTMP] Pipeline creation failed: " + std::string(error->message)); g_error_free(error); return nullptr; } return pipeline; } // ======================================================= // 主推流线程 // ======================================================= void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); while (ctx->thread_running) { // 1. 设备存在性 if (!device_exists(cam.device)) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Device not found: " + cam.device; } LOG_WARN("[RTMP] " + key + " - device not found"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } // 2. 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Pipeline creation failed"; } std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } GstBus* bus = gst_element_get_bus(pipeline); LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); // 3. 等待进入 PLAYING GstState state = GST_STATE_NULL; if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && state == GST_STATE_PLAYING) { std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; ctx->status.last_error.clear(); LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); } else { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Failed to reach PLAYING"; LOG_ERROR("[RTMP] " + key + " failed to PLAY"); goto cleanup; } // 4. 正常运行:只监听 ERROR / EOS while (ctx->thread_running) { GstMessage* msg = gst_bus_timed_pop_filtered( bus, 300 * GST_MSECOND, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); if (!msg) { // 心跳:pipeline 没死就认为在跑 std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; continue; } if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { GError* err = nullptr; gst_message_parse_error(msg, &err, nullptr); { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = err ? err->message : "Unknown GStreamer error"; } LOG_ERROR("[RTMP] " + key + " ERROR: " + (err ? err->message : "unknown")); if (err) g_error_free(err); gst_message_unref(msg); break; } if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "EOS"; } LOG_WARN("[RTMP] " + key + " EOS"); gst_message_unref(msg); break; } gst_message_unref(msg); } cleanup: gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); if (ctx->thread_running) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); } } { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; } LOG_INFO("[RTMP] Stream thread exited for " + key); } // ======================================================= // 启停管理 // ======================================================= void RTMPManager::start_all() { LOG_INFO("[RTMP] Starting enabled streams..."); std::lock_guard lock(streams_mutex); int delay_ms = 0; for (const auto& cam : g_app_config.cameras) { if (!cam.enabled) { LOG_INFO("[RTMP] Skip disabled camera: " + cam.name); continue; } const auto key = make_key(cam.name); if (streams.count(key)) continue; auto ctx = std::make_unique(); ctx->thread_running.store(true); ctx->thread = std::thread( [cam, ptr = ctx.get(), delay_ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); stream_loop(cam, ptr); }); streams.emplace(key, std::move(ctx)); delay_ms += 200; } } void RTMPManager::stop_all() { std::lock_guard lock(streams_mutex); for (auto& kv : streams) kv.second->thread_running.store(false); for (auto& kv : streams) if (kv.second->thread.joinable()) kv.second->thread.join(); streams.clear(); } // ======================================================= // 状态 & URL // ======================================================= bool RTMPManager::is_streaming(const std::string& cam_name) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_key(cam_name)); if (it == streams.end()) return false; std::lock_guard lk(it->second->status_mutex); return it->second->status.running; } std::string RTMPManager::get_stream_url(const std::string& cam_name) { std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "127.0.0.1"; return "http://" + ip + ":11985/rtc/v1/whep/?app=camera&stream=" + cam_name + "_main&vhost=live"; } // ======================================================= // 汇总状态 // ======================================================= std::vector RTMPManager::get_all_channels_status() { std::vector result; std::lock_guard lock(streams_mutex); for (size_t i = 0; i < g_app_config.cameras.size(); ++i) { const auto& cam = g_app_config.cameras[i]; ChannelInfo ch{}; ch.loc = static_cast(i); if (!cam.enabled) { ch.running = false; ch.reason = "Disabled by config"; result.push_back(ch); continue; } auto it = streams.find(make_key(cam.name)); if (it != streams.end()) { std::lock_guard lk(it->second->status_mutex); ch.running = it->second->status.running; ch.reason = it->second->status.last_error; if (ch.running) ch.url = get_stream_url(cam.name); } else { ch.running = false; ch.reason = "Not started"; } result.push_back(ch); } return result; }