Files
l-s/src/meta/progress.rs
licsber d7da1c325f feat: 0.5.0 新增子目录增量校验功能
- 当子目录存在 meta.json 时,直接进行 xxh128 快速校验而非重新计算
- 添加详细的错误提示(校验失败/新增文件/文件缺失)
- ProgressTracker 添加 multi() 方法,支持 MultiProgress::suspend 协同输出
- 优化排序:sort_by → sort_unstable_by_key
- 版本号更新至 0.5.0
2026-04-03 21:22:12 +08:00

272 lines
11 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use crate::utils::friendly_size;
/// 进度跟踪器,封装进度条和 IO 统计信息
pub struct ProgressTracker {
multi: Option<MultiProgress>,
file_progress_bar: Option<ProgressBar>, // 文件数量进度条
current_file_bar: Option<ProgressBar>, // 当前文件进度条
bytes_read: Arc<AtomicU64>,
current_file_bytes: Arc<AtomicU64>, // 当前文件已读字节数
current_file_size: Arc<AtomicU64>, // 当前文件总大小
iops: Arc<AtomicU64>,
start_time: Instant,
last_update: Arc<AtomicU64>,
last_bytes: Arc<AtomicU64>,
}
impl ProgressTracker {
/// 创建新的进度跟踪器
pub fn new(total_files: u64, message: &str) -> Self {
let (multi, file_progress_bar, current_file_bar) = if total_files > 0 {
let multi = MultiProgress::new();
// 文件数量进度条
let file_pb = multi.add(ProgressBar::new(total_files));
file_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({percent}%) {msg}")
.unwrap()
.progress_chars("#>-"),
);
file_pb.set_message(message.to_string());
// 当前文件进度条初始隐藏通过设置长度为0来隐藏
let current_pb = multi.add(ProgressBar::new(0));
current_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}")
.unwrap()
.progress_chars("=>-"),
);
current_pb.set_length(0); // 设置为0长度来隐藏
(Some(multi), Some(file_pb), Some(current_pb))
} else {
(None, None, None)
};
Self {
multi,
file_progress_bar,
current_file_bar,
bytes_read: Arc::new(AtomicU64::new(0)),
current_file_bytes: Arc::new(AtomicU64::new(0)),
current_file_size: Arc::new(AtomicU64::new(0)),
iops: Arc::new(AtomicU64::new(0)),
start_time: Instant::now(),
last_update: Arc::new(AtomicU64::new(0)),
last_bytes: Arc::new(AtomicU64::new(0)),
}
}
/// 为单个文件创建进度跟踪器(用于 process_file
pub fn new_single_file(file_size: u64, file_name: &str) -> Self {
let multi = MultiProgress::new();
// 单个文件进度条
let current_pb = multi.add(ProgressBar::new(file_size));
current_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}")
.unwrap()
.progress_chars("=>-"),
);
current_pb.set_message(format!("处理: {}", file_name));
Self {
multi: Some(multi),
file_progress_bar: None,
current_file_bar: Some(current_pb),
bytes_read: Arc::new(AtomicU64::new(0)),
current_file_bytes: Arc::new(AtomicU64::new(0)),
current_file_size: Arc::new(AtomicU64::new(file_size)),
iops: Arc::new(AtomicU64::new(0)),
start_time: Instant::now(),
last_update: Arc::new(AtomicU64::new(0)),
last_bytes: Arc::new(AtomicU64::new(0)),
}
}
/// 开始处理新文件
pub fn start_file(&self, file_size: u64, file_name: &str) {
if let Some(pb) = &self.current_file_bar {
pb.set_length(file_size);
pb.set_position(0);
pb.set_message(format!("处理: {}", file_name));
self.current_file_size.store(file_size, Ordering::Relaxed);
self.current_file_bytes.store(0, Ordering::Relaxed);
}
}
/// 完成当前文件的处理
pub fn finish_current_file(&self) {
if let Some(pb) = &self.current_file_bar {
if let Some(_file_pb) = &self.file_progress_bar {
// 在多文件模式下隐藏当前文件进度条通过设置长度为0
pb.set_length(0);
} else {
// 单文件模式下,完成进度条
pb.finish();
}
}
}
/// 完成一个文件的处理
pub fn finish_file(&self) {
self.finish_current_file();
if let Some(pb) = &self.file_progress_bar {
pb.inc(1);
// 每 10 个文件或每 0.5 秒更新一次消息,减少开销
let files_processed = pb.position();
let now = self.start_time.elapsed().as_millis() as u64;
let last_update = self.last_update.load(Ordering::Relaxed);
if files_processed % 10 == 0 || now.saturating_sub(last_update) > 500 {
self.update_message();
self.last_update.store(now, Ordering::Relaxed);
}
}
}
/// 更新进度条消息
fn update_message(&self) {
if let Some(pb) = &self.file_progress_bar {
let elapsed = self.start_time.elapsed().as_secs_f64();
let total_bytes = self.bytes_read.load(Ordering::Relaxed);
let total_ops = self.iops.load(Ordering::Relaxed);
if total_bytes > 0 && elapsed > 0.0 {
let speed_bytes_per_sec = total_bytes as f64 / elapsed;
let speed_str = friendly_size(speed_bytes_per_sec as u64);
let iops = total_ops as f64 / elapsed;
// 计算预估剩余时间
let files_processed = pb.position();
let files_total = pb.length().unwrap_or(0);
let files_remaining = files_total.saturating_sub(files_processed);
let avg_time_per_file = if files_processed > 0 {
elapsed / files_processed as f64
} else {
0.0
};
let eta_seconds = avg_time_per_file * files_remaining as f64;
let eta_str = if eta_seconds > 0.0 {
format!("ETA: {:.0}s", eta_seconds)
} else {
String::new()
};
pb.set_message(format!("IO速度: {}/s | IOPS: {:.0} | {}", speed_str, iops, eta_str));
}
}
}
/// 完成所有处理
pub fn finish(&self, message: &str) {
if let Some(pb) = &self.file_progress_bar {
pb.finish_with_message(message.to_string());
}
if let Some(pb) = &self.current_file_bar {
if self.file_progress_bar.is_none() {
// 单文件模式下完成
pb.finish_with_message(message.to_string());
} else {
// 多文件模式下隐藏通过设置长度为0
pb.set_length(0);
}
}
}
/// 获取字节读取回调(可 move
/// 在读取过程中定期更新消息,提供更高粒度的进度反馈
pub fn bytes_callback(&self) -> impl FnMut(u64) {
let bytes_read = self.bytes_read.clone();
let current_file_bytes = self.current_file_bytes.clone();
let last_bytes = self.last_bytes.clone();
let last_update = self.last_update.clone();
let start_time = self.start_time;
let file_progress_bar = self.file_progress_bar.clone();
let current_file_bar = self.current_file_bar.clone();
let bytes_read_clone = self.bytes_read.clone();
let iops = self.iops.clone();
move |bytes| {
bytes_read.fetch_add(bytes, Ordering::Relaxed);
current_file_bytes.fetch_add(bytes, Ordering::Relaxed);
// 更新当前文件进度条
if let Some(pb) = &current_file_bar {
let current_bytes = current_file_bytes.load(Ordering::Relaxed);
pb.set_position(current_bytes);
}
// 每读取 16MB约4次缓冲区读取或每 200ms 更新一次消息,提供更细粒度的反馈
let total_bytes = bytes_read.load(Ordering::Relaxed);
let last_bytes_value = last_bytes.load(Ordering::Relaxed);
let now = start_time.elapsed().as_millis() as u64;
let last_update_value = last_update.load(Ordering::Relaxed);
// 检查是否需要更新:每 16MB缓冲区为4MB约4次读取或每 200ms
let bytes_diff = total_bytes.saturating_sub(last_bytes_value);
let time_diff = now.saturating_sub(last_update_value);
const UPDATE_BYTES_THRESHOLD: u64 = 16 * 1024 * 1024; // 16MB
const UPDATE_TIME_THRESHOLD: u64 = 200; // 200ms
if bytes_diff >= UPDATE_BYTES_THRESHOLD || time_diff >= UPDATE_TIME_THRESHOLD {
if let Some(pb) = &file_progress_bar {
let elapsed = start_time.elapsed().as_secs_f64();
let total_bytes = bytes_read_clone.load(Ordering::Relaxed);
let total_ops = iops.load(Ordering::Relaxed);
if total_bytes > 0 && elapsed > 0.0 {
let speed_bytes_per_sec = total_bytes as f64 / elapsed;
let speed_str = friendly_size(speed_bytes_per_sec as u64);
let iops_value = total_ops as f64 / elapsed;
// 计算预估剩余时间
let files_processed = pb.position();
let files_total = pb.length().unwrap_or(0);
let files_remaining = files_total.saturating_sub(files_processed);
let avg_time_per_file = if files_processed > 0 {
elapsed / files_processed as f64
} else {
0.0
};
let eta_seconds = avg_time_per_file * files_remaining as f64;
let eta_str = if eta_seconds > 0.0 {
format!("ETA: {:.0}s", eta_seconds)
} else {
String::new()
};
pb.set_message(format!("IO速度: {}/s | IOPS: {:.0} | {}", speed_str, iops_value, eta_str));
}
}
last_bytes.store(total_bytes, Ordering::Relaxed);
last_update.store(now, Ordering::Relaxed);
}
}
}
/// 获取 IO 操作回调(可 move
pub fn iop_callback(&self) -> impl FnMut() {
let iops = self.iops.clone();
move || {
iops.fetch_add(1, Ordering::Relaxed);
}
}
/// 获取 MultiProgress用于 `println` / `suspend` 等与进度条协同输出
pub fn multi(&self) -> Option<&MultiProgress> {
self.multi.as_ref()
}
}