diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2292f5855bfde..a3bf286855205 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -294,8 +294,11 @@ impl DataFrame { self.session_state.create_logical_expr(sql, df_schema) } - /// Consume the DataFrame and produce a physical plan - pub async fn create_physical_plan(self) -> Result> { + /// Create a physical plan from this DataFrame. + /// + /// The `DataFrame` remains accessible after this call, so you can inspect + /// the plan and still call [`DataFrame::collect`] or other execution methods. + pub async fn create_physical_plan(&self) -> Result> { self.session_state.create_physical_plan(&self.plan).await } @@ -2392,7 +2395,7 @@ impl DataFrame { } else { let context = SessionContext::new_with_state((*self.session_state).clone()); // The schema is consistent with the output - let plan = self.clone().create_physical_plan().await?; + let plan = self.create_physical_plan().await?; let schema = plan.schema(); let task_ctx = Arc::new(self.task_ctx()); let partitions = collect_partitioned(plan, task_ctx).await?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 54dadfd78cbc2..d68490a305819 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -478,4 +478,25 @@ mod tests { Ok(()) } + + /// Test that `create_physical_plan` does not consume the `DataFrame`, so + /// callers can inspect (e.g. log) the physical plan and then still call + /// `write_parquet` or any other execution method on the same `DataFrame`. + #[tokio::test] + async fn create_physical_plan_does_not_consume_dataframe() -> Result<()> { + use crate::prelude::CsvReadOptions; + let ctx = SessionContext::new(); + let df = ctx + .read_csv("tests/data/example.csv", CsvReadOptions::new()) + .await?; + + // Obtain the physical plan for inspection without consuming `df`. + let _physical_plan = df.create_physical_plan().await?; + + // `df` is still usable — collect the results. + let batches = df.collect().await?; + assert!(!batches.is_empty()); + + Ok(()) + } }