Skip to content

Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator#450

Merged
gabotechs merged 6 commits into
datafusion-contrib:mainfrom
shehab-ali:sa/partition-task
Jun 4, 2026
Merged

Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator#450
gabotechs merged 6 commits into
datafusion-contrib:mainfrom
shehab-ali:sa/partition-task

Conversation

@shehab-ali

@shehab-ali shehab-ali commented May 11, 2026

Copy link
Copy Markdown
Contributor

Motivation

Issue #310 highlighted that FileScanConfigTaskEstimator::scale_up_leaf_node — the function that inflates a leaf DataSourceExec's partition count so it can be sliced across task_count distributed tasks — was splitting each input file group independently. When a group's file count wasn't divisible by task_count, the remainder always fell on the earlier sub-groups. With many input groups, that boundary skew compounded: some downstream tasks ended up with consistently more files than others, leaving workers idle at the tail of each stage.

Change

Flatten all files across all input groups, then round-robin them into N × task_count output groups (where N is the original input group count). This preserves the total fan-out PartitionIsolatorExec expects while smoothing skew across the original group boundaries.

let input_group_count = file_scan.file_groups.len().max(1);
let all_partitioned_files = file_scan
    .file_groups
    .iter()
    .flat_map(|file_group| file_group.iter().cloned())
    .collect::<Vec<_>>();
