Gene.bordegaray/2026/02/partition index dynamic filters#20331
Gene.bordegaray/2026/02/partition index dynamic filters#20331gene-bordegaray wants to merge 17 commits intoapache:mainfrom
Conversation
…02/dyn_filter_partition_indexed
| - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)] | ||
| - RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1 | ||
| - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true |
There was a problem hiding this comment.
no dynamic filter because its the build side of a build a build side...took me a second 😂
| // its own filter. | ||
| predicate = predicate | ||
| .map(|p| snapshot_physical_expr_for_partition(p, partition_index)) | ||
| .transpose()?; |
There was a problem hiding this comment.
decided to only do this in the parquet opener, if we did for all files (by default) just do nothing since predicates aren't passed to other opneers. This does mean that users will have to implement this for their on data sources.
Given this is a large PR, didn't want to include logic for a fallback and doing nothing seemed out of place, could still reconsider if others have an opinion.
LiaCastaneda
left a comment
There was a problem hiding this comment.
This makes sense to me and will be very helpful for use cases where we want to avoid repartitioning data. My only concern is that API users would need to align the probe and build side partitions, but this seems like a reasonable tradeoff. Let’s see what other contributors think. (this is a partial review I will finish later today or early next week) but until now it's looking good to me :)
| // One side starts with multiple partitions while target is 1. EnforceDistribution inserts a | ||
| // hash repartition on the left child. The partitioning schemes are now misaligned: | ||
| // - Left: hash-repartitioned (repartitioned=true) | ||
| // - Right: file-grouped (repartitioned=false) | ||
| // This is a correctness bug, so we expect an error. |
There was a problem hiding this comment.
Can we have the other way around as well? having a Join of type Partitioned and the left perserving file parttioning and the right having RepartitionExec.
| let optimized = ensure_distribution_helper_transform_up(join, 1)?; | ||
| assert_plan!(optimized, @r" | ||
| HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@1)] | ||
| DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet |
There was a problem hiding this comment.
Would it make sense to display if DataSourceExec is perserving partitioning? something like preserve_partitioning=[bool]? this may be useful for users to know why there is no RepartitionExec in the plan even if the mode is Partitioned
There was a problem hiding this comment.
I think this can be a separate PR, but can create issue 😄
💯 thank you for the reviews |
LiaCastaneda
left a comment
There was a problem hiding this comment.
👍 I think I'm done with my review, overall looks good, just some minor comments.
We need to make sure that in the future it’s easy to revert or migrate users away from index-based routing to their custom Partitioning implementation. Since this does not introduce a new API, I don’t think it should be a problem. This was previously a bug, and with this PR dynamic filtering works, but it’s something to keep in mind. |
NGA-TRAN
left a comment
There was a problem hiding this comment.
The approach looks great, Gene. Nice work!
I do have some suggestions on comments and test data to make things clearer for reviewers and future maintennce
datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Outdated
Show resolved
Hide resolved
| config.optimizer.enable_round_robin_repartition = false; | ||
| config.optimizer.repartition_file_scans = false; | ||
| config.optimizer.repartition_file_min_size = 1024; | ||
| config.optimizer.prefer_existing_sort = false; |
There was a problem hiding this comment.
I will be clearer if you add comments explaining why you need these settings and for which tests.
Right, usually when users decide to do this custom partitions, they must have a mechanism to enforce it. Thus, I do not think we need to worry about this at this dynamic filtering stage. We only need to provide a way to use the dynamic filtering correctly which is the purpose of this PR. |
| /// Per-partition filter expressions indexed by partition number. | ||
| type PartitionedFilters = Vec<Option<Arc<dyn PhysicalExpr>>>; |
There was a problem hiding this comment.
Just one more question -- how would evaluation be done for PartitionedFilters?
My understanding is that each partition would need to first access its corresponding PhysicalExpr and then call evaluate() right? However, the evaluate() trait of PhysicalExpr has no partition number in the args, soevaluate()can't directly integrate PartitionedFilters.
The current evaluate() function remains the same and evaluates inner.expr, which, when we preserve file partitioning, holds nothing (just lit(true) placeholder).
There was a problem hiding this comment.
Snapshotting happens before evaluation
The full path in chronological order is:
- ParquetOpener::open() is called
- snapshot_physical_expr_for_partition(predicate, partition_index) is called -> important to note that we pass the index
- snapshot_physical_expr_for_partition replaces the DynamicFilterPhysicalExpr with the filter for the partition on that index (this is a physical expr)
- evaluate() is called which uses the snapshot expr (not the DynamicFilterPhysicalExpr) and we don't need to know the partition parameter because it was already dealt with earlier
For the lit(true) concern, if has_partitioned_filters() returns false during snapshotting then we will fallback to lit(true) which yes then we won't evaluate anything. But this is ok behavior because its ok to do this rather than error
The shouldn't happen because, the we wait until the build side is complete before we snapshot so we should always resolve to true.
Maybe to be safe would be good to add a debug statement if has_partitioned_filters() returns false.
Lmk if this makes sense 🙂
There was a problem hiding this comment.
ah got it, I missed the fact that this was already integrated into the parquet-source opener, and thought evaluation was different. It's clear now and makes sense to me, thanks for explaining!
LiaCastaneda
left a comment
There was a problem hiding this comment.
This lgtm 👍 it’s just missing a committer’s review.
| /// CASE-hash routing, but this assumes build/probe partition indices stay aligned for | ||
| /// partition hash join / dynamic filter consumers. | ||
| /// | ||
| /// Misaligned Partitioned Hash Join Example: |
There was a problem hiding this comment.
Is this something DataFusion users have to think about? I.e. is this something a user can mess up or would it only happen if there was a bug in DataFusion?
There was a problem hiding this comment.
Ya, this can be user-triggered, not only a DF bug. If preserve_file_partitions is enabled and the two join sides are not partition-aligned by value/index, partition-index routing is unsafe.
A user can declare to preserve their file partitioning, but if they don't have partitioned data. This will be a bug on the behalf of the user
In EnforceDistribution for incompatible schemes instead of silently allowing incorrect results.
| fn should_use_partition_index(&self) -> bool { | ||
| matches!(self.mode, PartitionMode::Partitioned) | ||
| && self.dynamic_filter_routing_mode | ||
| == DynamicFilterRoutingMode::PartitionIndex |
There was a problem hiding this comment.
Would these two ever not match?
There was a problem hiding this comment.
Yes, if thre is a partitione dhash join and both sides contain a repartition -> it would use CaseHash
| /// Snapshot a `PhysicalExpr` tree, replacing any [`DynamicFilterPhysicalExpr`] that | ||
| /// has per-partition data with its partition-specific filter expression. | ||
| /// If a `DynamicFilterPhysicalExpr` does not have partitioned data, it is left unchanged. | ||
| pub fn snapshot_physical_expr_for_partition( | ||
| expr: Arc<dyn PhysicalExpr>, | ||
| partition: usize, | ||
| ) -> Result<Arc<dyn PhysicalExpr>> { |
There was a problem hiding this comment.
This is an interesting way to go about it. It does kind of feel to me like we are trying to shoehorn the concept of partitions into PhysicalExpr through the snapshot() API which was never intended for this use case. I feel there is probably a much cleaner way to introduce the concept of a partition to PhysicalExpr
There was a problem hiding this comment.
This is an interesting way to go about it. It does kind of feel to me like we are trying to shoehorn the concept of partitions into
PhysicalExprthrough thesnapshot()API which was never intended for this use case. I feel there is probably a much cleaner way to introduce the concept of a partition toPhysicalExpr
I agree that overloading thesnapshot()is a bit odd. Here are the two ways this can be improved upon:
1. Transmit Partition Info in the Data:
This is the suggestion @adriangb provided. I think it is and interesting approach but has large implications. Injecting a partition-id or something similar would mean changing projections, filter handling etc. With the objective of this PR being to enable dynamic filtering for file preserved partitioning, this seems like too large of a change.
2. Runtime Partition Bind:
Introduce a runtime wrapper PartitionBoundDynamicFilterPhysicalExpr. The parquet opener binds the dynamic filters to partition i by inserting this new wrapper rather than a static snapshot. Then, this wrapper is resolved at evaluation time. This largely avoids the shoehorning partitioning in PhysicalExpr through snapshot().
With this said, I am aware this approach is not the cleanest solution for this issue, but it does set us up for follow-up work. This can become a generic runtime bind API that hooks on PhysicalExpr, by default we will have it be a no-op. Then for DynamicFilterPhysicalExpr we will implement the hook and eliminate our wrapper. This will provide support for any future runtime-dependent expressions without the ad-hoc plumbing we did before and no schema / data changes.
I decided to leave the full generic solution our of this PR to keep these changes as local as I can, but tried to lay a path forward that is maintainable. Let me know thoughts on this approach.
There was a problem hiding this comment.
I'm not familiar with this code, so I'm ramping up with it, but you can expect me to take some time until I start contributing more useful comments.
For now, the big comment I left is mostly intended for triggering some discussions rather than actually jumping into code.
One way of getting your PRs in faster, is to slice them in smaller chunks, for example, maybe shipping the failing tests for the issue this is intended to solve first? I have to say that, even if that's a generally good advice, it might not be good for some PRs. It might not be for this one for example.
| /// Note for partitioned hash join dynamic filtering: | ||
| /// preserving file partitions can allow partition-index routing (`i -> i`) instead of | ||
| /// CASE-hash routing, but this assumes build/probe partition indices stay aligned for | ||
| /// partition hash join / dynamic filter consumers. | ||
| /// |
There was a problem hiding this comment.
This assumption is not specific to dynamic filters. If partition indices are not aligned, data returned in the join would be wrong whether dynamic filters are there or not.
As I don't think this advice is specific to dynamic filters, I'd try to keep this doc comment more minimal. Note that this is supposed to be rendered not only as docs, but also as part of a SHOW ALL query, which might not render nicely, so having ASCII graphics is probably the not most render-friendly thing for this specific case.
| // - File-grouped: partition 0 = all rows where column="A" (value-based) | ||
| // - Hash-repartitioned: partition 0 = all rows where hash(column) % N == 0 (hash-based) | ||
| // These are incompatible, so the join will miss matching rows. | ||
| plan = if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() |
There was a problem hiding this comment.
Pretty smart and clean way of propagating wether data is getting repartitioned across steps 👍
| !matches!( | ||
| repartition.partitioning(), | ||
| Partitioning::UnknownPartitioning(_) | ||
| ) | ||
| } else if child_context.plan.children().is_empty() { |
There was a problem hiding this comment.
It might make sense to not use a matches! macro here, and just manually match each possibility. The reason is that, if in the future a new enum entry is added to Partitioning, the compiler will force the dev to chose a right value here manually, forcing them to think about it.
| /// Per-partition filter expressions for partition-index routing. | ||
| /// When both sides of a hash join preserve their file partitioning (no RepartitionExec(Hash)), | ||
| /// build-partition i corresponds to probe-partition i. This allows storing per-partition | ||
| /// filters so that each partition only sees its own bounds, giving tighter filtering. | ||
| partitioned_exprs: PartitionedFilters, | ||
| } |
There was a problem hiding this comment.
I wonder if there is a better alternative than leaking the concept of plan partitioning into dynamic filters.
I do agree with @adriangb about leaking the concept of partitioning to expressions. I'd say:
- If we want expressions to be aware of partitions, let's do a proper API design that is as clean and future proof as possible, and use it here.
- If we want expressions to be agnostic from partitions, let's try to find a way implement this so that DynamicFilterExpressions are still agnostic from partitions.
It would be nice to avoid middle ground solutions that exists just for the sake of shipping PRs faster.
Some ideas that come to mind for keeping dynamic filters agnostic from plan partitioning in its structs:
Introduce a new __partition_index REE column or something similar
In the record batch right before calling evalute() on the dynamic filter expression, we could introduce a new column with the partition index. I think you already explored this without success, but I would like to understand better what blockers specifically you faced, I do see this approach yielding a potential good result.
Use Arrow's Schema metadata for threading the partition index
Arrow's schemas have the concept of metadata that can be used for threading arbitrary information. I think this should also be considered "leaking partitioning details", but I wonder if this brings an opportunity of making it cleaner?
Letting the hash join hold a Vec instead of a single HashJoinExecDynamicFilter
That way, there is as many HashJoinExecDynamicFilter as partitions, and they behave exactly the same as before, but as if there was only 1 partition.
There was a problem hiding this comment.
In the record batch right before calling
evalute()on the dynamic filter expression, we could introduce a new column with the partition index. I think you already explored this without success, but I would like to understand better what blockers specifically you faced, I do see this approach yielding a potential good result.
I also was thinking this approach should work.
Just throwing darts at the wall in terms of helping w/ implementation: you could create a marker expression e.g. impl PhysicalExpr for PartitionLiteral and then modify the machinery that replaces hive partition values in scans w/ the actual literals to also replace that with the DataFusion partition integer.
datafusion/datafusion/physical-expr-adapter/src/schema_rewriter.rs
Lines 46 to 83 in e937cad
datafusion/datafusion/datasource-parquet/src/opener.rs
Lines 217 to 261 in e937cad
You'd have to contend with "what if this expression is evaluated without replacement". I'm not sure when / why that would happen, or if it's okay that the expression only works in the context of a scan. Maybe just returning null by default is reasonable behavior.
There was a problem hiding this comment.
Is the generic runtime binding for Physical Expressions not a good path forward? I have changed this some @adriangb last review and think it will provide a clean API once the follow up work is done.
It will eliminate the wrapper and create an API over any physical expression to have runtime properties. This feels like a good approach since it's easy to comprehend, no changes to the schema or injecting columns, and introduced what I think will be a useful property to physical expressions as cost based and adaptive query execution is adopted.
Cc: @gabotechs
There was a problem hiding this comment.
I don't see the approach leak the idea of partitioning into dynamic filters, rather its goal will be to make physical expression aware of partitioning which I think is feasible.
Maybe I am misunderstanding what is good practice in exposing properties to parts of the plan but it seems like a good approach to have expressions aware of runtime features
There was a problem hiding this comment.
Here is a fleshed out PR that is my idea for the genreic runtime binder: #20549
this may make my vision more clear 👍
cc: @adriangb @gabotechs
There was a problem hiding this comment.
Would you be fine in investigating how other engines handle this? Looking at successful implementations of this in other engines might produce good arguments for a path forward.
Which issue does this PR close?
Closes #20195
Rationale for this change
Dynamic filter pushdown was completely when
preserve_file_partitionson due to a correctness bug.The Problem
When preserve_file_partitions enabled, DataFusion treats file groups as pre-partitioned data. Existing dynamic filtering used hash-based routing which is incompatible with the value-based partitioning that file groups are kept in:
Example:
For this reason was diabled, this PR re-enables it via
PartitionIndexrouting for dynamic filters.What changes are included in this PR?
Partition-Indexed dynamic filtering
New routing mode that uses direct partition-to-partition mapping:
Example:
Alignment Detection
Detects compatible partitioning to enable safe optimization:
match (left.repartitioned, right.repartitioned) {
In the case there is a
RepartitionExecin the path leading from theDataSourceExecto either the build or probe side of a Partitioned Hash Join -> Falls back to CaseHash.The reason is
RepartitionExecuses hash(value) % N to distribute rows, breaking the value-based partition alignment. When hash-partitioned, partition 0 no longer contains 'A' exclusively, breaking the partition index assumptionsWith hash partitioning, use:
Are these changes tested?
sqlogictests: test_files/preserve_file_partitioning.slt
Integration tests: datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Unit tests: in effected files
Are there any user-facing changes?
Yes a new error message can appear is partition hash joins are not aligned properly and the dynamic filtering display for partition index is a but different then CASE routing.
cc: @NGA-TRAN @LiaCastaneda @adriangb @gabotechs