Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 75 additions & 36 deletions src/coordinator/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::common::{require_one_child, serialize_uuid};
use crate::coordinator::metrics_store::MetricsStore;
use crate::coordinator::prepare_static_plan::prepare_static_plan;
use crate::coordinator::query_coordinator::QueryCoordinator;
use crate::distributed_planner::NetworkBoundaryExt;
use crate::worker::generated::worker::TaskKey;
use datafusion::common::internal_datafusion_err;
use datafusion::common::runtime::JoinSet;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::common::{Result, exec_err};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
Expand All @@ -14,8 +14,7 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::StreamExt;
use std::fmt::Formatter;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};

/// [ExecutionPlan] that executes the inner plan in distributed mode.
/// Before executing it, two modifications are lazily performed on the plan:
Expand All @@ -26,22 +25,39 @@ use std::sync::Mutex;
/// over the wire.
#[derive(Debug)]
pub struct DistributedExec {
plan: Arc<dyn ExecutionPlan>,
prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
/// Initial [ExecutionPlan] present before execution.
/// - If the plan was distributed statically, this will be the final distributed plan with all
/// the appropriate network boundaries in it.
/// - If the plan is going to be distributed dynamically during execution, this is the initial
/// non-distributed plan.
base_plan: Arc<dyn ExecutionPlan>,
/// Resulting [ExecutionPlan] after execution ready for visualization purposes.
/// - If the plan was distributed statically, this is equal to the base plan.
/// - If the plan is going to be distributed dynamically during execution, this is the resulting
/// plan re-calculated based on runtime statistics.
plan_for_viz: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
/// The head stage meant to be executed locally on [DistributedExec::execute].
head_stage: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
/// DataFusion metrics.
metrics: ExecutionPlanMetricsSet,
/// Storage where metrics collected from workers at runtime will place their results as they
/// finish their respective remote tasks.
pub(crate) metrics_store: Option<Arc<MetricsStore>>,
}

pub(super) struct PreparedPlan {
/// The head stage meant to be executed locally by the coordinator.
pub(super) head_stage: Arc<dyn ExecutionPlan>,
pub(super) join_set: JoinSet<Result<()>>,
/// A final representation of the plan for visualization purposes.
pub(super) plan_for_viz: Arc<dyn ExecutionPlan>,
}

impl DistributedExec {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
pub fn new(base_plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
plan,
prepared_plan: Arc::new(Mutex::new(None)),
base_plan,
plan_for_viz: Arc::new(Mutex::new(None)),
head_stage: Arc::new(Mutex::new(None)),
metrics: ExecutionPlanMetricsSet::new(),
metrics_store: None,
}
Expand All @@ -68,7 +84,10 @@ impl DistributedExec {
let Some(task_metrics) = &self.metrics_store else {
return;
};
let _ = self.plan.apply(|plan| {
let Some(plan) = self.plan_for_viz.lock().unwrap().as_ref().cloned() else {
return;
};
let _ = plan.apply(|plan| {
if let Some(boundary) = plan.as_network_boundary() {
let stage = boundary.input_stage();
for i in 0..stage.task_count() {
Expand All @@ -93,15 +112,27 @@ impl DistributedExec {
/// Returns the plan which is lazily prepared on `execute()` and actually gets executed.
/// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been
/// called.
pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
self.prepared_plan
pub(crate) fn plan_for_viz(&self) -> Result<Arc<dyn ExecutionPlan>> {
self.plan_for_viz
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
.clone()
.ok_or_else(|| {
internal_datafusion_err!("No prepared plan found. Was execute() called?")
})
}

/// Returns the head stage that was actually executed. Unlike [`Self::plan_for_viz`] (which is
/// reconstructed for visualization, with `Stage::Local` boundaries and rebuilt ancestor
/// `Arc`s), this returns the original `Arc` instances whose metrics were populated during
/// execution.
pub(crate) fn head_stage(&self) -> Result<Arc<dyn ExecutionPlan>> {
self.head_stage
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock head stage: {}", e))?
.clone()
.ok_or_else(|| internal_datafusion_err!("No head stage found. Was execute() called?"))
}
}

impl DisplayAs for DistributedExec {
Expand All @@ -116,20 +147,21 @@ impl ExecutionPlan for DistributedExec {
}

fn properties(&self) -> &Arc<PlanProperties> {
self.plan.properties()
self.base_plan.properties()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.plan]
vec![&self.base_plan]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DistributedExec {
plan: require_one_child(&children)?,
prepared_plan: self.prepared_plan.clone(),
base_plan: require_one_child(&children)?,
plan_for_viz: Arc::new(Mutex::new(None)),
head_stage: Arc::new(Mutex::new(None)),
metrics: self.metrics.clone(),
metrics_store: self.metrics_store.clone(),
}))
Expand All @@ -150,36 +182,43 @@ impl ExecutionPlan for DistributedExec {
);
}

let PreparedPlan {
head_stage,
join_set,
} = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?;
{
let mut guard = self
.prepared_plan
.lock()
.map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
*guard = Some(head_stage.clone());
}
let base_plan = Arc::clone(&self.base_plan);
let plan_for_viz = Arc::clone(&self.plan_for_viz);
let head_stage = Arc::clone(&self.head_stage);

let query_coordinator = QueryCoordinator::new(
Arc::clone(&context),
&self.metrics,
self.metrics_store.clone(),
);

let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
let tx = builder.tx();
// Spawn the task that pulls data from child...

builder.spawn(async move {
let mut stream = head_stage.execute(partition, context)?;
let _guard = query_coordinator.end_query_guard();

let result = prepare_static_plan(&query_coordinator, &base_plan)?;

plan_for_viz
.lock()
.expect("poisoned lock")
.replace(result.plan_for_viz);
head_stage
.lock()
.expect("poisoned lock")
.replace(Arc::clone(&result.head_stage));
let mut stream = result.head_stage.execute(partition, context)?;
while let Some(msg) = stream.next().await {
if tx.send(msg).await.is_err() {
break; // channel closed
}
}
drop(tx);
query_coordinator.drain_pending_tasks().await?;
Ok(())
});
// ...in parallel to the one that feeds the plan to workers.
builder.spawn(async move {
for res in join_set.join_all().await {
res?;
}
Ok(())
});

Ok(builder.build())
}

Expand Down
62 changes: 62 additions & 0 deletions src/coordinator/latency_metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use datafusion::common::instant::Instant;
use datafusion::physical_expr_common::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricValue, Time,
};
use std::fmt::Display;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

/// DataFusion metrics system is pretty limited from an API standpoint. This intermediate struct
/// bridges the gaps that are not satisfied by upstream API for measuring latency.
pub(super) struct LatencyMetric {
max: Time,
avg: Time,
max_latency_micros: AtomicU64,
sum_latency_micros: AtomicU64,
count_latency_micros: AtomicU64,
}
Comment on lines +11 to +17

@gabotechs gabotechs Jun 2, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the contents of this file are unchanged code that previously lived inside task_spawner.rs. I just extracted it to its own file as it's kind of isolated.


impl Drop for LatencyMetric {
fn drop(&mut self) {
self.max.add_duration(Duration::from_micros(
self.max_latency_micros.load(Ordering::Relaxed),
));
self.avg.add_duration(Duration::from_micros(
self.sum_latency_micros.load(Ordering::Relaxed)
/ self.count_latency_micros.load(Ordering::Relaxed).max(1),
));
}
}

impl LatencyMetric {
pub(super) fn new(
name: impl Display,
builder: impl Fn(MetricBuilder) -> MetricBuilder,
metrics: &ExecutionPlanMetricsSet,
) -> Self {
let max = Time::new();
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
name: format!("{name}_max").into(),
time: max.clone(),
});
let avg = Time::new();
builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
name: format!("{name}_avg").into(),
time: avg.clone(),
});
Self {
max,
avg,
max_latency_micros: AtomicU64::new(0),
sum_latency_micros: AtomicU64::new(0),
count_latency_micros: AtomicU64::new(0),
}
}

pub(super) fn record(&self, start: &Instant) {
let micros = start.elapsed().as_micros() as u64;
self.max_latency_micros.fetch_max(micros, Ordering::Relaxed);
self.sum_latency_micros.fetch_add(micros, Ordering::Relaxed);
self.count_latency_micros.fetch_add(1, Ordering::Relaxed);
}
}
3 changes: 2 additions & 1 deletion src/coordinator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
mod distributed;
mod latency_metric;
mod metrics_store;
mod prepare_static_plan;
mod task_spawner;
mod query_coordinator;

