Skip to content

Conversation

@danielhumanmod
Copy link

Which issue does this PR close?

Rationale for this change

The previous logic of EnsureCooperative optimizer lacked context awareness regarding ancestor nodes, making it not idempotent across multiple runs.

Specifically, we need to ensure that:

  1. Idempotency: Running the rule multiple times does not produce nested CooperativeExec wrappers.
  2. Context Awareness: If a subtree is already protected by a CooperativeExec, we should skip and not double-wrap its children.

What changes are included in this PR?

To solve this, we cannot rely solely on transform_up (which lacks parent context) or transform_down (which makes safe mutation difficult). This PR adopts transform_down_up with a depth counter to strictly enforce that nodes are only wrapped when they are not currently under a CooperativeExec scope.

Are these changes tested?

More unit tests coverage

Are there any user-facing changes?

No

@github-actions github-actions bot added the optimizer Optimizer rules label Jan 11, 2026
@danielhumanmod
Copy link
Author

cc @milenkovicm

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, cc @pepijnve

// 1. Node is a leaf or exchange point
// 2. Node is not already cooperative
// 3. Not under any CooperativeExec (depth == 0)
if (is_leaf || is_exchange) && !is_cooperative && coop_depth.get() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There aren't any implementations in the library that you could use to test this, but I'm not sure this is 100% correct if someone ever implements a non-cooperative exchange operator (i.e. one that doesn't use a Tokio mpsc::channel). I'll see if I can come up with a test case for this.

Copy link
Contributor

@pepijnve pepijnve Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's some very contrived test code (in details section below) that illustrates this. The code will output

aggr Lazy NonCooperative
  filter Lazy NonCooperative
    exch Eager Cooperative
      filter Lazy NonCooperative
        CooperativeExec
          exch Eager NonCooperative
            filter Lazy NonCooperative
              scan Lazy NonCooperative

Notice that there's a coop missing around the final scan.

The code used to produce this (with the incorrect double coop). The double coop is not intentional, but the two layers of coop are.

aggr Lazy NonCooperative
  filter Lazy NonCooperative
    exch Eager Cooperative
      filter Lazy NonCooperative
        CooperativeExec
          CooperativeExec
            exch Eager NonCooperative
              filter Lazy NonCooperative
                CooperativeExec
                  scan Lazy NonCooperative
Details
#[tokio::test]
async fn test_exchange() {
    let scan = Arc::new(DummyExec::new("scan".to_string(), None, SchedulingType::NonCooperative, EvaluationType::Lazy));
    let filter = Arc::new(DummyExec::new("filter".to_string(), Some(scan), SchedulingType::NonCooperative, EvaluationType::Lazy));
    let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Eager));
    let coop = Arc::new(CooperativeExec::new(exchange));
    let filter = Arc::new(DummyExec::new("filter".to_string(), Some(coop), SchedulingType::NonCooperative, EvaluationType::Lazy));
    let exchange = Arc::new(DummyExec::new("exch".to_string(), Some(filter), SchedulingType::Cooperative, EvaluationType::Eager));
    let filter = Arc::new(DummyExec::new("filter".to_string(), Some(exchange), SchedulingType::NonCooperative, EvaluationType::Lazy));
    let aggregate = Arc::new(DummyExec::new("aggr".to_string(), Some(filter), SchedulingType::NonCooperative, EvaluationType::Lazy));

    let config = ConfigOptions::new();
    let optimized = EnsureCooperative::new()
        .optimize(aggregate as Arc<dyn ExecutionPlan>, &config)
        .unwrap();

    let display = displayable(optimized.as_ref()).indent(true).to_string();

    println!("{}", display);
}

#[derive(Debug)]
struct DummyExec {
    name: String,
    input: Option<Arc<dyn ExecutionPlan>>,
    scheduling_type: SchedulingType,
    evaluation_type: EvaluationType,
    properties: PlanProperties,
}

impl DummyExec {
    fn new(
        name: String,
        input: Option<Arc<dyn ExecutionPlan>>,
        scheduling_type: SchedulingType,
        evaluation_type: EvaluationType,
    ) -> Self {
        DummyExec {
            name,
            input,
            scheduling_type,
            evaluation_type,
            properties: PlanProperties::new(
                EquivalenceProperties::new(Arc::new(Schema::empty())),
                Partitioning::UnknownPartitioning(1),
                EmissionType::Incremental,
                Boundedness::Bounded
            ).with_scheduling_type(scheduling_type).with_evaluation_type(evaluation_type),
        }
    }
}

impl DisplayAs for DummyExec {
    fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
        write!(f, "{} {:?} {:?}", self.name, self.evaluation_type, self.scheduling_type)
    }
}

impl ExecutionPlan for DummyExec {
    fn name(&self) -> &str {
        self.name.as_str()
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn properties(&self) -> &PlanProperties {
        &self.properties
    }

    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        match &self.input {
            None => vec![],
            Some(i) => vec![i],
        }
    }

    fn with_new_children(
        self: Arc<Self>,
        children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(Arc::new(DummyExec::new(
            self.name.clone(),
            match children.len() {
                0 => None,
                _ => Some(children[0].clone()),
            },
            self.scheduling_type,
            self.evaluation_type,
        )))
    }

    fn execute(
        &self,
        _partition: usize,
        _context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        todo!()
    }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Totally missed the case where an Eager node breaks the cooperative chain.

My plan is to maintain an ancestry stack that tracks both SchedulingType and EvaluationType. The new logic checks the stack bottom-up: a node is only considered 'protected' (and thus skips wrapping) if it encounters a Cooperative ancestor before any Eager pipeline breaker.

I have also added a test case to cover this scenario. Thanks for the insight!

@pepijnve
Copy link
Contributor

@danielhumanmod thanks for fixing this. I had completely forgotten about the need for idempotence when I wrote this.

@milenkovicm
Copy link
Contributor

Just for illustration, this is what I was getting if physical operator run multiple times (I would bet it run 3 times in this example 😀 )

    AdaptiveDatafusionExec: is_final=false, plan_id=1, stage_id=pending
      ProjectionExec: expr=[big_col@1 as big_col, big_col@0 as big_col]
        CrossJoinExec
          CoalescePartitionsExec
            ExchangeExec: partitioning=None, plan_id=2, stage_id=pending, stage_resolved=false
              CooperativeExec
                CooperativeExec
                  CooperativeExec
                    MockPartitionedScan: num_partitions=2, statistics=[Rows=Inexact(1024), Bytes=Inexact(8192), [(Col[0]:)]]
          ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0, stage_id=0, stage_resolved=true
            CooperativeExec
              CooperativeExec
                CooperativeExec
                  StatisticsExec: col_count=1, row_count=Inexact(262144)

my naive (wrong) tinking was that change to transform down would fix it.

anyway, thanks @danielhumanmod

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] EnsureCooperative is not idempotent

4 participants