Refactor task spawner into QueryCoordinator and StageCoordinator#479
Open
gabotechs wants to merge 2 commits into
Open
Refactor task spawner into QueryCoordinator and StageCoordinator#479gabotechs wants to merge 2 commits into
gabotechs wants to merge 2 commits into
Conversation
This was referenced Jun 2, 2026
Merged
59d810a to
de99ac6
Compare
gabotechs
commented
Jun 2, 2026
Comment on lines
+11
to
+17
| pub(super) struct LatencyMetric { | ||
| max: Time, | ||
| avg: Time, | ||
| max_latency_micros: AtomicU64, | ||
| sum_latency_micros: AtomicU64, | ||
| count_latency_micros: AtomicU64, | ||
| } |
Collaborator
Author
There was a problem hiding this comment.
All the contents of this file are unchanged code that previously lived inside task_spawner.rs. I just extracted it to its own file as it's kind of isolated.
Comment on lines
-69
to
+65
| plan.children()[0].clone(), | ||
| distributed_exec.plan_for_viz()?, |
Collaborator
Author
There was a problem hiding this comment.
During AQE, the plan meant for visualization is the one that gets constructed dynamically during execution, not necessarily the one that arrived on the first place to DistributedExec. This plan_for_viz() method ensures the appropriate plan is rewritten with metrics for visualization purposes.
| } | ||
|
|
||
| /// DataFusion metrics system is pretty limited from an API standpoint. This intermediate struct | ||
| /// bridges the gaps that are not satisfied by upstream API for measuring latency. | ||
| pub(super) struct LatencyMetric { |
Collaborator
Author
There was a problem hiding this comment.
Pretty unfortunate diff here:
LatencyMetricwas just moved tolatency_metric.rsas-is- New
keep_stream_aliveandNotifyGuardare introduced CoordinatorToWorkerMetricswas moved from the top of this file to the bottom as-is
520fe2c to
82af353
Compare
de99ac6 to
03aeeb9
Compare
gabotechs
added a commit
that referenced
this pull request
Jun 2, 2026
This is one PR from the following stack of PRs: - #477 <- you are here - #463 - #464 - #478 - #479 - #432 --- Network boundaries in this project are currently breaking one assumption from upstream DataFusion: `SendableRecordBatchStream`s yield record batches in two situations: - If explicitly polled - Eagerly on an spawned task triggered by the first poll https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/physical-plan/src/execution_plan.rs#L973-L988 Today, network boundaries pulling from remote sources are breaking this rule, because they start yielding `RecordBatches` over the network even if no poll has ever happened to the `SendableRecordBatchStream` returned by the network boundary. This has two consequences: 1. Greater memory consumption, as data will get accumulated in the network boundaries while nobody is polling for it. 2. Greater speed on JOINs, as an artifact of eagerly buffering right sides even before they are ever polled Consequence 2 is nice, but it should be delivered using standard upstream mechanisms: https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/common/src/config.rs#L695-L709 Not accidentally by how remote network boundaries work. --- This PR makes it so that remote network boundaries only start the network stream on first poll, instead of on `.execute()` call, as stated by the `EvaluationType::Eager` docs.
82af353 to
b2f8e5e
Compare
03aeeb9 to
d551cde
Compare
b2f8e5e to
ca79034
Compare
d551cde to
9a9f0de
Compare
ca79034 to
431d5a2
Compare
9a9f0de to
bda1c2d
Compare
Open
431d5a2 to
fe92488
Compare
bda1c2d to
2879131
Compare
fe92488 to
7b7b571
Compare
4e96136 to
5ad9e3f
Compare
423e39a to
8ab2da5
Compare
5ad9e3f to
4cb23c0
Compare
gabotechs
added a commit
that referenced
this pull request
Jun 11, 2026
This is one PR from the following stack of PRs: - #477 - #463 <- you are here - #464 - #478 - #479 - #486 - #432 This PR introduces a NetworkBoundaryBuilder argument to the network boundary injection logic, allowing more flexible and configurable strategies for determining which exchanges require network communication. This enables better optimization of data movement across distributed tasks.
gabotechs
added a commit
that referenced
this pull request
Jun 11, 2026
This is one PR from the following stack of PRs: - #477 - #463 - #464 <- you are here - #478 - #479 - #486 - #432 This PR introduces a MaxGauge metric to provide better tracking of peak values in distributed metrics collection. This enables more accurate monitoring of resource utilization and helps identify bottlenecks in the execution pipeline.
8ab2da5 to
657a531
Compare
gabotechs
added a commit
that referenced
this pull request
Jun 11, 2026
This is one PR from the following stack of PRs: - #477 - #463 - #464 - #478 <- you are here - #479 - #486 - #432 --- Introduces the `ProducerHead` type: ```rust pub enum ProducerHead { /// No specific head node is necessary. None, /// The head node should be a [BroadcastExec]. BroadcastExec { output_partitions: usize }, /// The head node should be a [RepartitionExec]. RepartitionExec { partitioning: Partitioning }, } ``` Which is passed over the network while remotely executing tasks in order to set the appropriate node at the head of a stage. Today, this is a noop because the right head node in stages is ensured statically at planning time, but in follow up PRs, network boundaries can get swamped and reorganized arbitrarily. One example that happens in AQE: 1. A JOIN is planned as a CollectLeft ```js HashJoinExec: mode=CollectLeft CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: unknown size DistributedLeafExec: unknown size ``` 2. While collecting runtime statistics, it happens that `Stage 1` is huge, and during AQE the JOINs are swapped ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: big size ``` 3. The `Stage 1` is now on the probe side, so it needs to be rewritten to a `NetworkShuffleExec`, otherwise duplicate data will be returned: ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkShuffleExec RepartitionExec // <- dynamically swapped at runtime based on the passed `ProducerHead` DistributedLeafExec: big size ``` Passing a `ProducerHead` at execution time unlocks two things: 1. dynamically set the fanout width accounting for a dynamically scaled upper stage 2. dynamically set the correct operator `BroadcastExec` or `RepartitionExec` based on the network boundary above, which might have changed because of AQE
fc261f1 to
a4ceac5
Compare
a4ceac5 to
1c864d5
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is one PR from the following stack of PRs:
This is a pure refactor PR with a couple of implementation detail changes:
Reducing the bloat of complex functions in already complicated parts of the codebase.
For this, two key structs are introduced:
QueryCoordinator: scoped to a whole distributed query, it handles references to pieces of data global to a query's lifetime, like theTaskContext, the metrics, theJoinSetused for spawning tasks, etc... it's in charge also of buildingStageCoordinatorinstances, which are scoped per-stage instead of per-query.StageCoordinator: this is the oldCoordinatorToWorkerTaskSpawner, but with some more methods that allow reusability and some better naming. It handles all the comms between workers and coordinator needed for driving a stage forward.This allows reducing the bloat in
prepare_static_planand the futureprepare_dynamic_planfunctions.Ensuring a coordinator->worker channel is held active for as long as the
DistributedExecnode is executing the query on the coordinator.For the static planner, this is a noop, as the previous model worked fine before, but this will become important in the future for the dynamic planner. In the dynamic planner, the plan can be set by some stages, but they might never reach execution, so instead of coupling the task entry cache invalidation to the task execution finish, it's coupled instead of the coordinator->channel lifetime.
This has one collateral effect:
WorkUnitfeeds can no longer rely on the globalcoordinator->workerEOS signal for ensuring that no furtherWorkUnitfeed is going to be sent by the coordinator, so they need an explicit EOS message that signals that no furtherWorkUnits will be received, even though thecoordinator->workerchannel will still be alive for a while.Add a
plan_for_vizfield inDistributedExec.This is a new slot in
DistributedExecthat holds a reference of the plan that is supposed to be rewritten with metrics for visualization purposes.Again, for the static planner this is a noop, because the plan meant for visualization is equal to the plan that arrived as child to
DistributedExecon the first place. However, during dynamic planning, the plan that arrives toDistributedExecis not going to be the same as the final one after execution, so we need a slot for storing that final plan.