Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Comment thread
shehab-ali marked this conversation as resolved.
Outdated
42 changes: 36 additions & 6 deletions src/distributed_planner/task_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,31 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?;

let mut new_file_scan = file_scan.clone();
new_file_scan.file_groups.clear();
for file_group in file_scan.file_groups.clone() {
new_file_scan
.file_groups
.extend(file_group.split_files(task_count));
}
let input_group_count = file_scan.file_groups.len().max(1);
let all_partitioned_files = file_scan
.file_groups
.iter()
.flat_map(|file_group| file_group.iter().cloned())
.collect::<Vec<_>>();
let file_groups =
rebalance_round_robin(all_partitioned_files, input_group_count * task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One requirement for this type of changes is that they should prove through the benchmarks that they indeed bring performance benefits, but this change does not seem to be meeting this criteria.

Maybe there are opportunities for further re-splitting PartitionedFiles?

Also, you might want to try ClickBench instead, which has a greater number of files VS TPC-H or TPC-DS, you probably would get some better numbers there.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that this PR shows 0 diffs in the plan snapshots makes me thing that it might be collaterally doing exactly the same thing as the previous code.

Maybe that's the reason why you see no performance impact?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reran benchmarks and clickbench and did a few tries to detect which ones were actual regressions vs noise. Updated the description with findings. Seems like the change is overall worth it for the performance wins.

@gabotechs gabotechs May 27, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still very suspicious about the plan snapshots not having changed at all, it really looks like the files are being distributed exactly the same as before. That's worth double checking.

Do you see the plans actually changing when running the remote benchmarks against main?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a look myself, and I think we really are not doing anything different from what we have on main, the two file distribution algorithms (main vs this PR) happen to be equivalent.

Take TPC-H Q1 as an example:

TPC-H Q1 With changes rolled back
┌───── DistributedExec ── Tasks: t0:[p0]plan_bytes_sent_0=89.22 K, plan_send_latency_avg_0=1.54ms, plan_send_latency_max_0=1.86ms
│ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], metrics=[output_rows_0=4, elapsed_compute_0=19.64µs, output_bytes_0=608.0 B, output_batches_0=1][Stage 2] => NetworkCoalesceExec: output_partitions=24, input_tasks=3, metrics=[elapsed_compute_0=692.10µs, msg_count_0=28, network_latency_count_0=28, max_mem_used_0=8.69 K, network_latency_sum_0=4.81ms, bytes_transferred_0=name:bytes_transferred 23.1 KB, network_latency_first_0=name:network_latency_first 178.43µs, network_latency_max_0=name:network_latency_max 309.74µs, network_latency_min_0=name:network_latency_min 98.64µs, network_latency_p50_0=name:network_latency_p50 167.77µs, network_latency_p95_0=name:network_latency_p95 226.47µs]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=5.95µs, elapsed_compute_1=8.37µs, elapsed_compute_2=12.66µs, output_bytes_0=0.0 B, output_bytes_1=0.0 B, output_bytes_2=0.0 B, output_batches_0=0, output_batches_1=0, output_batches_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0]ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(Int64(1))@9 as count_order], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=6.20µs, elapsed_compute_1=16.54µs, elapsed_compute_2=6.01µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, expr_0_eval_time_0=287ns, expr_0_eval_time_1=521ns, expr_0_eval_time_2=920ns, expr_1_eval_time_0=167ns, expr_1_eval_time_1=206ns, expr_1_eval_time_2=75ns, expr_2_eval_time_0=129ns, expr_2_eval_time_1=286ns, expr_2_eval_time_2=197ns, expr_3_eval_time_0=147ns, expr_3_eval_time_1=277ns, expr_3_eval_time_2=132ns, expr_4_eval_time_0=257ns, expr_4_eval_time_1=231ns, expr_4_eval_time_2=149ns, expr_5_eval_time_0=216ns, expr_5_eval_time_1=296ns, expr_5_eval_time_2=135ns, expr_6_eval_time_0=441ns, expr_6_eval_time_1=201ns, expr_6_eval_time_2=105ns, expr_7_eval_time_0=155ns, expr_7_eval_time_1=7.52µs, expr_7_eval_time_2=135ns, expr_8_eval_time_0=74ns, expr_8_eval_time_1=216ns, expr_8_eval_time_2=134ns, expr_9_eval_time_0=218ns, expr_9_eval_time_1=257ns, expr_9_eval_time_2=256ns]AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=298.56µs, elapsed_compute_1=345.21µs, elapsed_compute_2=344.54µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, peak_mem_used_0=2.83 K, peak_mem_used_1=5.15 K, peak_mem_used_2=2.83 K, aggregate_arguments_time_0=12.00µs, aggregate_arguments_time_1=21.93µs, aggregate_arguments_time_2=12.26µs, aggregation_time_0=133.11µs, aggregation_time_1=161.59µs, aggregation_time_2=161.20µs, emitting_time_0=24.87µs, emitting_time_1=57.49µs, emitting_time_2=38.60µs, time_calculating_group_ids_0=15.47µs, time_calculating_group_ids_1=31.67µs, time_calculating_group_ids_2=30.81µs][Stage 1] => NetworkShuffleExec: output_partitions=8, input_tasks=3, metrics=[elapsed_compute_0=1.51ms, elapsed_compute_1=2.22ms, elapsed_compute_2=1.53ms, msg_count_0=27, msg_count_1=30, msg_count_2=27, network_latency_count_0=27, network_latency_count_1=30, network_latency_count_2=27, max_mem_used_0=28.21 K, max_mem_used_1=29.21 K, max_mem_used_2=19.17 K, network_latency_sum_0=4.72ms, network_latency_sum_1=8.81ms, network_latency_sum_2=7.41ms, bytes_transferred_0=name:bytes_transferred 35.6 KB, bytes_transferred_1=name:bytes_transferred 43.8 KB, bytes_transferred_2=name:bytes_transferred 35.6 KB, network_latency_first_0=name:network_latency_first 149.20µs, network_latency_first_1=name:network_latency_first 187.74µs, network_latency_first_2=name:network_latency_first 219.48µs, network_latency_max_0=name:network_latency_max 361.92µs, network_latency_max_1=name:network_latency_max 594.90µs, network_latency_max_2=name:network_latency_max 462.14µs, network_latency_min_0=name:network_latency_min 75.50µs, network_latency_min_1=name:network_latency_min 160.96µs, network_latency_min_2=name:network_latency_min 147.28µs, network_latency_p50_0=name:network_latency_p50 145.85µs, network_latency_p50_1=name:network_latency_p50 311.88µs, network_latency_p50_2=name:network_latency_p50 235.71µs, network_latency_p95_0=name:network_latency_p95 331.17µs, network_latency_p95_1=name:network_latency_p95 421.00µs, network_latency_p95_2=name:network_latency_p95 438.18µs]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p23] t1:[p0..p23] t2:[p0..p23]RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 24), input_partitions=8, metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=275.47µs, elapsed_compute_1=339.73µs, elapsed_compute_2=317.16µs, output_bytes_0=5.5 MB, output_bytes_1=5.5 MB, output_bytes_2=5.5 MB, output_batches_0=4, output_batches_1=4, output_batches_2=4, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, fetch_time_0=7.14s, fetch_time_1=5.31s, fetch_time_2=5.62s, repartition_time_0=155.40µs, repartition_time_1=130.79µs, repartition_time_2=125.20µs, send_time_0=116.70µs, send_time_1=91.15µs, send_time_2=88.74µs]AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=1.60s, elapsed_compute_1=1.17s, elapsed_compute_2=1.07s, output_bytes_0=4.1 KB, output_bytes_1=3.4 KB, output_bytes_2=3.4 KB, output_batches_0=6, output_batches_1=5, output_batches_2=5, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, peak_mem_used_0=3.15 M, peak_mem_used_1=2.63 M, peak_mem_used_2=2.63 M, aggregate_arguments_time_0=592.06ms, aggregate_arguments_time_1=429.92ms, aggregate_arguments_time_2=397.83ms, aggregation_time_0=2.05s, aggregation_time_1=1.51s, aggregation_time_2=1.38s, emitting_time_0=43.50µs, emitting_time_1=37.62µs, emitting_time_2=35.21µs, time_calculating_group_ids_0=564.97ms, time_calculating_group_ids_1=411.26ms, time_calculating_group_ids_2=370.47ms, reduction_factor_0=0.00011% (24/22.21 M), reduction_factor_1=0.00011% (20/18.47 M), reduction_factor_2=0.00011% (20/18.46 M)]ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=587.36ms, elapsed_compute_1=432.58ms, elapsed_compute_2=400.44ms, output_bytes_0=2.3 GB, output_bytes_1=1979.9 MB, output_bytes_2=1976.6 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, expr_0_eval_time_0=584.74ms, expr_0_eval_time_1=430.36ms, expr_0_eval_time_2=398.56ms, expr_1_eval_time_0=225.74µs, expr_1_eval_time_1=181.10µs, expr_1_eval_time_2=168.70µs, expr_2_eval_time_0=80.79µs, expr_2_eval_time_1=65.89µs, expr_2_eval_time_2=56.27µs, expr_3_eval_time_0=95.11µs, expr_3_eval_time_1=67.88µs, expr_3_eval_time_2=73.53µs, expr_4_eval_time_0=102.33µs, expr_4_eval_time_1=67.25µs, expr_4_eval_time_2=71.12µs, expr_5_eval_time_0=166.15µs, expr_5_eval_time_1=118.82µs, expr_5_eval_time_2=116.43µs, expr_6_eval_time_0=95.72µs, expr_6_eval_time_1=69.22µs, expr_6_eval_time_2=63.65µs]FilterExec: l_shipdate@6 <= 1998-09-02, projection=[l_quantity@0, l_extendedprice@1, l_discount@2, l_tax@3, l_returnflag@4, l_linestatus@5], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=680.57ms, elapsed_compute_1=523.34ms, elapsed_compute_2=470.96ms, output_bytes_0=2040.0 MB, output_bytes_1=1698.0 MB, output_bytes_2=1695.0 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, selectivity_0=99% (22.21 M/22.53 M), selectivity_1=99% (18.47 M/18.74 M), selectivity_2=99% (18.46 M/18.72 M)]PartitionIsolatorExec: tasks=3 partitions=23, metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M]DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=203.90µs, bloom_filter_eval_time_1=226.95µs, bloom_filter_eval_time_2=225.73µs, metadata_load_time_0=625.07µs, metadata_load_time_1=734.40µs, metadata_load_time_2=860.51µs, page_index_eval_time_0=204.27µs, page_index_eval_time_1=198.23µs, page_index_eval_time_2=206.15µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=263.84µs, statistics_eval_time_1=239.83µs, statistics_eval_time_2=292.26µs, time_elapsed_opening_0=2.09ms, time_elapsed_opening_1=3.25ms, time_elapsed_opening_2=2.52ms, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=916.20ms, time_elapsed_processing_2=859.68ms, time_elapsed_scanning_total_0=7.12s, time_elapsed_scanning_total_1=5.30s, time_elapsed_scanning_total_2=5.61s, time_elapsed_scanning_until_data_0=583.23ms, time_elapsed_scanning_until_data_1=479.05ms, time_elapsed_scanning_until_data_2=852.57ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]
    └──────────────────────────────────────────────────