let file_groups =
    rebalance_round_robin(all_partitioned_files, input_group_count * task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();

rebalance_round_robin caps its target group count at items.len() so we don't emit empty partitions when there are fewer files than the requested fan-out.

Why this is needed

The previous per-group split_files was correct per group but had no global view. Two effects compounded:

  1. Tail skew: workers assigned to later-numbered partitions in each group consistently got fewer files. On Clickbench (43 queries, 100 input files), this skew is what allows queries like q2 and q1 to land 3–4× faster — those queries' bottleneck is the slowest scan task, not the total work.
  2. Idle workers: with 16 workers and skewed file assignment, some workers spent ~25% of stage time waiting for the laggards to finish.

Global round-robin spreads partitioned files evenly across every output partition, so the slowest task carries the same load as the median.

Benchmark Results

Spotlight: with q28 rebalancing changes the plan

clickbench_0-100/q28 (REGEXP_REPLACE(...) ... GROUP BY ... HAVING COUNT(*) > 100000) is the clearest single-query demonstration of the change. The leaf scan's output partition count visibly differs between main and this PR, and the latency moves with it.

Focused re-run on the 16-worker EC2 cluster (1 warmup + 5 iterations):

Branch per-iter (ms) avg leaf file_groups
main 5753 / 5873 / 5483 / 6023 / 5629 5752 92
this PR 4217 / 4139 / 4018 / 3777 / 3780 3985 107

1.44× faster with the leaf DataSourceExec now emitting 107 groups instead of 92 — the per-group split_files could not produce that fan-out for the hits table's non-uniform input layout.

New unit test

test_scale_up_leaf_node_rebalances_across_input_groups in src/distributed_planner/task_estimator.rs exercises the non-uniform case the existing integration fixtures miss (3 input groups × 5 files, task_count=3). It asserts that early output groups mix files from multiple input groups — a property the old per-group split_files could not produce regardless of task_count.

What we tried and rejected

An alternative target_groups = (file_count / files_per_task).max(task_count) was tested to try to recover the small q37-class regressions by packing more files per group. It collapsed group count down to task_count, eliminated intra-task parallelism, and regressed q20/q24/q25 by 3–4×. The current input_group_count * task_count formula is preserved because it keeps the per-task partition fan-out the downstream operators rely on.

Test Plan

  • cargo test -p datafusion-distributed task_estimator — new unit tests cover (a) group-boundary skew rebalancing and (b) the cap-to-file-count edge case
  • TPCH sf10 on remote EC2 cluster: 1.06× faster total
  • Clickbench 0–100 on remote EC2 cluster: 1.18× faster total
  • Visual inspection of generated plans on scan-heavy queries confirms output partition count is preserved

Notes / limitations

  • This PR focuses on balancing and file-count capping behavior.
  • It does not introduce safeguards for cross-side co-partitioning guarantees in partitioned joins; those may require follow-up work.
  • Small regressions (q37, q14, q33) on sparse-file workloads where each output group ends up with a single file are documented above. They are bounded (<100ms absolute on sub-second queries) and dwarfed by the gains.

@shehab-ali shehab-ali marked this pull request as ready for review May 12, 2026 19:53
Comment on lines +282 to +288
let all_partitioned_files = file_scan
.file_groups
.iter()
.flat_map(|file_group| file_group.iter().cloned())
.collect::<Vec<_>>();
let file_groups = rebalance_round_robin(all_partitioned_files, task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! sounds like a good thing to try. Can you try running the benchmarks for this and report the outcome?

@shehab-ali shehab-ali changed the title Add global rebalancing in file splitting between tasks Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator May 27, 2026
Comment on lines +283 to +290
let all_partitioned_files = file_scan
.file_groups
.iter()
.flat_map(|file_group| file_group.iter().cloned())
.collect::<Vec<_>>();
let file_groups =
rebalance_round_robin(all_partitioned_files, input_group_count * task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One requirement for this type of changes is that they should prove through the benchmarks that they indeed bring performance benefits, but this change does not seem to be meeting this criteria.

Maybe there are opportunities for further re-splitting PartitionedFiles?

Also, you might want to try ClickBench instead, which has a greater number of files VS TPC-H or TPC-DS, you probably would get some better numbers there.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that this PR shows 0 diffs in the plan snapshots makes me thing that it might be collaterally doing exactly the same thing as the previous code.

Maybe that's the reason why you see no performance impact?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reran benchmarks and clickbench and did a few tries to detect which ones were actual regressions vs noise. Updated the description with findings. Seems like the change is overall worth it for the performance wins.

@gabotechs gabotechs May 27, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still very suspicious about the plan snapshots not having changed at all, it really looks like the files are being distributed exactly the same as before. That's worth double checking.

Do you see the plans actually changing when running the remote benchmarks against main?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a look myself, and I think we really are not doing anything different from what we have on main, the two file distribution algorithms (main vs this PR) happen to be equivalent.

Take TPC-H Q1 as an example:

TPC-H Q1 With changes rolled back
┌───── DistributedExec ── Tasks: t0:[p0]plan_bytes_sent_0=89.22 K, plan_send_latency_avg_0=1.54ms, plan_send_latency_max_0=1.86ms
│ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], metrics=[output_rows_0=4, elapsed_compute_0=19.64µs, output_bytes_0=608.0 B, output_batches_0=1][Stage 2] => NetworkCoalesceExec: output_partitions=24, input_tasks=3, metrics=[elapsed_compute_0=692.10µs, msg_count_0=28, network_latency_count_0=28, max_mem_used_0=8.69 K, network_latency_sum_0=4.81ms, bytes_transferred_0=name:bytes_transferred 23.1 KB, network_latency_first_0=name:network_latency_first 178.43µs, network_latency_max_0=name:network_latency_max 309.74µs, network_latency_min_0=name:network_latency_min 98.64µs, network_latency_p50_0=name:network_latency_p50 167.77µs, network_latency_p95_0=name:network_latency_p95 226.47µs]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=5.95µs, elapsed_compute_1=8.37µs, elapsed_compute_2=12.66µs, output_bytes_0=0.0 B, output_bytes_1=0.0 B, output_bytes_2=0.0 B, output_batches_0=0, output_batches_1=0, output_batches_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0]ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(Int64(1))@9 as count_order], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=6.20µs, elapsed_compute_1=16.54µs, elapsed_compute_2=6.01µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, expr_0_eval_time_0=287ns, expr_0_eval_time_1=521ns, expr_0_eval_time_2=920ns, expr_1_eval_time_0=167ns, expr_1_eval_time_1=206ns, expr_1_eval_time_2=75ns, expr_2_eval_time_0=129ns, expr_2_eval_time_1=286ns, expr_2_eval_time_2=197ns, expr_3_eval_time_0=147ns, expr_3_eval_time_1=277ns, expr_3_eval_time_2=132ns, expr_4_eval_time_0=257ns, expr_4_eval_time_1=231ns, expr_4_eval_time_2=149ns, expr_5_eval_time_0=216ns, expr_5_eval_time_1=296ns, expr_5_eval_time_2=135ns, expr_6_eval_time_0=441ns, expr_6_eval_time_1=201ns, expr_6_eval_time_2=105ns, expr_7_eval_time_0=155ns, expr_7_eval_time_1=7.52µs, expr_7_eval_time_2=135ns, expr_8_eval_time_0=74ns, expr_8_eval_time_1=216ns, expr_8_eval_time_2=134ns, expr_9_eval_time_0=218ns, expr_9_eval_time_1=257ns, expr_9_eval_time_2=256ns]AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=298.56µs, elapsed_compute_1=345.21µs, elapsed_compute_2=344.54µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, peak_mem_used_0=2.83 K, peak_mem_used_1=5.15 K, peak_mem_used_2=2.83 K, aggregate_arguments_time_0=12.00µs, aggregate_arguments_time_1=21.93µs, aggregate_arguments_time_2=12.26µs, aggregation_time_0=133.11µs, aggregation_time_1=161.59µs, aggregation_time_2=161.20µs, emitting_time_0=24.87µs, emitting_time_1=57.49µs, emitting_time_2=38.60µs, time_calculating_group_ids_0=15.47µs, time_calculating_group_ids_1=31.67µs, time_calculating_group_ids_2=30.81µs][Stage 1] => NetworkShuffleExec: output_partitions=8, input_tasks=3, metrics=[elapsed_compute_0=1.51ms, elapsed_compute_1=2.22ms, elapsed_compute_2=1.53ms, msg_count_0=27, msg_count_1=30, msg_count_2=27, network_latency_count_0=27, network_latency_count_1=30, network_latency_count_2=27, max_mem_used_0=28.21 K, max_mem_used_1=29.21 K, max_mem_used_2=19.17 K, network_latency_sum_0=4.72ms, network_latency_sum_1=8.81ms, network_latency_sum_2=7.41ms, bytes_transferred_0=name:bytes_transferred 35.6 KB, bytes_transferred_1=name:bytes_transferred 43.8 KB, bytes_transferred_2=name:bytes_transferred 35.6 KB, network_latency_first_0=name:network_latency_first 149.20µs, network_latency_first_1=name:network_latency_first 187.74µs, network_latency_first_2=name:network_latency_first 219.48µs, network_latency_max_0=name:network_latency_max 361.92µs, network_latency_max_1=name:network_latency_max 594.90µs, network_latency_max_2=name:network_latency_max 462.14µs, network_latency_min_0=name:network_latency_min 75.50µs, network_latency_min_1=name:network_latency_min 160.96µs, network_latency_min_2=name:network_latency_min 147.28µs, network_latency_p50_0=name:network_latency_p50 145.85µs, network_latency_p50_1=name:network_latency_p50 311.88µs, network_latency_p50_2=name:network_latency_p50 235.71µs, network_latency_p95_0=name:network_latency_p95 331.17µs, network_latency_p95_1=name:network_latency_p95 421.00µs, network_latency_p95_2=name:network_latency_p95 438.18µs]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p23] t1:[p0..p23] t2:[p0..p23]RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 24), input_partitions=8, metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=275.47µs, elapsed_compute_1=339.73µs, elapsed_compute_2=317.16µs, output_bytes_0=5.5 MB, output_bytes_1=5.5 MB, output_bytes_2=5.5 MB, output_batches_0=4, output_batches_1=4, output_batches_2=4, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, fetch_time_0=7.14s, fetch_time_1=5.31s, fetch_time_2=5.62s, repartition_time_0=155.40µs, repartition_time_1=130.79µs, repartition_time_2=125.20µs, send_time_0=116.70µs, send_time_1=91.15µs, send_time_2=88.74µs]AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=1.60s, elapsed_compute_1=1.17s, elapsed_compute_2=1.07s, output_bytes_0=4.1 KB, output_bytes_1=3.4 KB, output_bytes_2=3.4 KB, output_batches_0=6, output_batches_1=5, output_batches_2=5, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, peak_mem_used_0=3.15 M, peak_mem_used_1=2.63 M, peak_mem_used_2=2.63 M, aggregate_arguments_time_0=592.06ms, aggregate_arguments_time_1=429.92ms, aggregate_arguments_time_2=397.83ms, aggregation_time_0=2.05s, aggregation_time_1=1.51s, aggregation_time_2=1.38s, emitting_time_0=43.50µs, emitting_time_1=37.62µs, emitting_time_2=35.21µs, time_calculating_group_ids_0=564.97ms, time_calculating_group_ids_1=411.26ms, time_calculating_group_ids_2=370.47ms, reduction_factor_0=0.00011% (24/22.21 M), reduction_factor_1=0.00011% (20/18.47 M), reduction_factor_2=0.00011% (20/18.46 M)]ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=587.36ms, elapsed_compute_1=432.58ms, elapsed_compute_2=400.44ms, output_bytes_0=2.3 GB, output_bytes_1=1979.9 MB, output_bytes_2=1976.6 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, expr_0_eval_time_0=584.74ms, expr_0_eval_time_1=430.36ms, expr_0_eval_time_2=398.56ms, expr_1_eval_time_0=225.74µs, expr_1_eval_time_1=181.10µs, expr_1_eval_time_2=168.70µs, expr_2_eval_time_0=80.79µs, expr_2_eval_time_1=65.89µs, expr_2_eval_time_2=56.27µs, expr_3_eval_time_0=95.11µs, expr_3_eval_time_1=67.88µs, expr_3_eval_time_2=73.53µs, expr_4_eval_time_0=102.33µs, expr_4_eval_time_1=67.25µs, expr_4_eval_time_2=71.12µs, expr_5_eval_time_0=166.15µs, expr_5_eval_time_1=118.82µs, expr_5_eval_time_2=116.43µs, expr_6_eval_time_0=95.72µs, expr_6_eval_time_1=69.22µs, expr_6_eval_time_2=63.65µs]FilterExec: l_shipdate@6 <= 1998-09-02, projection=[l_quantity@0, l_extendedprice@1, l_discount@2, l_tax@3, l_returnflag@4, l_linestatus@5], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=680.57ms, elapsed_compute_1=523.34ms, elapsed_compute_2=470.96ms, output_bytes_0=2040.0 MB, output_bytes_1=1698.0 MB, output_bytes_2=1695.0 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, selectivity_0=99% (22.21 M/22.53 M), selectivity_1=99% (18.47 M/18.74 M), selectivity_2=99% (18.46 M/18.72 M)]PartitionIsolatorExec: tasks=3 partitions=23, metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M]DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=203.90µs, bloom_filter_eval_time_1=226.95µs, bloom_filter_eval_time_2=225.73µs, metadata_load_time_0=625.07µs, metadata_load_time_1=734.40µs, metadata_load_time_2=860.51µs, page_index_eval_time_0=204.27µs, page_index_eval_time_1=198.23µs, page_index_eval_time_2=206.15µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=263.84µs, statistics_eval_time_1=239.83µs, statistics_eval_time_2=292.26µs, time_elapsed_opening_0=2.09ms, time_elapsed_opening_1=3.25ms, time_elapsed_opening_2=2.52ms, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=916.20ms, time_elapsed_processing_2=859.68ms, time_elapsed_scanning_total_0=7.12s, time_elapsed_scanning_total_1=5.30s, time_elapsed_scanning_total_2=5.61s, time_elapsed_scanning_until_data_0=583.23ms, time_elapsed_scanning_until_data_1=479.05ms, time_elapsed_scanning_until_data_2=852.57ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]
    └──────────────────────────────────────────────────
