diff --git a/crates/integrations/datafusion/public-api.txt b/crates/integrations/datafusion/public-api.txt index d24bd9fc9e..6aac904627 100644 --- a/crates/integrations/datafusion/public-api.txt +++ b/crates/integrations/datafusion/public-api.txt @@ -2,6 +2,12 @@ pub mod iceberg_datafusion pub mod iceberg_datafusion::metadata_table pub struct iceberg_datafusion::metadata_table::IcebergMetadataTableProvider impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider @@ -13,13 +19,51 @@ pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan<'l pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub mod iceberg_datafusion::physical_plan +pub struct iceberg_datafusion::physical_plan::IcebergCommitExec +impl iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::new(table: iceberg::table::Table, catalog: alloc::sync::Arc, input: alloc::sync::Arc, schema: arrow_schema::schema::SchemaRef) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::fmt_as(&self, t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::benefits_from_input_partitioning(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::execute(&self, partition: usize, context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::name(&self) -> &str +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::properties(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::required_input_distribution(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::with_new_children(self: alloc::sync::Arc, children: alloc::vec::Vec>) -> datafusion_common::error::Result> +pub struct iceberg_datafusion::physical_plan::IcebergMetadataScan +impl iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::new(provider: iceberg_datafusion::metadata_table::IcebergMetadataTableProvider) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::provider(&self) -> &iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::fmt_as(&self, _t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::execute(&self, _partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::name(&self) -> &str +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::properties(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> pub struct iceberg_datafusion::physical_plan::IcebergTableScan impl iceberg_datafusion::physical_plan::IcebergTableScan +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::limit(&self) -> core::option::Option +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::new(table: iceberg::table::Table, snapshot_id: core::option::Option, schema: arrow_schema::schema::SchemaRef, projection: core::option::Option<&alloc::vec::Vec>, filters: &[datafusion_expr::expr::Expr], limit: core::option::Option) -> Self pub fn iceberg_datafusion::physical_plan::IcebergTableScan::predicates(&self) -> core::option::Option<&iceberg::expr::predicate::Predicate> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::projection(&self) -> core::option::Option<&[alloc::string::String]> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::snapshot_id(&self) -> core::option::Option pub fn iceberg_datafusion::physical_plan::IcebergTableScan::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_catalog_config(self, catalog_config: core::option::Option) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_predicates(self, predicates: core::option::Option) -> Self impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergTableScan pub fn iceberg_datafusion::physical_plan::IcebergTableScan::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergTableScan @@ -31,12 +75,61 @@ pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, _part pub fn iceberg_datafusion::physical_plan::IcebergTableScan::name(&self) -> &str pub fn iceberg_datafusion::physical_plan::IcebergTableScan::properties(&self) -> &alloc::sync::Arc pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> +pub struct iceberg_datafusion::physical_plan::IcebergWriteExec +impl iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::new(table: iceberg::table::Table, input: alloc::sync::Arc, schema: arrow_schema::schema::SchemaRef) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::fmt_as(&self, t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::benefits_from_input_partitioning(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::execute(&self, partition: usize, context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::maintains_input_order(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::name(&self) -> &str +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::properties(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::with_new_children(self: alloc::sync::Arc, children: alloc::vec::Vec>) -> datafusion_common::error::Result> +pub struct iceberg_datafusion::physical_plan::PartitionExpr +impl iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::partition_spec(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::PartitionExpr::table_schema(&self) -> &iceberg::spec::schema::SchemaRef +pub fn iceberg_datafusion::physical_plan::PartitionExpr::try_new(partition_spec: alloc::sync::Arc, table_schema: iceberg::spec::schema::SchemaRef) -> datafusion_common::error::Result +impl core::clone::Clone for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::clone(&self) -> iceberg_datafusion::physical_plan::PartitionExpr +impl core::cmp::Eq for iceberg_datafusion::physical_plan::PartitionExpr +impl core::cmp::PartialEq for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::eq(&self, other: &Self) -> bool +impl core::fmt::Debug for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::fmt::Display for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::hash::Hash for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::hash(&self, state: &mut H) +impl datafusion_physical_expr_common::physical_expr::PhysicalExpr for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::PartitionExpr::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::PartitionExpr::data_type(&self, _input_schema: &arrow_schema::schema::Schema) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::evaluate(&self, batch: &arrow_array::record_batch::RecordBatch) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::fmt_sql(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::nullable(&self, _input_schema: &arrow_schema::schema::Schema) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> pub fn iceberg_datafusion::physical_plan::convert_filters_to_predicate(filters: &[datafusion_expr::expr::Expr]) -> core::option::Option pub fn iceberg_datafusion::physical_plan::project_with_partition(input: alloc::sync::Arc, table: &iceberg::table::Table) -> datafusion_common::error::Result> pub mod iceberg_datafusion::table pub mod iceberg_datafusion::table::metadata_table pub struct iceberg_datafusion::table::metadata_table::IcebergMetadataTableProvider impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider @@ -58,6 +151,24 @@ impl core::fmt::Debug for iceberg_datafusion::table_provider_factory::IcebergTab pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_catalog::table::TableProviderFactory for iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::create<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, cmd: &'life2 datafusion_expr::logical_plan::ddl::CreateExternalTable) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait +pub struct iceberg_datafusion::table::IcebergMetadataTableProvider +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> +impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +impl core::fmt::Debug for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_catalog::table::TableProvider for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, _projection: core::option::Option<&'life2 alloc::vec::Vec>, _filters: &'life3 [datafusion_expr::expr::Expr], _limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::table::IcebergStaticTableProvider impl iceberg_datafusion::IcebergStaticTableProvider pub async fn iceberg_datafusion::IcebergStaticTableProvider::try_new_from_table(table: iceberg::table::Table) -> iceberg::error::Result @@ -74,6 +185,12 @@ pub fn iceberg_datafusion::IcebergStaticTableProvider::schema(&self) -> arrow_sc pub fn iceberg_datafusion::IcebergStaticTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergStaticTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::table::IcebergTableProvider +impl iceberg_datafusion::IcebergTableProvider +pub fn iceberg_datafusion::IcebergTableProvider::config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::IcebergTableProvider::snapshot_id(&self) -> core::option::Option +pub fn iceberg_datafusion::IcebergTableProvider::table_ident(&self) -> &iceberg::catalog::TableIdent +pub async fn iceberg_datafusion::IcebergTableProvider::try_new_with_config(catalog: alloc::sync::Arc, config: iceberg_datafusion::IcebergCatalogConfig, namespace: iceberg::catalog::NamespaceIdent, name: impl core::convert::Into) -> iceberg::error::Result +pub fn iceberg_datafusion::IcebergTableProvider::with_snapshot_id(self, snapshot_id: core::option::Option) -> Self impl core::clone::Clone for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::clone(&self) -> iceberg_datafusion::IcebergTableProvider impl core::fmt::Debug for iceberg_datafusion::IcebergTableProvider @@ -96,15 +213,48 @@ impl core::fmt::Debug for iceberg_datafusion::table_provider_factory::IcebergTab pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_catalog::table::TableProviderFactory for iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::create<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, cmd: &'life2 datafusion_expr::logical_plan::ddl::CreateExternalTable) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait +pub struct iceberg_datafusion::IcebergCatalogConfig +pub iceberg_datafusion::IcebergCatalogConfig::name: alloc::string::String +pub iceberg_datafusion::IcebergCatalogConfig::props: std::collections::hash::map::HashMap +pub iceberg_datafusion::IcebergCatalogConfig::type: alloc::string::String +impl iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::new(type: impl core::convert::Into, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> Self +impl core::clone::Clone for iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::clone(&self) -> iceberg_datafusion::IcebergCatalogConfig +impl core::cmp::Eq for iceberg_datafusion::IcebergCatalogConfig +impl core::cmp::PartialEq for iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::eq(&self, other: &iceberg_datafusion::IcebergCatalogConfig) -> bool +impl core::fmt::Debug for iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::marker::StructuralPartialEq for iceberg_datafusion::IcebergCatalogConfig pub struct iceberg_datafusion::IcebergCatalogProvider impl iceberg_datafusion::IcebergCatalogProvider pub async fn iceberg_datafusion::IcebergCatalogProvider::try_new(client: alloc::sync::Arc) -> iceberg::error::Result +pub async fn iceberg_datafusion::IcebergCatalogProvider::try_new_with_config(client: alloc::sync::Arc, config: iceberg_datafusion::IcebergCatalogConfig) -> iceberg::error::Result impl core::fmt::Debug for iceberg_datafusion::IcebergCatalogProvider pub fn iceberg_datafusion::IcebergCatalogProvider::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_catalog::catalog::CatalogProvider for iceberg_datafusion::IcebergCatalogProvider pub fn iceberg_datafusion::IcebergCatalogProvider::as_any(&self) -> &dyn core::any::Any pub fn iceberg_datafusion::IcebergCatalogProvider::schema(&self, name: &str) -> core::option::Option> pub fn iceberg_datafusion::IcebergCatalogProvider::schema_names(&self) -> alloc::vec::Vec +pub struct iceberg_datafusion::IcebergMetadataTableProvider +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> +impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +impl core::fmt::Debug for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_catalog::table::TableProvider for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, _projection: core::option::Option<&'life2 alloc::vec::Vec>, _filters: &'life3 [datafusion_expr::expr::Expr], _limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::IcebergStaticTableProvider impl iceberg_datafusion::IcebergStaticTableProvider pub async fn iceberg_datafusion::IcebergStaticTableProvider::try_new_from_table(table: iceberg::table::Table) -> iceberg::error::Result @@ -121,6 +271,12 @@ pub fn iceberg_datafusion::IcebergStaticTableProvider::schema(&self) -> arrow_sc pub fn iceberg_datafusion::IcebergStaticTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergStaticTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::IcebergTableProvider +impl iceberg_datafusion::IcebergTableProvider +pub fn iceberg_datafusion::IcebergTableProvider::config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::IcebergTableProvider::snapshot_id(&self) -> core::option::Option +pub fn iceberg_datafusion::IcebergTableProvider::table_ident(&self) -> &iceberg::catalog::TableIdent +pub async fn iceberg_datafusion::IcebergTableProvider::try_new_with_config(catalog: alloc::sync::Arc, config: iceberg_datafusion::IcebergCatalogConfig, namespace: iceberg::catalog::NamespaceIdent, name: impl core::convert::Into) -> iceberg::error::Result +pub fn iceberg_datafusion::IcebergTableProvider::with_snapshot_id(self, snapshot_id: core::option::Option) -> Self impl core::clone::Clone for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::clone(&self) -> iceberg_datafusion::IcebergTableProvider impl core::fmt::Debug for iceberg_datafusion::IcebergTableProvider diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 69ab68f93a..21ea5afbf7 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -23,6 +23,7 @@ use datafusion::catalog::{CatalogProvider, SchemaProvider}; use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; +use crate::IcebergCatalogConfig; use crate::schema::IcebergSchemaProvider; /// Provides an interface to manage and access multiple schemas @@ -47,6 +48,24 @@ impl IcebergCatalogProvider { /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. pub async fn try_new(client: Arc) -> Result { + Self::try_new_impl(client, None).await + } + + /// Like [`try_new`](Self::try_new), but threads a serializable + /// [`IcebergCatalogConfig`] into every schema and table provider it creates, + /// so the catalog's tables can be queried by a distributed engine such as + /// Ballista. The `client` must already be built from the same `config`. + pub async fn try_new_with_config( + client: Arc, + config: IcebergCatalogConfig, + ) -> Result { + Self::try_new_impl(client, Some(config)).await + } + + async fn try_new_impl( + client: Arc, + config: Option, + ) -> Result { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. @@ -63,6 +82,7 @@ impl IcebergCatalogProvider { .map(|name| { IcebergSchemaProvider::try_new( client.clone(), + config.clone(), NamespaceIdent::new(name.clone()), ) }) diff --git a/crates/integrations/datafusion/src/catalog_config.rs b/crates/integrations/datafusion/src/catalog_config.rs new file mode 100644 index 0000000000..b62e500ffe --- /dev/null +++ b/crates/integrations/datafusion/src/catalog_config.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +/// A serializable description of the catalog (and storage) that backs an +/// [`IcebergTableProvider`](crate::table::IcebergTableProvider). +/// +/// This is the minimal, self-contained handle needed to *reconstruct* a catalog +/// and its associated `FileIO` on a remote node. It deliberately holds only +/// plain data (no live connections) so that distributed query engines such as +/// Ballista can serialize it, ship it to executors, and rebuild the catalog +/// there via a catalog loader (e.g. `iceberg-catalog-loader`) and the storage +/// via `FileIOBuilder::with_props`. +/// +/// The `props` map carries both the catalog connection properties (e.g. the +/// REST catalog URI) and the storage/`FileIO` properties (e.g. S3 endpoint and +/// credentials); in practice these live together in a single map. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct IcebergCatalogConfig { + /// The catalog type, e.g. `"rest"`, `"sql"`, `"glue"`. + pub r#type: String, + pub name: String, + /// Catalog connection and storage properties. + pub props: HashMap, +} + +impl IcebergCatalogConfig { + pub fn new( + r#type: impl Into, + name: impl Into, + props: HashMap, + ) -> Self { + Self { + r#type: r#type.into(), + name: name.into(), + props, + } + } +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 4b0ea8606d..19475aeda2 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -18,6 +18,9 @@ mod catalog; pub use catalog::*; +mod catalog_config; +pub use catalog_config::*; + mod error; pub use error::*; diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 835c804908..d3a43c4006 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -41,13 +41,17 @@ use crate::to_datafusion_error; /// IcebergCommitExec is responsible for collecting the files written and use /// [`Transaction::fast_append`] to commit the data files written. #[derive(Debug)] -pub(crate) struct IcebergCommitExec { +pub struct IcebergCommitExec { table: Table, catalog: Arc, input: Arc, schema: ArrowSchemaRef, count_schema: ArrowSchemaRef, plan_properties: Arc, + /// Optional serializable catalog/storage config, populated when this node is + /// built through a config-backed provider so it can be reconstructed on a + /// remote node by a distributed engine. + catalog_config: Option, } impl IcebergCommitExec { @@ -68,9 +72,31 @@ impl IcebergCommitExec { schema, count_schema, plan_properties, + catalog_config: None, } } + /// Attaches a serializable catalog/storage config to this node so that a + /// distributed engine can reconstruct it (including the catalog) on a remote + /// node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Returns the table this node commits to. + pub fn table(&self) -> &Table { + &self.table + } + // Compute the plan properties for this execution plan fn compute_properties(schema: ArrowSchemaRef) -> Arc { Arc::new(PlanProperties::new( @@ -160,12 +186,15 @@ impl ExecutionPlan for IcebergCommitExec { ))); } - Ok(Arc::new(IcebergCommitExec::new( - self.table.clone(), - self.catalog.clone(), - children[0].clone(), - self.schema.clone(), - ))) + Ok(Arc::new( + IcebergCommitExec::new( + self.table.clone(), + self.catalog.clone(), + children[0].clone(), + self.schema.clone(), + ) + .with_catalog_config(self.catalog_config.clone()), + )) } fn execute( @@ -592,6 +621,7 @@ mod tests { let iceberg_table_provider = IcebergTableProvider::try_new( catalog.clone(), + None, namespace.clone(), table_name.to_string(), ) diff --git a/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs index a1a65dec1f..d25a897cc6 100644 --- a/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs @@ -45,6 +45,13 @@ impl IcebergMetadataScan { properties, } } + + /// Returns the metadata-table provider this node scans, so a distributed + /// engine can serialize the catalog config + table identifier + metadata type + /// it carries and rebuild it on a remote node. + pub fn provider(&self) -> &IcebergMetadataTableProvider { + &self.provider + } } impl DisplayAs for IcebergMetadataScan { diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aeac30de32..024266a4e9 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -26,6 +26,9 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; +pub use commit::IcebergCommitExec; pub use expr_to_predicate::convert_filters_to_predicate; -pub use project::project_with_partition; +pub use metadata_scan::IcebergMetadataScan; +pub use project::{PartitionExpr, project_with_partition}; pub use scan::IcebergTableScan; +pub use write::IcebergWriteExec; diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index 670d961f91..00c3c848d6 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -30,7 +30,7 @@ use iceberg::arrow::{ PROJECTED_PARTITION_VALUE_COLUMN, PartitionValueCalculator, schema_to_arrow_schema, strip_metadata_from_schema, }; -use iceberg::spec::PartitionSpec; +use iceberg::spec::{PartitionSpec, SchemaRef}; use iceberg::table::Table; use crate::to_datafusion_error; @@ -79,10 +79,6 @@ pub fn project_with_partition( ))); } - let calculator = - PartitionValueCalculator::try_new(partition_spec.as_ref(), table_schema.as_ref()) - .map_err(to_datafusion_error)?; - let mut projection_exprs: Vec<(Arc, String)> = Vec::with_capacity(input_schema.fields().len() + 1); @@ -91,26 +87,51 @@ pub fn project_with_partition( projection_exprs.push((column_expr, field.name().clone())); } - let partition_expr = Arc::new(PartitionExpr::new(calculator, partition_spec.clone())); + let partition_expr = Arc::new(PartitionExpr::try_new( + partition_spec.clone(), + table_schema.clone(), + )?); projection_exprs.push((partition_expr, PROJECTED_PARTITION_VALUE_COLUMN.to_string())); let projection = ProjectionExec::try_new(projection_exprs, input)?; Ok(Arc::new(projection)) } -/// PhysicalExpr implementation for partition value calculation +/// `PhysicalExpr` that computes Iceberg partition values for each input row. +/// +/// Alongside the live (non-serializable) [`PartitionValueCalculator`], it retains +/// the [`PartitionSpec`] and table schema it was built from. A distributed engine +/// can serialize those two — both are self-contained iceberg spec types — and +/// rebuild an equivalent expression on a remote node via [`PartitionExpr::try_new`]. #[derive(Debug, Clone)] -struct PartitionExpr { +pub struct PartitionExpr { calculator: Arc, partition_spec: Arc, + table_schema: SchemaRef, } impl PartitionExpr { - fn new(calculator: PartitionValueCalculator, partition_spec: Arc) -> Self { - Self { + /// Builds a partition expression from a partition spec and the table schema + /// it is bound to, constructing the underlying [`PartitionValueCalculator`]. + pub fn try_new(partition_spec: Arc, table_schema: SchemaRef) -> DFResult { + let calculator = + PartitionValueCalculator::try_new(partition_spec.as_ref(), table_schema.as_ref()) + .map_err(to_datafusion_error)?; + Ok(Self { calculator: Arc::new(calculator), partition_spec, - } + table_schema, + }) + } + + /// The partition spec whose values this expression computes. + pub fn partition_spec(&self) -> &Arc { + &self.partition_spec + } + + /// The table schema the partition values are derived from. + pub fn table_schema(&self) -> &SchemaRef { + &self.table_schema } } @@ -248,8 +269,6 @@ mod tests { let input = Arc::new(EmptyExec::new(arrow_schema.clone())); - let calculator = PartitionValueCalculator::try_new(&partition_spec, &table_schema).unwrap(); - let mut projection_exprs: Vec<(Arc, String)> = Vec::with_capacity(arrow_schema.fields().len() + 1); for (i, field) in arrow_schema.fields().iter().enumerate() { @@ -257,7 +276,9 @@ mod tests { projection_exprs.push((column_expr, field.name().clone())); } - let partition_expr = Arc::new(PartitionExpr::new(calculator, partition_spec)); + let partition_expr = Arc::new( + PartitionExpr::try_new(partition_spec, Arc::new(table_schema.clone())).unwrap(), + ); projection_exprs.push((partition_expr, PROJECTED_PARTITION_VALUE_COLUMN.to_string())); let projection = ProjectionExec::try_new(projection_exprs, input).unwrap(); @@ -302,7 +323,7 @@ mod tests { let partition_spec = Arc::new(partition_spec); let calculator = PartitionValueCalculator::try_new(&partition_spec, &table_schema).unwrap(); let partition_type = calculator.partition_arrow_type().clone(); - let expr = PartitionExpr::new(calculator, partition_spec); + let expr = PartitionExpr::try_new(partition_spec, Arc::new(table_schema.clone())).unwrap(); assert_eq!(expr.data_type(&arrow_schema).unwrap(), partition_type); assert!(!expr.nullable(&arrow_schema).unwrap()); diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 36539ae503..098af883e8 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -53,11 +53,15 @@ pub struct IcebergTableScan { predicates: Option, /// Optional limit on the number of rows to return limit: Option, + /// Optional serializable catalog/storage config, populated when this scan is + /// built through a config-backed provider so it can be reconstructed on a + /// remote node by a distributed engine. + catalog_config: Option, } impl IcebergTableScan { /// Creates a new [`IcebergTableScan`] object. - pub(crate) fn new( + pub fn new( table: Table, snapshot_id: Option, schema: ArrowSchemaRef, @@ -80,9 +84,36 @@ impl IcebergTableScan { projection, predicates, limit, + catalog_config: None, } } + /// Attaches a serializable catalog/storage config to this scan so that a + /// distributed engine can reconstruct it on a remote node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Replaces the scan's pushed-down filter predicate. + /// + /// `IcebergTableScan::new` derives the predicate from DataFusion `Expr` + /// filters; this setter lets a distributed engine restore an already-built + /// [`Predicate`] directly (e.g. after deserializing it), so file pruning is + /// preserved on remote nodes. + pub fn with_predicates(mut self, predicates: Option) -> Self { + self.predicates = predicates; + self + } + pub fn table(&self) -> &Table { &self.table } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 697eeec659..b9aa80d493 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -60,11 +60,15 @@ use crate::to_datafusion_error; /// The output of this execution plan is a record batch containing a single column with serialized /// data file information that can be used for committing the write operation to the table. #[derive(Debug)] -pub(crate) struct IcebergWriteExec { +pub struct IcebergWriteExec { table: Table, input: Arc, result_schema: ArrowSchemaRef, plan_properties: Arc, + /// Optional serializable catalog/storage config, populated when this node is + /// built through a config-backed provider so it can be reconstructed on a + /// remote node by a distributed engine. + catalog_config: Option, } impl IcebergWriteExec { @@ -76,9 +80,30 @@ impl IcebergWriteExec { input, result_schema: Self::make_result_schema(), plan_properties, + catalog_config: None, } } + /// Attaches a serializable catalog/storage config to this node so that a + /// distributed engine can reconstruct it on a remote node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Returns the table this node writes to. + pub fn table(&self) -> &Table { + &self.table + } + fn compute_properties( input: &Arc, schema: ArrowSchemaRef, @@ -172,11 +197,10 @@ impl ExecutionPlan for IcebergWriteExec { ))); } - Ok(Arc::new(Self::new( - self.table.clone(), - Arc::clone(&children[0]), - self.schema(), - ))) + Ok(Arc::new( + Self::new(self.table.clone(), Arc::clone(&children[0]), self.schema()) + .with_catalog_config(self.catalog_config.clone()), + )) } /// Executes the write operation for the given partition. diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 76cf599062..f5c7d32912 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -32,7 +32,7 @@ use iceberg::inspect::MetadataTableType; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; -use crate::to_datafusion_error; +use crate::{IcebergCatalogConfig, to_datafusion_error}; /// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing /// access to table providers within a specific namespace. @@ -42,6 +42,10 @@ pub(crate) struct IcebergSchemaProvider { catalog: Arc, /// The namespace this schema represents namespace: NamespaceIdent, + /// Optional serializable catalog/storage config. When present, every table + /// provider this schema creates carries it, so catalog-registered tables can + /// be queried by a distributed engine. + config: Option, /// A concurrent map where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. @@ -57,8 +61,12 @@ impl IcebergSchemaProvider { /// This method retrieves a list of table names /// attempts to create a table provider for each table name, and /// collects these providers into a `HashMap`. + /// + /// When `config` is present it is threaded into every table provider this + /// schema creates, so the tables can be queried by a distributed engine. pub(crate) async fn try_new( client: Arc, + config: Option, namespace: NamespaceIdent, ) -> Result { // TODO: @@ -75,7 +83,14 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + IcebergTableProvider::try_new( + client.clone(), + config.clone(), + namespace.clone(), + name.clone(), + ) + }) .collect::>(), ) .await?; @@ -88,6 +103,7 @@ impl IcebergSchemaProvider { Ok(IcebergSchemaProvider { catalog: client, namespace, + config, tables, }) } @@ -171,6 +187,7 @@ impl SchemaProvider for IcebergSchemaProvider { let catalog = self.catalog.clone(); let namespace = self.namespace.clone(); + let config = self.config.clone(); let tables = self.tables.clone(); let name_clone = name.clone(); @@ -189,9 +206,11 @@ impl SchemaProvider for IcebergSchemaProvider { .await .map_err(to_datafusion_error)?; - // Create a new table provider using the catalog reference + // Create a new table provider using the catalog reference, + // carrying the config so it stays distributable. let table_provider = IcebergTableProvider::try_new( catalog.clone(), + config.clone(), namespace.clone(), name_clone.clone(), ) @@ -315,13 +334,93 @@ mod tests { .await .unwrap(); - let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), namespace) + let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), None, namespace) .await .unwrap(); (provider, temp_dir) } + #[tokio::test] + async fn test_schema_provider_with_config_propagates_to_tables() { + use iceberg::TableCreation; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + + let temp_dir = TempDir::new().unwrap(); + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("test_ns".to_string()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse_path}/t")) + .schema(schema) + .properties(HashMap::new()) + .build(), + ) + .await + .unwrap(); + + // With config: the table provider carries it (and is therefore distributable). + let config = crate::IcebergCatalogConfig::new("memory", "memory", HashMap::new()); + let with_config = + IcebergSchemaProvider::try_new(catalog.clone(), Some(config), namespace.clone()) + .await + .unwrap(); + let provider = with_config + .table("t") + .await + .unwrap() + .expect("table provider"); + let iceberg = provider + .as_any() + .downcast_ref::() + .expect("IcebergTableProvider"); + assert!( + iceberg.config().is_some(), + "try_new_with_config should propagate the config to its tables" + ); + + // Without config: providers stay config-less (legacy behavior). + let without_config = + IcebergSchemaProvider::try_new(catalog.clone(), None, namespace.clone()) + .await + .unwrap(); + let provider = without_config + .table("t") + .await + .unwrap() + .expect("table provider"); + let iceberg = provider + .as_any() + .downcast_ref::() + .expect("IcebergTableProvider"); + assert!(iceberg.config().is_none()); + } + #[tokio::test] async fn test_register_table_with_data_fails() { let (schema_provider, _temp_dir) = create_test_schema_provider().await; diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs b/crates/integrations/datafusion/src/table/metadata_table.rs index 38148b4084..ab3c1befa7 100644 --- a/crates/integrations/datafusion/src/table/metadata_table.rs +++ b/crates/integrations/datafusion/src/table/metadata_table.rs @@ -41,6 +41,47 @@ use crate::to_datafusion_error; pub struct IcebergMetadataTableProvider { pub(crate) table: Table, pub(crate) r#type: MetadataTableType, + /// Optional serializable catalog/storage config, populated when this provider + /// is built through a config-backed table provider so that a distributed + /// engine can reconstruct it (reload the table from the catalog) on a remote + /// node. + catalog_config: Option, +} + +impl IcebergMetadataTableProvider { + /// Creates a metadata-table provider over an already-loaded table. + pub fn new(table: Table, r#type: MetadataTableType) -> Self { + Self { + table, + r#type, + catalog_config: None, + } + } + + /// Attaches a serializable catalog/storage config so that a distributed engine + /// can reconstruct this provider on a remote node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Returns the table this provider inspects. + pub fn table(&self) -> &Table { + &self.table + } + + /// Returns which metadata table this provider serves. + pub fn metadata_type(&self) -> &MetadataTableType { + &self.r#type + } } #[async_trait] diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..aa8a2a0872 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -47,7 +47,7 @@ use iceberg::inspect::MetadataTableType; use iceberg::spec::TableProperties; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; -use metadata_table::IcebergMetadataTableProvider; +pub use metadata_table::IcebergMetadataTableProvider; use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; @@ -73,6 +73,15 @@ pub struct IcebergTableProvider { table_ident: TableIdent, /// A reference-counted arrow `Schema` (cached at construction) schema: ArrowSchemaRef, + /// Optional serializable catalog/storage config. When present, it is + /// threaded into the execution plan nodes produced by `scan`/`insert_into` + /// so that a distributed engine can reconstruct them (and their catalog and + /// storage) on remote nodes. + config: Option, + /// Optional snapshot to read. `None` reads the current snapshot (refreshed + /// from the catalog on each scan); `Some` pins reads to that snapshot for + /// time-travel. Writes always target the current table state. + snapshot_id: Option, } impl IcebergTableProvider { @@ -82,6 +91,7 @@ impl IcebergTableProvider { /// reference for future metadata refreshes on each operation. pub(crate) async fn try_new( catalog: Arc, + config: Option, namespace: NamespaceIdent, name: impl Into, ) -> Result { @@ -95,16 +105,59 @@ impl IcebergTableProvider { catalog, table_ident, schema, + config, + snapshot_id: None, }) } + /// Creates a catalog-backed table provider that carries a serializable + /// [`IcebergCatalogConfig`](crate::IcebergCatalogConfig). + /// + /// The `catalog` must already be built from the same `config`. The config is + /// threaded into the execution plan nodes this provider produces so that a + /// distributed engine (e.g. Ballista) can serialize those nodes and rebuild + /// the catalog/storage on remote executors. + pub async fn try_new_with_config( + catalog: Arc, + config: crate::IcebergCatalogConfig, + namespace: NamespaceIdent, + name: impl Into, + ) -> Result { + Self::try_new(catalog, Some(config), namespace, name).await + } + + /// Pins reads to a specific snapshot for time-travel. `None` (the default) + /// reads the current snapshot. The snapshot id is threaded into the scan + /// node, so it is serialized and honored by a distributed engine as well. + pub fn with_snapshot_id(mut self, snapshot_id: Option) -> Self { + self.snapshot_id = snapshot_id; + self + } + + /// Returns the snapshot this provider reads, if pinned for time-travel. + pub fn snapshot_id(&self) -> Option { + self.snapshot_id + } + + /// Returns the serializable catalog/storage config, if this provider was + /// created with one. + pub fn config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.config.as_ref() + } + + /// Returns the identifier of the table this provider serves. + pub fn table_ident(&self) -> &TableIdent { + &self.table_ident + } + pub(crate) async fn metadata_table( &self, r#type: MetadataTableType, ) -> Result { // Load fresh table metadata for metadata table access let table = self.catalog.load_table(&self.table_ident).await?; - Ok(IcebergMetadataTableProvider { table, r#type }) + Ok(IcebergMetadataTableProvider::new(table, r#type) + .with_catalog_config(self.config.clone())) } } @@ -136,15 +189,18 @@ impl TableProvider for IcebergTableProvider { .await .map_err(to_datafusion_error)?; - // Create scan with fresh metadata (always use current snapshot) - Ok(Arc::new(IcebergTableScan::new( - table, - None, // Always use current snapshot for catalog-backed provider - self.schema.clone(), - projection, - filters, - limit, - ))) + // Create scan with fresh metadata, honoring a pinned snapshot if set. + Ok(Arc::new( + IcebergTableScan::new( + table, + self.snapshot_id, + self.schema.clone(), + projection, + filters, + limit, + ) + .with_catalog_config(self.config.clone()), + )) } fn supports_filters_pushdown( @@ -217,21 +273,23 @@ impl TableProvider for IcebergTableProvider { sort_by_partition(repartitioned_plan)? }; - let write_plan = Arc::new(IcebergWriteExec::new( - table.clone(), - write_input, - self.schema.clone(), - )); + let write_plan = Arc::new( + IcebergWriteExec::new(table.clone(), write_input, self.schema.clone()) + .with_catalog_config(self.config.clone()), + ); // Merge the outputs of write_plan into one so we can commit all files together let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan)); - Ok(Arc::new(IcebergCommitExec::new( - table, - self.catalog.clone(), - coalesce_partitions, - self.schema.clone(), - ))) + Ok(Arc::new( + IcebergCommitExec::new( + table, + self.catalog.clone(), + coalesce_partitions, + self.schema.clone(), + ) + .with_catalog_config(self.config.clone()), + )) } } @@ -526,10 +584,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; // Test creating a catalog-backed provider - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); // Verify the schema is loaded correctly let schema = provider.schema(); @@ -542,10 +604,14 @@ mod tests { async fn test_catalog_backed_provider_scan() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -568,10 +634,14 @@ mod tests { async fn test_catalog_backed_provider_insert() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -595,10 +665,14 @@ mod tests { async fn test_physical_input_schema_consistent_with_logical_input_schema() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -720,10 +794,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_partitioned_test_catalog_and_table(Some(true)).await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); let input_schema = provider.schema(); @@ -752,10 +830,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_partitioned_test_catalog_and_table(Some(false)).await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); let input_schema = provider.schema(); @@ -812,10 +894,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); let state = ctx.state(); @@ -865,4 +951,40 @@ mod tests { "Limit should be None when not specified" ); } + + #[tokio::test] + async fn test_with_snapshot_id_pins_scan() { + use datafusion::datasource::TableProvider; + + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + + // Default provider reads the current snapshot (None in the scan). + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); + assert_eq!(provider.snapshot_id(), None); + + // Pinning a snapshot threads it into the scan node, where the codec reads + // it — so time-travel is honored locally and distributed. + let pinned = provider.with_snapshot_id(Some(123)); + assert_eq!(pinned.snapshot_id(), Some(123)); + + let ctx = SessionContext::new(); + let state = ctx.state(); + let scan_plan = pinned.scan(&state, None, &[], None).await.unwrap(); + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + assert_eq!( + iceberg_scan.snapshot_id(), + Some(123), + "pinned snapshot should propagate to the scan node" + ); + } }