Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Comment thread
shehab-ali marked this conversation as resolved.
Outdated
129 changes: 125 additions & 4 deletions src/distributed_planner/task_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,16 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?;

let mut file_scan_template = file_scan.clone();
let mut file_groups = VecDeque::with_capacity(file_scan.file_groups.len() * task_count);
for file_group in file_scan_template.file_groups.drain(..) {
file_groups.extend(file_group.split_files(task_count));
}
let input_group_count = file_scan_template.file_groups.len().max(1);
let all_partitioned_files = file_scan_template
.file_groups
.iter()
.flat_map(|file_group| file_group.iter().cloned())
.collect::<Vec<_>>();
file_scan_template.file_groups.clear();
let rebalanced =
rebalance_round_robin(all_partitioned_files, input_group_count * task_count);
let mut file_groups: VecDeque<FileGroup> = rebalanced.into_iter().map(Into::into).collect();
let expected_partitions = plan.output_partitioning().partition_count();

let dle = DistributedLeafExec::new(
Expand All @@ -299,6 +305,17 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
}
}

fn rebalance_round_robin<T>(items: Vec<T>, target_groups: usize) -> Vec<Vec<T>> {
let target_groups = target_groups.min(items.len());
let mut groups = (0..target_groups)
.map(|_| Vec::new())
.collect::<Vec<Vec<T>>>();
for (idx, item) in items.into_iter().enumerate() {
groups[idx % target_groups].push(item);
}
groups
}

/// Tries multiple user-provided [TaskEstimator]s until one returns an estimation. If none
/// returns an estimation, a set of default [TaskEstimation] implementations is tried. Right
/// now the only default [TaskEstimation] is [FileScanConfigTaskEstimator].
Expand Down Expand Up @@ -402,6 +419,110 @@ mod tests {
Ok(())
}

#[test]
fn test_rebalance_round_robin_fixes_group_boundary_skew() {
let items = (0..8).collect::<Vec<_>>();
let groups = rebalance_round_robin(items, 5);
let sizes = groups.iter().map(Vec::len).collect::<Vec<_>>();
assert_eq!(sizes, vec![2, 2, 2, 1, 1]);
}

#[test]
fn test_rebalance_round_robin_caps_partitions_to_file_count() {
let items = vec![10, 20, 30];
let groups = rebalance_round_robin(items, 5);
let sizes = groups.iter().map(Vec::len).collect::<Vec<_>>();
assert_eq!(sizes, vec![1, 1, 1]);
}

/// End-to-end check that `scale_up_leaf_node` actually rebalances files **across**
/// input group boundaries. With 3 input groups of 5 files each and `task_count = 3`,
/// the global round-robin packing interleaves files from different input groups into
/// the same output group — something the prior per-group `split_files` approach
/// could never do, since it kept each input group's files clustered together.
#[test]
fn test_scale_up_leaf_node_rebalances_across_input_groups() {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
FileGroup, FileScanConfigBuilder, FileSource, ParquetSource,
};
use datafusion::execution::object_store::ObjectStoreUrl;
Comment thread
shehab-ali marked this conversation as resolved.
Outdated

let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let source: Arc<dyn FileSource> = Arc::new(ParquetSource::new(Arc::clone(&schema)));
let url = ObjectStoreUrl::parse("file:///").unwrap();
let mut builder = FileScanConfigBuilder::new(url, source);
for g in 0..3 {
let files: Vec<PartitionedFile> = (0..5)
.map(|i| PartitionedFile::new(format!("g{}/f{}.parquet", g, i), 1024))
.collect();
builder = builder.with_file_group(FileGroup::new(files));
}
let scan = builder.build();
let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(scan);

let cfg = ConfigOptions::default();
let scaled = FileScanConfigTaskEstimator
.scale_up_leaf_node(&plan, 3, &cfg)
.expect("scale_up should produce a plan");

let dle = scaled
.as_any()
.downcast_ref::<DistributedLeafExec>()
.expect("expected DistributedLeafExec wrapper");

// Walk each per-task variant and collect every file group's file paths.
let mut groups: Vec<Vec<String>> = Vec::new();
for variant in &dle.variants {
let dse: &DataSourceExec = variant.as_any().downcast_ref().unwrap();
let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref().unwrap();
for fg in &file_scan.file_groups {
groups.push(
fg.iter()
.map(|f| f.object_meta.location.to_string())
.collect(),
);
}
}

// 15 files round-robin into 3 input groups * 3 task_count = 9 output groups,
// dealt across 3 task variants of 3 partitions each.
assert_eq!(dle.variants.len(), 3, "expected 3 task variants");
assert_eq!(
groups.len(),
9,
"expected 9 output groups, got {:?}",
groups
);
let total: usize = groups.iter().map(Vec::len).sum();
assert_eq!(total, 15);

// At least six of the nine output groups must contain files from multiple
// distinct input groups. The prior per-group `split_files` algorithm kept
// each task's partitions confined to a single input group, so this assertion
// would fail under the old behavior.
let prefix = |path: &str| path.split('/').next().unwrap_or("").to_string();
let cross_group_count = groups
.iter()
.filter(|files| {
files
.iter()
.map(|p| prefix(p))
.collect::<std::collections::BTreeSet<_>>()
.len()
>= 2
})
.count();
assert!(
cross_group_count >= 6,
"expected >=6 output groups mixing files from multiple input groups; \
got {} (groups: {:?})",
cross_group_count,
groups
);
}

impl CombinedTaskEstimator {
fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) {
self.user_provided.push(Arc::new(value));
Expand Down
Loading