TPC-H Q1 With changes incliuded
┌───── DistributedExec ── Tasks: t0:[p0]plan_bytes_sent_0=89.23 K, plan_send_latency_avg_0=1.73ms, plan_send_latency_max_0=2.68ms
│ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], metrics=[output_rows_0=4, elapsed_compute_0=18.89µs, output_bytes_0=608.0 B, output_batches_0=1][Stage 2] => NetworkCoalesceExec: output_partitions=24, input_tasks=3, metrics=[elapsed_compute_0=954.86µs, msg_count_0=28, network_latency_count_0=28, max_mem_used_0=8.02 K, network_latency_sum_0=5.96ms, bytes_transferred_0=name:bytes_transferred 23.1 KB, network_latency_first_0=name:network_latency_first 183.19µs, network_latency_max_0=name:network_latency_max 310.71µs, network_latency_min_0=name:network_latency_min 140.34µs, network_latency_p50_0=name:network_latency_p50 213.28µs, network_latency_p95_0=name:network_latency_p95 276.61µs]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=4.36µs, elapsed_compute_1=15.59µs, elapsed_compute_2=4.90µs, output_bytes_0=0.0 B, output_bytes_1=0.0 B, output_bytes_2=0.0 B, output_batches_0=0, output_batches_1=0, output_batches_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0]ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(Int64(1))@9 as count_order], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=5.12µs, elapsed_compute_1=12.71µs, elapsed_compute_2=4.81µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, expr_0_eval_time_0=240ns, expr_0_eval_time_1=1.08µs, expr_0_eval_time_2=206ns, expr_1_eval_time_0=127ns, expr_1_eval_time_1=369ns, expr_1_eval_time_2=133ns, expr_2_eval_time_0=155ns, expr_2_eval_time_1=274ns, expr_2_eval_time_2=153ns, expr_3_eval_time_0=135ns, expr_3_eval_time_1=315ns, expr_3_eval_time_2=133ns, expr_4_eval_time_0=137ns, expr_4_eval_time_1=370ns, expr_4_eval_time_2=137ns, expr_5_eval_time_0=156ns, expr_5_eval_time_1=671ns, expr_5_eval_time_2=160ns, expr_6_eval_time_0=136ns, expr_6_eval_time_1=165ns, expr_6_eval_time_2=132ns, expr_7_eval_time_0=136ns, expr_7_eval_time_1=276ns, expr_7_eval_time_2=135ns, expr_8_eval_time_0=134ns, expr_8_eval_time_1=253ns, expr_8_eval_time_2=119ns, expr_9_eval_time_0=237ns, expr_9_eval_time_1=463ns, expr_9_eval_time_2=221ns]AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=1, output_rows_1=2, output_rows_2=1, elapsed_compute_0=236.22µs, elapsed_compute_1=307.03µs, elapsed_compute_2=320.92µs, output_bytes_0=608.0 B, output_bytes_1=1216.0 B, output_bytes_2=608.0 B, output_batches_0=1, output_batches_1=2, output_batches_2=1, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, peak_mem_used_0=2.74 K, peak_mem_used_1=4.96 K, peak_mem_used_2=2.74 K, aggregate_arguments_time_0=12.21µs, aggregate_arguments_time_1=26.66µs, aggregate_arguments_time_2=12.99µs, aggregation_time_0=87.38µs, aggregation_time_1=142.93µs, aggregation_time_2=103.72µs, emitting_time_0=8.00µs, emitting_time_1=18.06µs, emitting_time_2=20.59µs, time_calculating_group_ids_0=16.48µs, time_calculating_group_ids_1=27.35µs, time_calculating_group_ids_2=15.43µs][Stage 1] => NetworkShuffleExec: output_partitions=8, input_tasks=3, metrics=[elapsed_compute_0=1.67ms, elapsed_compute_1=3.05ms, elapsed_compute_2=2.51ms, msg_count_0=27, msg_count_1=30, msg_count_2=27, network_latency_count_0=27, network_latency_count_1=30, network_latency_count_2=27, max_mem_used_0=19.51 K, max_mem_used_1=31.81 K, max_mem_used_2=23.88 K, network_latency_sum_0=6.84ms, network_latency_sum_1=8.22ms, network_latency_sum_2=5.12ms, bytes_transferred_0=name:bytes_transferred 35.6 KB, bytes_transferred_1=name:bytes_transferred 43.8 KB, bytes_transferred_2=name:bytes_transferred 35.6 KB, network_latency_first_0=name:network_latency_first 186.04µs, network_latency_first_1=name:network_latency_first 170.14µs, network_latency_first_2=name:network_latency_first 189.94µs, network_latency_max_0=name:network_latency_max 448.69µs, network_latency_max_1=name:network_latency_max 479.85µs, network_latency_max_2=name:network_latency_max 258.36µs, network_latency_min_0=name:network_latency_min 129.43µs, network_latency_min_1=name:network_latency_min 155.84µs, network_latency_min_2=name:network_latency_min 83.94µs, network_latency_p50_0=name:network_latency_p50 221.98µs, network_latency_p50_1=name:network_latency_p50 265.76µs, network_latency_p50_2=name:network_latency_p50 196.88µs, network_latency_p95_0=name:network_latency_p95 388.63µs, network_latency_p95_1=name:network_latency_p95 373.39µs, network_latency_p95_2=name:network_latency_p95 245.33µs]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p23] t1:[p0..p23] t2:[p0..p23]RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 24), input_partitions=8, metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=315.46µs, elapsed_compute_1=280.17µs, elapsed_compute_2=261.70µs, output_bytes_0=5.5 MB, output_bytes_1=5.5 MB, output_bytes_2=5.5 MB, output_batches_0=4, output_batches_1=4, output_batches_2=4, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, fetch_time_0=7.74s, fetch_time_1=7.87s, fetch_time_2=6.50s, repartition_time_0=184.70µs, repartition_time_1=156.25µs, repartition_time_2=175.73µs, send_time_0=110.07µs, send_time_1=103.09µs, send_time_2=110.93µs]AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))], metrics=[output_rows_0=24, output_rows_1=20, output_rows_2=20, elapsed_compute_0=1.55s, elapsed_compute_1=1.09s, elapsed_compute_2=1.12s, output_bytes_0=4.1 KB, output_bytes_1=3.4 KB, output_bytes_2=3.4 KB, output_batches_0=6, output_batches_1=5, output_batches_2=5, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0.0 B, spilled_bytes_1=0.0 B, spilled_bytes_2=0.0 B, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, peak_mem_used_0=3.15 M, peak_mem_used_1=2.63 M, peak_mem_used_2=2.63 M, aggregate_arguments_time_0=572.17ms, aggregate_arguments_time_1=398.12ms, aggregate_arguments_time_2=414.16ms, aggregation_time_0=1.99s, aggregation_time_1=1.42s, aggregation_time_2=1.43s, emitting_time_0=65.62µs, emitting_time_1=66.52µs, emitting_time_2=60.03µs, time_calculating_group_ids_0=548.17ms, time_calculating_group_ids_1=380.51ms, time_calculating_group_ids_2=392.05ms, reduction_factor_0=0.00011% (24/22.21 M), reduction_factor_1=0.00011% (20/18.47 M), reduction_factor_2=0.00011% (20/18.46 M)]ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=572.59ms, elapsed_compute_1=395.93ms, elapsed_compute_2=418.19ms, output_bytes_0=2.3 GB, output_bytes_1=1979.9 MB, output_bytes_2=1976.6 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, expr_0_eval_time_0=570.09ms, expr_0_eval_time_1=394.00ms, expr_0_eval_time_2=416.27ms, expr_1_eval_time_0=232.87µs, expr_1_eval_time_1=155.01µs, expr_1_eval_time_2=165.62µs, expr_2_eval_time_0=89.42µs, expr_2_eval_time_1=64.03µs, expr_2_eval_time_2=61.63µs, expr_3_eval_time_0=97.32µs, expr_3_eval_time_1=73.44µs, expr_3_eval_time_2=71.84µs, expr_4_eval_time_0=83.77µs, expr_4_eval_time_1=79.62µs, expr_4_eval_time_2=65.81µs, expr_5_eval_time_0=108.56µs, expr_5_eval_time_1=88.77µs, expr_5_eval_time_2=93.48µs, expr_6_eval_time_0=84.67µs, expr_6_eval_time_1=88.75µs, expr_6_eval_time_2=68.42µs]FilterExec: l_shipdate@6 <= 1998-09-02, projection=[l_quantity@0, l_extendedprice@1, l_discount@2, l_tax@3, l_returnflag@4, l_linestatus@5], metrics=[output_rows_0=22.21 M, output_rows_1=18.47 M, output_rows_2=18.46 M, elapsed_compute_0=667.22ms, elapsed_compute_1=498.78ms, elapsed_compute_2=485.06ms, output_bytes_0=2040.0 MB, output_bytes_1=1698.0 MB, output_bytes_2=1695.0 MB, output_batches_0=680, output_batches_1=566, output_batches_2=565, selectivity_0=99% (22.21 M/22.53 M), selectivity_1=99% (18.47 M/18.74 M), selectivity_2=99% (18.46 M/18.72 M)]PartitionIsolatorExec: tasks=3 partitions=23, metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M]DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=181.47µs, bloom_filter_eval_time_1=145.15µs, bloom_filter_eval_time_2=151.24µs, metadata_load_time_0=953.55ms, metadata_load_time_1=1.36s, metadata_load_time_2=1.10s, page_index_eval_time_0=159.14µs, page_index_eval_time_1=128.17µs, page_index_eval_time_2=138.31µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=358.54µs, statistics_eval_time_1=355.26µs, statistics_eval_time_2=289.19µs, time_elapsed_opening_0=956.45ms, time_elapsed_opening_1=1.36s, time_elapsed_opening_2=1.10s, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=878.36ms, time_elapsed_processing_2=896.36ms, time_elapsed_scanning_total_0=6.78s, time_elapsed_scanning_total_1=6.50s, time_elapsed_scanning_total_2=5.39s, time_elapsed_scanning_until_data_0=671.53ms, time_elapsed_scanning_until_data_1=728.33ms, time_elapsed_scanning_until_data_2=735.26ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]
    └──────────────────────────────────────────────────

