Skip to content

Add fetch to UnionExec for LIMIT pushdown consistency #23238

Description

@zhuqi-lucas

Is your feature request related to a problem or challenge?

CoalescePartitionsExec carries an optional fetch: Option<usize> so
LimitPushdown can fold GlobalLimit(N) -> CoalescePartitionsExec
into CoalescePartitionsExec(fetch=N). The fetch semantics were
tightened in #14418 and #18245, the latter making them consistent
across single-partition and multi-partition cases.

UnionExec has no equivalent. When LimitPushdown sees
GlobalLimit(N) -> UnionExec it has to leave a wrapping
CoalescePartitionsExec(fetch=N) (or a separate GlobalLimitExec)
on top to enforce the cap. Any downstream optimizer pass that peels
the Union apart for rewriting (e.g. distributed execution rewrites)
has to remember to re-attach that wrapper, otherwise the union
returns N rows per child and the LIMIT is silently violated.

This is the same class of bug #18245 closed for
CoalescePartitionsExec, but here the inconsistency is single-child
vs multi-child Union.

Describe the solution you'd like

Add fetch: Option<usize> to UnionExec, plus:

  • with_fetch() builder method
  • execute() honors fetch: stop emitting once N rows produced,
    cancel pending children, same way SortPreservingMergeExec does it
  • LimitPushdown learns to fold GlobalLimit(N) -> Union into
    Union(fetch=N), mirroring the existing fold for Coalesce
  • with_new_children preserves fetch so rewrites that reassemble
    a Union from transformed children cannot silently drop the cap
  • Behavior is consistent across single-child and multi-child Union
    (the analog of CoalescePartitionsExec fetch is not consistent with one partition and more than one partition #18245 for this operator)

Describe alternatives you've considered

Keeping a separate CoalescePartitionsExec(fetch=N) /
GlobalLimitExec on top of Union. This works as long as every
consumer of Union remembers to preserve that wrapper across
rewrites. We have hit a downstream fork bug where a
peel-and-reassemble pass dropped the wrapper, surfacing only in
multi-shard topologies. Encoding the cap on the Union node itself
removes the whole class of "forgot to re-attach the wrapper" bugs.

Additional context

Prior fetch work in this area: #14418, #18245, #21170. Happy to
put up the PR if maintainers agree on the shape.

Invariants to preserve

There are two distinct senses of "pushing the fetch down" through Union, and
the difference matters for correctness. The same distinction already applies
to CoalescePartitionsExec.fetch (see #18245); making it explicit here so
any future LimitPushdown work on Union follows the same template.

Forbidden: substitution pushdown (replacing the cap)

Union(fetch=N)   →   Union              ← cap gone, total = N × K
  child A              child A
                         LocalLimit(N)
  child B              child B
                         LocalLimit(N)

Union.fetch MUST NOT be replaced by per-child LocalLimit(N). The fetch
field encodes "total output across all children is at most N rows" and lives
on the Union node itself. Folding it down into each child recreates the
exact "each child returns N rows, total N×K" miscount this issue is meant
to fix.

Encouraged: early-exit pushdown (replicating the cap)

Union(fetch=N)   →   Union(fetch=N)     ← cap stays, plus per-child early stop
  child A              child A
                         LocalLimit(N)
  child B              child B
                         LocalLimit(N)

LimitPushdown SHOULD additionally introduce a LocalLimit(N) inside each
Union child as an early-exit optimization, as long as Union(fetch=N)
stays in place above the Union
. Each child stops at N rows (saves IO);
the Union still caps total output at N (correctness). Both constraints
coexist; the smaller one dominates in practice. This is the same
optimization template LimitPushdown already applies for
CoalescePartitionsExec.fetch (see #18245).

Required: with_new_children round-trips fetch

UnionExec::with_new_children MUST preserve fetch, so optimizer rewrites
that reassemble the Union from transformed children cannot silently drop
the cap. The bug class this issue fixes is precisely "peel-and-reassemble
pass forgot to re-attach the cap node"; encoding the cap on the Union
itself removes the cap-as-separate-node footgun, but only if the field
round-trips through with_new_children.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions