From 667dfeb8bd90e87175b8d5c91379afcdf3faecac Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:07:29 +0000 Subject: [PATCH 01/10] Add files_opened and files_scanned metrics to FileStreamMetrics Track file-level progress in FileStream with two new counters: - files_opened: incremented when a file is successfully opened - files_scanned: incremented when a file's reader stream is fully consumed Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_stream.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index c8090382094ef..64da87d3da6de 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -124,6 +124,7 @@ impl FileStream { } FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { + self.file_stream_metrics.files_opened.add(1); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); let next = self.start_next_file().transpose(); @@ -223,6 +224,7 @@ impl FileStream { } } None => { + self.file_stream_metrics.files_scanned.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); @@ -399,6 +401,10 @@ pub struct FileStreamMetrics { /// If using `OnError::Skip` this will provide a count of the number of files /// which were skipped and will not be included in the scan results. pub file_scan_errors: Count, + /// Count of files successfully opened. + pub files_opened: Count, + /// Count of files completely scanned (reader stream fully consumed). + pub files_scanned: Count, } impl FileStreamMetrics { @@ -433,6 +439,12 @@ impl FileStreamMetrics { let file_scan_errors = MetricBuilder::new(metrics).counter("file_scan_errors", partition); + let files_opened = + MetricBuilder::new(metrics).counter("files_opened", partition); + + let files_scanned = + MetricBuilder::new(metrics).counter("files_scanned", partition); + Self { time_opening, time_scanning_until_data, @@ -440,6 +452,8 @@ impl FileStreamMetrics { time_processing, file_open_errors, file_scan_errors, + files_opened, + files_scanned, } } } From 20115ebbdb4c8261d633c05bb39a6671c3548ce3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:11:22 +0000 Subject: [PATCH 02/10] fmt --- datafusion/datasource/src/file_stream.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 64da87d3da6de..c13b4a1d39b5e 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -439,8 +439,7 @@ impl FileStreamMetrics { let file_scan_errors = MetricBuilder::new(metrics).counter("file_scan_errors", partition); - let files_opened = - MetricBuilder::new(metrics).counter("files_opened", partition); + let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); let files_scanned = MetricBuilder::new(metrics).counter("files_scanned", partition); From 8809b35acd53c208f69a49854b1d9768d738e488 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 1 Mar 2026 16:40:14 +0100 Subject: [PATCH 03/10] Rename files_opened to files_processed and account for limit Rename `files_opened` metric to `files_processed` so it reflects all files assigned to the partition, not just those that were opened. When a LIMIT terminates the stream early, the remaining files (including any prefetched next file) are counted so that `files_processed` always equals the total partition file count. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_stream.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index c13b4a1d39b5e..1737f3e1d5f94 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -124,7 +124,7 @@ impl FileStream { } FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { - self.file_stream_metrics.files_opened.add(1); + self.file_stream_metrics.files_processed.add(1); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); let next = self.start_next_file().transpose(); @@ -180,6 +180,13 @@ impl FileStream { batch } else { let batch = batch.slice(0, *remain); + // Count the prefetched next file (if any) and + // all remaining files we will never open. + let unprocessed = self.file_iter.len() + + usize::from(next.is_some()); + self.file_stream_metrics + .files_processed + .add(unprocessed); self.state = FileStreamState::Limit; *remain = 0; batch @@ -401,8 +408,10 @@ pub struct FileStreamMetrics { /// If using `OnError::Skip` this will provide a count of the number of files /// which were skipped and will not be included in the scan results. pub file_scan_errors: Count, - /// Count of files successfully opened. - pub files_opened: Count, + /// Count of files processed (opened, pruned, or skipped due to limit). + /// When the stream completes, this equals the total number of files + /// assigned to this partition. + pub files_processed: Count, /// Count of files completely scanned (reader stream fully consumed). pub files_scanned: Count, } @@ -439,7 +448,8 @@ impl FileStreamMetrics { let file_scan_errors = MetricBuilder::new(metrics).counter("file_scan_errors", partition); - let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); + let files_processed = + MetricBuilder::new(metrics).counter("files_processed", partition); let files_scanned = MetricBuilder::new(metrics).counter("files_scanned", partition); @@ -451,7 +461,7 @@ impl FileStreamMetrics { time_processing, file_open_errors, file_scan_errors, - files_opened, + files_processed, files_scanned, } } From 93d19721689bd0293625a59d02fef1a3ada21243 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 2 Mar 2026 16:32:34 +0100 Subject: [PATCH 04/10] Increment files_processed on completion, not on open Move the files_processed metric so it only increments when we are truly done with a file (consumed, errored+skipped, or limit), not at file-open time. This makes the metric semantics match the name. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_stream.rs | 48 +++++++++++++----------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 1737f3e1d5f94..94521a45db4fe 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -124,7 +124,6 @@ impl FileStream { } FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { - self.file_stream_metrics.files_processed.add(1); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); let next = self.start_next_file().transpose(); @@ -151,6 +150,7 @@ impl FileStream { self.file_stream_metrics.file_open_errors.add(1); match self.on_error { OnError::Skip => { + self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.time_opening.stop(); self.state = FileStreamState::Idle } @@ -180,13 +180,15 @@ impl FileStream { batch } else { let batch = batch.slice(0, *remain); - // Count the prefetched next file (if any) and - // all remaining files we will never open. - let unprocessed = self.file_iter.len() + // Count this file, the prefetched next file + // (if any), and all remaining files we will + // never open. + let done = 1 + + self.file_iter.len() + usize::from(next.is_some()); self.file_stream_metrics .files_processed - .add(unprocessed); + .add(done); self.state = FileStreamState::Limit; *remain = 0; batch @@ -204,26 +206,29 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error - OnError::Skip => match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = - FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin(std::future::ready( - reader, - )), + OnError::Skip => { + self.file_stream_metrics.files_processed.add(1); + match mem::take(next) { + Some(future) => { + self.file_stream_metrics.time_opening.start(); + + match future { + NextOpen::Pending(future) => { + self.state = + FileStreamState::Open { future } + } + NextOpen::Ready(reader) => { + self.state = FileStreamState::Open { + future: Box::pin( + std::future::ready(reader), + ), + } } } } + None => return Poll::Ready(None), } - None => return Poll::Ready(None), - }, + } OnError::Fail => { self.state = FileStreamState::Error; return Poll::Ready(Some(Err(err))); @@ -231,6 +236,7 @@ impl FileStream { } } None => { + self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.files_scanned.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); From b81ded0b6cd44f6e34be43ba792050b8b8eda0dc Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 2 Mar 2026 16:53:10 +0100 Subject: [PATCH 05/10] Add files_opened metric to FileStreamMetrics Incremented when a file is successfully opened, complementing the existing files_processed (done with file) and files_scanned (fully consumed) metrics. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_stream.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 94521a45db4fe..fcf0864a3a670 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -124,6 +124,7 @@ impl FileStream { } FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { + self.file_stream_metrics.files_opened.add(1); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); let next = self.start_next_file().transpose(); @@ -414,6 +415,8 @@ pub struct FileStreamMetrics { /// If using `OnError::Skip` this will provide a count of the number of files /// which were skipped and will not be included in the scan results. pub file_scan_errors: Count, + /// Count of files successfully opened. + pub files_opened: Count, /// Count of files processed (opened, pruned, or skipped due to limit). /// When the stream completes, this equals the total number of files /// assigned to this partition. @@ -454,6 +457,9 @@ impl FileStreamMetrics { let file_scan_errors = MetricBuilder::new(metrics).counter("file_scan_errors", partition); + let files_opened = + MetricBuilder::new(metrics).counter("files_opened", partition); + let files_processed = MetricBuilder::new(metrics).counter("files_processed", partition); @@ -467,6 +473,7 @@ impl FileStreamMetrics { time_processing, file_open_errors, file_scan_errors, + files_opened, files_processed, files_scanned, } From cac97634410727ceb4de1dab071b95cffe90787e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 2 Mar 2026 17:01:22 +0100 Subject: [PATCH 06/10] Rename files_processed to files_closed Pairs better with files_opened and avoids confusion with files_scanned. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_stream.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index fcf0864a3a670..80a0605a52c60 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -151,7 +151,7 @@ impl FileStream { self.file_stream_metrics.file_open_errors.add(1); match self.on_error { OnError::Skip => { - self.file_stream_metrics.files_processed.add(1); + self.file_stream_metrics.files_closed.add(1); self.file_stream_metrics.time_opening.stop(); self.state = FileStreamState::Idle } @@ -188,7 +188,7 @@ impl FileStream { + self.file_iter.len() + usize::from(next.is_some()); self.file_stream_metrics - .files_processed + .files_closed .add(done); self.state = FileStreamState::Limit; *remain = 0; @@ -208,7 +208,7 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error OnError::Skip => { - self.file_stream_metrics.files_processed.add(1); + self.file_stream_metrics.files_closed.add(1); match mem::take(next) { Some(future) => { self.file_stream_metrics.time_opening.start(); @@ -237,7 +237,7 @@ impl FileStream { } } None => { - self.file_stream_metrics.files_processed.add(1); + self.file_stream_metrics.files_closed.add(1); self.file_stream_metrics.files_scanned.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); @@ -417,10 +417,10 @@ pub struct FileStreamMetrics { pub file_scan_errors: Count, /// Count of files successfully opened. pub files_opened: Count, - /// Count of files processed (opened, pruned, or skipped due to limit). + /// Count of files closed (opened, pruned, or skipped due to limit). /// When the stream completes, this equals the total number of files /// assigned to this partition. - pub files_processed: Count, + pub files_closed: Count, /// Count of files completely scanned (reader stream fully consumed). pub files_scanned: Count, } @@ -460,8 +460,8 @@ impl FileStreamMetrics { let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); - let files_processed = - MetricBuilder::new(metrics).counter("files_processed", partition); + let files_closed = + MetricBuilder::new(metrics).counter("files_closed", partition); let files_scanned = MetricBuilder::new(metrics).counter("files_scanned", partition); @@ -474,7 +474,7 @@ impl FileStreamMetrics { file_open_errors, file_scan_errors, files_opened, - files_processed, + files_closed, files_scanned, } } From 1c7edf6e81ac8c6646d9283769296ce0213df6b4 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 2 Mar 2026 17:09:26 +0100 Subject: [PATCH 07/10] fmt --- datafusion/datasource/src/file_stream.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 80a0605a52c60..a27b3d5b5b9b3 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -187,9 +187,7 @@ impl FileStream { let done = 1 + self.file_iter.len() + usize::from(next.is_some()); - self.file_stream_metrics - .files_closed - .add(done); + self.file_stream_metrics.files_closed.add(done); self.state = FileStreamState::Limit; *remain = 0; batch @@ -457,11 +455,9 @@ impl FileStreamMetrics { let file_scan_errors = MetricBuilder::new(metrics).counter("file_scan_errors", partition); - let files_opened = - MetricBuilder::new(metrics).counter("files_opened", partition); + let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); - let files_closed = - MetricBuilder::new(metrics).counter("files_closed", partition); + let files_closed = MetricBuilder::new(metrics).counter("files_closed", partition); let files_scanned = MetricBuilder::new(metrics).counter("files_scanned", partition); From 555cc33c04303c0ecdb6006b6b7a6763270b3ea8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 3 Mar 2026 07:37:04 +0100 Subject: [PATCH 08/10] remove files_scanned --- datafusion/datasource/src/file_stream.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index a27b3d5b5b9b3..bd45bfdc52f15 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -236,7 +236,6 @@ impl FileStream { } None => { self.file_stream_metrics.files_closed.add(1); - self.file_stream_metrics.files_scanned.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); @@ -419,8 +418,6 @@ pub struct FileStreamMetrics { /// When the stream completes, this equals the total number of files /// assigned to this partition. pub files_closed: Count, - /// Count of files completely scanned (reader stream fully consumed). - pub files_scanned: Count, } impl FileStreamMetrics { @@ -459,9 +456,6 @@ impl FileStreamMetrics { let files_closed = MetricBuilder::new(metrics).counter("files_closed", partition); - let files_scanned = - MetricBuilder::new(metrics).counter("files_scanned", partition); - Self { time_opening, time_scanning_until_data, @@ -471,7 +465,6 @@ impl FileStreamMetrics { file_scan_errors, files_opened, files_closed, - files_scanned, } } } From 4674ee246b3b14ef5923c8b682c749211de58177 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 3 Mar 2026 07:41:50 +0100 Subject: [PATCH 09/10] Rename files_closed to files_processed in metrics Add docstrings --- datafusion/datasource/src/file_stream.rs | 32 ++++++++++++++++-------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index bd45bfdc52f15..958f3f3bfa20f 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -151,7 +151,7 @@ impl FileStream { self.file_stream_metrics.file_open_errors.add(1); match self.on_error { OnError::Skip => { - self.file_stream_metrics.files_closed.add(1); + self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.time_opening.stop(); self.state = FileStreamState::Idle } @@ -187,7 +187,7 @@ impl FileStream { let done = 1 + self.file_iter.len() + usize::from(next.is_some()); - self.file_stream_metrics.files_closed.add(done); + self.file_stream_metrics.files_processed.add(done); self.state = FileStreamState::Limit; *remain = 0; batch @@ -206,7 +206,7 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error OnError::Skip => { - self.file_stream_metrics.files_closed.add(1); + self.file_stream_metrics.files_processed.add(1); match mem::take(next) { Some(future) => { self.file_stream_metrics.time_opening.start(); @@ -235,7 +235,7 @@ impl FileStream { } } None => { - self.file_stream_metrics.files_closed.add(1); + self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); @@ -412,12 +412,22 @@ pub struct FileStreamMetrics { /// If using `OnError::Skip` this will provide a count of the number of files /// which were skipped and will not be included in the scan results. pub file_scan_errors: Count, - /// Count of files successfully opened. + /// Count of files successfully opened or evaluated for processing. + /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal + /// to the total number of files in the query; unless the query itself fails. + /// This value will always be greater than or equal to `files_open`. + /// Note that this value does *not* mean the file was actually scanned. + /// We increment this value for any processing of a file, even if that processing is + /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time). pub files_opened: Count, - /// Count of files closed (opened, pruned, or skipped due to limit). - /// When the stream completes, this equals the total number of files - /// assigned to this partition. - pub files_closed: Count, + /// Count of files completely processed / closed (opened, pruned, or skipped due to limit). + /// At t=0 (the beginning of a query) this is 0. + /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal + /// to the total number of files in the query; unless the query itself fails. + /// This value will always be less than or equal to `files_open`. + /// We increment this value for any processing of a file, even if that processing is + /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time). + pub files_processed: Count, } impl FileStreamMetrics { @@ -454,7 +464,7 @@ impl FileStreamMetrics { let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); - let files_closed = MetricBuilder::new(metrics).counter("files_closed", partition); + let files_processed = MetricBuilder::new(metrics).counter("files_processed", partition); Self { time_opening, @@ -464,7 +474,7 @@ impl FileStreamMetrics { file_open_errors, file_scan_errors, files_opened, - files_closed, + files_processed, } } } From 401635476cd1be0cf140c42bd1c2fd756c4ea881 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 3 Mar 2026 07:45:39 +0100 Subject: [PATCH 10/10] fmt --- datafusion/datasource/src/file_stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 958f3f3bfa20f..514a7e0a0bead 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -187,7 +187,9 @@ impl FileStream { let done = 1 + self.file_iter.len() + usize::from(next.is_some()); - self.file_stream_metrics.files_processed.add(done); + self.file_stream_metrics + .files_processed + .add(done); self.state = FileStreamState::Limit; *remain = 0; batch @@ -464,7 +466,8 @@ impl FileStreamMetrics { let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); - let files_processed = MetricBuilder::new(metrics).counter("files_processed", partition); + let files_processed = + MetricBuilder::new(metrics).counter("files_processed", partition); Self { time_opening,