From a233246394d65fd7dd984a858f79b6f4f01bef74 Mon Sep 17 00:00:00 2001 From: Shehab Ali Date: Wed, 27 May 2026 09:35:58 -0400 Subject: [PATCH 1/4] Add global rebalancing in file splitting between tasks --- .vscode/settings.json | 1 + src/distributed_planner/task_estimator.rs | 42 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index 60534e3a..da7e6eeb 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -279,17 +279,31 @@ impl TaskEstimator for FileScanConfigTaskEstimator { let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?; let mut new_file_scan = file_scan.clone(); - new_file_scan.file_groups.clear(); - for file_group in file_scan.file_groups.clone() { - new_file_scan - .file_groups - .extend(file_group.split_files(task_count)); - } + let input_group_count = file_scan.file_groups.len().max(1); + let all_partitioned_files = file_scan + .file_groups + .iter() + .flat_map(|file_group| file_group.iter().cloned()) + .collect::>(); + let file_groups = + rebalance_round_robin(all_partitioned_files, input_group_count * task_count); + new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect(); let plan = DataSourceExec::from_data_source(new_file_scan); Some(Arc::new(PartitionIsolatorExec::new(plan, task_count))) } } +fn rebalance_round_robin(items: Vec, target_groups: usize) -> Vec> { + let target_groups = target_groups.min(items.len()); + let mut groups = (0..target_groups) + .map(|_| Vec::new()) + .collect::>>(); + for (idx, item) in items.into_iter().enumerate() { + groups[idx % target_groups].push(item); + } + groups +} + /// Tries multiple user-provided [TaskEstimator]s until one returns an estimation. If none /// returns an estimation, a set of default [TaskEstimation] implementations is tried. Right /// now the only default [TaskEstimation] is [FileScanConfigTaskEstimator]. @@ -393,6 +407,22 @@ mod tests { Ok(()) } + #[test] + fn test_rebalance_round_robin_fixes_group_boundary_skew() { + let items = (0..8).collect::>(); + let groups = rebalance_round_robin(items, 5); + let sizes = groups.iter().map(Vec::len).collect::>(); + assert_eq!(sizes, vec![2, 2, 2, 1, 1]); + } + + #[test] + fn test_rebalance_round_robin_caps_partitions_to_file_count() { + let items = vec![10, 20, 30]; + let groups = rebalance_round_robin(items, 5); + let sizes = groups.iter().map(Vec::len).collect::>(); + assert_eq!(sizes, vec![1, 1, 1]); + } + impl CombinedTaskEstimator { fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) { self.user_provided.push(Arc::new(value)); From 1e5784a912e0ffe81a691dfc4a7be2522a1bd4ec Mon Sep 17 00:00:00 2001 From: Shehab Ali Date: Wed, 27 May 2026 16:44:09 -0400 Subject: [PATCH 2/4] Test cross-input-group rebalancing in scale_up_leaf_node Adds a unit test that exercises the non-uniform case the integration fixtures miss: 3 input groups of 5 files each. Asserts the early output groups mix files from multiple input groups, which the prior per-group split_files algorithm could not produce. --- src/distributed_planner/task_estimator.rs | 75 +++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index da7e6eeb..a234d6f8 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -423,6 +423,81 @@ mod tests { assert_eq!(sizes, vec![1, 1, 1]); } + /// End-to-end check that `scale_up_leaf_node` actually rebalances files **across** + /// input group boundaries. With 3 input groups of 5 files each and `task_count = 3`, + /// the global round-robin packing interleaves files from different input groups into + /// the same output group — something the prior per-group `split_files` approach + /// could never do, since it kept each input group's files clustered together. + #[test] + fn test_scale_up_leaf_node_rebalances_across_input_groups() { + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::physical_plan::{ + FileGroup, FileScanConfigBuilder, FileSource, ParquetSource, + }; + use datafusion::execution::object_store::ObjectStoreUrl; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let source: Arc = Arc::new(ParquetSource::new(Arc::clone(&schema))); + let url = ObjectStoreUrl::parse("file:///").unwrap(); + let mut builder = FileScanConfigBuilder::new(url, source); + for g in 0..3 { + let files: Vec = (0..5) + .map(|i| PartitionedFile::new(format!("g{}/f{}.parquet", g, i), 1024)) + .collect(); + builder = builder.with_file_group(FileGroup::new(files)); + } + let scan = builder.build(); + let plan: Arc = DataSourceExec::from_data_source(scan); + + let cfg = ConfigOptions::default(); + let scaled = FileScanConfigTaskEstimator + .scale_up_leaf_node(&plan, 3, &cfg) + .expect("scale_up should produce a plan"); + + let isolator = scaled + .as_any() + .downcast_ref::() + .expect("expected PartitionIsolatorExec wrapper"); + let inner = isolator + .children() + .first() + .map(|c| Arc::clone(*c)) + .unwrap(); + let dse: &DataSourceExec = inner.as_any().downcast_ref().unwrap(); + let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref().unwrap(); + + let groups: Vec> = file_scan + .file_groups + .iter() + .map(|g| { + g.iter() + .map(|f| f.object_meta.location.to_string()) + .collect() + }) + .collect(); + + // 15 files round-robin into 3 * 3 = 9 output groups -> sizes [2,2,2,2,2,2,1,1,1]. + assert_eq!(groups.len(), 9, "expected 9 output groups, got {:?}", groups); + let total: usize = groups.iter().map(Vec::len).sum(); + assert_eq!(total, 15); + + // Each of the first 5 output groups must contain files from at least two + // distinct input groups -- this is the cross-group rebalancing that the prior + // per-group `split_files` algorithm could not produce. + let prefix = |path: &str| path.split('/').next().unwrap_or("").to_string(); + for (idx, files) in groups.iter().enumerate().take(5) { + let distinct_inputs: std::collections::BTreeSet = + files.iter().map(|p| prefix(p)).collect(); + assert!( + distinct_inputs.len() >= 2, + "output group {} must mix files from multiple input groups; got {:?}", + idx, + files + ); + } + } + impl CombinedTaskEstimator { fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) { self.user_provided.push(Arc::new(value)); From c24ff0eed891c4c52db8a7332625e0b1760311df Mon Sep 17 00:00:00 2001 From: Shehab Ali Date: Mon, 1 Jun 2026 10:00:57 -0400 Subject: [PATCH 3/4] nit --- src/distributed_planner/task_estimator.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index 281e443c..c01d0df5 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -284,8 +284,7 @@ impl TaskEstimator for FileScanConfigTaskEstimator { file_scan_template.file_groups.clear(); let rebalanced = rebalance_round_robin(all_partitioned_files, input_group_count * task_count); - let mut file_groups: VecDeque = - rebalanced.into_iter().map(Into::into).collect(); + let mut file_groups: VecDeque = rebalanced.into_iter().map(Into::into).collect(); let expected_partitions = plan.output_partitioning().partition_count(); let dle = DistributedLeafExec::new( @@ -477,8 +476,7 @@ mod tests { let mut groups: Vec> = Vec::new(); for variant in &dle.variants { let dse: &DataSourceExec = variant.as_any().downcast_ref().unwrap(); - let file_scan: &FileScanConfig = - dse.data_source().as_any().downcast_ref().unwrap(); + let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref().unwrap(); for fg in &file_scan.file_groups { groups.push( fg.iter() @@ -491,7 +489,12 @@ mod tests { // 15 files round-robin into 3 input groups * 3 task_count = 9 output groups, // dealt across 3 task variants of 3 partitions each. assert_eq!(dle.variants.len(), 3, "expected 3 task variants"); - assert_eq!(groups.len(), 9, "expected 9 output groups, got {:?}", groups); + assert_eq!( + groups.len(), + 9, + "expected 9 output groups, got {:?}", + groups + ); let total: usize = groups.iter().map(Vec::len).sum(); assert_eq!(total, 15); From 6ab08cf56c196c176dd7eec134b935861d739c08 Mon Sep 17 00:00:00 2001 From: Shehab Ali Date: Thu, 4 Jun 2026 09:50:34 -0400 Subject: [PATCH 4/4] remove test --- .vscode/settings.json | 1 - src/distributed_planner/task_estimator.rs | 88 ----------------------- 2 files changed, 89 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 9e26dfee..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1 +0,0 @@ -{} \ No newline at end of file diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index c01d0df5..4232b5c5 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -435,94 +435,6 @@ mod tests { assert_eq!(sizes, vec![1, 1, 1]); } - /// End-to-end check that `scale_up_leaf_node` actually rebalances files **across** - /// input group boundaries. With 3 input groups of 5 files each and `task_count = 3`, - /// the global round-robin packing interleaves files from different input groups into - /// the same output group — something the prior per-group `split_files` approach - /// could never do, since it kept each input group's files clustered together. - #[test] - fn test_scale_up_leaf_node_rebalances_across_input_groups() { - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use datafusion::datasource::listing::PartitionedFile; - use datafusion::datasource::physical_plan::{ - FileGroup, FileScanConfigBuilder, FileSource, ParquetSource, - }; - use datafusion::execution::object_store::ObjectStoreUrl; - - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let source: Arc = Arc::new(ParquetSource::new(Arc::clone(&schema))); - let url = ObjectStoreUrl::parse("file:///").unwrap(); - let mut builder = FileScanConfigBuilder::new(url, source); - for g in 0..3 { - let files: Vec = (0..5) - .map(|i| PartitionedFile::new(format!("g{}/f{}.parquet", g, i), 1024)) - .collect(); - builder = builder.with_file_group(FileGroup::new(files)); - } - let scan = builder.build(); - let plan: Arc = DataSourceExec::from_data_source(scan); - - let cfg = ConfigOptions::default(); - let scaled = FileScanConfigTaskEstimator - .scale_up_leaf_node(&plan, 3, &cfg) - .expect("scale_up should produce a plan"); - - let dle = scaled - .as_any() - .downcast_ref::() - .expect("expected DistributedLeafExec wrapper"); - - // Walk each per-task variant and collect every file group's file paths. - let mut groups: Vec> = Vec::new(); - for variant in &dle.variants { - let dse: &DataSourceExec = variant.as_any().downcast_ref().unwrap(); - let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref().unwrap(); - for fg in &file_scan.file_groups { - groups.push( - fg.iter() - .map(|f| f.object_meta.location.to_string()) - .collect(), - ); - } - } - - // 15 files round-robin into 3 input groups * 3 task_count = 9 output groups, - // dealt across 3 task variants of 3 partitions each. - assert_eq!(dle.variants.len(), 3, "expected 3 task variants"); - assert_eq!( - groups.len(), - 9, - "expected 9 output groups, got {:?}", - groups - ); - let total: usize = groups.iter().map(Vec::len).sum(); - assert_eq!(total, 15); - - // At least six of the nine output groups must contain files from multiple - // distinct input groups. The prior per-group `split_files` algorithm kept - // each task's partitions confined to a single input group, so this assertion - // would fail under the old behavior. - let prefix = |path: &str| path.split('/').next().unwrap_or("").to_string(); - let cross_group_count = groups - .iter() - .filter(|files| { - files - .iter() - .map(|p| prefix(p)) - .collect::>() - .len() - >= 2 - }) - .count(); - assert!( - cross_group_count >= 6, - "expected >=6 output groups mixing files from multiple input groups; \ - got {} (groups: {:?})", - cross_group_count, - groups - ); - } - impl CombinedTaskEstimator { fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) { self.user_provided.push(Arc::new(value));