Adaptive task count assignation#432
Conversation
37a2990 to
9f3d99b
Compare
024e0f1 to
5751a15
Compare
9f3d99b to
48cae57
Compare
5751a15 to
d2a57e1
Compare
48cae57 to
2aa45ce
Compare
PR factored out from #416. This is one PR from the following stack of PRs: - #422 <- you are here - #424 - #416 - #425 - #426 - #427 - #432 Previously, we where force-propagating a max task count assignation below the NetworkBroadcast so that the remote build side has never more tasks than the stage above. In a dynamic task count assignation context, we can no longer do this, as by the time you realize a remote build side is going to have more tasks than the stage above, the build side might have already started executing, and by that time its task count is set in stone. This is fine, build side in broadcast should have an arbitrarily more expensive build side. What matters there is not that the build side is cheap to execute, but that it returns little amount of data. A build side can return very little data (just a couple of rows) and still be very expensive to execute This is actually the reason why there is a small speedup in benchmarks. --- <details><summary>tpch_sf10 1.09 faster ✔</summary> ```text === Comparing tpch_sf10 results from engine 'datafusion-distributed-main' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] === q1: prev=1269 ms, new=1286 ms, diff=1.01 slower ✖ q2: prev= 390 ms, new= 395 ms, diff=1.01 slower ✖ q3: prev= 784 ms, new= 826 ms, diff=1.05 slower ✖ q4: prev= 413 ms, new= 392 ms, diff=1.05 faster ✔ q5: prev=1306 ms, new=1242 ms, diff=1.05 faster ✔ q6: prev= 534 ms, new= 528 ms, diff=1.01 faster ✔ q7: prev=1483 ms, new=1420 ms, diff=1.04 faster ✔ q8: prev=3001 ms, new=1585 ms, diff=1.89 faster ✅ q9: prev=2054 ms, new=2009 ms, diff=1.02 faster ✔ q10: prev= 951 ms, new= 921 ms, diff=1.03 faster ✔ q11: prev= 322 ms, new= 304 ms, diff=1.06 faster ✔ q12: prev= 670 ms, new= 676 ms, diff=1.01 slower ✖ q13: prev= 624 ms, new= 613 ms, diff=1.02 faster ✔ q14: prev= 594 ms, new= 546 ms, diff=1.09 faster ✔ q15: prev= 778 ms, new= 756 ms, diff=1.03 faster ✔ q16: prev= 223 ms, new= 219 ms, diff=1.02 faster ✔ q17: prev=1644 ms, new=1733 ms, diff=1.05 slower ✖ q18: prev=1884 ms, new=1966 ms, diff=1.04 slower ✖ q19: prev= 802 ms, new= 727 ms, diff=1.10 faster ✔ q20: prev= 784 ms, new= 706 ms, diff=1.11 faster ✔ q21: prev=2112 ms, new=1925 ms, diff=1.10 faster ✔ q22: prev= 251 ms, new= 261 ms, diff=1.04 slower ✖ TOTAL: prev=68651.703894 ms, new=63144.566305999986 ms, diff=1.09 faster ✔ ``` </details> <details><summary>tpcds_sf1 1.02 faster ✔</summary> ```text === Comparing tpcds_sf1 results from engine 'datafusion-distributed-dynamic-task-allocation' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] === q1: prev= 260 ms, new= 336 ms, diff=1.29 slower ❌ q2: prev= 290 ms, new= 321 ms, diff=1.11 slower ✖ q3: prev= 181 ms, new= 215 ms, diff=1.19 slower ✖ q4: prev=2039 ms, new=2184 ms, diff=1.07 slower ✖ q5: prev= 333 ms, new= 325 ms, diff=1.02 faster ✔ q6: prev= 622 ms, new= 676 ms, diff=1.09 slower ✖ q7: prev= 225 ms, new= 225 ms, diff=1.00 slower ✖ q8: prev= 312 ms, new= 200 ms, diff=1.56 faster ✅ q9: prev= 242 ms, new= 189 ms, diff=1.28 faster ✅ q10: prev= 480 ms, new= 494 ms, diff=1.03 slower ✖ q11: prev=1511 ms, new=1382 ms, diff=1.09 faster ✔ q12: prev= 262 ms, new= 292 ms, diff=1.11 slower ✖ q13: prev= 477 ms, new= 487 ms, diff=1.02 slower ✖ q14: prev= 637 ms, new= 782 ms, diff=1.23 slower ❌ q15: prev= 170 ms, new= 144 ms, diff=1.18 faster ✔ q16: prev= 350 ms, new= 379 ms, diff=1.08 slower ✖ q17: prev= 229 ms, new= 250 ms, diff=1.09 slower ✖ q18: prev= 295 ms, new= 281 ms, diff=1.05 faster ✔ q19: prev= 286 ms, new= 254 ms, diff=1.13 faster ✔ q20: prev= 206 ms, new= 143 ms, diff=1.44 faster ✅ q21: prev= 305 ms, new= 282 ms, diff=1.08 faster ✔ q22: prev= 390 ms, new= 401 ms, diff=1.03 slower ✖ q23: prev= 672 ms, new= 640 ms, diff=1.05 faster ✔ q24: prev= 368 ms, new= 376 ms, diff=1.02 slower ✖ q25: prev= 203 ms, new= 279 ms, diff=1.37 slower ❌ q26: prev= 147 ms, new= 198 ms, diff=1.35 slower ❌ q27: prev= 406 ms, new= 358 ms, diff=1.13 faster ✔ q28: prev= 195 ms, new= 161 ms, diff=1.21 faster ✅ q29: prev= 237 ms, new= 219 ms, diff=1.08 faster ✔ q31: prev= 343 ms, new= 327 ms, diff=1.05 faster ✔ q32: prev= 142 ms, new= 152 ms, diff=1.07 slower ✖ q33: prev= 277 ms, new= 211 ms, diff=1.31 faster ✅ q34: prev= 199 ms, new= 188 ms, diff=1.06 faster ✔ q35: prev= 514 ms, new= 498 ms, diff=1.03 faster ✔ q36: prev= 341 ms, new= 311 ms, diff=1.10 faster ✔ q37: prev= 256 ms, new= 302 ms, diff=1.18 slower ✖ q38: prev= 228 ms, new= 245 ms, diff=1.07 slower ✖ q39: prev= 259 ms, new= 266 ms, diff=1.03 slower ✖ q40: prev= 281 ms, new= 325 ms, diff=1.16 slower ✖ q41: prev= 87 ms, new= 90 ms, diff=1.03 slower ✖ q42: prev= 116 ms, new= 124 ms, diff=1.07 slower ✖ q43: prev= 190 ms, new= 132 ms, diff=1.44 faster ✅ q44: prev= 214 ms, new= 144 ms, diff=1.49 faster ✅ q45: prev= 244 ms, new= 186 ms, diff=1.31 faster ✅ q46: prev= 355 ms, new= 288 ms, diff=1.23 faster ✅ q47: prev= 374 ms, new= 387 ms, diff=1.03 slower ✖ q48: prev= 384 ms, new= 360 ms, diff=1.07 faster ✔ q49: prev= 285 ms, new= 229 ms, diff=1.24 faster ✅ q50: prev= 352 ms, new= 343 ms, diff=1.03 faster ✔ q51: prev= 305 ms, new= 224 ms, diff=1.36 faster ✅ q52: prev= 138 ms, new= 127 ms, diff=1.09 faster ✔ q53: prev= 143 ms, new= 158 ms, diff=1.10 slower ✖ q54: prev= 331 ms, new= 271 ms, diff=1.22 faster ✅ q55: prev= 132 ms, new= 145 ms, diff=1.10 slower ✖ q56: prev= 298 ms, new= 233 ms, diff=1.28 faster ✅ q57: prev= 335 ms, new= 354 ms, diff=1.06 slower ✖ q58: prev= 280 ms, new= 284 ms, diff=1.01 slower ✖ q59: prev= 293 ms, new= 270 ms, diff=1.09 faster ✔ q60: prev= 361 ms, new= 311 ms, diff=1.16 faster ✔ q61: prev= 856 ms, new= 849 ms, diff=1.01 faster ✔ q62: prev= 639 ms, new= 665 ms, diff=1.04 slower ✖ q63: prev= 224 ms, new= 148 ms, diff=1.51 faster ✅ q64: prev=1159 ms, new=1193 ms, diff=1.03 slower ✖ q65: prev= 229 ms, new= 228 ms, diff=1.00 faster ✔ q66: prev= 730 ms, new= 714 ms, diff=1.02 faster ✔ q67: prev= 406 ms, new= 420 ms, diff=1.03 slower ✖ q68: prev= 289 ms, new= 320 ms, diff=1.11 slower ✖ q69: prev= 513 ms, new= 570 ms, diff=1.11 slower ✖ q70: prev= 394 ms, new= 386 ms, diff=1.02 faster ✔ q71: prev= 250 ms, new= 329 ms, diff=1.32 slower ❌ q72: prev=6644 ms, new=6609 ms, diff=1.01 faster ✔ q73: prev= 201 ms, new= 210 ms, diff=1.04 slower ✖ q74: prev= 797 ms, new= 743 ms, diff=1.07 faster ✔ q75: prev= 375 ms, new= 452 ms, diff=1.21 slower ❌ q76: prev= 165 ms, new= 230 ms, diff=1.39 slower ❌ q77: prev= 232 ms, new= 271 ms, diff=1.17 slower ✖ q78: prev= 341 ms, new= 353 ms, diff=1.04 slower ✖ q79: prev= 226 ms, new= 228 ms, diff=1.01 slower ✖ q80: prev= 332 ms, new= 336 ms, diff=1.01 slower ✖ q81: prev= 216 ms, new= 191 ms, diff=1.13 faster ✔ q82: prev= 258 ms, new= 262 ms, diff=1.02 slower ✖ q83: prev= 240 ms, new= 287 ms, diff=1.20 slower ✖ q84: prev= 240 ms, new= 228 ms, diff=1.05 faster ✔ q85: prev= 455 ms, new= 364 ms, diff=1.25 faster ✅ q86: prev= 124 ms, new= 138 ms, diff=1.11 slower ✖ q87: prev= 203 ms, new= 208 ms, diff=1.02 slower ✖ q88: prev= 404 ms, new= 350 ms, diff=1.15 faster ✔ q89: prev= 237 ms, new= 167 ms, diff=1.42 faster ✅ q90: prev= 189 ms, new= 187 ms, diff=1.01 faster ✔ q91: prev= 377 ms, new= 328 ms, diff=1.15 faster ✔ q92: prev= 284 ms, new= 131 ms, diff=2.17 faster ✅ q93: prev= 154 ms, new= 142 ms, diff=1.08 faster ✔ q94: prev= 302 ms, new= 308 ms, diff=1.02 slower ✖ q95: prev= 365 ms, new= 290 ms, diff=1.26 faster ✅ q96: prev= 177 ms, new= 157 ms, diff=1.13 faster ✔ q97: prev= 235 ms, new= 170 ms, diff=1.38 faster ✅ q98: prev= 165 ms, new= 159 ms, diff=1.04 faster ✔ q99: prev= 951 ms, new= 995 ms, diff=1.05 slower ✖ TOTAL: prev=123029.07797800002 ms, new=120962.55825200005 ms, diff=1.02 faster ✔ ``` </details>
An independent refactor factored out from #416 This is one PR from the following stack of PRs: - #422 - #424 <- you are here - #416 - #425 - #426 - #427 - #432 Previously the stage struct was a "hidden" state machine that could have two states: 1. A state where the Stage contains the input plan and is locally accessible and traversible. ```rust pub struct Stage { query_id: ... num: ... plan: Some(plan), tasks: vec![None, None, None] } ``` 2. A state where the input plan is serialized, and the worker URLs are assigned. This happens in `DistributedExec` right before execution on `prepare_plan()` ```rust pub struct Stage { query_id: ... num: ... plan: None, tasks: vec![Some("http://1"), Some("http://2"), Some("http://3")] } ``` This PR makes this behavior explicit, and represented with an `enum`: ```rust pub enum Stage { Local(LocalStage), Remote(RemoteStage), } pub struct LocalStage { pub query_id: Uuid, pub num: usize, pub plan: Arc<dyn ExecutionPlan>, pub tasks: usize, } pub struct RemoteStage { pub query_id: Uuid, pub num: usize, pub workers: Vec<Url>, } ```
d2a57e1 to
a147fd7
Compare
2aa45ce to
d4a4236
Compare
f643281 to
b0cb082
Compare
139aa3f to
96dfd61
Compare
… planner (#416) This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 <- you are here - #425 - #426 - #427 - #432 The main purpose of this PR is to make distributed planning in a single pass, rather than the current two that communicate each other via an intermediate struct (`AnnotatedPlan`). This change cascades into several other changes that produce a nicer public API for building custom distributed plans, but also produce a big diff. ## Dropping the two-step annotation + NB injection On a dynamic task assignation context, choosing the task count for a stage based on the previous one can no longer be done statically. After "annotating" a stage, and before "annotating" the one above, we need to be able to send it for execution, collect runtime metrics, and based on that decide the task count for the stage above. This means that the stage below should be good to be sent for execution before the full annotation process has finished, meaning that we need to do everything there is to be done in the "annotation" process, we can no longer divide the distribution process in several steps that recurse the whole plan. ## Network boundaries no longer mutate their children In order for network boundaries to know what mutations to apply to their children, they need to now how many consumer tasks are they going to be running, but this might not be know until execution time, so if we want to dynamically assign tasks to stages, there's no way at planning time that we can know how to mutate the children. For example, we do not now how to scale up a `RepartitionExec` if we don't know how many `NetworkShuffleExec`s are going to be consuming it. The responsibility of preparing network boundaries inputs (e.g., scaling RepartitionExec) is now factor out into a separate `network_boundary_scale_input()` function that can be called either at planning time or at execution time. Right now, it's still just called at planning time.
This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 - #425 <- you are here - #426 - #427 - #432 Removes `impl_set_plan.rs` in favor of just inlining its contents to `impl_coordinator_channel.rs`. In future changes, the relationship between `impl_set_plan.rs` and `impl_coordinator_channel.rs` will get more complex, increasing the function signature `impl_set_plan.rs` exposes to `impl_coordinator_channel.rs`. This proves that the split between those two files does not make sense, as they have never been able to evolve independently, so we may as well just not pay the price of a complex function signature in between.
00eeb5b to
62e00ac
Compare
cdf6f6a to
07c09e9
Compare
This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 - #425 - #426 <- you are here - #427 - #432 `distributed.rs` contains the `DistributedExec` node, which has evolved towards acting as a "coordinator". It's in charge of assigning tasks to worker URLs, setting the subplans in the appropriate workers, collecting metrics, streaming work units, etc... Soon, it will evolve even more as we prepare for adaptative query execution. This PR ships two things: - A refactor that dismantles the old `distributed.rs` into smaller reusable modules in the `coordinator/` module. - Bypass the metrics collection machinery if metrics collection is disabled
62e00ac to
ca2d4a9
Compare
eef3497 to
6be38b4
Compare
64a36b4 to
8e539eb
Compare
fe050cc to
709612a
Compare
66478d3 to
e81bfcf
Compare
8e539eb to
c1922d9
Compare
e81bfcf to
2a69af1
Compare
c80380c to
7b9b3ab
Compare
77e9d3f to
dc07ea6
Compare
jayshrivastava
left a comment
There was a problem hiding this comment.
Pretty promising results. Left a few comments. I also had codex do a sweep:
Last Commit Review Findings
Review target: last commit, Support dynamic task count assignation.
1. Blocking: dynamic planning can deadlock on set-but-never-executed stages
DistributedExec::execute keeps the end-query guard alive until after drain_pending_tasks():
src/coordinator/distributed.rs:199src/coordinator/distributed.rs:218
At the same time, send_plan_task keeps the coordinator-to-worker request stream open with keep_stream_alive:
src/coordinator/query_coordinator.rs:184
The worker response stream also waits on the metrics stream:
src/worker/impl_coordinator_channel.rs:180
If the dynamic planner sets and samples a stage but later never executes that stage, metrics never completes. Then drain_pending_tasks() waits for the worker response stream to end, but the response stream is kept alive by the coordinator channel, and that channel only closes when the guard drops. The guard cannot drop because execution is waiting inside drain_pending_tasks().
This is a circular wait.
2. Blocking: task-data invalidation is still coupled to execution finish
The commit rationale says task entry invalidation moves from task execution finish to coordinator-channel lifetime, but the old invalidation path is still present:
src/worker/impl_execute_task.rs:97
The new coordinator-channel invalidation is also present:
src/worker/impl_coordinator_channel.rs:167
That means task data can still be removed when partition execution drains, even while the coordinator channel is intentionally held open. This keeps the old early-cleanup behavior and can break dynamic cases with late consumers, retries, or future execution requests that rely on the coordinator channel still owning task-data lifetime.
3. High: per-column throughput stats are almost always zero
set_per_col_bytes_per_second computes:
ready / total_ready * total_bytes_per_secondat:
src/execution_plans/sampler.rs:357
Because this is integer division, any column with ready < total_ready becomes 0 before multiplication. In ordinary multi-column cases, most or all columns will report zero bytes per second.
The fix is to multiply before dividing, with overflow-safe arithmetic if needed:
ready.saturating_mul(total_bytes_per_second as u64) / total_ready4. High: bytes_per_partition_per_second = 0 can panic
The config setter accepts any usize:
src/distributed_ext.rs:761
But dynamic planning uses the value as a divisor:
src/coordinator/prepare_dynamic_plan.rs:49
If a user sets bytes_per_partition_per_second to 0, planning can panic in div_ceil. The setter should reject zero, or planning should clamp or error before dividing.
5. High: remote partition statistics ignore the requested partition
Stage::partition_statistics returns the whole remote stage statistics regardless of the requested partition:
src/stage.rs:149
If DataFusion calls partition_statistics(Some(partition)), each partition can appear to contain the full stage statistics. That can inflate rows and bytes by partition count and distort AQE decisions.
The safer behavior is either:
- return unknown stats for per-partition requests when only global stats are available, or
- track real per-partition stats and return the matching partition.
6. Medium: SamplerExec::execute can panic on stale partition ids
SamplerExec::execute indexes into partition_samplers before validating the partition:
src/execution_plans/sampler.rs:547
If a stale or mismatched dynamic partition id reaches this plan, this panics instead of returning a DataFusion execution error.
This should be changed to get(partition) and return exec_err! when the partition is invalid.
7. Medium: dynamic coalesce boundaries discard runtime stats
Dynamic planning explicitly skips runtime-stat collection for NetworkCoalesceExec:
src/coordinator/prepare_dynamic_plan.rs:98
This may be fine for the final gather stage, but if a coalesce boundary feeds another planned stage, downstream task sizing gets unknown stats. That can make later dynamic decisions less accurate or effectively static.
If NetworkCoalesceExec can appear below additional distributed operators, it should either gather stats too or document why it is always terminal for dynamic planning.
8. Medium: plan reconstruction is destructive
PlanReconstructor removes entries from stage_map while reconstructing:
src/coordinator/prepare_dynamic_plan.rs:157
That makes reconstruction one-shot. If a stage is referenced by more than one network boundary, or reconstruction is retried or reused for diagnostics, the second lookup fails.
Prefer borrowing and cloning from the map instead of removing entries.
9. Medium: sampler/load-info errors can surface too late
Worker load-info errors are emitted through the same response stream as metrics:
src/worker/impl_coordinator_channel.rs:180
The dynamic planner waits on the separate load-info receiver, while send_plan_task sees response-stream errors later:
src/coordinator/query_coordinator.rs:202
This can let planning continue with partial or zero stats, while the actual sampler error surfaces later during pending-task draining. Runtime-stat collection should receive explicit errors, not just missing load-info messages.
10. Low: the new lifecycle behavior needs a targeted regression test
The highest-risk scenario is:
- Dynamic planner sends
SetPlanRequestfor a stage. - Worker starts sampling and sends
LoadInfo. - AQE chooses a different shape and that stage never reaches
execute_task. - The query still must finish and
drain_pending_tasks()must not hang.
Existing correctness tests likely exercise stages that are eventually consumed, so they may not catch this lifecycle bug. Add a targeted test for a planned-but-abandoned dynamic stage and assert that pending-task draining completes.
| /// Maximum number of record batches buffered by a sampler. | ||
| max_batches_buffered: MaxGaugeMetric, | ||
| /// Peak memory buffered by any partition sampler during the sampling phase. | ||
| max_mem_used: Gauge, |
There was a problem hiding this comment.
I dont think this measures what you want. Right now we do this for every batch in all partitions.
self.max_mem_used.add(batch_size);
So this measures the size of all the batches pushed to the buffer.
You probably want to use an int to store the total size of bytes in the buffer on every push/pop. Then make this a MaxGaugeMetric and update it on every push/pop.
There was a problem hiding this comment.
Also none of these metrics are tagged by partition. We should probably do that. SamplerExecMetrics::new() can take a partition id.
There was a problem hiding this comment.
And the comment on MaxGaugeMetric should say that it only stores a max.
/// Similar to DataFusion's Gauge metric, but aggregates between instances using `max` instead of
/// `sum`.
I honestly thought it was a regular Guage with a different aggregator.
There was a problem hiding this comment.
It do is a regular Gauge with a max aggregator.
So this measures the size of all the batches pushed to the buffer.
This is equal to the max memory used. Note that entries in the buffer are never popped. Instead, they are converted into a chained stream of the buffered entries + the ones that are yet to come, but this is an only-push buffer.
This is actually not really a buffer, it's more like a peek, where some entries are peeked and then streamed back as-is without any extra buffering in the middle.
I'll update the name.
| let metrics: Arc<LazyLock<_, Box<dyn FnOnce() -> SamplerExecMetrics + Send>>> = | ||
| Arc::new(LazyLock::new(Box::new(move || { | ||
| SamplerExecMetrics::new(&metric_set_clone) | ||
| }))); |
There was a problem hiding this comment.
I don't understand this, or the comment.
otherwise the coordinator side will register them when they are never relevant there
There was a problem hiding this comment.
I extended a bit the comment to explain it a bit more
| let n_cols = self.input.schema().fields.len(); | ||
|
|
||
| let mut reporter = LoadInfoDropHandler { | ||
| load_info: pb::LoadInfo { |
There was a problem hiding this comment.
Nit: can we implement new on this?
There was a problem hiding this comment.
This is a protobuf message autogenerated, and typically further methods are not implemented for protobuf messages.
I can add a separate function though, I'll do that.
| Some(self.worker_connections.metrics.clone_inner()) | ||
| } | ||
|
|
||
| fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> { |
There was a problem hiding this comment.
I wonder if this could use a little test coverage. Maybe it's covered elsewhere, I'm still reviewing.
There was a problem hiding this comment.
You might notice that there's very little new unit tests overall. This is mainly because this new feature is covered by all tpch, tpcds and clickbench integration tests, by actually running the full queries with AQE enabled
There was a problem hiding this comment.
I have explicitly not added almost any new unit test because of this, and because of the amount of from scratch iterations I've done over this, but as this stabilizes, if you see opportunities for covering things that are not already covered by the integration tests, those are more than welcome
| stage_map: DashMap<usize, (Arc<dyn ExecutionPlan>, MetricsSet)>, | ||
| } | ||
|
|
||
| impl PlanReconstructor { |
There was a problem hiding this comment.
this feels super random. it would be nice if it was shared between here and prepare_network_boundaries which also does things like insert_producer_head
There was a problem hiding this comment.
Both prepare_network_boundaries and this code are doing very different things:
prepare_network_boundariesis actually laying out the plan that will eventually get executed and sent to workers- This is just reconstructing the plan dynamically as different stages get sent to the workers during AQE so that we can then have something to visualize. The
insert_producer_headfor example is really just for the sake of a future visualization
There was a problem hiding this comment.
I'm going to add some comments about this
| return Ok(TreeNodeRecursion::Continue); | ||
| }; | ||
|
|
||
| if let Stage::Remote(remote) = nb.input_stage() |
There was a problem hiding this comment.
Would it be ok to add the remote stage to the NetworkBoundaryBuilder? So you don't have to recurse here. You could store an Option<RemoteStage>
There was a problem hiding this comment.
It would be a bit messy, because there can be an arbitrary number of RemoteStages present in the subplan here.
I think it's not too bad, recursing is negligible as long as no Arc operations or plan restructures happen during the recursion. .apply takes the plan by reference, and at no point it needs an owned reference to an Arc<dyn ExecutionPlan>, so the cost of having such recursion is ~0, unlike if you were down a transform_down() returning Transformed::yes at some point in the query
| if d_cfg.dynamic_task_count { | ||
| // The task count will be decided dynamically at execution time. | ||
| return Ok(Arc::new( | ||
| DistributedExec::new(plan).with_metrics_collection(d_cfg.collect_metrics), |
There was a problem hiding this comment.
Just a little bit of a smell.
The dynamic prepare plan does a lot of stuff that inject_network_boundaries, prepare_network_boundaries is supposed to do. Should we move those function calls to the static prepare plan? So the DistributedExec is always responsible for that stuff?
The downside is that the prepare static plan which happens below during planning would happen during execution in DistributedExec
There was a problem hiding this comment.
Yeah, I think that's a big drawback. For static planning, we really like to see the full plan before it's send for execution.
The split of responsibilities seems correct here:
- static planner: we plan statically inside
distributed_query_planner.rs, so all further logic is here (e.g.,prepare_network_boundaries) - dynamic planner: we plan during execution, so the logic is in
DistributedExec
| })); | ||
|
|
||
| // Stream back the metrics once the task finishes executing. | ||
| // The oneshot receiver resolves when impl_execute_task sends the collected |
There was a problem hiding this comment.
Also I noticed that we still have the old code in the impl_execute_task.rs
if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
There was a problem hiding this comment.
Yeah, we still need that old code for a couple of things:
- Marking execution as finished for stage metrics
- Sending the runtime metrics over the wire back to the coordinator
Additionally, the cache entry is still promptly invalidated here rather than waiting for the coordinator->worker channel to drop.
This should not be a problem, if the query finishes streaming arrow data, it does not matter what is waiting for what, everything is abruptly cancelled (but the metrics collection). This is actually not even related to this PR, although I think there might be better ways of doing this. As it's not related to this PR, I think it might be worth evaluating in a different one |
This is fine, the only consequence of this is that task data will get invalidated a bit earlier, which is acceptable and even desirable. Again, this did not change in this PR |
👍 This is a good call, doing this |
👍 Doing that |
Because of there being repartitions in between samplers and network boundaries, it's not possible to attribute some statistics to specific partitions. One thing that comes to mind is, if a specific partition is requested, maybe it's better to just divide the stats by the amount of partitions available, assuming that the partitions are going to be perfectly evenly distributed |
👍 Sounds reasonable |
This is fine an expected. We don't care about sampling this boundaries because we always know that the stage above should have |
This is fine, we are good with taking owned references for this, we don't need to hold them in memory. |
I think this is fine, the timing for all of this should be negligible, and I'm afraid that doing it differently can bring complexity to the code |
I actually have fought this as this was surfaced by the existing tests. This situation happens in many TPCH and TPC-DS queries, so I think we are covered on this front |
This is one PR from the following stack of PRs:
This PR implements support for dynamic task count assignation, allowing the distributed planner to determine the optimal number of tasks for each stage based on runtime characteristics and available resources. This enables adaptive query execution where task counts are optimized for the specific execution context rather than being determined statically.
There are two fundamental pieces delivered in this PR that make adaptive task count assignation possible:
SamplerExec
This is a new
ExecutionPlanimplementation that peeks a few initial record batches before execution, gathers statistics over them, and then reports aLoadInfomessage to theprepare_dynamic_planrunning on the coordinator┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌─────────────────────┐ │ │ │ │ ┌───────────────────▶│ DistributedExec │◀──────────────────────┐ │ │ │ │ │ │ │ └─────────────────────┘ │ ┌──┴─────┐ │ ┌─────────────────────┐ │ ┌─────┴──┐ │LoadInfo│ │ │ │LoadInfo│ └──┬─────┘ │ │ ... │ │ └─────┬──┘ │ │ │ │ │ │ └─────────────────────┘ │ │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐│ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ │ ProducerHead │ │ │ │ ProducerHead │ ││ │ │ (RepartitionExec or │ │ (RepartitionExec or │ │ │ │ │ BroadcastExec) │ │ │ │ BroadcastExec) │ ││ │ └─────────────────────┘ └─────────────────────┘ │ │ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ ││ │ │ │ │ │ │ └─┼───│ SamplerExec │ │ │ │ SamplerExec │───┼┘ │ │ │ │ │ └─────────────────────┘ │ │ └─────────────────────┘ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ │ │ │ │ │ │ ... │ │ ... │ │ │ │ │ │ │ │ │ └─────────────────────┘ └─────────────────────┘ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ │ │ │ │ │ DataSourceExec │ │ │ │ DataSourceExec │ │ │ │ │ │ │ └─────────────────────┘ │ │ └─────────────────────┘ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─Each
SamplerExecnode contains onePartitionSamplerper partition, and each partition sampling is treated independently:The way a
PartitionSamplerworks is that, before actual execution, it will peek a representative number ofRecordBatches and gather stats over them (per column byte count, per column NDV, per column null count, total row count, etc...). For that the following things happen:RecordBatches that are available synchronously withoutasyncgaps. Some operators likeAggregate(Partial)retain big internal representations in memory that can yield more than 1RecordBatchsynchronously, because they were already computed and stored in memory.asyncgap is reached, it pulls just 1 more record batch, and it measures the time it takes in thisasyncgap:RecordBatches are arrivingThis velocity information is what really matters towards deciding how many task count should be yielded.
prepare_dynamic_plan
This is analogous to the existing
prepare_static_plan, but with a runtime based dynamic task count assignation.In order to understand how it works, let's use the classical aggregation example:
The plan walks from bottom to top, until we delimit a stage, same as before:
We'll use runtime information of "Stage 1" in order to determine the task count for the future "Stage 2". For that, the plan is modified and a
SamplerExecis inserted just below the producer head (RepartitionExec):Let's say that the
TaskEstimatorfor the leaf nodeDataSourceExecdecided aDesired(3)task estimation. This subplan is then immediately sent to three workers, and upon setting the plan on the workers, all theSamplerExecs are kicked off and they start sampling even before any call to.execute()in any node:All the
SamplerExecs start sampling, and they start reportingLoadInfomessages to theprepare_dynamic_planfunction, which builds the nextNetworkShuffleExecboundary with some pre-definedStatistics, the ones that where collected at runtime by theSamplerExecs below:While delimiting the next stage, the one that contains the
ProjectionExecand theAggregate(final), the task count for that stage is decided purely over its compute cost, based on stats. At this point, the stats we have in that slice of the plan are very accurate, as we manage to gather them and condense them inNetworkShuffleExec, which at that point acts as a leaf node (as it's stage was already set toStage::Remotebecause it already was sent and started sampling).Based on the compute cost inferred from the runtime stats, we suddenly realize that there's very very little data flowing through the
SamplerExec, and as a consequence, the compute cost of "Stage 2" is estimated to be super low, so we collapse early to a single node:Benchmarks
Extracted comparing runs of this same branch with
--dynamic falseVS--dynamic true:TPCH SF1: Tasks 279 -> 318, prev=14631 ms, new=6727 ms, diff=2.17 faster ✅
TPCH SF10: Tasks 363 -> 734, prev=35577 ms, new=13014 ms, diff=2.73 faster ✅
TPCH SF100: Tasks 387 -> 1175, prev=307187 ms, new=62943 ms, diff=4.88 faster ✅
TPCDS SF1: Tasks 3079->2770, prev=62815 ms, new=44737 ms, diff=1.40 faster ✅
ClickBench 0-100: Tasks 912->609, prev=35116 ms, new=34185 ms, diff=1.03 faster ✔