TPC-H Q1 With changes incliuded
┌───── DistributedExec ── Tasks: t0:[p0]plan_bytes_sent_0=89.23 K, plan_send_latency_avg_0=1.73ms, plan_send_latency_max_0=2.68ms
│ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], metrics=[output_rows_0=4, elapsed_compute_0=18.89µs, output_bytes_0=608.0 B, output_batches_0=1][Stage 2] => NetworkCoalesceExec: output_partitions=24, input_tasks=3, metrics=[elapsed_compute_0=954.86µs, msg_count_0=28, network_latency_count_0=28, max_mem_used_0=8.02 K, network_latency_sum_0=5.96ms, bytes_transferred_0=name:bytes_transferred 23.1 KB, network_latency_first_0=name:network_latency_first 183.19µs, network_latency_max_0=name:network_latency_max 310.71µs, network_latency_min_0=name:network_latency_min 140.34µs, network_latency_p50_0=name:network_latency_p50 213.28µs, network_latency_p95_0=name:network_latency_p95 276.61µs]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=4.36µs, elapsed_compute_1=15.59µs, elapsed_compute_2=4.90µs, output_bytes_0=0.0 B, output_bytes_1=0.0 B, output_bytes_2=0.0 B, output_batches_0=0, output_batches_1=0, output_batches_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0]ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(Int64(1))@9 as count_order], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=5.12µs, elapsed_compute_1=12.71µs, elapsed_compute_2=4.81µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, expr_0_eval_time_0=240ns, expr_0_eval_time_1=1.08µs, expr_0_eval_time_2=206ns, expr_1_eval_time_0=127ns, expr_1_eval_time_1=369ns, expr_1_eval_time_2=133ns, expr_2_eval_time_0=155ns, expr_2_eval_time_1=274ns, expr_2_eval_time_2=153ns, expr_3_eval_time_0=135ns, expr_3_eval_time_1=315ns, expr_3_eval_time_2=133ns, expr_4_eval_time_0=137ns, expr_4_eval_time_1=370ns, expr_4_eval_time_2=137ns, expr_5_eval_time_0=156ns, expr_5_eval_time_1=671ns, expr_5_eval_time_2=160ns, expr_6_eval_time_0=136ns, expr_6_eval_time_1=165ns, expr_6_eval_time_2=132ns, expr_7_eval_time_0=136ns, expr_7_eval_time_1=276ns, expr_7_eval_time_2=135ns, expr_8_eval_time_0=134ns, expr_8_eval_time_1=253ns, expr_8_eval_time_2=119ns, expr_9_eval_time_0=237ns, expr_9_eval_time_1=463ns, expr_9_eval_time_2=221ns]AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=236.22µs, elapsed_compute_1=307.03µs, elapsed_compute_2=320.92µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, peak_mem_used_0=2.74 K, peak_mem_used_1=4.96 K, peak_mem_used_2=2.74 K, aggregate_arguments_time_0=12.21µs, aggregate_arguments_time_1=26.66µs, aggregate_arguments_time_2=12.99µs, aggregation_time_0=87.38µs, aggregation_time_1=142.93µs, aggregation_time_2=103.72µs, emitting_time_0=8.00µs, emitting_time_1=18.06µs, emitting_time_2=20.59µs, time_calculating_group_ids_0=16.48µs, time_calculating_group_ids_1=27.35µs, time_calculating_group_ids_2=15.43µs][Stage 1] => NetworkShuffleExec: output_partitions=8, input_tasks=3, metrics=[elapsed_compute_0=1.67ms, elapsed_compute_1=3.05ms, elapsed_compute_2=2.51ms, msg_count_0=27, msg_count_1=30, msg_count_2=27, network_latency_count_0=27, network_latency_count_1=30, network_latency_count_2=27, max_mem_used_0=19.51 K, max_mem_used_1=31.81 K, max_mem_used_2=23.88 K, network_latency_sum_0=6.84ms, network_latency_sum_1=8.22ms, network_latency_sum_2=5.12ms, bytes_transferred_0=name:bytes_transferred 35.6 KB, bytes_transferred_1=name:bytes_transferred 43.8 KB, bytes_transferred_2=name:bytes_transferred 35.6 KB, network_latency_first_0=name:network_latency_first 186.04µs, network_latency_first_1=name:network_latency_first 170.14µs, network_latency_first_2=name:network_latency_first 189.94µs, network_latency_max_0=name:network_latency_max 448.69µs, network_latency_max_1=name:network_latency_max 479.85µs, network_latency_max_2=name:network_latency_max 258.36µs, network_latency_min_0=name:network_latency_min 129.43µs, network_latency_min_1=name:network_latency_min 155.84µs, network_latency_min_2=name:network_latency_min 83.94µs, network_latency_p50_0=name:network_latency_p50 221.98µs, network_latency_p50_1=name:network_latency_p50 265.76µs, network_latency_p50_2=name:network_latency_p50 196.88µs, network_latency_p95_0=name:network_latency_p95 388.63µs, network_latency_p95_1=name:network_latency_p95 373.39µs, network_latency_p95_2=name:network_latency_p95 245.33µs]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p23] t1:[p0..p23] t2:[p0..p23]RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 24), input_partitions=8, metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=315.46µs, elapsed_compute_1=280.17µs, elapsed_compute_2=261.70µs, output_bytes_0=5.5 MB, output_bytes_1=5.5 MB, output_bytes_2=5.5 MB, output_batches_0=4, output_batches_1=4, output_batches_2=4, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, fetch_time_0=7.74s, fetch_time_1=7.87s, fetch_time_2=6.50s, repartition_time_0=184.70µs, repartition_time_1=156.25µs, repartition_time_2=175.73µs, send_time_0=110.07µs, send_time_1=103.09µs, send_time_2=110.93µs]AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=1.55s, elapsed_compute_1=1.09s, elapsed_compute_2=1.12s, output_bytes_0=4.1 KB, output_bytes_1=3.4 KB, output_bytes_2=3.4 KB, output_batches_0=6, output_batches_1=5, output_batches_2=5, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, peak_mem_used_0=3.15 M, peak_mem_used_1=2.63 M, peak_mem_used_2=2.63 M, aggregate_arguments_time_0=572.17ms, aggregate_arguments_time_1=398.12ms, aggregate_arguments_time_2=414.16ms, aggregation_time_0=1.99s, aggregation_time_1=1.42s, aggregation_time_2=1.43s, emitting_time_0=65.62µs, emitting_time_1=66.52µs, emitting_time_2=60.03µs, time_calculating_group_ids_0=548.17ms, time_calculating_group_ids_1=380.51ms, time_calculating_group_ids_2=392.05ms, reduction_factor_0=0.00011% (24/22.21 M), reduction_factor_1=0.00011% (20/18.47 M), reduction_factor_2=0.00011% (20/18.46 M)]ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=572.59ms, elapsed_compute_1=395.93ms, elapsed_compute_2=418.19ms, output_bytes_0=2.3 GB, output_bytes_1=1979.9 MB, output_bytes_2=1976.6 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, expr_0_eval_time_0=570.09ms, expr_0_eval_time_1=394.00ms, expr_0_eval_time_2=416.27ms, expr_1_eval_time_0=232.87µs, expr_1_eval_time_1=155.01µs, expr_1_eval_time_2=165.62µs, expr_2_eval_time_0=89.42µs, expr_2_eval_time_1=64.03µs, expr_2_eval_time_2=61.63µs, expr_3_eval_time_0=97.32µs, expr_3_eval_time_1=73.44µs, expr_3_eval_time_2=71.84µs, expr_4_eval_time_0=83.77µs, expr_4_eval_time_1=79.62µs, expr_4_eval_time_2=65.81µs, expr_5_eval_time_0=108.56µs, expr_5_eval_time_1=88.77µs, expr_5_eval_time_2=93.48µs, expr_6_eval_time_0=84.67µs, expr_6_eval_time_1=88.75µs, expr_6_eval_time_2=68.42µs]FilterExec: l_shipdate@6 <= 1998-09-02, projection=[l_quantity@0, l_extendedprice@1, l_discount@2, l_tax@3, l_returnflag@4, l_linestatus@5], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=667.22ms, elapsed_compute_1=498.78ms, elapsed_compute_2=485.06ms, output_bytes_0=2040.0 MB, output_bytes_1=1698.0 MB, output_bytes_2=1695.0 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, selectivity_0=99% (22.21 M/22.53 M), selectivity_1=99% (18.47 M/18.74 M), selectivity_2=99% (18.46 M/18.72 M)]PartitionIsolatorExec: tasks=3 partitions=23, metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M]DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=181.47µs, bloom_filter_eval_time_1=145.15µs, bloom_filter_eval_time_2=151.24µs, metadata_load_time_0=953.55ms, metadata_load_time_1=1.36s, metadata_load_time_2=1.10s, page_index_eval_time_0=159.14µs, page_index_eval_time_1=128.17µs, page_index_eval_time_2=138.31µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=358.54µs, statistics_eval_time_1=355.26µs, statistics_eval_time_2=289.19µs, time_elapsed_opening_0=956.45ms, time_elapsed_opening_1=1.36s, time_elapsed_opening_2=1.10s, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=878.36ms, time_elapsed_processing_2=896.36ms, time_elapsed_scanning_total_0=6.78s, time_elapsed_scanning_total_1=6.50s, time_elapsed_scanning_total_2=5.39s, time_elapsed_scanning_until_data_0=671.53ms, time_elapsed_scanning_until_data_1=728.33ms, time_elapsed_scanning_until_data_2=735.26ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]
    └──────────────────────────────────────────────────

