kunlang_video/src/record_manager.cpp
2025-12-02 15:12:36 +08:00

668 lines
19 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "record_manager.hpp"
#include <sys/statvfs.h>
#include <algorithm>
#include <ctime>
#include <fstream>
#include <iostream>
#include <optional>
#include "logger.hpp"
namespace
{
std::optional<int64_t> getEarliestRecordMsInternal(
const std::unordered_map<std::string, std::vector<RecordFileInfo>>& index_)
{
int64_t earliest = -1;
for (auto& kv : index_)
{
for (auto& f : kv.second)
{
if (earliest < 0 || f.start_ms < earliest) earliest = f.start_ms;
}
}
if (earliest < 0) return std::nullopt;
return earliest;
}
} // namespace
std::shared_ptr<RecordManager> g_record_manager = nullptr;
extern std::atomic<bool> g_running;
namespace fs = std::filesystem;
//
// 构造函数:自动加载 SRS 配置
//
RecordManager::RecordManager(const std::string& srs_record_cfg_path) : srs_record_cfg_path_(srs_record_cfg_path)
{
LOG_INFO("[RecordManager] Loading SRS config: " + srs_record_cfg_path_);
bool ok = loadSrsConfig();
if (!ok)
{
LOG_ERROR("[RecordManager] Failed to load SRS config.");
return;
}
LOG_INFO("[RecordManager] SRS config loaded successfully.");
LOG_INFO("record_dir = " + record_dir_);
LOG_INFO("dvr_duration_sec = " + std::to_string(dvr_duration_sec_));
LOG_INFO("http_port = " + std::to_string(http_port_));
// 自动根据 dvr_duration 启动扫描线程
startAutoScan(dvr_duration_sec_);
}
RecordManager::~RecordManager()
{
stopAutoScan(); // 线程退出
}
//
// 解析 SRS DVR 配置,提取 record_dir_ 与 dvr_duration_sec_
//
bool RecordManager::loadSrsConfig()
{
std::ifstream ifs(srs_record_cfg_path_);
if (!ifs.is_open())
{
LOG_ERROR("[RecordManager] Cannot open SRS config file: " + srs_record_cfg_path_);
return false;
}
std::string line;
std::string dvr_path;
bool in_http_server = false;
while (std::getline(ifs, line))
{
line.erase(0, line.find_first_not_of(" \t"));
if (line.empty() || line[0] == '#') continue;
// ---------- 进入 http_server 块 ----------
if (line.find("http_server") != std::string::npos && line.find("{") != std::string::npos)
{
in_http_server = true;
continue;
}
if (in_http_server && line == "}")
{
in_http_server = false;
continue;
}
if (in_http_server)
{
if (line.rfind("listen", 0) == 0)
{
int port = 0;
if (sscanf(line.c_str(), "listen %d;", &port) == 1)
{
http_port_ = port;
}
}
}
// 解析 dvr_path
if (line.find("dvr_path") != std::string::npos)
{
auto pos = line.find('/');
auto semicolon = line.find(';');
if (pos != std::string::npos && semicolon != std::string::npos)
dvr_path = line.substr(pos, semicolon - pos);
}
// 解析 dvr_duration
else if (line.find("dvr_duration") != std::string::npos)
{
int sec = 0;
if (sscanf(line.c_str(), "dvr_duration %d", &sec) == 1) dvr_duration_sec_ = sec;
}
}
if (dvr_path.empty())
{
LOG_ERROR("[RecordManager] dvr_path not found in config.");
return false;
}
// /sata/record/[stream]/[2006]-[01]-[02]/...
auto pos = dvr_path.find("[stream]");
if (pos != std::string::npos)
record_dir_ = dvr_path.substr(0, pos);
else
{
LOG_ERROR("[RecordManager] Cannot extract record_dir from dvr_path: " + dvr_path);
return false;
}
return true;
}
void RecordManager::removeExpiredDays()
{
namespace fs = std::filesystem;
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
for (auto& streamEntry : fs::directory_iterator(record_dir_))
{
if (!streamEntry.is_directory()) continue;
for (auto& dayEntry : fs::directory_iterator(streamEntry))
{
if (!dayEntry.is_directory()) continue;
std::string dayName = dayEntry.path().filename().string(); // 2025-11-14
std::tm tm{};
if (sscanf(dayName.c_str(), "%d-%d-%d", &tm.tm_year, &tm.tm_mon, &tm.tm_mday) != 3) continue;
tm.tm_year -= 1900;
tm.tm_mon -= 1;
time_t dayTime = mktime(&tm);
double daysOld = difftime(now, dayTime) / 86400.0;
if (daysOld > retention_days_)
{
LOG_WARN("[RecordManager] Removing expired day: " + dayEntry.path().string());
fs::remove_all(dayEntry.path());
}
}
}
}
void RecordManager::removeOldestHoursUntilSafe()
{
namespace fs = std::filesystem;
while (true)
{
// 1) 检查磁盘使用率
struct statvfs vfs{};
if (statvfs(record_dir_.c_str(), &vfs) != 0) return;
double used = 1.0 - (double)vfs.f_bavail / (double)vfs.f_blocks;
if (used < usage_threshold_) break;
// =====================================================
// 2) 找“全盘最老日期字符串”YYYY-MM-DD
// =====================================================
std::string oldestDayName;
time_t oldestTime = 0;
bool foundDay = false;
for (auto& streamDir : fs::directory_iterator(record_dir_))
{
if (!streamDir.is_directory()) continue;
for (auto& dayDir : fs::directory_iterator(streamDir))
{
if (!dayDir.is_directory()) continue;
std::string dayName = dayDir.path().filename().string();
std::tm tm{};
if (sscanf(dayName.c_str(), "%d-%d-%d", &tm.tm_year, &tm.tm_mon, &tm.tm_mday) != 3) continue;
tm.tm_year -= 1900;
tm.tm_mon -= 1;
time_t t = mktime(&tm);
if (!foundDay || t < oldestTime)
{
foundDay = true;
oldestTime = t;
oldestDayName = dayName;
}
}
}
if (!foundDay)
{
LOG_WARN("[RecordManager] No day folder found for hour cleanup.");
break;
}
// =====================================================
// 3) 在这个日期下,跨所有 stream 找最老 hour
// =====================================================
int oldestHourValue = -1;
bool foundHour = false;
for (auto& streamDir : fs::directory_iterator(record_dir_))
{
if (!streamDir.is_directory()) continue;
fs::path dayDir = streamDir.path() / oldestDayName;
if (!fs::exists(dayDir) || !fs::is_directory(dayDir)) continue;
for (auto& hourDir : fs::directory_iterator(dayDir))
{
if (!hourDir.is_directory()) continue;
std::string hourName = hourDir.path().filename().string();
int h = -1;
if (sscanf(hourName.c_str(), "%d", &h) != 1) continue;
if (!foundHour || h < oldestHourValue)
{
foundHour = true;
oldestHourValue = h;
}
}
}
// =====================================================
// 4) 如果这个日期在所有 stream 下都没有 hour → 删整天
// =====================================================
if (!foundHour)
{
for (auto& streamDir : fs::directory_iterator(record_dir_))
{
fs::path dayDir = streamDir.path() / oldestDayName;
if (fs::exists(dayDir))
{
LOG_WARN("[RecordManager] Day empty, remove whole day: " + dayDir.string());
fs::remove_all(dayDir);
}
}
continue;
}
// =====================================================
// 5) 删除“同一天 + 同一小时”在所有路上的数据
// =====================================================
char hourBuf[4];
snprintf(hourBuf, sizeof(hourBuf), "%02d", oldestHourValue);
std::string hourName(hourBuf);
for (auto& streamDir : fs::directory_iterator(record_dir_))
{
if (!streamDir.is_directory()) continue;
fs::path hourDir = streamDir.path() / oldestDayName / hourName;
if (fs::exists(hourDir) && fs::is_directory(hourDir))
{
LOG_WARN("[RecordManager] Removing oldest hour: " + hourDir.string());
fs::remove_all(hourDir);
}
// 顺便清掉空 day
fs::path dayDir = streamDir.path() / oldestDayName;
if (fs::exists(dayDir) && fs::is_directory(dayDir) && fs::is_empty(dayDir))
{
LOG_WARN("[RecordManager] Removing empty day: " + dayDir.string());
fs::remove_all(dayDir);
}
}
}
}
void RecordManager::cleanupStorage()
{
struct statvfs vfs{};
if (statvfs(record_dir_.c_str(), &vfs) != 0)
{
LOG_ERROR("[RecordManager] statvfs failed for " + record_dir_);
return;
}
double used = 1.0 - (double)vfs.f_bavail / (double)vfs.f_blocks;
// 1) 删除超期天
removeExpiredDays();
// 重新检查磁盘
if (statvfs(record_dir_.c_str(), &vfs) == 0)
{
used = 1.0 - (double)vfs.f_bavail / (double)vfs.f_blocks;
}
// 2) 使用率仍超 → 按小时清理
if (used >= usage_threshold_)
{
removeOldestHoursUntilSafe();
}
}
void RecordManager::startAutoScan(int interval_sec)
{
scan_interval_sec_ = interval_sec;
if (running_) return;
running_.store(true);
scan_thread_ = std::thread(
[this]()
{
LOG_INFO("[RecordManager] Auto-scan thread started, interval = " + std::to_string(scan_interval_sec_) +
" seconds.");
while (running_.load() && g_running.load())
{
auto t0 = std::chrono::steady_clock::now();
this->scanAll();
// ======= 新增:打印最早录像时间 + 磁盘使用率 =======
std::optional<int64_t> earliestOpt;
{
std::lock_guard<std::mutex> lock(index_mutex_);
earliestOpt = getEarliestRecordMsInternal(index_);
}
if (earliestOpt.has_value())
{
std::string tReadable = RecordManager::toReadable(earliestOpt.value());
struct statvfs vfs{};
double usedPct = -1;
if (statvfs(record_dir_.c_str(), &vfs) == 0)
{
double used = 1.0 - (double)vfs.f_bavail / (double)vfs.f_blocks;
usedPct = used * 100.0;
}
LOG_INFO("[RecordManager] Earliest record time=" + tReadable +
", disk used=" + std::to_string(usedPct) + "%");
}
else
{
LOG_INFO("[RecordManager] No video files found.");
}
// ======= 再调用清理 =======
this->cleanupStorage();
LOG_INFO("[RecordManager] scanAll() completed.");
// 休眠剩余时间(支持快速退出)
auto t1 = std::chrono::steady_clock::now();
auto used_ms = std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count();
int sleep_ms = scan_interval_sec_ * 1000 - used_ms;
if (sleep_ms < 100) sleep_ms = 100;
int remain = sleep_ms;
while (remain > 0 && running_.load() && g_running.load())
{
int chunk = std::min(remain, 200);
std::this_thread::sleep_for(std::chrono::milliseconds(chunk));
remain -= chunk;
}
}
LOG_INFO("[RecordManager] Auto-scan thread stopped.");
});
}
void RecordManager::stopAutoScan()
{
if (!running_) return;
running_ = false;
if (scan_thread_.joinable()) scan_thread_.join();
}
//
// 扫描所有文件
//
void RecordManager::scanAll()
{
std::lock_guard<std::mutex> lock(index_mutex_);
index_.clear();
if (!fs::exists(record_dir_))
{
LOG_WARN("[RecordManager] record_dir not exist: " + record_dir_);
return;
}
for (auto const& entry : fs::recursive_directory_iterator(record_dir_))
{
if (!entry.is_regular_file()) continue;
auto p = entry.path();
if (p.extension() != ".mp4") continue;
auto info = parseFile(p);
if (info.start_ms <= 0) continue;
index_[info.stream].push_back(info);
}
for (auto& kv : index_)
{
sortStream(kv.first);
}
LOG_INFO("[RecordManager] scanAll completed. streams=" + std::to_string(index_.size()));
}
//
// 排序函数:按 start_ms 升序
//
void RecordManager::sortStream(const std::string& stream)
{
auto& files = index_[stream];
std::sort(files.begin(), files.end(),
[](const RecordFileInfo& a, const RecordFileInfo& b) { return a.start_ms < b.start_ms; });
}
//
// 解析路径得到文件时间戳
//
RecordFileInfo RecordManager::parseFile(const fs::path& p)
{
RecordFileInfo info;
info.path = p.string();
try
{
auto filename = p.filename().string(); // 10-23-17.mp4
auto date_dir = p.parent_path().parent_path().filename().string(); // yyyy-mm-dd
auto stream_dir = p.parent_path().parent_path().parent_path().filename().string();
info.stream = stream_dir;
int y = 0, mon = 0, d = 0, h = 0, m = 0, s = 0;
sscanf(date_dir.c_str(), "%d-%d-%d", &y, &mon, &d);
sscanf(filename.c_str(), "%d-%d-%d", &h, &m, &s);
std::tm tm{};
tm.tm_year = y - 1900;
tm.tm_mon = mon - 1;
tm.tm_mday = d;
tm.tm_hour = h;
tm.tm_min = m;
tm.tm_sec = s;
int64_t start = std::mktime(&tm) * 1000LL;
info.start_ms = start;
// 读取 mp4 时长
MP4FileHandle hFile = MP4Read(info.path.c_str());
if (hFile != MP4_INVALID_FILE_HANDLE)
{
double durSec = MP4GetDuration(hFile) / (double)MP4GetTimeScale(hFile);
MP4Close(hFile);
info.end_ms = start + (int64_t)(durSec * 1000.0);
}
else
{
info.end_ms = start + 60000; // fallback
}
}
catch (...)
{
info.start_ms = -1;
info.end_ms = -1;
}
return info;
}
//
// 查询录像段
//
std::vector<RecordSegment> RecordManager::querySegments(const std::string& stream, int64_t start_ms, int64_t end_ms)
{
std::lock_guard<std::mutex> lock(index_mutex_);
std::vector<RecordSegment> result;
if (start_ms <= 0 || end_ms <= 0 || start_ms >= end_ms) return result;
if (!index_.count(stream)) return result;
const auto& files = index_[stream];
std::vector<RecordFileInfo> hits;
for (const auto& f : files)
{
bool overlap = !(f.end_ms <= start_ms || f.start_ms >= end_ms);
if (overlap) hits.push_back(f);
}
if (hits.empty()) return result;
std::sort(hits.begin(), hits.end(), [](auto& a, auto& b) { return a.start_ms < b.start_ms; });
int idx = 1;
RecordSegment seg{};
seg.index = idx;
seg.start_ms = hits[0].start_ms;
seg.end_ms = hits[0].end_ms;
seg.files.push_back(hits[0]);
for (size_t i = 1; i < hits.size(); ++i)
{
auto& prev = hits[i - 1];
auto& cur = hits[i];
bool continuous = (cur.start_ms - prev.end_ms) <= 2000;
if (continuous)
{
seg.files.push_back(cur);
seg.end_ms = cur.end_ms;
}
else
{
seg.segment_id = stream + "_" + std::to_string(seg.start_ms) + "_" + std::to_string(seg.end_ms);
result.push_back(seg);
idx++;
seg = RecordSegment{};
seg.index = idx;
seg.start_ms = cur.start_ms;
seg.end_ms = cur.end_ms;
seg.files.push_back(cur);
}
}
seg.segment_id = stream + "_" + std::to_string(seg.start_ms) + "_" + std::to_string(seg.end_ms);
result.push_back(seg);
return result;
}
//
// 根据 segmentId 获取 segment
//
RecordSegment RecordManager::getSegment(const std::string& segmentId)
{
std::lock_guard<std::mutex> lock(index_mutex_);
// 1) 找最后一个 "_"
size_t pos_last = segmentId.rfind('_');
if (pos_last == std::string::npos) return {};
// 2) 再往前找一个 "_"
size_t pos_mid = segmentId.rfind('_', pos_last - 1);
if (pos_mid == std::string::npos) return {};
// 3) 三段拆分
std::string stream = segmentId.substr(0, pos_mid);
std::string startStr = segmentId.substr(pos_mid + 1, pos_last - pos_mid - 1);
std::string endStr = segmentId.substr(pos_last + 1);
int64_t start_ms = 0;
int64_t end_ms = 0;
try
{
start_ms = std::stoll(startStr);
end_ms = std::stoll(endStr);
}
catch (...)
{
LOG_ERROR("[RecordManager] segmentId parse failed: " + segmentId);
return {};
}
// 确认 stream 在 index_ 里
if (!index_.count(stream))
{
LOG_WARN("[RecordManager] stream not exist in index: " + stream);
return {};
}
// 4) 根据时间区间收集属于该 segment 的所有文件
const auto& files = index_[stream];
RecordSegment seg;
seg.segment_id = segmentId;
seg.index = 1;
seg.start_ms = start_ms;
seg.end_ms = end_ms;
for (const auto& f : files)
{
// 与 segment 时间区间有重叠的文件都属于这个 segment
bool overlap = !(f.end_ms <= start_ms || f.start_ms >= end_ms);
if (overlap) seg.files.push_back(f);
}
if (seg.files.empty())
{
LOG_WARN("[RecordManager] No files found for segmentId: " + segmentId);
return {};
}
return seg;
}
int64_t RecordManager::toMsTimestamp(const std::string& s)
{
std::tm tm{};
char* ret = strptime(s.c_str(), "%Y-%m-%d %H:%M:%S", &tm);
if (!ret) return -1;
time_t t = mktime(&tm);
return (int64_t)t * 1000;
}
std::string RecordManager::toReadable(int64_t ms)
{
time_t t = ms / 1000;
std::tm* tm = localtime(&t);
char buf[32];
strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", tm);
return buf;
}
std::string RecordManager::loc_to_stream(int loc)
{
// loc 必须 0~7
if (loc < 0 || loc > 7) return "";
return "AHD" + std::to_string(loc + 1) + "_main";
}