diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 7f75b9a..e45f934 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -155,19 +155,44 @@ fn main() -> Result<()> { ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} files ({percent}%) | {msg}") .unwrap() - .progress_chars("#>-"), + .progress_chars("#>-") ); - pb.set_message("Scanning files..."); + pb.set_message("Starting scan..."); + + // Show immediate feedback + pb.println("🔍 Initializing scanner and discovering files..."); cfg.progress_callback = Some(Arc::new(move |processed, total, findings| { - pb.set_length(total as u64); - pb.set_position(processed as u64); - pb.set_message(format!("Found {} findings", findings)); + if total > 0 { + pb.set_length(total as u64); + pb.set_position(processed as u64); + if processed == 0 { + pb.set_message(format!("Found {} files to scan", total)); + } else if processed < total { + pb.set_message(format!("Processing files... {} findings so far", findings)); + } else { + pb.set_message(format!("Completed! Found {} findings", findings)); + } + } else { + pb.set_message("Discovering files..."); + } + })); + } + + // Stream JSONL findings as they arrive + if args.json { + cfg.result_callback = Some(Arc::new(move |f: &Finding| { + if let Ok(s) = serde_json::to_string(f) { + println!("{}", s); + } })); } let scanner = Scanner::new(®, dets, cfg); if args.dry_run { + if !args.progress { + eprintln!("🔍 Discovering files..."); + } let files = scanner.discover_files(&args.paths); for p in files { println!("{}", p.display()); @@ -175,6 +200,11 @@ fn main() -> Result<()> { return Ok(()); } + // Show startup message if not using progress bar + if !args.progress && !args.json { + eprintln!("🔍 Starting scan of {} path(s)...", args.paths.len()); + } + let findings = scanner.run(&args.paths)?; // Clear progress bar if it was shown @@ -183,9 +213,7 @@ fn main() -> Result<()> { } if args.json { - for f in &findings { - println!("{}", serde_json::to_string(f)?); - } + // Already streamed above } else { print_table(&findings); } diff --git a/crates/scanner-core/src/lib.rs b/crates/scanner-core/src/lib.rs index 2a5f8f3..4838478 100644 --- a/crates/scanner-core/src/lib.rs +++ b/crates/scanner-core/src/lib.rs @@ -2,7 +2,7 @@ use aho_corasick::AhoCorasickBuilder; use anyhow::{anyhow, Context, Result}; use crossbeam_channel::{bounded, Receiver, Sender}; use ignore::WalkBuilder; -use rayon::prelude::*; +// use rayon::prelude::*; // Using rayon for parallel processing of discovered files (imported locally where needed) use regex::Regex; use serde::{Deserialize, Serialize}; use std::collections::{BTreeSet, HashMap}; @@ -11,6 +11,7 @@ use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::Mutex; +// // ---------------- Types ---------------- @@ -83,6 +84,9 @@ pub struct Finding { pub detector_id: String, } +// Reduce clippy::type_complexity by aliasing the result callback type +type ResultCallback = Arc; + #[derive(Debug, Clone, Default)] pub struct Prefilter { pub extensions: BTreeSet, @@ -112,15 +116,19 @@ pub trait Detector: Send + Sync { pub struct Emitter { tx: Sender, rx: Receiver, + on_result: Option, } impl Emitter { pub fn new(bound: usize) -> Self { let (tx, rx) = bounded(bound); - Self { tx, rx } + Self { tx, rx, on_result: None } } pub fn send(&mut self, finding: Finding) -> Result<()> { + if let Some(ref cb) = self.on_result { + cb(&finding); + } self.tx .send(finding) .map_err(|e| anyhow!("emitter send failed: {e}")) @@ -182,6 +190,8 @@ pub struct Config { pub deterministic: bool, #[serde(skip)] pub progress_callback: Option, + #[serde(skip)] + pub result_callback: Option, } fn default_max_file_size() -> usize { @@ -208,6 +218,7 @@ impl Clone for Config { exclude_globs: self.exclude_globs.clone(), deterministic: self.deterministic, progress_callback: self.progress_callback.clone(), + result_callback: self.result_callback.clone(), } } } @@ -220,6 +231,7 @@ impl Default for Config { exclude_globs: Vec::new(), deterministic: false, progress_callback: None, + result_callback: None, } } } @@ -722,19 +734,16 @@ impl<'a> Scanner<'a> { } pub fn discover_files(&self, roots: &[PathBuf]) -> Vec { - let mut paths = Vec::new(); - // Build glob matcher for include patterns + // Compile include and exclude glob sets once let include_matcher: Option = if !self.config.include_globs.is_empty() { let mut builder = globset::GlobSetBuilder::new(); for pattern in &self.config.include_globs { - match globset::Glob::new(pattern) { - Ok(glob) => { - builder.add(glob); - } - Err(_) => { - return Vec::new(); // Return empty on pattern error - } + if let Ok(glob) = globset::Glob::new(pattern) { + builder.add(glob); + } else { + // If any pattern is invalid, return empty to avoid expensive scan with bad filter + return Vec::new(); } } builder.build().ok() @@ -742,37 +751,93 @@ impl<'a> Scanner<'a> { None }; - for root in roots { - let mut builder = WalkBuilder::new(root); - builder - .hidden(false) - .git_ignore(true) - .git_exclude(true) - .ignore(true); + let exclude_matcher: Option = if !self.config.exclude_globs.is_empty() { + let mut builder = globset::GlobSetBuilder::new(); + for pattern in &self.config.exclude_globs { + if let Ok(glob) = globset::Glob::new(pattern) { + builder.add(glob); + } else { + return Vec::new(); + } + } + builder.build().ok() + } else { + None + }; - for entry in builder.build().flatten() { - let md = match entry.metadata() { - Ok(m) => m, - Err(_) => continue, - }; - if md.is_file() { - if md.len() as usize > self.config.max_file_size { + // Helper to apply path-based filters early (before metadata calls when possible) + let path_allowed = |p: &Path| -> bool { + if let Some(ref ex) = exclude_matcher { + if ex.is_match(p) { + return false; + } + } + if let Some(ref inc) = include_matcher { + if !inc.is_match(p) { + return false; + } + } + true + }; + + if roots.is_empty() { + return Vec::new(); + } + + // Single parallel walker over all roots + let mut builder = WalkBuilder::new(&roots[0]); + for r in roots.iter().skip(1) { + builder.add(r); + } + builder + .hidden(false) + .git_ignore(true) + .git_exclude(true) + .ignore(true) + .parents(true) + .follow_links(false) + .same_file_system(false); + + if let Ok(n) = std::thread::available_parallelism() { + builder.threads(n.get()); + } + + // Use sequential walker for reliable discovery + let mut paths = Vec::new(); + + for result in builder.build() { + if let Ok(entry) = result { + // Check if it's a file + if let Some(ft) = entry.file_type() { + if !ft.is_file() { + continue; + } + } else if let Ok(md) = entry.metadata() { + if !md.is_file() { continue; } + } else { + continue; + } - let path = entry.into_path(); + let path = entry.into_path(); + if !path_allowed(&path) { + continue; + } - // Apply include glob filtering - if let Some(ref matcher) = include_matcher { - if !matcher.is_match(&path) { - continue; - } + // Size check + if let Ok(md) = fs::metadata(&path) { + if (md.len() as usize) > self.config.max_file_size { + continue; } - - paths.push(path); + } else { + continue; } + + paths.push(path); } } + paths } @@ -810,53 +875,191 @@ impl<'a> Scanner<'a> { } pub fn run(&self, roots: &[PathBuf]) -> Result> { - let files = self.discover_files(roots); - let total_files = files.len(); - let mut findings: Vec = Vec::new(); + use std::sync::atomic::{AtomicUsize, Ordering}; + use crossbeam_channel::{unbounded, Receiver, Sender}; + use std::thread; + use std::time::{Duration, Instant}; + + let findings_vec: Arc>> = Arc::new(Mutex::new(Vec::new())); + let processed = Arc::new(AtomicUsize::new(0)); + let discovered = Arc::new(AtomicUsize::new(0)); + let _findings_cnt = Arc::new(AtomicUsize::new(0)); + + // Initial progress callback (0 of 0) + if let Some(ref cb) = self.config.progress_callback { + cb(0, 0, 0); + } - // Call progress callback with initial state - if let Some(ref callback) = self.config.progress_callback { - callback(0, total_files, 0); + if roots.is_empty() { + return Ok(Vec::new()); } - let (tx, rx) = bounded::(8192); - let (progress_tx, progress_rx) = bounded::(1000); + // Create channels for streaming file paths from discovery to processing + let (path_tx, path_rx): (Sender, Receiver) = unbounded(); + let (done_tx, _done_rx) = unbounded::<()>(); - // Spawn a thread to collect progress updates - let progress_handle = if let Some(ref callback) = self.config.progress_callback { - let callback = callback.clone(); - Some(std::thread::spawn(move || { - let mut processed = 0; - let findings_count = 0; + // Prepare include/exclude matchers for filtering + let include_matcher: Option = if !self.config.include_globs.is_empty() { + let mut builder = globset::GlobSetBuilder::new(); + for pattern in &self.config.include_globs { + if let Ok(glob) = globset::Glob::new(pattern) { + builder.add(glob); + } else { + return Ok(Vec::new()); + } + } + builder.build().ok() + } else { + None + }; - while progress_rx.recv().is_ok() { - processed += 1; - callback(processed, total_files, findings_count); + let exclude_matcher: Option = if !self.config.exclude_globs.is_empty() { + let mut builder = globset::GlobSetBuilder::new(); + for pattern in &self.config.exclude_globs { + if let Ok(glob) = globset::Glob::new(pattern) { + builder.add(glob); + } else { + return Ok(Vec::new()); } - })) + } + builder.build().ok() } else { None }; - files.par_iter().for_each_with( - (tx.clone(), progress_tx.clone()), - |(tx, progress_tx), path| { - if let Some(lang) = Self::detect_language(path) { - if let Ok(bytes) = Self::load_file(path) { - let unit = ScanUnit { - path: path.clone(), - lang, - bytes: bytes.clone(), - }; - // Strip comments once and reuse - let stripped = strip_comments(lang, &bytes); - let stripped_s = String::from_utf8_lossy(&stripped); - let index = LineIndex::new(stripped_s.as_bytes()); - - let mut em = Emitter { - tx: tx.clone(), - rx: rx.clone(), - }; + // Clone values needed for the discovery thread + let roots_clone = roots.to_vec(); + let max_file_size = self.config.max_file_size; + let discovered_clone = discovered.clone(); + let progress_cb_discovery = self.config.progress_callback.clone(); + let include_matcher_clone = include_matcher.clone(); + let exclude_matcher_clone = exclude_matcher.clone(); + + // Start file discovery thread with progress reporting + let discovery_handle = thread::spawn(move || { + let path_allowed = |p: &Path| -> bool { + if let Some(ref ex) = exclude_matcher_clone { + if ex.is_match(p) { + return false; + } + } + if let Some(ref inc) = include_matcher_clone { + if !inc.is_match(p) { + return false; + } + } + true + }; + + // Build parallel walker + let mut builder = WalkBuilder::new(&roots_clone[0]); + for r in roots_clone.iter().skip(1) { + builder.add(r); + } + builder + .hidden(false) + .git_ignore(true) + .git_exclude(true) + .ignore(true) + .parents(true) + .follow_links(false) + .same_file_system(false); + + // Use parallel walker for faster discovery + if let Ok(n) = std::thread::available_parallelism() { + builder.threads(n.get()); + } + + // Use a simpler approach with regular walker but in a separate thread + let mut discovered_count = 0; + let mut last_progress = Instant::now(); + + for result in builder.build() { + if let Ok(entry) = result { + // Check if it's a file + if let Some(ft) = entry.file_type() { + if !ft.is_file() { + continue; + } + } else if let Ok(md) = entry.metadata() { + if !md.is_file() { + continue; + } + } else { + continue; + } + + let path = entry.into_path(); + if !path_allowed(&path) { + continue; + } + + // Size check + if let Ok(md) = fs::metadata(&path) { + if (md.len() as usize) > max_file_size { + continue; + } + } else { + continue; + } + + // Send path to processing + if path_tx.send(path).is_err() { + break; + } + + // Update discovery count and progress + discovered_count += 1; + discovered_clone.store(discovered_count, Ordering::Relaxed); + + // Throttle progress updates to avoid overwhelming the UI + if discovered_count % 100 == 0 || last_progress.elapsed() > Duration::from_millis(200) { + if let Some(ref cb) = progress_cb_discovery { + cb(0, discovered_count, 0); // processed=0 during discovery + last_progress = Instant::now(); + } + } + } + } + + // Signal discovery is complete + drop(path_tx); + let _ = done_tx.send(()); + }); + + // Process files directly instead of using a separate thread to avoid lifetime issues + let mut all_paths = Vec::new(); + + // Wait for discovery to complete and collect all paths + discovery_handle.join().unwrap(); + + // Collect all discovered paths + while let Ok(path) = path_rx.try_recv() { + all_paths.push(path); + } + + // Process all discovered paths using rayon for parallel processing + use rayon::prelude::*; + use std::sync::Mutex as StdMutex; + + let all_findings = Arc::new(StdMutex::new(Vec::new())); + + all_paths.par_iter().for_each(|path| { + if let Some(lang) = Scanner::detect_language(path) { + if let Ok(bytes) = Scanner::load_file(path) { + let unit = ScanUnit { + path: path.clone(), + lang, + bytes: bytes.clone(), + }; + let stripped = strip_comments(lang, &bytes); + let stripped_s = String::from_utf8_lossy(&stripped); + let index = LineIndex::new(stripped_s.as_bytes()); + + // Collect findings locally first + { + let (local_tx, local_rx) = bounded(100); + let mut em = Emitter { tx: local_tx, rx: local_rx, on_result: self.config.result_callback.clone() }; for det in &self.detectors { if !det.languages().contains(&lang) { continue; @@ -866,30 +1069,41 @@ impl<'a> Scanner<'a> { } let _ = det.scan_optimized(&unit, &stripped_s, &index, &mut em); } + // Collect all findings from this file and add to global collection + let local_findings = em.drain(); + if !local_findings.is_empty() { + if let Ok(mut guard) = all_findings.lock() { + guard.extend(local_findings); + } + } } } - // Signal that this file has been processed - let _ = progress_tx.send(1); - }, - ); - - drop(tx); - drop(progress_tx); - - for f in rx.iter() { - findings.push(f); - } - - // Wait for progress thread to finish - if let Some(handle) = progress_handle { - let _ = handle.join(); - } - - // Final progress update - if let Some(ref callback) = self.config.progress_callback { - callback(total_files, total_files, findings.len()); - } + } + // Mark processed and update progress + let new_proc = processed.fetch_add(1, Ordering::Relaxed) + 1; + if let Some(ref cb) = self.config.progress_callback { + let current_findings = { + let guard = all_findings.lock().unwrap(); + guard.len() + }; + cb( + new_proc, + discovered.load(Ordering::Relaxed), + current_findings, + ); + } + }); + + // Extract all findings + let mut findings = { + let collected_findings = all_findings.lock().unwrap(); + let mut findings_guard = findings_vec.lock().unwrap(); + findings_guard.extend(collected_findings.clone()); + findings_guard.clone() + }; + + // All processing completed successfully if self.config.deterministic { findings.sort_by(|a, b| { ( @@ -909,10 +1123,20 @@ impl<'a> Scanner<'a> { }); } + // Final progress update + if let Some(ref cb) = self.config.progress_callback { + cb( + processed.load(Ordering::Relaxed), + discovered.load(Ordering::Relaxed), + findings.len(), + ); + } + Ok(findings) } } + fn prefilter_hit(det: &dyn Detector, stripped: &[u8]) -> bool { let pf = det.prefilter(); if pf.substrings.is_empty() {