Here's the scoped diff for the two DataSourceExecs:

DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=203.90µs, bloom_filter_eval_time_1=226.95µs, bloom_filter_eval_time_2=225.73µs, metadata_load_time_0=625.07µs, metadata_load_time_1=734.40µs, metadata_load_time_2=860.51µs, page_index_eval_time_0=204.27µs, page_index_eval_time_1=198.23µs, page_index_eval_time_2=206.15µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=263.84µs, statistics_eval_time_1=239.83µs, statistics_eval_time_2=292.26µs, time_elapsed_opening_0=2.09ms, time_elapsed_opening_1=3.25ms, time_elapsed_opening_2=2.52ms, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=916.20ms, time_elapsed_processing_2=859.68ms, time_elapsed_scanning_total_0=7.12s, time_elapsed_scanning_total_1=5.30s, time_elapsed_scanning_total_2=5.61s, time_elapsed_scanning_until_data_0=583.23ms, time_elapsed_scanning_until_data_1=479.05ms, time_elapsed_scanning_until_data_2=852.57ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=181.47µs, bloom_filter_eval_time_1=145.15µs, bloom_filter_eval_time_2=151.24µs, metadata_load_time_0=953.55ms, metadata_load_time_1=1.36s, metadata_load_time_2=1.10s, page_index_eval_time_0=159.14µs, page_index_eval_time_1=128.17µs, page_index_eval_time_2=138.31µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=358.54µs, statistics_eval_time_1=355.26µs, statistics_eval_time_2=289.19µs, time_elapsed_opening_0=956.45ms, time_elapsed_opening_1=1.36s, time_elapsed_opening_2=1.10s, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=878.36ms, time_elapsed_processing_2=896.36ms, time_elapsed_scanning_total_0=6.78s, time_elapsed_scanning_total_1=6.50s, time_elapsed_scanning_total_2=5.39s, time_elapsed_scanning_until_data_0=671.53ms, time_elapsed_scanning_until_data_1=728.33ms, time_elapsed_scanning_until_data_2=735.26ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not being able to reproduce the performance benefits, from what I see what you reported is typically within the noise threshold.

Maybe try running with --iterations 10?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see. It looks like the queries specifically in the benchmarks were not affected by this change. I just added a test and I ran benchmarks on q28. I added the change in the description (Spotlight: q28) that should explain the performance improvement with it

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 this is pretty interesting, I do am seeing some performance improvements in ClickBench (x1.35) but not in q28, q28 breaks even.

TOTAL: prev=671813.4324120003 ms, new=498864.71842399996 ms, diff=1.35 faster ✅
      q0: prev=   2 ms, new=   2 ms, diff=1.00 slower ✖
      q1: prev= 514 ms, new= 444 ms, diff=1.16 faster ✔
      q2: prev= 621 ms, new= 534 ms, diff=1.16 faster ✔
      q3: prev= 546 ms, new= 527 ms, diff=1.04 faster ✔
      q4: prev= 581 ms, new= 542 ms, diff=1.07 faster ✔
      q5: prev= 808 ms, new= 721 ms, diff=1.12 faster ✔
      q6: prev=   2 ms, new=   2 ms, diff=1.00 slower ✖
      q7: prev= 453 ms, new= 440 ms, diff=1.03 faster ✔
      q8: prev= 679 ms, new= 649 ms, diff=1.05 faster ✔
      q9: prev=7112 ms, new= 812 ms, diff=8.76 faster ✅
     q10: prev= 701 ms, new= 618 ms, diff=1.13 faster ✔
     q11: prev= 675 ms, new= 591 ms, diff=1.14 faster ✔
     q12: prev= 823 ms, new=1158 ms, diff=1.41 slower ❌
     q13: prev=1045 ms, new= 849 ms, diff=1.23 faster ✅
     q14: prev= 818 ms, new= 722 ms, diff=1.13 faster ✔
     q15: prev= 756 ms, new= 551 ms, diff=1.37 faster ✅
     q16: prev=1573 ms, new= 865 ms, diff=1.82 faster ✅
     q17: prev=1010 ms, new= 928 ms, diff=1.09 faster ✔
     q18: prev=1321 ms, new=1388 ms, diff=1.05 slower ✖
     q19: prev= 481 ms, new= 453 ms, diff=1.06 faster ✔
     q20: prev=2709 ms, new=2149 ms, diff=1.26 faster ✅
     q21: prev=3316 ms, new=2613 ms, diff=1.27 faster ✅
     q22: prev=3788 ms, new=2922 ms, diff=1.30 faster ✅
     q23: prev=9854 ms, new=7151 ms, diff=1.38 faster ✅
     q24: prev=1350 ms, new= 639 ms, diff=2.11 faster ✅
     q25: prev= 776 ms, new= 517 ms, diff=1.50 faster ✅
     q26: prev= 794 ms, new= 576 ms, diff=1.38 faster ✅
     q27: prev=2948 ms, new=2464 ms, diff=1.20 faster ✔
     q28: prev=5889 ms, new=5968 ms, diff=1.01 slower ✖
     q29: prev= 547 ms, new= 484 ms, diff=1.13 faster ✔
     q30: prev= 781 ms, new= 649 ms, diff=1.20 faster ✅
     q31: prev=1265 ms, new= 912 ms, diff=1.39 faster ✅
     q32: prev=1724 ms, new=1107 ms, diff=1.56 faster ✅
     q33: prev=4158 ms, new=2982 ms, diff=1.39 faster ✅
     q34: prev=3586 ms, new=3004 ms, diff=1.19 faster ✔
     q35: prev= 752 ms, new= 667 ms, diff=1.13 faster ✔
     q36: prev= 426 ms, new= 381 ms, diff=1.12 faster ✔
     q37: prev= 288 ms, new= 249 ms, diff=1.16 faster ✔
     q38: prev= 446 ms, new= 380 ms, diff=1.17 faster ✔
     q39: prev= 628 ms, new= 632 ms, diff=1.01 slower ✖
     q40: prev= 252 ms, new= 215 ms, diff=1.17 faster ✔
     q41: prev= 177 ms, new= 209 ms, diff=1.18 slower ✖
     q42: prev= 184 ms, new= 200 ms, diff=1.09 slower ✖

Let's do this then 👍, the code changed a bit here after #467, so you'll need to update with main, although the idea and code should be pretty much the same.

Adds a unit test that exercises the non-uniform case the integration
fixtures miss: 3 input groups of 5 files each. Asserts the early output
groups mix files from multiple input groups, which the prior per-group
split_files algorithm could not produce.
# Conflicts:
#	src/distributed_planner/task_estimator.rs
@shehab-ali shehab-ali force-pushed the sa/partition-task branch from f845242 to c24ff0e Compare June 1, 2026 14:00
@gabotechs

Copy link
Copy Markdown
Collaborator

If you update this branch with main I think the CI failures should go away

Comment thread .vscode/settings.json Outdated
Comment thread src/distributed_planner/task_estimator.rs Outdated

@gabotechs gabotechs left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks good, thanks @shehab-ali!

@gabotechs gabotechs merged commit 8d92830 into datafusion-contrib:main Jun 4, 2026
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants