diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index c8090382094ef..514a7e0a0bead 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -124,6 +124,7 @@ impl FileStream { } FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { + self.file_stream_metrics.files_opened.add(1); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); let next = self.start_next_file().transpose(); @@ -150,6 +151,7 @@ impl FileStream { self.file_stream_metrics.file_open_errors.add(1); match self.on_error { OnError::Skip => { + self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.time_opening.stop(); self.state = FileStreamState::Idle } @@ -179,6 +181,15 @@ impl FileStream { batch } else { let batch = batch.slice(0, *remain); + // Count this file, the prefetched next file + // (if any), and all remaining files we will + // never open. + let done = 1 + + self.file_iter.len() + + usize::from(next.is_some()); + self.file_stream_metrics + .files_processed + .add(done); self.state = FileStreamState::Limit; *remain = 0; batch @@ -196,26 +207,29 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error - OnError::Skip => match mem::take(next) { - Some(future) => { - self.file_stream_metrics.time_opening.start(); - - match future { - NextOpen::Pending(future) => { - self.state = - FileStreamState::Open { future } - } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin(std::future::ready( - reader, - )), + OnError::Skip => { + self.file_stream_metrics.files_processed.add(1); + match mem::take(next) { + Some(future) => { + self.file_stream_metrics.time_opening.start(); + + match future { + NextOpen::Pending(future) => { + self.state = + FileStreamState::Open { future } + } + NextOpen::Ready(reader) => { + self.state = FileStreamState::Open { + future: Box::pin( + std::future::ready(reader), + ), + } } } } + None => return Poll::Ready(None), } - None => return Poll::Ready(None), - }, + } OnError::Fail => { self.state = FileStreamState::Error; return Poll::Ready(Some(Err(err))); @@ -223,6 +237,7 @@ impl FileStream { } } None => { + self.file_stream_metrics.files_processed.add(1); self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); @@ -399,6 +414,22 @@ pub struct FileStreamMetrics { /// If using `OnError::Skip` this will provide a count of the number of files /// which were skipped and will not be included in the scan results. pub file_scan_errors: Count, + /// Count of files successfully opened or evaluated for processing. + /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal + /// to the total number of files in the query; unless the query itself fails. + /// This value will always be greater than or equal to `files_open`. + /// Note that this value does *not* mean the file was actually scanned. + /// We increment this value for any processing of a file, even if that processing is + /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time). + pub files_opened: Count, + /// Count of files completely processed / closed (opened, pruned, or skipped due to limit). + /// At t=0 (the beginning of a query) this is 0. + /// At t=end (completion of a query) this is equal to `files_opened`, and both values are equal + /// to the total number of files in the query; unless the query itself fails. + /// This value will always be less than or equal to `files_open`. + /// We increment this value for any processing of a file, even if that processing is + /// discarding it because we hit a `LIMIT` (in this case `files_opened` and `files_processed` are both incremented at the same time). + pub files_processed: Count, } impl FileStreamMetrics { @@ -433,6 +464,11 @@ impl FileStreamMetrics { let file_scan_errors = MetricBuilder::new(metrics).counter("file_scan_errors", partition); + let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition); + + let files_processed = + MetricBuilder::new(metrics).counter("files_processed", partition); + Self { time_opening, time_scanning_until_data, @@ -440,6 +476,8 @@ impl FileStreamMetrics { time_processing, file_open_errors, file_scan_errors, + files_opened, + files_processed, } } }