Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions crates/integrations/datafusion/public-api.txt

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions crates/integrations/datafusion/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::catalog::{CatalogProvider, SchemaProvider};
use futures::future::try_join_all;
use iceberg::{Catalog, NamespaceIdent, Result};

use crate::IcebergCatalogConfig;
use crate::schema::IcebergSchemaProvider;

/// Provides an interface to manage and access multiple schemas
Expand All @@ -47,6 +48,24 @@ impl IcebergCatalogProvider {
/// attempts to create a schema provider for each namespace, and
/// collects these providers into a `HashMap`.
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
Self::try_new_impl(client, None).await
}

/// Like [`try_new`](Self::try_new), but threads a serializable
/// [`IcebergCatalogConfig`] into every schema and table provider it creates,
/// so the catalog's tables can be queried by a distributed engine such as
/// Ballista. The `client` must already be built from the same `config`.
pub async fn try_new_with_config(
client: Arc<dyn Catalog>,
config: IcebergCatalogConfig,
) -> Result<Self> {
Self::try_new_impl(client, Some(config)).await
}

async fn try_new_impl(
client: Arc<dyn Catalog>,
config: Option<IcebergCatalogConfig>,
) -> Result<Self> {
// TODO:
// Schemas and providers should be cached and evicted based on time
// As of right now; schemas might become stale.
Expand All @@ -63,6 +82,7 @@ impl IcebergCatalogProvider {
.map(|name| {
IcebergSchemaProvider::try_new(
client.clone(),
config.clone(),
NamespaceIdent::new(name.clone()),
)
})
Expand Down
54 changes: 54 additions & 0 deletions crates/integrations/datafusion/src/catalog_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

/// A serializable description of the catalog (and storage) that backs an
/// [`IcebergTableProvider`](crate::table::IcebergTableProvider).
///
/// This is the minimal, self-contained handle needed to *reconstruct* a catalog
/// and its associated `FileIO` on a remote node. It deliberately holds only
/// plain data (no live connections) so that distributed query engines such as
/// Ballista can serialize it, ship it to executors, and rebuild the catalog
/// there via a catalog loader (e.g. `iceberg-catalog-loader`) and the storage
/// via `FileIOBuilder::with_props`.
///
/// The `props` map carries both the catalog connection properties (e.g. the
/// REST catalog URI) and the storage/`FileIO` properties (e.g. S3 endpoint and
/// credentials); in practice these live together in a single map.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IcebergCatalogConfig {
/// The catalog type, e.g. `"rest"`, `"sql"`, `"glue"`.
pub r#type: String,
pub name: String,
/// Catalog connection and storage properties.
pub props: HashMap<String, String>,
}

impl IcebergCatalogConfig {
pub fn new(
r#type: impl Into<String>,
name: impl Into<String>,
props: HashMap<String, String>,
) -> Self {
Self {
r#type: r#type.into(),
name: name.into(),
props,
}
}
}
3 changes: 3 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
mod catalog;
pub use catalog::*;

mod catalog_config;
pub use catalog_config::*;

mod error;
pub use error::*;

Expand Down
44 changes: 37 additions & 7 deletions crates/integrations/datafusion/src/physical_plan/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ use crate::to_datafusion_error;
/// IcebergCommitExec is responsible for collecting the files written and use
/// [`Transaction::fast_append`] to commit the data files written.
#[derive(Debug)]
pub(crate) struct IcebergCommitExec {
pub struct IcebergCommitExec {
table: Table,
catalog: Arc<dyn Catalog>,
input: Arc<dyn ExecutionPlan>,
schema: ArrowSchemaRef,
count_schema: ArrowSchemaRef,
plan_properties: Arc<PlanProperties>,
/// Optional serializable catalog/storage config, populated when this node is
/// built through a config-backed provider so it can be reconstructed on a
/// remote node by a distributed engine.
catalog_config: Option<crate::IcebergCatalogConfig>,
}

impl IcebergCommitExec {
Expand All @@ -68,9 +72,31 @@ impl IcebergCommitExec {
schema,
count_schema,
plan_properties,
catalog_config: None,
}
}

/// Attaches a serializable catalog/storage config to this node so that a
/// distributed engine can reconstruct it (including the catalog) on a remote
/// node.
pub fn with_catalog_config(
mut self,
catalog_config: Option<crate::IcebergCatalogConfig>,
) -> Self {
self.catalog_config = catalog_config;
self
}

/// Returns the serializable catalog/storage config, if any.
pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> {
self.catalog_config.as_ref()
}

/// Returns the table this node commits to.
pub fn table(&self) -> &Table {
&self.table
}

// Compute the plan properties for this execution plan
fn compute_properties(schema: ArrowSchemaRef) -> Arc<PlanProperties> {
Arc::new(PlanProperties::new(
Expand Down Expand Up @@ -160,12 +186,15 @@ impl ExecutionPlan for IcebergCommitExec {
)));
}

Ok(Arc::new(IcebergCommitExec::new(
self.table.clone(),
self.catalog.clone(),
children[0].clone(),
self.schema.clone(),
)))
Ok(Arc::new(
IcebergCommitExec::new(
self.table.clone(),
self.catalog.clone(),
children[0].clone(),
self.schema.clone(),
)
.with_catalog_config(self.catalog_config.clone()),
))
}

fn execute(
Expand Down Expand Up @@ -592,6 +621,7 @@ mod tests {

let iceberg_table_provider = IcebergTableProvider::try_new(
catalog.clone(),
None,
namespace.clone(),
table_name.to_string(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ impl IcebergMetadataScan {
properties,
}
}

/// Returns the metadata-table provider this node scans, so a distributed
/// engine can serialize the catalog config + table identifier + metadata type
/// it carries and rebuild it on a remote node.
pub fn provider(&self) -> &IcebergMetadataTableProvider {
&self.provider
}
}

impl DisplayAs for IcebergMetadataScan {
Expand Down
5 changes: 4 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub(crate) mod write;

pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";

pub use commit::IcebergCommitExec;
pub use expr_to_predicate::convert_filters_to_predicate;
pub use project::project_with_partition;
pub use metadata_scan::IcebergMetadataScan;
pub use project::{PartitionExpr, project_with_partition};
pub use scan::IcebergTableScan;
pub use write::IcebergWriteExec;
51 changes: 36 additions & 15 deletions crates/integrations/datafusion/src/physical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use iceberg::arrow::{
PROJECTED_PARTITION_VALUE_COLUMN, PartitionValueCalculator, schema_to_arrow_schema,
strip_metadata_from_schema,
};
use iceberg::spec::PartitionSpec;
use iceberg::spec::{PartitionSpec, SchemaRef};
use iceberg::table::Table;

use crate::to_datafusion_error;
Expand Down Expand Up @@ -79,10 +79,6 @@ pub fn project_with_partition(
)));
}

let calculator =
PartitionValueCalculator::try_new(partition_spec.as_ref(), table_schema.as_ref())
.map_err(to_datafusion_error)?;

let mut projection_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
Vec::with_capacity(input_schema.fields().len() + 1);

Expand All @@ -91,26 +87,51 @@ pub fn project_with_partition(
projection_exprs.push((column_expr, field.name().clone()));
}

let partition_expr = Arc::new(PartitionExpr::new(calculator, partition_spec.clone()));
let partition_expr = Arc::new(PartitionExpr::try_new(
partition_spec.clone(),
table_schema.clone(),
)?);
projection_exprs.push((partition_expr, PROJECTED_PARTITION_VALUE_COLUMN.to_string()));

let projection = ProjectionExec::try_new(projection_exprs, input)?;
Ok(Arc::new(projection))
}