pub use distributed::DistributedExec;
pub(crate) use metrics_store::MetricsStore;
67 changes: 11 additions & 56 deletions src/coordinator/prepare_static_plan.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
use crate::coordinator::MetricsStore;
use crate::coordinator::distributed::PreparedPlan;
use crate::coordinator::task_spawner::{
CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner,
};
use crate::coordinator::query_coordinator::QueryCoordinator;
use crate::stage::RemoteStage;
use crate::{
DistributedConfig, NetworkBoundaryExt, Stage, TaskEstimator, TaskRoutingContext,
get_distributed_worker_resolver,
};
use datafusion::common::runtime::JoinSet;
use crate::{NetworkBoundaryExt, Stage};
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{Result, exec_err};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use rand::Rng;
use std::sync::Arc;

/// Prepares the distributed plan for execution, which implies:
Expand All @@ -28,18 +18,9 @@ use std::sync::Arc;
/// 4. Spawn a background task per worker that waits for the worker to finish and collects
/// its metrics into [DistributedExec::task_metrics] via the coordinator channel.
pub(super) fn prepare_static_plan(
query_coordinator: &QueryCoordinator,
base_plan: &Arc<dyn ExecutionPlan>,
metrics: &ExecutionPlanMetricsSet,
task_metrics: &Option<Arc<MetricsStore>>,
ctx: &Arc<TaskContext>,
) -> Result<PreparedPlan> {
let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?;

let available_urls = worker_resolver.get_urls()?;

let metrics = CoordinatorToWorkerMetrics::new(metrics);

let mut join_set = JoinSet::new();
let prepared = Arc::clone(base_plan).transform_up(|plan| {
// The following logic is just applied on network boundaries.
let Some(plan) = plan.as_network_boundary() else {
Expand All @@ -50,46 +31,18 @@ pub(super) fn prepare_static_plan(
return exec_err!("Input stage from network boundary was not in Local state");
};

let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?;
let task_estimator = &d_cfg.__private_task_estimator;

let mut spawner =
CoordinatorToWorkerTaskSpawner::new(stage, &metrics, task_metrics, ctx, &mut join_set)?;
let mut stage_coordinator = query_coordinator.stage_coordinator(stage);

let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext {
task_ctx: Arc::clone(ctx),
plan: &stage.plan,
task_count: stage.tasks,
available_urls: &available_urls,
}) {
Ok(Some(routed_urls)) => routed_urls,
// If the user has not defined custom routing with a `route_tasks` implementation, we
// default to round-robin task assignation from a randomized starting point.
Ok(None) => {
let start_idx = rand::rng().random_range(0..available_urls.len());
(0..stage.tasks)
.map(|i| available_urls[(start_idx + i) % available_urls.len()].clone())
.collect()
}
Err(e) => return exec_err!("error routing tasks to workers: {e}"),
};

if routed_urls.len() != stage.tasks {
return exec_err!(
"number of tasks ({}) was not equal to number of urls ({}) at execution time",
stage.tasks,
routed_urls.len()
);
}
let routed_urls = stage_coordinator.routed_urls()?;

let mut workers = Vec::with_capacity(stage.tasks);
for (i, routed_url) in routed_urls.into_iter().enumerate() {
workers.push(routed_url.clone());
// Spawn a task that sends the subplan to the chosen URL.
// There will be as many spawned tasks as workers.
let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?;
spawner.metrics_collection_task(i, worker_rx);
spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?;
let (worker_tx, worker_rx) = stage_coordinator.send_plan_task(i, routed_url)?;
stage_coordinator.worker_to_coordinator_task(i, worker_rx);
stage_coordinator.coordinator_to_worker_task(i, worker_tx)?;
}

Ok(Transformed::yes(plan.with_input_stage(Stage::Remote(
Expand All @@ -102,6 +55,8 @@ pub(super) fn prepare_static_plan(
})?;
Ok(PreparedPlan {
head_stage: prepared.data,
join_set,
// If the plan was statically planned, the base plan is the same one that will be used for
// visualization.
plan_for_viz: Arc::clone(base_plan),
})
}
Loading
Loading