Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 113 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ pub trait ExtensionPlanner {
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;

/// Create a physical plan for a [`LogicalPlan::TableScan`].
///
/// This is useful for planning valid [`TableSource`]s that are not [`TableProvider`]s.
///
/// Returns:
/// * `Ok(Some(plan))` if the planner knows how to plan the `scan`
/// * `Ok(None)` if the planner does not know how to plan the `scan` and wants to delegate the planning to another [`ExtensionPlanner`]
/// * `Err` if the planner knows how to plan the `scan` but errors while doing so
///
/// [`TableSource`]: datafusion_expr::TableSource
/// [`TableProvider`]: datafusion_catalog::TableProvider
async fn plan_table_scan(
&self,
_planner: &dyn PhysicalPlanner,
_scan: &TableScan,
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}
}

/// Default single node physical query planner that converts a
Expand Down Expand Up @@ -278,7 +298,8 @@ struct LogicalNode<'a> {

impl DefaultPhysicalPlanner {
/// Create a physical planner that uses `extension_planners` to
/// plan user-defined logical nodes [`LogicalPlan::Extension`].
/// plan user-defined logical nodes [`LogicalPlan::Extension`]
/// or user-defined table sources in [`LogicalPlan::TableScan`].
/// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
/// plan.
pub fn with_extension_planners(
Expand Down Expand Up @@ -455,13 +476,22 @@ impl DefaultPhysicalPlanner {
) -> Result<Arc<dyn ExecutionPlan>> {
let exec_node: Arc<dyn ExecutionPlan> = match node {
// Leaves (no children)
LogicalPlan::TableScan(TableScan {
source,
projection,
filters,
fetch,
..
}) => {
LogicalPlan::TableScan(scan) => {
for planner in &self.extension_planners {
if let Some(plan) =
planner.plan_table_scan(self, scan, session_state).await?
{
return Ok(plan);
}
}

let TableScan {
source,
projection,
filters,
fetch,
..
} = scan;
let source = source_as_provider(source)?;
// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
Expand Down Expand Up @@ -2889,7 +2919,9 @@ mod tests {
use datafusion_execution::TaskContext;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::builder::subquery_alias;
use datafusion_expr::{LogicalPlanBuilder, UserDefinedLogicalNodeCore, col, lit};
use datafusion_expr::{
LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, col, lit,
};
use datafusion_functions_aggregate::count::count_all;
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_physical_expr::EquivalenceProperties;
Expand Down Expand Up @@ -4413,4 +4445,76 @@ digraph {
assert_contains!(&err_str, "field nullability at index");
assert_contains!(&err_str, "field metadata at index");
}

#[derive(Debug)]
struct MockTableSource {
schema: SchemaRef,
}

impl TableSource for MockTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}

struct MockTableScanExtensionPlanner;

#[async_trait]
impl ExtensionPlanner for MockTableScanExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
_node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
_physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}

async fn plan_table_scan(
&self,
_planner: &dyn PhysicalPlanner,
scan: &TableScan,
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if scan.source.as_any().is::<MockTableSource>() {
Ok(Some(Arc::new(EmptyExec::new(Arc::clone(
scan.projected_schema.inner(),
)))))
} else {
Ok(None)
}
}
}

#[tokio::test]
async fn test_table_scan_extension_planner() {
let session_state = make_session_state();
let planner = Arc::new(MockTableScanExtensionPlanner);
let physical_planner =
DefaultPhysicalPlanner::with_extension_planners(vec![planner]);

let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let table_source = Arc::new(MockTableSource {
schema: Arc::clone(&schema),
});
let logical_plan = LogicalPlanBuilder::scan("test", table_source, None)
.unwrap()
.build()
.unwrap();

let plan = physical_planner
.create_physical_plan(&logical_plan, &session_state)
.await
.unwrap();

assert_eq!(plan.schema(), schema);
assert!(plan.as_any().is::<EmptyExec>());
}
}
Loading