Perf: Extend TopK aggregation optimization to general aggregates#20568
Perf: Extend TopK aggregation optimization to general aggregates#20568Dandandan wants to merge 14 commits intoapache:mainfrom
Conversation
Push limit into AggregateExec for queries like GROUP BY ... ORDER BY COUNT(*) DESC LIMIT K, not just MIN/MAX and DISTINCT. After aggregation completes, applies a partial sort + take to emit only the top-K rows, avoiding full materialization downstream. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 |
Serialize and deserialize the new sort_column_index field in LimitOptions so that general aggregate TopK plans survive protobuf roundtrip. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
🤖: Benchmark completed Details
|
Extends the TopK-aggregate fusion optimization to support multi-column ORDER BY clauses (e.g. ORDER BY COUNT(*) DESC, name ASC LIMIT K). Previously only single-column sort was supported. Uses lexsort_to_indices for efficient multi-column partial sort during the aggregate emit phase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
run benchmarks |
|
🤖 |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
🤖: Benchmark completed Details
|
|
run benchmarks |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmarks |
|
🤖 |
Two fixes: 1. Don't set group_values_soft_limit when topk_sort_columns is set - flushing early resets accumulator state, corrupting results (TPC-H Q13). 2. Don't declare output ordering for FinalPartitioned/SinglePartitioned - EnforceSorting replaces SortPreservingMergeExec with CoalescePartitionsExec, losing merge-sort (DISTINCT ON). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This pull request extends the TopK aggregation optimization to general aggregates (COUNT, SUM, AVG, etc.), beyond the existing support for MIN/MAX and DISTINCT. The optimization pushes limit information into AggregateExec for queries like GROUP BY ... ORDER BY COUNT(*) DESC LIMIT K. Instead of materializing all aggregation groups and then sorting, the optimizer configures AggregateExec to apply a partial sort + take operation after aggregation completes, emitting only the top-K rows.
Changes:
- Extended
LimitOptionsto support multi-column sort specifications via newtopk_sort_columnsfield - Added
TopKEmitconfiguration toGroupedHashAggregateStreamthat applies lexicographic partial sort after aggregation - Updated TopKAggregation optimizer to recognize general aggregate patterns and set appropriate limit options
- Added protobuf support for serializing/deserializing topk_sort_columns
- Updated test expectations for TPC-H queries to show the new topk optimization in action
Reviewed changes
Copilot reviewed 14 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/physical-plan/src/aggregates/mod.rs | Extended LimitOptions with topk_sort_columns field, added output ordering declaration logic, updated display formatting |
| datafusion/physical-plan/src/aggregates/row_hash.rs | Added TopKEmit struct and apply_topk method, integrated topk selection into aggregation emit flow, prevented early flush for general topk |
| datafusion/physical-optimizer/src/topk_aggregation.rs | Refactored to support multi-column sort, added general aggregate topk path, implemented SortExec elimination when ordering is satisfied |
| datafusion/proto/proto/datafusion.proto | Added AggLimitSortColumn message and topk_sort_columns field to AggLimit |
| datafusion/proto/src/physical_plan/mod.rs | Added serialization/deserialization logic for topk_sort_columns |
| datafusion/proto/src/generated/prost.rs | Generated protobuf Rust code for new messages |
| datafusion/proto/src/generated/pbjson.rs | Generated JSON serialization code for new messages |
| datafusion/physical-optimizer/src/combine_partial_final_agg.rs | Updated to properly clone limit_options |
| datafusion/sqllogictest/test_files/*.slt.part | Updated plan expectations showing new topk optimization |
| datafusion/sqllogictest/test_files/aggregate.slt | Added tests for general aggregate topk optimization |
| datafusion/core/tests/sql/explain_analyze.rs | Updated test expectation with topk information |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
🤖: Benchmark completed Details
|
|
run benchmarks |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Let's not do it for now, seems not that impactful with the soft limit removed. |
Push limit into AggregateExec for queries like GROUP BY ... ORDER BY COUNT(*) DESC LIMIT K, not just MIN/MAX and DISTINCT. After aggregation completes, applies a partial sort + take to emit only the top-K rows, avoiding full materialization downstream.
Which issue does this PR close?
Rationale for this change
Performance 🚀
Details
What changes are included in this PR?
TopK-Aggregate fusion for general aggregates — Extends the existing TopK optimization (which only worked for MIN/MAX and DISTINCT) to general aggregates like COUNT, SUM, AVG, etc.
For queries like SELECT col, COUNT(*) AS c FROM t GROUP BY col ORDER BY c DESC LIMIT 10, instead of materializing all groups from the hash aggregation and then sorting, the optimizer pushes the limit into
AggregateExec. At emit time, it uses a partial sort (lexsort_to_indices) to extract only the top-K rows, avoiding a full sort.
Are these changes tested?
Yes, additional + existing tests
Are there any user-facing changes?
Small changes to
LimitOptionsAggregateExec::limit_options