You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem or challenge?
CoalescePartitionsExec carries an optional fetch: Option<usize> so LimitPushdown can fold GlobalLimit(N) -> CoalescePartitionsExec
into CoalescePartitionsExec(fetch=N). The fetch semantics were
tightened in #14418 and #18245, the latter making them consistent
across single-partition and multi-partition cases.
UnionExec has no equivalent. When LimitPushdown sees GlobalLimit(N) -> UnionExec it has to leave a wrapping CoalescePartitionsExec(fetch=N) (or a separate GlobalLimitExec)
on top to enforce the cap. Any downstream optimizer pass that peels
the Union apart for rewriting (e.g. distributed execution rewrites)
has to remember to re-attach that wrapper, otherwise the union
returns N rows per child and the LIMIT is silently violated.
This is the same class of bug #18245 closed for CoalescePartitionsExec, but here the inconsistency is single-child
vs multi-child Union.
Describe the solution you'd like
Add fetch: Option<usize> to UnionExec, plus:
with_fetch() builder method
execute() honors fetch: stop emitting once N rows produced,
cancel pending children, same way SortPreservingMergeExec does it
LimitPushdown learns to fold GlobalLimit(N) -> Union into Union(fetch=N), mirroring the existing fold for Coalesce
with_new_children preserves fetch so rewrites that reassemble
a Union from transformed children cannot silently drop the cap
Keeping a separate CoalescePartitionsExec(fetch=N) / GlobalLimitExec on top of Union. This works as long as every
consumer of Union remembers to preserve that wrapper across
rewrites. We have hit a downstream fork bug where a
peel-and-reassemble pass dropped the wrapper, surfacing only in
multi-shard topologies. Encoding the cap on the Union node itself
removes the whole class of "forgot to re-attach the wrapper" bugs.
Additional context
Prior fetch work in this area: #14418, #18245, #21170. Happy to
put up the PR if maintainers agree on the shape.
Invariants to preserve
There are two distinct senses of "pushing the fetch down" through Union, and
the difference matters for correctness. The same distinction already applies
to CoalescePartitionsExec.fetch (see #18245); making it explicit here so
any future LimitPushdown work on Union follows the same template.
Forbidden: substitution pushdown (replacing the cap)
Union(fetch=N) → Union ← cap gone, total = N × K
child A child A
LocalLimit(N)
child B child B
LocalLimit(N)
Union.fetch MUST NOT be replaced by per-child LocalLimit(N). The fetch
field encodes "total output across all children is at most N rows" and lives
on the Union node itself. Folding it down into each child recreates the
exact "each child returns N rows, total N×K" miscount this issue is meant
to fix.
Encouraged: early-exit pushdown (replicating the cap)
Union(fetch=N) → Union(fetch=N) ← cap stays, plus per-child early stop
child A child A
LocalLimit(N)
child B child B
LocalLimit(N)
LimitPushdown SHOULD additionally introduce a LocalLimit(N) inside each
Union child as an early-exit optimization, as long as Union(fetch=N)
stays in place above the Union. Each child stops at N rows (saves IO);
the Union still caps total output at N (correctness). Both constraints
coexist; the smaller one dominates in practice. This is the same
optimization template LimitPushdown already applies for CoalescePartitionsExec.fetch (see #18245).
Required: with_new_children round-trips fetch
UnionExec::with_new_children MUST preserve fetch, so optimizer rewrites
that reassemble the Union from transformed children cannot silently drop
the cap. The bug class this issue fixes is precisely "peel-and-reassemble
pass forgot to re-attach the cap node"; encoding the cap on the Union
itself removes the cap-as-separate-node footgun, but only if the field
round-trips through with_new_children.
Is your feature request related to a problem or challenge?
CoalescePartitionsExeccarries an optionalfetch: Option<usize>soLimitPushdowncan foldGlobalLimit(N) -> CoalescePartitionsExecinto
CoalescePartitionsExec(fetch=N). The fetch semantics weretightened in #14418 and #18245, the latter making them consistent
across single-partition and multi-partition cases.
UnionExechas no equivalent. WhenLimitPushdownseesGlobalLimit(N) -> UnionExecit has to leave a wrappingCoalescePartitionsExec(fetch=N)(or a separateGlobalLimitExec)on top to enforce the cap. Any downstream optimizer pass that peels
the Union apart for rewriting (e.g. distributed execution rewrites)
has to remember to re-attach that wrapper, otherwise the union
returns N rows per child and the LIMIT is silently violated.
This is the same class of bug #18245 closed for
CoalescePartitionsExec, but here the inconsistency is single-childvs multi-child
Union.Describe the solution you'd like
Add
fetch: Option<usize>toUnionExec, plus:with_fetch()builder methodexecute()honorsfetch: stop emitting once N rows produced,cancel pending children, same way
SortPreservingMergeExecdoes itLimitPushdownlearns to foldGlobalLimit(N) -> UnionintoUnion(fetch=N), mirroring the existing fold forCoalescewith_new_childrenpreservesfetchso rewrites that reassemblea
Unionfrom transformed children cannot silently drop the capUnion(the analog of CoalescePartitionsExec fetch is not consistent with one partition and more than one partition #18245 for this operator)
Describe alternatives you've considered
Keeping a separate
CoalescePartitionsExec(fetch=N)/GlobalLimitExecon top ofUnion. This works as long as everyconsumer of
Unionremembers to preserve that wrapper acrossrewrites. We have hit a downstream fork bug where a
peel-and-reassemble pass dropped the wrapper, surfacing only in
multi-shard topologies. Encoding the cap on the
Unionnode itselfremoves the whole class of "forgot to re-attach the wrapper" bugs.
Additional context
Prior
fetchwork in this area: #14418, #18245, #21170. Happy toput up the PR if maintainers agree on the shape.
Invariants to preserve
There are two distinct senses of "pushing the fetch down" through Union, and
the difference matters for correctness. The same distinction already applies
to
CoalescePartitionsExec.fetch(see #18245); making it explicit here soany future LimitPushdown work on Union follows the same template.
Forbidden: substitution pushdown (replacing the cap)
Union.fetchMUST NOT be replaced by per-childLocalLimit(N). Thefetchfield encodes "total output across all children is at most N rows" and lives
on the Union node itself. Folding it down into each child recreates the
exact "each child returns N rows, total N×K" miscount this issue is meant
to fix.
Encouraged: early-exit pushdown (replicating the cap)
LimitPushdownSHOULD additionally introduce aLocalLimit(N)inside eachUnion child as an early-exit optimization, as long as
Union(fetch=N)stays in place above the Union. Each child stops at N rows (saves IO);
the Union still caps total output at N (correctness). Both constraints
coexist; the smaller one dominates in practice. This is the same
optimization template
LimitPushdownalready applies forCoalescePartitionsExec.fetch(see #18245).Required:
with_new_childrenround-trips fetchUnionExec::with_new_childrenMUST preservefetch, so optimizer rewritesthat reassemble the Union from transformed children cannot silently drop
the cap. The bug class this issue fixes is precisely "peel-and-reassemble
pass forgot to re-attach the cap node"; encoding the cap on the Union
itself removes the cap-as-separate-node footgun, but only if the field
round-trips through
with_new_children.