/// PhysicalExpr implementation for partition value calculation
/// `PhysicalExpr` that computes Iceberg partition values for each input row.
///
/// Alongside the live (non-serializable) [`PartitionValueCalculator`], it retains
/// the [`PartitionSpec`] and table schema it was built from. A distributed engine
/// can serialize those two — both are self-contained iceberg spec types — and
/// rebuild an equivalent expression on a remote node via [`PartitionExpr::try_new`].
#[derive(Debug, Clone)]
struct PartitionExpr {
pub struct PartitionExpr {
calculator: Arc<PartitionValueCalculator>,
partition_spec: Arc<PartitionSpec>,
table_schema: SchemaRef,
}

impl PartitionExpr {
fn new(calculator: PartitionValueCalculator, partition_spec: Arc<PartitionSpec>) -> Self {
Self {
/// Builds a partition expression from a partition spec and the table schema
/// it is bound to, constructing the underlying [`PartitionValueCalculator`].
pub fn try_new(partition_spec: Arc<PartitionSpec>, table_schema: SchemaRef) -> DFResult<Self> {
let calculator =
PartitionValueCalculator::try_new(partition_spec.as_ref(), table_schema.as_ref())
.map_err(to_datafusion_error)?;
Ok(Self {
calculator: Arc::new(calculator),
partition_spec,
}
table_schema,
})
}

/// The partition spec whose values this expression computes.
pub fn partition_spec(&self) -> &Arc<PartitionSpec> {
&self.partition_spec
}

/// The table schema the partition values are derived from.
pub fn table_schema(&self) -> &SchemaRef {
&self.table_schema
}
}

Expand Down Expand Up @@ -248,16 +269,16 @@ mod tests {

let input = Arc::new(EmptyExec::new(arrow_schema.clone()));

let calculator = PartitionValueCalculator::try_new(&partition_spec, &table_schema).unwrap();

let mut projection_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
Vec::with_capacity(arrow_schema.fields().len() + 1);
for (i, field) in arrow_schema.fields().iter().enumerate() {
let column_expr = Arc::new(Column::new(field.name(), i));
projection_exprs.push((column_expr, field.name().clone()));
}

let partition_expr = Arc::new(PartitionExpr::new(calculator, partition_spec));
let partition_expr = Arc::new(
PartitionExpr::try_new(partition_spec, Arc::new(table_schema.clone())).unwrap(),
);
projection_exprs.push((partition_expr, PROJECTED_PARTITION_VALUE_COLUMN.to_string()));

let projection = ProjectionExec::try_new(projection_exprs, input).unwrap();
Expand Down Expand Up @@ -302,7 +323,7 @@ mod tests {
let partition_spec = Arc::new(partition_spec);
let calculator = PartitionValueCalculator::try_new(&partition_spec, &table_schema).unwrap();
let partition_type = calculator.partition_arrow_type().clone();
let expr = PartitionExpr::new(calculator, partition_spec);
let expr = PartitionExpr::try_new(partition_spec, Arc::new(table_schema.clone())).unwrap();

assert_eq!(expr.data_type(&arrow_schema).unwrap(), partition_type);
assert!(!expr.nullable(&arrow_schema).unwrap());
Expand Down
33 changes: 32 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ pub struct IcebergTableScan {
predicates: Option<Predicate>,
/// Optional limit on the number of rows to return
limit: Option<usize>,
/// Optional serializable catalog/storage config, populated when this scan is
/// built through a config-backed provider so it can be reconstructed on a
/// remote node by a distributed engine.
catalog_config: Option<crate::IcebergCatalogConfig>,
}

impl IcebergTableScan {
/// Creates a new [`IcebergTableScan`] object.
pub(crate) fn new(
pub fn new(
table: Table,
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
Expand All @@ -80,9 +84,36 @@ impl IcebergTableScan {
projection,
predicates,
limit,
catalog_config: None,
}
}

/// Attaches a serializable catalog/storage config to this scan so that a
/// distributed engine can reconstruct it on a remote node.
pub fn with_catalog_config(
mut self,
catalog_config: Option<crate::IcebergCatalogConfig>,
) -> Self {
self.catalog_config = catalog_config;
self
}

/// Returns the serializable catalog/storage config, if any.
pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> {
self.catalog_config.as_ref()
}

/// Replaces the scan's pushed-down filter predicate.
///
/// `IcebergTableScan::new` derives the predicate from DataFusion `Expr`
/// filters; this setter lets a distributed engine restore an already-built
/// [`Predicate`] directly (e.g. after deserializing it), so file pruning is
/// preserved on remote nodes.
pub fn with_predicates(mut self, predicates: Option<Predicate>) -> Self {
self.predicates = predicates;
self
}

pub fn table(&self) -> &Table {
&self.table
}
Expand Down
Loading
Loading