// rtmp_manager.cpp #include "rtmp_manager.hpp" #include #include #include #include #include #include #include #include std::atomic RTMPManager::g_live_enabled{false}; bool RTMPManager::g_record_enabled = true; // ======================================================= // 工具函数 // ======================================================= static bool device_exists(const std::string& path) { struct stat st; return stat(path.c_str(), &st) == 0; } // 获取指定网卡 IPv4 std::string get_ip_address(const std::string& ifname) { struct ifaddrs* ifaddr = nullptr; if (getifaddrs(&ifaddr) != 0) return ""; char ip[INET_ADDRSTRLEN] = {0}; for (auto* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { if (!ifa->ifa_addr) continue; if (ifa->ifa_addr->sa_family == AF_INET && ifname == ifa->ifa_name) { auto* addr = &reinterpret_cast(ifa->ifa_addr)->sin_addr; inet_ntop(AF_INET, addr, ip, sizeof(ip)); break; } } freeifaddrs(ifaddr); return ip; } // ======================================================= // 静态成员 // ======================================================= std::unordered_map> RTMPManager::streams; std::mutex RTMPManager::streams_mutex; // ======================================================= // 初始化 // ======================================================= void RTMPManager::init() { gst_init(nullptr, nullptr); LOG_INFO("[RTMP] GStreamer initialized"); } std::string RTMPManager::make_key(const std::string& name) { return name + "_main"; } // ======================================================= // 创建推流管线(稳定版) // ======================================================= GstElement* RTMPManager::create_pipeline(const Camera& cam) { const std::string stream = cam.name + "_main"; const std::string app = "camera"; const std::string live_rtmp = "rtmp://36.153.162.171:19435/" + app + "/" + stream + "?vhost=live"; const std::string record_rtmp = "rtmp://127.0.0.1:2935/" + app + "/" + stream + "?vhost=record"; std::string pipeline_str = "v4l2src device=" + cam.device + " io-mode=dmabuf " "! video/x-raw,format=NV12,width=1920,height=1080,framerate=" + std::to_string(cam.fps) + "/1 " "! videoscale " "! video/x-raw,width=" + std::to_string(cam.width) + ",height=" + std::to_string(cam.height) + " " "! queue max-size-buffers=2 leaky=downstream " "! mpph264enc rc-mode=cbr bps=" + std::to_string(cam.bitrate) + " gop=" + std::to_string(cam.fps) + " header-mode=each-idr profile=baseline " "! h264parse config-interval=1 " "! tee name=t " // ===== record:可开关 ===== "t. ! queue max-size-buffers=8 leaky=downstream " "! valve name=record_valve drop=true " // ★★ 新增 "! queue max-size-buffers=8 leaky=downstream " "! flvmux name=rec_mux streamable=true " "! rtmpsink name=rec_sink location=\"" + record_rtmp + "\" sync=false async=false " // ===== live:可随意开关 ===== "t. ! queue max-size-buffers=8 leaky=downstream " "! valve name=live_valve drop=true " "! queue max-size-buffers=8 leaky=downstream " "! flvmux name=live_mux streamable=true " "! rtmpsink name=live_sink location=\"" + live_rtmp + "\" sync=false async=false "; GError* err = nullptr; GstElement* pipeline = gst_parse_launch(pipeline_str.c_str(), &err); if (err) { LOG_ERROR("[RTMP] Pipeline creation failed: " + std::string(err->message)); g_error_free(err); return nullptr; } return pipeline; } // ======================================================= // 主推流线程 // ======================================================= void RTMPManager::stream_loop(Camera cam, StreamContext* ctx) { const std::string key = make_key(cam.name); while (ctx->thread_running) { // 1. 设备存在性 if (!device_exists(cam.device)) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Device not found: " + cam.device; } LOG_WARN("[RTMP] " + key + " - device not found"); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } // 2. 创建 pipeline GstElement* pipeline = create_pipeline(cam); if (!pipeline) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Pipeline creation failed"; } std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } // 2.1 获取 live_valve(用于 MQTT 控制) GstElement* live_valve = gst_bin_get_by_name(GST_BIN(pipeline), "live_valve"); if (!live_valve) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "live_valve not found"; } LOG_ERROR("[RTMP] " + key + " - live_valve not found"); gst_object_unref(pipeline); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } // 保存到 ctx { std::lock_guard lk(ctx->status_mutex); if (ctx->live_valve) gst_object_unref(ctx->live_valve); ctx->live_valve = live_valve; // ⚠️ live_valve 引用交给 ctx 管理 } // 2.x 获取 record_valve GstElement* record_valve = gst_bin_get_by_name(GST_BIN(pipeline), "record_valve"); if (!record_valve) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "record_valve not found"; } LOG_ERROR("[RTMP] " + key + " - record_valve not found"); gst_object_unref(pipeline); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } // 保存到 ctx { std::lock_guard lk(ctx->status_mutex); if (ctx->record_valve) gst_object_unref(ctx->record_valve); ctx->record_valve = record_valve; } g_object_set(G_OBJECT(ctx->record_valve), "drop", g_record_enabled ? FALSE : TRUE, nullptr); g_object_set(G_OBJECT(ctx->live_valve), "drop", g_live_enabled.load() ? FALSE : TRUE, nullptr); GstBus* bus = gst_element_get_bus(pipeline); LOG_INFO("[RTMP] Starting stream: " + key); gst_element_set_state(pipeline, GST_STATE_PLAYING); // 3. 等待进入 PLAYING GstState state = GST_STATE_NULL; if (gst_element_get_state(pipeline, &state, nullptr, 5 * GST_SECOND) == GST_STATE_CHANGE_SUCCESS && state == GST_STATE_PLAYING) { std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; ctx->status.last_error.clear(); LOG_INFO("[RTMP] " + key + " confirmed PLAYING"); } else { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "Failed to reach PLAYING"; LOG_ERROR("[RTMP] " + key + " failed to PLAY"); goto cleanup; } // 4. 正常运行:只监听 ERROR / EOS while (ctx->thread_running) { GstMessage* msg = gst_bus_timed_pop_filtered( bus, 300 * GST_MSECOND, static_cast(GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); if (!msg) { // 心跳:pipeline 没死就认为在跑 std::lock_guard lk(ctx->status_mutex); ctx->status.running = true; continue; } if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { GError* err = nullptr; gst_message_parse_error(msg, &err, nullptr); { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = err ? err->message : "Unknown GStreamer error"; } LOG_ERROR("[RTMP] " + key + " ERROR: " + (err ? err->message : "unknown")); if (err) g_error_free(err); gst_message_unref(msg); break; } if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; ctx->status.last_error = "EOS"; } LOG_WARN("[RTMP] " + key + " EOS"); gst_message_unref(msg); break; } gst_message_unref(msg); } cleanup: { std::lock_guard lk(ctx->status_mutex); if (ctx->live_valve) { gst_object_unref(ctx->live_valve); ctx->live_valve = nullptr; } if (ctx->record_valve) { gst_object_unref(ctx->record_valve); ctx->record_valve = nullptr; } } gst_element_set_state(pipeline, GST_STATE_NULL); if (bus) gst_object_unref(bus); gst_object_unref(pipeline); if (ctx->thread_running) { LOG_WARN("[RTMP] Restarting " + key + " in 3s..."); std::this_thread::sleep_for(std::chrono::seconds(3)); } } { std::lock_guard lk(ctx->status_mutex); ctx->status.running = false; } LOG_INFO("[RTMP] Stream thread exited for " + key); } // ======================================================= // 启停管理 // ======================================================= void RTMPManager::start_all() { LOG_INFO("[RTMP] Starting enabled streams..."); std::lock_guard lock(streams_mutex); int delay_ms = 0; for (const auto& cam : g_app_config.cameras) { if (!cam.enabled) { LOG_INFO("[RTMP] Skip disabled camera: " + cam.name); continue; } const auto key = make_key(cam.name); if (streams.count(key)) continue; auto ctx = std::make_unique(); ctx->thread_running.store(true); ctx->thread = std::thread( [cam, ptr = ctx.get(), delay_ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); stream_loop(cam, ptr); }); streams.emplace(key, std::move(ctx)); delay_ms += 200; } } void RTMPManager::stop_all() { std::lock_guard lock(streams_mutex); for (auto& kv : streams) { kv.second->thread_running.store(false); std::lock_guard lk(kv.second->status_mutex); if (kv.second->live_valve) { gst_object_unref(kv.second->live_valve); kv.second->live_valve = nullptr; } if (kv.second->record_valve) { gst_object_unref(kv.second->record_valve); kv.second->record_valve = nullptr; } } for (auto& kv : streams) if (kv.second->thread.joinable()) kv.second->thread.join(); streams.clear(); } // ======================================================= // 状态 & URL // ======================================================= bool RTMPManager::is_streaming(const std::string& cam_name) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_key(cam_name)); if (it == streams.end()) return false; std::lock_guard lk(it->second->status_mutex); return it->second->status.running; } std::string RTMPManager::get_stream_url(const std::string& cam_name) { std::string ip = get_ip_address("enP2p33s0"); if (ip.empty()) ip = "192.168.4.194"; return "http://" + ip + ":11985/rtc/v1/whep/?app=camera&stream=" + cam_name + "_main&vhost=live"; } // ======================================================= // 汇总状态 // ======================================================= std::vector RTMPManager::get_all_channels_status() { std::vector result; std::lock_guard lock(streams_mutex); for (size_t i = 0; i < g_app_config.cameras.size(); ++i) { const auto& cam = g_app_config.cameras[i]; ChannelInfo ch{}; ch.loc = static_cast(i); if (!cam.enabled) { ch.running = false; ch.reason = "Disabled by config"; result.push_back(ch); continue; } auto it = streams.find(make_key(cam.name)); if (it != streams.end()) { std::lock_guard lk(it->second->status_mutex); ch.running = it->second->status.running; ch.reason = it->second->status.last_error; if (ch.running) ch.url = get_stream_url(cam.name); } else { ch.running = false; ch.reason = "Not started"; } result.push_back(ch); } return result; } void RTMPManager::set_live_enabled_all(bool enable) { std::lock_guard lock(streams_mutex); g_live_enabled.store(enable); for (auto& kv : streams) { auto* ctx = kv.second.get(); std::lock_guard lk(ctx->status_mutex); if (!ctx->live_valve) continue; // enable=true → drop=false(放行 live) g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr); } LOG_INFO(std::string("[RTMP] Live ") + (enable ? "ENABLED" : "DISABLED") + " for all streams"); } void RTMPManager::set_live_enabled(const std::string& cam_name, bool enable) { std::lock_guard lock(streams_mutex); auto it = streams.find(make_key(cam_name)); if (it == streams.end()) return; auto* ctx = it->second.get(); std::lock_guard lk(ctx->status_mutex); if (!ctx->live_valve) return; g_object_set(G_OBJECT(ctx->live_valve), "drop", enable ? FALSE : TRUE, nullptr); LOG_INFO("[RTMP] Live " + std::string(enable ? "ENABLED" : "DISABLED") + " for " + cam_name); }