No eager buffering in network connections#477
Conversation
| fn default() -> Self { | ||
| let cache = Cache::builder() | ||
| .time_to_idle(Duration::from_secs(60)) | ||
| .time_to_idle(Duration::from_mins(10)) |
There was a problem hiding this comment.
As we are no longer eagerly calling the remote streams, there's now a legitimate use case where the time gap between when the plan was sent in CoordinatorChannel until it's executed with ExecuteTask is significant.
This typically happens in JOINs, where the stage on the build side is called immediately, but the stage on the probe side will be delayed until the left side has been fully gathered. Having 10 mins here gives enough head room for the join to fully collect the build side until it starts executing the probe side.
5d52360 to
f43a75a
Compare
There was a problem hiding this comment.
The idea makes sense here and I understand the issue with buffering memory before on late consumers.
I am curious though do you see a case where this was actually beneficial. In the previous behavior we could be like prefetching / buffering data before it is actually request so it is ready to go right away when the consumer asks for it. I was thinking there may be cases where the time to first batch now for the consumer might be slower without this?
Could we bench this? But trust your judgement on the call 👍
EDIT:
I am seeing your point about the JOINs and ya I think I agree here and this is more like a happy mistake from uninteded behavior
56908a9 to
08fa7fd
Compare
Doing that now. I do expect some performance impact (which is expected). |
@gabotechs nice feel free to ping when some results are ready 😄 |
|
This is the benchmark comparison against I'll try now running with |
|
Now against With hash join buffering enabled, things look as before |
@gabotechs noice, thank you 🙇 |
|
I think the CI failure is because of #480 |
08fa7fd to
5064fd0
Compare
This is one PR from the following stack of PRs: - #477 - #463 <- you are here - #464 - #478 - #479 - #486 - #432 This PR introduces a NetworkBoundaryBuilder argument to the network boundary injection logic, allowing more flexible and configurable strategies for determining which exchanges require network communication. This enables better optimization of data movement across distributed tasks.
This is one PR from the following stack of PRs: - #477 - #463 - #464 <- you are here - #478 - #479 - #486 - #432 This PR introduces a MaxGauge metric to provide better tracking of peak values in distributed metrics collection. This enables more accurate monitoring of resource utilization and helps identify bottlenecks in the execution pipeline.
This is one PR from the following stack of PRs: - #477 - #463 - #464 - #478 <- you are here - #479 - #486 - #432 --- Introduces the `ProducerHead` type: ```rust pub enum ProducerHead { /// No specific head node is necessary. None, /// The head node should be a [BroadcastExec]. BroadcastExec { output_partitions: usize }, /// The head node should be a [RepartitionExec]. RepartitionExec { partitioning: Partitioning }, } ``` Which is passed over the network while remotely executing tasks in order to set the appropriate node at the head of a stage. Today, this is a noop because the right head node in stages is ensured statically at planning time, but in follow up PRs, network boundaries can get swamped and reorganized arbitrarily. One example that happens in AQE: 1. A JOIN is planned as a CollectLeft ```js HashJoinExec: mode=CollectLeft CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: unknown size DistributedLeafExec: unknown size ``` 2. While collecting runtime statistics, it happens that `Stage 1` is huge, and during AQE the JOINs are swapped ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: big size ``` 3. The `Stage 1` is now on the probe side, so it needs to be rewritten to a `NetworkShuffleExec`, otherwise duplicate data will be returned: ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkShuffleExec RepartitionExec // <- dynamically swapped at runtime based on the passed `ProducerHead` DistributedLeafExec: big size ``` Passing a `ProducerHead` at execution time unlocks two things: 1. dynamically set the fanout width accounting for a dynamically scaled upper stage 2. dynamically set the correct operator `BroadcastExec` or `RepartitionExec` based on the network boundary above, which might have changed because of AQE
This is one PR from the following stack of PRs:
Network boundaries in this project are currently breaking one assumption from upstream DataFusion:
SendableRecordBatchStreams yield record batches in two situations:https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/physical-plan/src/execution_plan.rs#L973-L988
Today, network boundaries pulling from remote sources are breaking this rule, because they start yielding
RecordBatchesover the network even if no poll has ever happened to theSendableRecordBatchStreamreturned by the network boundary.This has two consequences:
Consequence 2 is nice, but it should be delivered using standard upstream mechanisms:
https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/common/src/config.rs#L695-L709
Not accidentally by how remote network boundaries work.
This PR makes it so that remote network boundaries only start the network stream on first poll, instead of on
.execute()call, as stated by theEvaluationType::Eagerdocs.