diff --git a/Cargo.lock b/Cargo.lock index 50f5692..aebd80f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,7 +236,7 @@ dependencies = [ [[package]] name = "l-s" -version = "0.3.4" +version = "0.4.0" dependencies = [ "anyhow", "clap", diff --git a/Cargo.toml b/Cargo.toml index ddf4529..c75291b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "l-s" -version = "0.3.4" +version = "0.4.0" authors = ["licsber "] edition = "2021" diff --git a/src/main.rs b/src/main.rs index 4d04172..4f6909f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use std::time::Instant; use anyhow::{Context, Result}; use clap::Parser; -use meta::{calc_xxh128, scan_dir_xxh128, DirSnapshot, FileMeta}; +use meta::{calc_xxh128_with_callback, scan_dir_xxh128, DirSnapshot, FileMeta, ProgressTracker}; fn main() -> Result<()> { let started = Instant::now(); @@ -43,8 +43,18 @@ fn process_file(path: &Path) -> Result<()> { .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".to_string()); let save_path = meta_dir.join(format!("{basename}.json")); + + // 获取文件大小 + let file_size = fs::metadata(path) + .with_context(|| format!("无法读取文件信息: {}", path.display()))? + .len(); + if !save_path.exists() { - let meta = FileMeta::from_path(path)?; + let tracker = ProgressTracker::new_single_file(file_size, &basename); + let on_bytes = tracker.bytes_callback(); + let on_iop = tracker.iop_callback(); + let meta = FileMeta::from_path_with_callback(path, on_bytes, on_iop)?; + tracker.finish("处理完成"); let json = meta.to_pretty_json()?; println!("{}", json); fs::write(&save_path, json)?; @@ -54,7 +64,14 @@ fn process_file(path: &Path) -> Result<()> { let existing = File::open(&save_path) .with_context(|| format!("无法读取历史元数据: {}", save_path.display()))?; let old_meta = FileMeta::from_reader(existing)?; - let fast_hash = calc_xxh128(path)?; + + // 使用进度条计算快速哈希 + let tracker = ProgressTracker::new_single_file(file_size, &basename); + let on_bytes = tracker.bytes_callback(); + let on_iop = tracker.iop_callback(); + let fast_hash = calc_xxh128_with_callback(path, on_bytes, on_iop)?; + tracker.finish("校验完成"); + if fast_hash == old_meta.xxh128 { println!("校验通过."); return Ok(()); @@ -62,7 +79,11 @@ fn process_file(path: &Path) -> Result<()> { println!("校验失败!"); println!("现校验文件:"); - let meta = FileMeta::from_path(path)?; + let tracker = ProgressTracker::new_single_file(file_size, &basename); + let on_bytes = tracker.bytes_callback(); + let on_iop = tracker.iop_callback(); + let meta = FileMeta::from_path_with_callback(path, on_bytes, on_iop)?; + tracker.finish("处理完成"); println!("{}", meta.to_pretty_json()?); println!("原校验文件:"); println!("{}", old_meta.to_pretty_json()?); diff --git a/src/meta/file.rs b/src/meta/file.rs index 2b64647..63f278f 100644 --- a/src/meta/file.rs +++ b/src/meta/file.rs @@ -32,10 +32,6 @@ pub struct FileMeta { } impl FileMeta { - pub fn from_path(path: &Path) -> Result { - Self::from_path_with_callback(path, |_| {}, || {}) - } - pub fn from_path_with_callback(path: &Path, mut on_bytes_read: F1, mut on_iop: F2) -> Result where F1: FnMut(u64), @@ -124,10 +120,6 @@ impl FileMeta { } } -pub fn calc_xxh128(path: &Path) -> Result { - calc_xxh128_with_callback(path, |_| {}, || {}) -} - pub fn calc_xxh128_with_callback(path: &Path, mut on_bytes_read: F1, mut on_iop: F2) -> Result where F1: FnMut(u64), diff --git a/src/meta/mod.rs b/src/meta/mod.rs index f5dc551..1fccf93 100644 --- a/src/meta/mod.rs +++ b/src/meta/mod.rs @@ -2,5 +2,6 @@ mod file; mod progress; mod tree; -pub use file::{calc_xxh128, FileMeta}; +pub use file::{calc_xxh128_with_callback, FileMeta}; +pub use progress::ProgressTracker; pub use tree::{scan_dir_xxh128, DirSnapshot}; diff --git a/src/meta/progress.rs b/src/meta/progress.rs index 1110c0b..b4761dd 100644 --- a/src/meta/progress.rs +++ b/src/meta/progress.rs @@ -2,14 +2,20 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; -use indicatif::{ProgressBar, ProgressStyle}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use crate::utils::friendly_size; /// 进度跟踪器,封装进度条和 IO 统计信息 pub struct ProgressTracker { - progress_bar: Option, + // MultiProgress 必须保持存活,否则进度条会消失 + #[allow(dead_code)] + _multi: Option, + file_progress_bar: Option, // 文件数量进度条 + current_file_bar: Option, // 当前文件进度条 bytes_read: Arc, + current_file_bytes: Arc, // 当前文件已读字节数 + current_file_size: Arc, // 当前文件总大小 iops: Arc, start_time: Instant, last_update: Arc, @@ -19,23 +25,41 @@ pub struct ProgressTracker { impl ProgressTracker { /// 创建新的进度跟踪器 pub fn new(total_files: u64, message: &str) -> Self { - let progress_bar = if total_files > 0 { - let pb = ProgressBar::new(total_files); - pb.set_style( + let (multi, file_progress_bar, current_file_bar) = if total_files > 0 { + let multi = MultiProgress::new(); + + // 文件数量进度条 + let file_pb = multi.add(ProgressBar::new(total_files)); + file_pb.set_style( ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({percent}%) {msg}") .unwrap() .progress_chars("#>-"), ); - pb.set_message(message.to_string()); - Some(pb) + file_pb.set_message(message.to_string()); + + // 当前文件进度条(初始隐藏,通过设置长度为0来隐藏) + let current_pb = multi.add(ProgressBar::new(0)); + current_pb.set_style( + ProgressStyle::default_bar() + .template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}") + .unwrap() + .progress_chars("=>-"), + ); + current_pb.set_length(0); // 设置为0长度来隐藏 + + (Some(multi), Some(file_pb), Some(current_pb)) } else { - None + (None, None, None) }; Self { - progress_bar, + _multi: multi, + file_progress_bar, + current_file_bar, bytes_read: Arc::new(AtomicU64::new(0)), + current_file_bytes: Arc::new(AtomicU64::new(0)), + current_file_size: Arc::new(AtomicU64::new(0)), iops: Arc::new(AtomicU64::new(0)), start_time: Instant::now(), last_update: Arc::new(AtomicU64::new(0)), @@ -43,9 +67,63 @@ impl ProgressTracker { } } + /// 为单个文件创建进度跟踪器(用于 process_file) + pub fn new_single_file(file_size: u64, file_name: &str) -> Self { + let multi = MultiProgress::new(); + + // 单个文件进度条 + let current_pb = multi.add(ProgressBar::new(file_size)); + current_pb.set_style( + ProgressStyle::default_bar() + .template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}") + .unwrap() + .progress_chars("=>-"), + ); + current_pb.set_message(format!("处理: {}", file_name)); + + Self { + _multi: Some(multi), + file_progress_bar: None, + current_file_bar: Some(current_pb), + bytes_read: Arc::new(AtomicU64::new(0)), + current_file_bytes: Arc::new(AtomicU64::new(0)), + current_file_size: Arc::new(AtomicU64::new(file_size)), + iops: Arc::new(AtomicU64::new(0)), + start_time: Instant::now(), + last_update: Arc::new(AtomicU64::new(0)), + last_bytes: Arc::new(AtomicU64::new(0)), + } + } + + /// 开始处理新文件 + pub fn start_file(&self, file_size: u64, file_name: &str) { + if let Some(pb) = &self.current_file_bar { + pb.set_length(file_size); + pb.set_position(0); + pb.set_message(format!("处理: {}", file_name)); + self.current_file_size.store(file_size, Ordering::Relaxed); + self.current_file_bytes.store(0, Ordering::Relaxed); + } + } + + /// 完成当前文件的处理 + pub fn finish_current_file(&self) { + if let Some(pb) = &self.current_file_bar { + if let Some(_file_pb) = &self.file_progress_bar { + // 在多文件模式下,隐藏当前文件进度条(通过设置长度为0) + pb.set_length(0); + } else { + // 单文件模式下,完成进度条 + pb.finish(); + } + } + } + /// 完成一个文件的处理 pub fn finish_file(&self) { - if let Some(pb) = &self.progress_bar { + self.finish_current_file(); + + if let Some(pb) = &self.file_progress_bar { pb.inc(1); // 每 10 个文件或每 0.5 秒更新一次消息,减少开销 let files_processed = pb.position(); @@ -61,7 +139,7 @@ impl ProgressTracker { /// 更新进度条消息 fn update_message(&self) { - if let Some(pb) = &self.progress_bar { + if let Some(pb) = &self.file_progress_bar { let elapsed = self.start_time.elapsed().as_secs_f64(); let total_bytes = self.bytes_read.load(Ordering::Relaxed); let total_ops = self.iops.load(Ordering::Relaxed); @@ -70,31 +148,66 @@ impl ProgressTracker { let speed_bytes_per_sec = total_bytes as f64 / elapsed; let speed_str = friendly_size(speed_bytes_per_sec as u64); let iops = total_ops as f64 / elapsed; - pb.set_message(format!("IO速度: {}/s | IOPS: {:.0}", speed_str, iops)); + + // 计算预估剩余时间 + let files_processed = pb.position(); + let files_total = pb.length().unwrap_or(0); + let files_remaining = files_total.saturating_sub(files_processed); + let avg_time_per_file = if files_processed > 0 { + elapsed / files_processed as f64 + } else { + 0.0 + }; + let eta_seconds = avg_time_per_file * files_remaining as f64; + let eta_str = if eta_seconds > 0.0 { + format!("ETA: {:.0}s", eta_seconds) + } else { + String::new() + }; + + pb.set_message(format!("IO速度: {}/s | IOPS: {:.0} | {}", speed_str, iops, eta_str)); } } } /// 完成所有处理 pub fn finish(&self, message: &str) { - if let Some(pb) = &self.progress_bar { + if let Some(pb) = &self.file_progress_bar { pb.finish_with_message(message.to_string()); } + if let Some(pb) = &self.current_file_bar { + if self.file_progress_bar.is_none() { + // 单文件模式下完成 + pb.finish_with_message(message.to_string()); + } else { + // 多文件模式下隐藏(通过设置长度为0) + pb.set_length(0); + } + } } /// 获取字节读取回调(可 move) /// 在读取过程中定期更新消息,提供更高粒度的进度反馈 pub fn bytes_callback(&self) -> impl FnMut(u64) { let bytes_read = self.bytes_read.clone(); + let current_file_bytes = self.current_file_bytes.clone(); let last_bytes = self.last_bytes.clone(); let last_update = self.last_update.clone(); let start_time = self.start_time; - let progress_bar = self.progress_bar.clone(); + let file_progress_bar = self.file_progress_bar.clone(); + let current_file_bar = self.current_file_bar.clone(); let bytes_read_clone = self.bytes_read.clone(); let iops = self.iops.clone(); move |bytes| { bytes_read.fetch_add(bytes, Ordering::Relaxed); + current_file_bytes.fetch_add(bytes, Ordering::Relaxed); + + // 更新当前文件进度条 + if let Some(pb) = ¤t_file_bar { + let current_bytes = current_file_bytes.load(Ordering::Relaxed); + pb.set_position(current_bytes); + } // 每读取 16MB(约4次缓冲区读取)或每 200ms 更新一次消息,提供更细粒度的反馈 let total_bytes = bytes_read.load(Ordering::Relaxed); @@ -109,7 +222,7 @@ impl ProgressTracker { const UPDATE_TIME_THRESHOLD: u64 = 200; // 200ms if bytes_diff >= UPDATE_BYTES_THRESHOLD || time_diff >= UPDATE_TIME_THRESHOLD { - if let Some(pb) = &progress_bar { + if let Some(pb) = &file_progress_bar { let elapsed = start_time.elapsed().as_secs_f64(); let total_bytes = bytes_read_clone.load(Ordering::Relaxed); let total_ops = iops.load(Ordering::Relaxed); @@ -118,7 +231,24 @@ impl ProgressTracker { let speed_bytes_per_sec = total_bytes as f64 / elapsed; let speed_str = friendly_size(speed_bytes_per_sec as u64); let iops_value = total_ops as f64 / elapsed; - pb.set_message(format!("IO速度: {}/s | IOPS: {:.0}", speed_str, iops_value)); + + // 计算预估剩余时间 + let files_processed = pb.position(); + let files_total = pb.length().unwrap_or(0); + let files_remaining = files_total.saturating_sub(files_processed); + let avg_time_per_file = if files_processed > 0 { + elapsed / files_processed as f64 + } else { + 0.0 + }; + let eta_seconds = avg_time_per_file * files_remaining as f64; + let eta_str = if eta_seconds > 0.0 { + format!("ETA: {:.0}s", eta_seconds) + } else { + String::new() + }; + + pb.set_message(format!("IO速度: {}/s | IOPS: {:.0} | {}", speed_str, iops_value, eta_str)); } } diff --git a/src/meta/tree.rs b/src/meta/tree.rs index ffbe0c1..095c469 100644 --- a/src/meta/tree.rs +++ b/src/meta/tree.rs @@ -71,6 +71,12 @@ impl DirSnapshot { continue; } + // 获取文件大小并开始跟踪 + let file_size = entry.metadata() + .map(|m| m.len()) + .unwrap_or(0); + tracker.start_file(file_size, &name); + let on_bytes = tracker.bytes_callback(); let on_iop = tracker.iop_callback(); let meta = FileMeta::from_path_with_callback(&full_path, on_bytes, on_iop)?; @@ -181,6 +187,12 @@ fn walk_dir_with_progress( continue; } + // 获取文件大小并开始跟踪 + let file_size = entry.metadata() + .map(|m| m.len()) + .unwrap_or(0); + tracker.start_file(file_size, &name); + let on_bytes = tracker.bytes_callback(); let on_iop = tracker.iop_callback(); let hash = calc_xxh128_with_callback(&full_path, on_bytes, on_iop)?;