Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,11 @@ impl DataFrame {
self.session_state.create_logical_expr(sql, df_schema)
}

/// Consume the DataFrame and produce a physical plan
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
/// Create a physical plan from this DataFrame.
///
/// The `DataFrame` remains accessible after this call, so you can inspect
/// the plan and still call [`DataFrame::collect`] or other execution methods.
pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
self.session_state.create_physical_plan(&self.plan).await
}

Expand Down Expand Up @@ -2392,7 +2395,7 @@ impl DataFrame {
} else {
let context = SessionContext::new_with_state((*self.session_state).clone());
// The schema is consistent with the output
let plan = self.clone().create_physical_plan().await?;
let plan = self.create_physical_plan().await?;
let schema = plan.schema();
let task_ctx = Arc::new(self.task_ctx());
let partitions = collect_partitioned(plan, task_ctx).await?;
Expand Down
21 changes: 21 additions & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,4 +478,25 @@ mod tests {

Ok(())
}

/// Test that `create_physical_plan` does not consume the `DataFrame`, so
/// callers can inspect (e.g. log) the physical plan and then still call
/// `write_parquet` or any other execution method on the same `DataFrame`.
#[tokio::test]
async fn create_physical_plan_does_not_consume_dataframe() -> Result<()> {
use crate::prelude::CsvReadOptions;
let ctx = SessionContext::new();
let df = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?;

// Obtain the physical plan for inspection without consuming `df`.
let _physical_plan = df.create_physical_plan().await?;

// `df` is still usable — collect the results.
let batches = df.collect().await?;
assert!(!batches.is_empty());

Ok(())
}
}