Here's the scoped diff for the two DataSourceExecs:

DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=203.90µs, bloom_filter_eval_time_1=226.95µs, bloom_filter_eval_time_2=225.73µs, metadata_load_time_0=625.07µs, metadata_load_time_1=734.40µs, metadata_load_time_2=860.51µs, page_index_eval_time_0=204.27µs, page_index_eval_time_1=198.23µs, page_index_eval_time_2=206.15µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=263.84µs, statistics_eval_time_1=239.83µs, statistics_eval_time_2=292.26µs, time_elapsed_opening_0=2.09ms, time_elapsed_opening_1=3.25ms, time_elapsed_opening_2=2.52ms, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=916.20ms, time_elapsed_processing_2=859.68ms, time_elapsed_scanning_total_0=7.12s, time_elapsed_scanning_total_1=5.30s, time_elapsed_scanning_total_2=5.61s, time_elapsed_scanning_until_data_0=583.23ms, time_elapsed_scanning_until_data_1=479.05ms, time_elapsed_scanning_until_data_2=852.57ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]DataSourceExec: file_groups={23 groups: [[tpch_sf10/lineitem/part-0.parquet:0..104241221], [tpch_sf10/lineitem/part-1.parquet:0..102592606], [tpch_sf10/lineitem/part-1.parquet:102592606..104028426], [tpch_sf10/lineitem/part-10.parquet:0..103284844], [tpch_sf10/lineitem/part-11.parquet:0..102113163], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@10 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows_0=22.53 M, output_rows_1=18.74 M, output_rows_2=18.72 M, elapsed_compute_0=8ns, elapsed_compute_1=8ns, elapsed_compute_2=7ns, output_bytes_0=2.1 GB, output_bytes_1=1787.1 MB, output_bytes_2=1785.2 MB, output_batches_0=2.75 K, output_batches_1=2.29 K, output_batches_2=2.29 K, files_ranges_pruned_statistics_0=8 total → 8 matched, files_ranges_pruned_statistics_1=8 total → 8 matched, files_ranges_pruned_statistics_2=7 total → 7 matched, row_groups_pruned_statistics_0=24 total → 24 matched, row_groups_pruned_statistics_1=20 total → 20 matched, row_groups_pruned_statistics_2=20 total → 20 matched, row_groups_pruned_bloom_filter_0=24 total → 24 matched, row_groups_pruned_bloom_filter_1=20 total → 20 matched, row_groups_pruned_bloom_filter_2=20 total → 20 matched, page_index_pages_pruned_0=1.12 K total → 1.12 K matched, page_index_pages_pruned_1=931 total → 931 matched, page_index_pages_pruned_2=930 total → 930 matched, page_index_rows_pruned_0=22.53 M total → 22.53 M matched, page_index_rows_pruned_1=18.74 M total → 18.74 M matched, page_index_rows_pruned_2=18.72 M total → 18.72 M matched, limit_pruned_row_groups_0=0 total → 0 matched, limit_pruned_row_groups_1=0 total → 0 matched, limit_pruned_row_groups_2=0 total → 0 matched, batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=154.8 M, bytes_scanned_1=128.8 M, bytes_scanned_2=128.7 M, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, predicate_evaluation_errors_0=0, predicate_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, bloom_filter_eval_time_0=181.47µs, bloom_filter_eval_time_1=145.15µs, bloom_filter_eval_time_2=151.24µs, metadata_load_time_0=953.55ms, metadata_load_time_1=1.36s, metadata_load_time_2=1.10s, page_index_eval_time_0=159.14µs, page_index_eval_time_1=128.17µs, page_index_eval_time_2=138.31µs, row_pushdown_eval_time_0=16ns, row_pushdown_eval_time_1=16ns, row_pushdown_eval_time_2=14ns, statistics_eval_time_0=358.54µs, statistics_eval_time_1=355.26µs, statistics_eval_time_2=289.19µs, time_elapsed_opening_0=956.45ms, time_elapsed_opening_1=1.36s, time_elapsed_opening_2=1.10s, time_elapsed_processing_0=1.22s, time_elapsed_processing_1=878.36ms, time_elapsed_processing_2=896.36ms, time_elapsed_scanning_total_0=6.78s, time_elapsed_scanning_total_1=6.50s, time_elapsed_scanning_total_2=5.39s, time_elapsed_scanning_until_data_0=671.53ms, time_elapsed_scanning_until_data_1=728.33ms, time_elapsed_scanning_until_data_2=735.26ms, scan_efficiency_ratio_0=19% (154.8 M/828.6 M), scan_efficiency_ratio_1=16% (128.8 M/826.7 M), scan_efficiency_ratio_2=18% (128.7 M/723.0 M)]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not being able to reproduce the performance benefits, from what I see what you reported is typically within the noise threshold.

Maybe try running with --iterations 10?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see. It looks like the queries specifically in the benchmarks were not affected by this change. I just added a test and I ran benchmarks on q28. I added the change in the description (Spotlight: q28) that should explain the performance improvement with it

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 this is pretty interesting, I do am seeing some performance improvements in ClickBench (x1.35) but not in q28, q28 breaks even.

TOTAL: prev=671813.4324120003 ms, new=498864.71842399996 ms, diff=1.35 faster ✅
      q0: prev=   2 ms, new=   2 ms, diff=1.00 slower ✖
      q1: prev= 514 ms, new= 444 ms, diff=1.16 faster ✔
      q2: prev= 621 ms, new= 534 ms, diff=1.16 faster ✔
      q3: prev= 546 ms, new= 527 ms, diff=1.04 faster ✔
      q4: prev= 581 ms, new= 542 ms, diff=1.07 faster ✔
      q5: prev= 808 ms, new= 721 ms, diff=1.12 faster ✔
      q6: prev=   2 ms, new=   2 ms, diff=1.00 slower ✖
      q7: prev= 453 ms, new= 440 ms, diff=1.03 faster ✔
      q8: prev= 679 ms, new= 649 ms, diff=1.05 faster ✔
      q9: prev=7112 ms, new= 812 ms, diff=8.76 faster ✅
     q10: prev= 701 ms, new= 618 ms, diff=1.13 faster ✔
     q11: prev= 675 ms, new= 591 ms, diff=1.14 faster ✔
     q12: prev= 823 ms, new=1158 ms, diff=1.41 slower ❌
     q13: prev=1045 ms, new= 849 ms, diff=1.23 faster ✅
     q14: prev= 818 ms, new= 722 ms, diff=1.13 faster ✔
     q15: prev= 756 ms, new= 551 ms, diff=1.37 faster ✅
     q16: prev=1573 ms, new= 865 ms, diff=1.82 faster ✅
     q17: prev=1010 ms, new= 928 ms, diff=1.09 faster ✔
     q18: prev=1321 ms, new=1388 ms, diff=1.05 slower ✖
     q19: prev= 481 ms, new= 453 ms, diff=1.06 faster ✔
     q20: prev=2709 ms, new=2149 ms, diff=1.26 faster ✅
     q21: prev=3316 ms, new=2613 ms, diff=1.27 faster ✅
     q22: prev=3788 ms, new=2922 ms, diff=1.30 faster ✅
     q23: prev=9854 ms, new=7151 ms, diff=1.38 faster ✅
     q24: prev=1350 ms, new= 639 ms, diff=2.11 faster ✅
     q25: prev= 776 ms, new= 517 ms, diff=1.50 faster ✅
     q26: prev= 794 ms, new= 576 ms, diff=1.38 faster ✅
     q27: prev=2948 ms, new=2464 ms, diff=1.20 faster ✔
     q28: prev=5889 ms, new=5968 ms, diff=1.01 slower ✖
     q29: prev= 547 ms, new= 484 ms, diff=1.13 faster ✔
     q30: prev= 781 ms, new= 649 ms, diff=1.20 faster ✅
     q31: prev=1265 ms, new= 912 ms, diff=1.39 faster ✅
     q32: prev=1724 ms, new=1107 ms, diff=1.56 faster ✅
     q33: prev=4158 ms, new=2982 ms, diff=1.39 faster ✅
     q34: prev=3586 ms, new=3004 ms, diff=1.19 faster ✔
     q35: prev= 752 ms, new= 667 ms, diff=1.13 faster ✔
     q36: prev= 426 ms, new= 381 ms, diff=1.12 faster ✔
     q37: prev= 288 ms, new= 249 ms, diff=1.16 faster ✔
     q38: prev= 446 ms, new= 380 ms, diff=1.17 faster ✔
     q39: prev= 628 ms, new= 632 ms, diff=1.01 slower ✖
     q40: prev= 252 ms, new= 215 ms, diff=1.17 faster ✔
     q41: prev= 177 ms, new= 209 ms, diff=1.18 slower ✖
     q42: prev= 184 ms, new= 200 ms, diff=1.09 slower ✖

Let's do this then 👍, the code changed a bit here after #467, so you'll need to update with main, although the idea and code should be pretty much the same.

let plan = DataSourceExec::from_data_source(new_file_scan);
Some(Arc::new(PartitionIsolatorExec::new(plan, task_count)))
}
}

fn rebalance_round_robin<T>(items: Vec<T>, target_groups: usize) -> Vec<Vec<T>> {
let target_groups = target_groups.min(items.len());
let mut groups = (0..target_groups)
.map(|_| Vec::new())
.collect::<Vec<Vec<T>>>();
for (idx, item) in items.into_iter().enumerate() {
groups[idx % target_groups].push(item);
}
groups
}

/// Tries multiple user-provided [TaskEstimator]s until one returns an estimation. If none
/// returns an estimation, a set of default [TaskEstimation] implementations is tried. Right
/// now the only default [TaskEstimation] is [FileScanConfigTaskEstimator].
Expand Down Expand Up @@ -393,6 +407,22 @@ mod tests {
Ok(())
}

#[test]
fn test_rebalance_round_robin_fixes_group_boundary_skew() {
let items = (0..8).collect::<Vec<_>>();
let groups = rebalance_round_robin(items, 5);
let sizes = groups.iter().map(Vec::len).collect::<Vec<_>>();
assert_eq!(sizes, vec![2, 2, 2, 1, 1]);
}

#[test]
fn test_rebalance_round_robin_caps_partitions_to_file_count() {
let items = vec![10, 20, 30];
let groups = rebalance_round_robin(items, 5);
let sizes = groups.iter().map(Vec::len).collect::<Vec<_>>();
assert_eq!(sizes, vec![1, 1, 1]);
}

impl CombinedTaskEstimator {
fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) {
self.user_provided.push(Arc::new(value));
Expand Down
Loading