faet: single file process.

This commit is contained in:
2025-12-22 05:23:04 +08:00
parent fe1f883b0d
commit 6943ac8ef5
7 changed files with 187 additions and 31 deletions

2
Cargo.lock generated
View File

@@ -236,7 +236,7 @@ dependencies = [
[[package]] [[package]]
name = "l-s" name = "l-s"
version = "0.3.4" version = "0.4.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "l-s" name = "l-s"
version = "0.3.4" version = "0.4.0"
authors = ["licsber <admin@licsber.site>"] authors = ["licsber <admin@licsber.site>"]
edition = "2021" edition = "2021"

View File

@@ -11,7 +11,7 @@ use std::time::Instant;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::Parser; use clap::Parser;
use meta::{calc_xxh128, scan_dir_xxh128, DirSnapshot, FileMeta}; use meta::{calc_xxh128_with_callback, scan_dir_xxh128, DirSnapshot, FileMeta, ProgressTracker};
fn main() -> Result<()> { fn main() -> Result<()> {
let started = Instant::now(); let started = Instant::now();
@@ -43,8 +43,18 @@ fn process_file(path: &Path) -> Result<()> {
.map(|n| n.to_string_lossy().to_string()) .map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
let save_path = meta_dir.join(format!("{basename}.json")); let save_path = meta_dir.join(format!("{basename}.json"));
// 获取文件大小
let file_size = fs::metadata(path)
.with_context(|| format!("无法读取文件信息: {}", path.display()))?
.len();
if !save_path.exists() { if !save_path.exists() {
let meta = FileMeta::from_path(path)?; let tracker = ProgressTracker::new_single_file(file_size, &basename);
let on_bytes = tracker.bytes_callback();
let on_iop = tracker.iop_callback();
let meta = FileMeta::from_path_with_callback(path, on_bytes, on_iop)?;
tracker.finish("处理完成");
let json = meta.to_pretty_json()?; let json = meta.to_pretty_json()?;
println!("{}", json); println!("{}", json);
fs::write(&save_path, json)?; fs::write(&save_path, json)?;
@@ -54,7 +64,14 @@ fn process_file(path: &Path) -> Result<()> {
let existing = File::open(&save_path) let existing = File::open(&save_path)
.with_context(|| format!("无法读取历史元数据: {}", save_path.display()))?; .with_context(|| format!("无法读取历史元数据: {}", save_path.display()))?;
let old_meta = FileMeta::from_reader(existing)?; let old_meta = FileMeta::from_reader(existing)?;
let fast_hash = calc_xxh128(path)?;
// 使用进度条计算快速哈希
let tracker = ProgressTracker::new_single_file(file_size, &basename);
let on_bytes = tracker.bytes_callback();
let on_iop = tracker.iop_callback();
let fast_hash = calc_xxh128_with_callback(path, on_bytes, on_iop)?;
tracker.finish("校验完成");
if fast_hash == old_meta.xxh128 { if fast_hash == old_meta.xxh128 {
println!("校验通过."); println!("校验通过.");
return Ok(()); return Ok(());
@@ -62,7 +79,11 @@ fn process_file(path: &Path) -> Result<()> {
println!("校验失败!"); println!("校验失败!");
println!("现校验文件:"); println!("现校验文件:");
let meta = FileMeta::from_path(path)?; let tracker = ProgressTracker::new_single_file(file_size, &basename);
let on_bytes = tracker.bytes_callback();
let on_iop = tracker.iop_callback();
let meta = FileMeta::from_path_with_callback(path, on_bytes, on_iop)?;
tracker.finish("处理完成");
println!("{}", meta.to_pretty_json()?); println!("{}", meta.to_pretty_json()?);
println!("原校验文件:"); println!("原校验文件:");
println!("{}", old_meta.to_pretty_json()?); println!("{}", old_meta.to_pretty_json()?);

View File

@@ -32,10 +32,6 @@ pub struct FileMeta {
} }
impl FileMeta { impl FileMeta {
pub fn from_path(path: &Path) -> Result<Self> {
Self::from_path_with_callback(path, |_| {}, || {})
}
pub fn from_path_with_callback<F1, F2>(path: &Path, mut on_bytes_read: F1, mut on_iop: F2) -> Result<Self> pub fn from_path_with_callback<F1, F2>(path: &Path, mut on_bytes_read: F1, mut on_iop: F2) -> Result<Self>
where where
F1: FnMut(u64), F1: FnMut(u64),
@@ -124,10 +120,6 @@ impl FileMeta {
} }
} }
pub fn calc_xxh128(path: &Path) -> Result<String> {
calc_xxh128_with_callback(path, |_| {}, || {})
}
pub fn calc_xxh128_with_callback<F1, F2>(path: &Path, mut on_bytes_read: F1, mut on_iop: F2) -> Result<String> pub fn calc_xxh128_with_callback<F1, F2>(path: &Path, mut on_bytes_read: F1, mut on_iop: F2) -> Result<String>
where where
F1: FnMut(u64), F1: FnMut(u64),

View File

@@ -2,5 +2,6 @@ mod file;
mod progress; mod progress;
mod tree; mod tree;
pub use file::{calc_xxh128, FileMeta}; pub use file::{calc_xxh128_with_callback, FileMeta};
pub use progress::ProgressTracker;
pub use tree::{scan_dir_xxh128, DirSnapshot}; pub use tree::{scan_dir_xxh128, DirSnapshot};

View File

@@ -2,14 +2,20 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use crate::utils::friendly_size; use crate::utils::friendly_size;
/// 进度跟踪器,封装进度条和 IO 统计信息 /// 进度跟踪器,封装进度条和 IO 统计信息
pub struct ProgressTracker { pub struct ProgressTracker {
progress_bar: Option<ProgressBar>, // MultiProgress 必须保持存活,否则进度条会消失
#[allow(dead_code)]
_multi: Option<MultiProgress>,
file_progress_bar: Option<ProgressBar>, // 文件数量进度条
current_file_bar: Option<ProgressBar>, // 当前文件进度条
bytes_read: Arc<AtomicU64>, bytes_read: Arc<AtomicU64>,
current_file_bytes: Arc<AtomicU64>, // 当前文件已读字节数
current_file_size: Arc<AtomicU64>, // 当前文件总大小
iops: Arc<AtomicU64>, iops: Arc<AtomicU64>,
start_time: Instant, start_time: Instant,
last_update: Arc<AtomicU64>, last_update: Arc<AtomicU64>,
@@ -19,23 +25,41 @@ pub struct ProgressTracker {
impl ProgressTracker { impl ProgressTracker {
/// 创建新的进度跟踪器 /// 创建新的进度跟踪器
pub fn new(total_files: u64, message: &str) -> Self { pub fn new(total_files: u64, message: &str) -> Self {
let progress_bar = if total_files > 0 { let (multi, file_progress_bar, current_file_bar) = if total_files > 0 {
let pb = ProgressBar::new(total_files); let multi = MultiProgress::new();
pb.set_style(
// 文件数量进度条
let file_pb = multi.add(ProgressBar::new(total_files));
file_pb.set_style(
ProgressStyle::default_bar() ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({percent}%) {msg}") .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({percent}%) {msg}")
.unwrap() .unwrap()
.progress_chars("#>-"), .progress_chars("#>-"),
); );
pb.set_message(message.to_string()); file_pb.set_message(message.to_string());
Some(pb)
// 当前文件进度条初始隐藏通过设置长度为0来隐藏
let current_pb = multi.add(ProgressBar::new(0));
current_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}")
.unwrap()
.progress_chars("=>-"),
);
current_pb.set_length(0); // 设置为0长度来隐藏
(Some(multi), Some(file_pb), Some(current_pb))
} else { } else {
None (None, None, None)
}; };
Self { Self {
progress_bar, _multi: multi,
file_progress_bar,
current_file_bar,
bytes_read: Arc::new(AtomicU64::new(0)), bytes_read: Arc::new(AtomicU64::new(0)),
current_file_bytes: Arc::new(AtomicU64::new(0)),
current_file_size: Arc::new(AtomicU64::new(0)),
iops: Arc::new(AtomicU64::new(0)), iops: Arc::new(AtomicU64::new(0)),
start_time: Instant::now(), start_time: Instant::now(),
last_update: Arc::new(AtomicU64::new(0)), last_update: Arc::new(AtomicU64::new(0)),
@@ -43,9 +67,63 @@ impl ProgressTracker {
} }
} }
/// 为单个文件创建进度跟踪器(用于 process_file
pub fn new_single_file(file_size: u64, file_name: &str) -> Self {
let multi = MultiProgress::new();
// 单个文件进度条
let current_pb = multi.add(ProgressBar::new(file_size));
current_pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.yellow} [{elapsed_precise}] [{wide_bar:.yellow/red}] {bytes}/{total_bytes} ({percent}%) [{bytes_per_sec}] ETA: {eta_precise}")
.unwrap()
.progress_chars("=>-"),
);
current_pb.set_message(format!("处理: {}", file_name));
Self {
_multi: Some(multi),
file_progress_bar: None,
current_file_bar: Some(current_pb),
bytes_read: Arc::new(AtomicU64::new(0)),
current_file_bytes: Arc::new(AtomicU64::new(0)),
current_file_size: Arc::new(AtomicU64::new(file_size)),
iops: Arc::new(AtomicU64::new(0)),
start_time: Instant::now(),
last_update: Arc::new(AtomicU64::new(0)),
last_bytes: Arc::new(AtomicU64::new(0)),
}
}
/// 开始处理新文件
pub fn start_file(&self, file_size: u64, file_name: &str) {
if let Some(pb) = &self.current_file_bar {
pb.set_length(file_size);
pb.set_position(0);
pb.set_message(format!("处理: {}", file_name));
self.current_file_size.store(file_size, Ordering::Relaxed);
self.current_file_bytes.store(0, Ordering::Relaxed);
}
}
/// 完成当前文件的处理
pub fn finish_current_file(&self) {
if let Some(pb) = &self.current_file_bar {
if let Some(_file_pb) = &self.file_progress_bar {
// 在多文件模式下隐藏当前文件进度条通过设置长度为0
pb.set_length(0);
} else {
// 单文件模式下,完成进度条
pb.finish();
}
}
}
/// 完成一个文件的处理 /// 完成一个文件的处理
pub fn finish_file(&self) { pub fn finish_file(&self) {
if let Some(pb) = &self.progress_bar { self.finish_current_file();
if let Some(pb) = &self.file_progress_bar {
pb.inc(1); pb.inc(1);
// 每 10 个文件或每 0.5 秒更新一次消息,减少开销 // 每 10 个文件或每 0.5 秒更新一次消息,减少开销
let files_processed = pb.position(); let files_processed = pb.position();
@@ -61,7 +139,7 @@ impl ProgressTracker {
/// 更新进度条消息 /// 更新进度条消息
fn update_message(&self) { fn update_message(&self) {
if let Some(pb) = &self.progress_bar { if let Some(pb) = &self.file_progress_bar {
let elapsed = self.start_time.elapsed().as_secs_f64(); let elapsed = self.start_time.elapsed().as_secs_f64();
let total_bytes = self.bytes_read.load(Ordering::Relaxed); let total_bytes = self.bytes_read.load(Ordering::Relaxed);
let total_ops = self.iops.load(Ordering::Relaxed); let total_ops = self.iops.load(Ordering::Relaxed);
@@ -70,31 +148,66 @@ impl ProgressTracker {
let speed_bytes_per_sec = total_bytes as f64 / elapsed; let speed_bytes_per_sec = total_bytes as f64 / elapsed;
let speed_str = friendly_size(speed_bytes_per_sec as u64); let speed_str = friendly_size(speed_bytes_per_sec as u64);
let iops = total_ops as f64 / elapsed; let iops = total_ops as f64 / elapsed;
pb.set_message(format!("IO速度: {}/s | IOPS: {:.0}", speed_str, iops));
// 计算预估剩余时间
let files_processed = pb.position();
let files_total = pb.length().unwrap_or(0);
let files_remaining = files_total.saturating_sub(files_processed);
let avg_time_per_file = if files_processed > 0 {
elapsed / files_processed as f64
} else {
0.0
};
let eta_seconds = avg_time_per_file * files_remaining as f64;
let eta_str = if eta_seconds > 0.0 {
format!("ETA: {:.0}s", eta_seconds)
} else {
String::new()
};
pb.set_message(format!("IO速度: {}/s | IOPS: {:.0} | {}", speed_str, iops, eta_str));
} }
} }
} }
/// 完成所有处理 /// 完成所有处理
pub fn finish(&self, message: &str) { pub fn finish(&self, message: &str) {
if let Some(pb) = &self.progress_bar { if let Some(pb) = &self.file_progress_bar {
pb.finish_with_message(message.to_string()); pb.finish_with_message(message.to_string());
} }
if let Some(pb) = &self.current_file_bar {
if self.file_progress_bar.is_none() {
// 单文件模式下完成
pb.finish_with_message(message.to_string());
} else {
// 多文件模式下隐藏通过设置长度为0
pb.set_length(0);
}
}
} }
/// 获取字节读取回调(可 move /// 获取字节读取回调(可 move
/// 在读取过程中定期更新消息,提供更高粒度的进度反馈 /// 在读取过程中定期更新消息,提供更高粒度的进度反馈
pub fn bytes_callback(&self) -> impl FnMut(u64) { pub fn bytes_callback(&self) -> impl FnMut(u64) {
let bytes_read = self.bytes_read.clone(); let bytes_read = self.bytes_read.clone();
let current_file_bytes = self.current_file_bytes.clone();
let last_bytes = self.last_bytes.clone(); let last_bytes = self.last_bytes.clone();
let last_update = self.last_update.clone(); let last_update = self.last_update.clone();
let start_time = self.start_time; let start_time = self.start_time;
let progress_bar = self.progress_bar.clone(); let file_progress_bar = self.file_progress_bar.clone();
let current_file_bar = self.current_file_bar.clone();
let bytes_read_clone = self.bytes_read.clone(); let bytes_read_clone = self.bytes_read.clone();
let iops = self.iops.clone(); let iops = self.iops.clone();
move |bytes| { move |bytes| {
bytes_read.fetch_add(bytes, Ordering::Relaxed); bytes_read.fetch_add(bytes, Ordering::Relaxed);
current_file_bytes.fetch_add(bytes, Ordering::Relaxed);
// 更新当前文件进度条
if let Some(pb) = &current_file_bar {
let current_bytes = current_file_bytes.load(Ordering::Relaxed);
pb.set_position(current_bytes);
}
// 每读取 16MB约4次缓冲区读取或每 200ms 更新一次消息,提供更细粒度的反馈 // 每读取 16MB约4次缓冲区读取或每 200ms 更新一次消息,提供更细粒度的反馈
let total_bytes = bytes_read.load(Ordering::Relaxed); let total_bytes = bytes_read.load(Ordering::Relaxed);
@@ -109,7 +222,7 @@ impl ProgressTracker {
const UPDATE_TIME_THRESHOLD: u64 = 200; // 200ms const UPDATE_TIME_THRESHOLD: u64 = 200; // 200ms
if bytes_diff >= UPDATE_BYTES_THRESHOLD || time_diff >= UPDATE_TIME_THRESHOLD { if bytes_diff >= UPDATE_BYTES_THRESHOLD || time_diff >= UPDATE_TIME_THRESHOLD {
if let Some(pb) = &progress_bar { if let Some(pb) = &file_progress_bar {
let elapsed = start_time.elapsed().as_secs_f64(); let elapsed = start_time.elapsed().as_secs_f64();
let total_bytes = bytes_read_clone.load(Ordering::Relaxed); let total_bytes = bytes_read_clone.load(Ordering::Relaxed);
let total_ops = iops.load(Ordering::Relaxed); let total_ops = iops.load(Ordering::Relaxed);
@@ -118,7 +231,24 @@ impl ProgressTracker {
let speed_bytes_per_sec = total_bytes as f64 / elapsed; let speed_bytes_per_sec = total_bytes as f64 / elapsed;
let speed_str = friendly_size(speed_bytes_per_sec as u64); let speed_str = friendly_size(speed_bytes_per_sec as u64);
let iops_value = total_ops as f64 / elapsed; let iops_value = total_ops as f64 / elapsed;
pb.set_message(format!("IO速度: {}/s | IOPS: {:.0}", speed_str, iops_value));
// 计算预估剩余时间
let files_processed = pb.position();
let files_total = pb.length().unwrap_or(0);
let files_remaining = files_total.saturating_sub(files_processed);
let avg_time_per_file = if files_processed > 0 {
elapsed / files_processed as f64
} else {
0.0
};
let eta_seconds = avg_time_per_file * files_remaining as f64;
let eta_str = if eta_seconds > 0.0 {
format!("ETA: {:.0}s", eta_seconds)
} else {
String::new()
};
pb.set_message(format!("IO速度: {}/s | IOPS: {:.0} | {}", speed_str, iops_value, eta_str));
} }
} }

View File

@@ -71,6 +71,12 @@ impl DirSnapshot {
continue; continue;
} }
// 获取文件大小并开始跟踪
let file_size = entry.metadata()
.map(|m| m.len())
.unwrap_or(0);
tracker.start_file(file_size, &name);
let on_bytes = tracker.bytes_callback(); let on_bytes = tracker.bytes_callback();
let on_iop = tracker.iop_callback(); let on_iop = tracker.iop_callback();
let meta = FileMeta::from_path_with_callback(&full_path, on_bytes, on_iop)?; let meta = FileMeta::from_path_with_callback(&full_path, on_bytes, on_iop)?;
@@ -181,6 +187,12 @@ fn walk_dir_with_progress(
continue; continue;
} }
// 获取文件大小并开始跟踪
let file_size = entry.metadata()
.map(|m| m.len())
.unwrap_or(0);
tracker.start_file(file_size, &name);
let on_bytes = tracker.bytes_callback(); let on_bytes = tracker.bytes_callback();
let on_iop = tracker.iop_callback(); let on_iop = tracker.iop_callback();
let hash = calc_xxh128_with_callback(&full_path, on_bytes, on_iop)?; let hash = calc_xxh128_with_callback(&full_path, on_bytes, on_iop)?;