Skip to content

Add cost model#486

Open
gabotechs wants to merge 1 commit into
gabrielmusat/task-spawner-refactor-and-cache-invalidationfrom
gabrielmusat/cost-calculation
Open

Add cost model#486
gabotechs wants to merge 1 commit into
gabrielmusat/task-spawner-refactor-and-cache-invalidationfrom
gabrielmusat/cost-calculation

Conversation

@gabotechs

@gabotechs gabotechs commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

This is one PR from the following stack of PRs:


Mostly rescued from:

This PR is pretty big, but the whole purpose is to expose a very simple function:

pub(crate) fn calculate_cost(plan: &Arc<dyn ExecutionPlan>, cfg: &DistributedConfig) -> Result<usize>;

While keeping everything else private to the rest of the codebase.


This PR introduces a cost model used in Adaptive task count assignation, that attributes a compute cost to a plan based on:

  1. The Statistics available by normal DataFusion mechanisms (ExecutionPlan::partition_statistics())
  2. The compute time complexity of the different operators in a plan

The whole meat of this PR lives in the src/distributed_planner/statistics/compute_per_node.rs file: it's in charge of attributing a time complexity to the different ExecutionPlan implementations using big O notation. For example:

        let plan = plan_costs(r#"SELECT * FROM weather WHERE "MinTemp" > 5"#, 1).await;
        assert_snapshot!(plan, @r"
        O((out_Cols+Col0)) | FilterExec: MinTemp@0 > 5
         O(out_Cols) | DataSourceExec: file_groups={...}
        ");

In this example:

  • DataSourceExec needs to allocate all the RecordBatches for all the output columns, as it's the data source, so it's time complexity is O(out_Cols), as its compute complexity linearly scales with the amount of columns it needs to allocate.
  • FilterExec needs to:
    1. Perform a linear predicate evaluation over the MinTemp column (O(Col0))
    2. Allocate full new RecordBatches for all output columns based on rows filtered by the evaulated predicate (O(out_Cols))

The tests in compute_per_node.rs contains several examples of this for the different operators, which can go from pretty simple (FilterExec, ProjectionExec), to very complex (HashJoinExec).

Once the time complexity in big O notation is attributed to a node, doing a simple multiplication math of the columns appearing in the notation by the estimated byte_size of those columns (given by Statistics), gives the compute cost measured in bytes.

@gabotechs gabotechs force-pushed the gabrielmusat/task-spawner-refactor-and-cache-invalidation branch from bda1c2d to 2879131 Compare June 8, 2026 12:19
@gabotechs gabotechs force-pushed the gabrielmusat/cost-calculation branch from bda802f to 4c66ecb Compare June 8, 2026 12:19
@gabotechs gabotechs force-pushed the gabrielmusat/task-spawner-refactor-and-cache-invalidation branch from 2879131 to 4e96136 Compare June 8, 2026 14:57
@gabotechs gabotechs force-pushed the gabrielmusat/cost-calculation branch from 4c66ecb to 26eeb03 Compare June 8, 2026 15:11
@gabotechs gabotechs force-pushed the gabrielmusat/task-spawner-refactor-and-cache-invalidation branch 2 times, most recently from 5ad9e3f to 4cb23c0 Compare June 9, 2026 06:42
@gabotechs gabotechs force-pushed the gabrielmusat/cost-calculation branch 2 times, most recently from 3db9e03 to 20ed9aa Compare June 9, 2026 09:28
@gabotechs gabotechs marked this pull request as ready for review June 10, 2026 06:45
gabotechs added a commit that referenced this pull request Jun 11, 2026
This is one PR from the following stack of PRs:
- #477
- #463
<- you are here
- #464
- #478
- #479
- #486
- #432

This PR introduces a NetworkBoundaryBuilder argument to the network
boundary injection logic, allowing more flexible and configurable
strategies for determining which exchanges require network
communication. This enables better optimization of data movement across
distributed tasks.
gabotechs added a commit that referenced this pull request Jun 11, 2026
This is one PR from the following stack of PRs:

- #477
- #463
- #464
<- you are here
- #478
- #479
- #486
- #432

This PR introduces a MaxGauge metric to provide better tracking of peak
values in distributed metrics collection. This enables more accurate
monitoring of resource utilization and helps identify bottlenecks in the
execution pipeline.
gabotechs added a commit that referenced this pull request Jun 11, 2026
This is one PR from the following stack of PRs:
- #477
- #463
- #464
- #478
<- you are here
- #479
- #486
- #432

---

Introduces the `ProducerHead` type:

```rust
pub enum ProducerHead {
    /// No specific head node is necessary.
    None,
    /// The head node should be a [BroadcastExec].
    BroadcastExec { output_partitions: usize },
    /// The head node should be a [RepartitionExec].
    RepartitionExec { partitioning: Partitioning },
}
```

Which is passed over the network while remotely executing tasks in order
to set the appropriate node at the head of a stage.

Today, this is a noop because the right head node in stages is ensured
statically at planning time, but in follow up PRs, network boundaries
can get swamped and reorganized arbitrarily.

One example that happens in AQE:

1. A JOIN is planned as a CollectLeft

```js
HashJoinExec: mode=CollectLeft
  CoalescePartitionsExec:
    [Stage 1] => NetworkBroadcastExec
      BroadcastExec
        DistributedLeafExec: unknown size
  DistributedLeafExec: unknown size
```

2. While collecting runtime statistics, it happens that `Stage 1` is
huge, and during AQE the JOINs are swapped

```js
HashJoinExec: mode=CollectLeft
  DistributedLeafExec: small size
  CoalescePartitionsExec:
    [Stage 1] => NetworkBroadcastExec
      BroadcastExec
        DistributedLeafExec: big size
```

3. The `Stage 1` is now on the probe side, so it needs to be rewritten
to a `NetworkShuffleExec`, otherwise duplicate data will be returned:

```js
HashJoinExec: mode=CollectLeft
  DistributedLeafExec: small size
  CoalescePartitionsExec:
    [Stage 1] => NetworkShuffleExec
      RepartitionExec // <- dynamically swapped at runtime based on the passed `ProducerHead`
        DistributedLeafExec: big size
```

Passing a `ProducerHead` at execution time unlocks two things:
1. dynamically set the fanout width accounting for a dynamically scaled
upper stage
2. dynamically set the correct operator `BroadcastExec` or
`RepartitionExec` based on the network boundary above, which might have
changed because of AQE
@gabotechs gabotechs force-pushed the gabrielmusat/task-spawner-refactor-and-cache-invalidation branch 2 times, most recently from 4c906e5 to 6b7d1a4 Compare June 11, 2026 08:00
@gabotechs gabotechs force-pushed the gabrielmusat/cost-calculation branch 2 times, most recently from b176c44 to 8422609 Compare June 11, 2026 08:27
@gabotechs gabotechs force-pushed the gabrielmusat/task-spawner-refactor-and-cache-invalidation branch from 6b7d1a4 to fc261f1 Compare June 12, 2026 08:12
@gabotechs gabotechs force-pushed the gabrielmusat/cost-calculation branch 2 times, most recently from 156eaf7 to dad19f2 Compare June 12, 2026 09:36
@gabotechs gabotechs force-pushed the gabrielmusat/task-spawner-refactor-and-cache-invalidation branch from fc261f1 to a4ceac5 Compare June 12, 2026 11:03
@gabotechs gabotechs force-pushed the gabrielmusat/cost-calculation branch from dad19f2 to b8e0d65 Compare June 12, 2026 11:03
@gabotechs gabotechs force-pushed the gabrielmusat/task-spawner-refactor-and-cache-invalidation branch from a4ceac5 to 1c864d5 Compare June 14, 2026 17:20
@gabotechs gabotechs force-pushed the gabrielmusat/cost-calculation branch from b8e0d65 to 5575ad2 Compare June 14, 2026 17:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant