Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions datafusion/datasource/src/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl FileStream {
}
FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
self.file_stream_metrics.files_processed.add(1);
// include time needed to start opening in `start_next_file`
self.file_stream_metrics.time_opening.stop();
let next = self.start_next_file().transpose();
Expand Down Expand Up @@ -179,6 +180,13 @@ impl FileStream {
batch
} else {
let batch = batch.slice(0, *remain);
// Count the prefetched next file (if any) and
// all remaining files we will never open.
let unprocessed = self.file_iter.len()
+ usize::from(next.is_some());
self.file_stream_metrics
.files_processed
.add(unprocessed);
self.state = FileStreamState::Limit;
*remain = 0;
batch
Expand Down Expand Up @@ -223,6 +231,7 @@ impl FileStream {
}
}
None => {
self.file_stream_metrics.files_scanned.add(1);
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();

Expand Down Expand Up @@ -399,6 +408,12 @@ pub struct FileStreamMetrics {
/// If using `OnError::Skip` this will provide a count of the number of files
/// which were skipped and will not be included in the scan results.
pub file_scan_errors: Count,
/// Count of files processed (opened, pruned, or skipped due to limit).
/// When the stream completes, this equals the total number of files
/// assigned to this partition.
pub files_processed: Count,
/// Count of files completely scanned (reader stream fully consumed).
pub files_scanned: Count,
}

impl FileStreamMetrics {
Expand Down Expand Up @@ -433,13 +448,21 @@ impl FileStreamMetrics {
let file_scan_errors =
MetricBuilder::new(metrics).counter("file_scan_errors", partition);

let files_processed =
MetricBuilder::new(metrics).counter("files_processed", partition);

let files_scanned =
MetricBuilder::new(metrics).counter("files_scanned", partition);

Self {
time_opening,
time_scanning_until_data,
time_scanning_total,
time_processing,
file_open_errors,
file_scan_errors,
files_processed,
files_scanned,
}
}
}
Expand Down