From 214a11036d658ae0cabf208cfa30d06f03ec3f47 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Jun 2026 17:09:03 -0400 Subject: [PATCH 1/3] use recordwritermanager to spill over when file size gets too large --- .../iceberg/WritePartitionedRowsToFiles.java | 49 +++++++++++------ ...ebergWriteSchemaTransformProviderTest.java | 52 +++++++++++++++++++ 2 files changed, 85 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 8b4ae0863f72..316ffae07613 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.iceberg.RecordWriterManager.getPartitionDataPath; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; @@ -31,9 +32,13 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionField; @@ -41,13 +46,14 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.util.PropertyUtil; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -110,7 +116,10 @@ public void setup() { @ProcessElement public void processElement( - @Element KV> element, OutputReceiver out) + @Element KV> element, + BoundedWindow window, + PaneInfo paneInfo, + OutputReceiver out) throws Exception { String tableIdentifier = checkStateNotNull(element.getKey().getString(DESTINATION)); String partitionPath = checkStateNotNull(element.getKey().getString(PARTITION)); @@ -130,24 +139,32 @@ public void processElement( destination .getFileFormat() .addExtension(String.format("%s-%s", filePrefix, UUID.randomUUID())); - - RecordWriter writer = - new RecordWriter(table, destination.getFileFormat(), fileName, partitionData); - try { + System.out.println(partitionData + fileName); + + long maxFileSize = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + WindowedValue windowedDestination = + WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); + RecordWriterManager writer = + new RecordWriterManager(catalogConfig, filePrefix, maxFileSize, Integer.MAX_VALUE); + try (writer) { for (Row row : element.getValue()) { - Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); - writer.write(record); + writer.write(windowedDestination, row); } - } finally { - writer.close(); } - SerializableDataFile sdf = SerializableDataFile.from(writer.getDataFile(), partitionPath); - out.output( - FileWriteResult.builder() - .setTableIdentifier(destination.getTableIdentifier()) - .setSerializableDataFile(sdf) - .build()); + List serializableDataFiles = + checkStateNotNull(writer.getSerializableDataFiles().get(windowedDestination)); + for (SerializableDataFile dataFile : serializableDataFiles) { + out.output( + FileWriteResult.builder() + .setTableIdentifier(destination.getTableIdentifier()) + .setSerializableDataFile(dataFile) + .build()); + } } private Map getPartitionFieldMap( diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index c5fc5a6b6fe7..36d7ea8dc69f 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -58,9 +58,11 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; @@ -491,6 +493,56 @@ public void writePartitionedData(boolean autosharding) { p.run(); } + @Test + public void testWritingPartitionedDataRespectsFileSizePropertyWithSpillOver() { + Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + PartitionSpec spec = PartitionSpec.builderFor(icebergSchema).identity("str").build(); + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + + Table table = + warehouse.createTable( + TableIdentifier.parse(identifier), + icebergSchema, + spec, + ImmutableMap.of(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "1")); + Map config = + ImmutableMap.of( + "table", + identifier, + "catalog_properties", + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), + "distribution_mode", + distributionMode.name()); + + List rows = + Arrays.asList( + Row.withSchema(schema).addValues("a", 1).build(), + Row.withSchema(schema).addValues("a", 2).build(), + Row.withSchema(schema).addValues("a", 3).build(), + Row.withSchema(schema).addValues("b", 1).build(), + Row.withSchema(schema).addValues("b", 2).build(), + Row.withSchema(schema).addValues("b", 3).build()); + + testPipeline + .apply("Records To Add", Create.of(rows)) + .setRowSchema(schema) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)); + testPipeline.run().waitUntilFinish(); + + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + PCollection readRows = + p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + PAssert.that(readRows).containsInAnyOrder(rows); + + table.refresh(); + List datafiles = new ArrayList<>(); + table.snapshots().forEach(s -> s.addedDataFiles(table.io()).forEach(datafiles::add)); + assertEquals(6, datafiles.size()); + + p.run(); + } + @Test public void testWriteCreateTableWithPartitionSpec() { String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); From f627cb17586ce989cd203571e9c1714cf57d4e92 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Jun 2026 17:29:03 -0400 Subject: [PATCH 2/3] cleanup unused code --- .../iceberg/WritePartitionedRowsToFiles.java | 51 ++----------------- 1 file changed, 3 insertions(+), 48 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 316ffae07613..21e2d1b7e2de 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -18,13 +18,10 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.DESTINATION; -import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.PARTITION; -import static org.apache.beam.sdk.io.iceberg.RecordWriterManager.getPartitionDataPath; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import java.util.List; import java.util.Map; -import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; @@ -40,11 +37,7 @@ import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; @@ -54,7 +47,6 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.util.PropertyUtil; -import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,9 +85,6 @@ private static class WriteDoFn extends DoFn>, FileWriteRes private final IcebergCatalogConfig catalogConfig; private final String filePrefix; private final Schema dataSchema; - private transient @MonotonicNonNull Map specIds; - private transient @MonotonicNonNull Map> - partitionFieldMaps; WriteDoFn( IcebergCatalogConfig catalogConfig, @@ -108,12 +97,6 @@ private static class WriteDoFn extends DoFn>, FileWriteRes this.dataSchema = dataSchema; } - @Setup - public void setup() { - partitionFieldMaps = Maps.newHashMap(); - specIds = Maps.newHashMap(); - } - @ProcessElement public void processElement( @Element KV> element, @@ -122,24 +105,9 @@ public void processElement( OutputReceiver out) throws Exception { String tableIdentifier = checkStateNotNull(element.getKey().getString(DESTINATION)); - String partitionPath = checkStateNotNull(element.getKey().getString(PARTITION)); IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); Table table = getOrCreateTable(destination, dataSchema); - partitionPath = - getPartitionDataPath( - partitionPath, getPartitionFieldMap(destination.getTableIdentifier(), table)); - - StructLike partitionData = - table.spec().isPartitioned() - ? DataFiles.data(table.spec(), partitionPath) - : new PartitionKey(table.spec(), table.schema()); - - String fileName = - destination - .getFileFormat() - .addExtension(String.format("%s-%s", filePrefix, UUID.randomUUID())); - System.out.println(partitionData + fileName); long maxFileSize = PropertyUtil.propertyAsLong( @@ -150,10 +118,12 @@ public void processElement( WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); RecordWriterManager writer = new RecordWriterManager(catalogConfig, filePrefix, maxFileSize, Integer.MAX_VALUE); - try (writer) { + try { for (Row row : element.getValue()) { writer.write(windowedDestination, row); } + } finally { + writer.close(); } List serializableDataFiles = @@ -167,21 +137,6 @@ public void processElement( } } - private Map getPartitionFieldMap( - TableIdentifier identifier, Table table) { - @Nullable Integer specId = checkStateNotNull(specIds).get(identifier); - if (specId != null && specId == table.spec().specId()) { - return checkStateNotNull(checkStateNotNull(partitionFieldMaps).get(identifier)); - } - Map partitionFieldMap = Maps.newHashMap(); - for (PartitionField partitionField : table.spec().fields()) { - partitionFieldMap.put(partitionField.name(), partitionField); - } - checkStateNotNull(specIds).put(identifier, table.spec().specId()); - checkStateNotNull(partitionFieldMaps).put(identifier, partitionFieldMap); - return partitionFieldMap; - } - Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { TableIdentifier identifier = destination.getTableIdentifier(); return TableCache.getAndRefreshIfStale( From 5f49c8815d32c7d0c1c083677f2c1bd2b54811b7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Jun 2026 17:31:12 -0400 Subject: [PATCH 3/3] address comments --- .../beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 21e2d1b7e2de..a3f58ba18343 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -126,8 +126,12 @@ public void processElement( writer.close(); } + @Nullable List serializableDataFiles = - checkStateNotNull(writer.getSerializableDataFiles().get(windowedDestination)); + writer.getSerializableDataFiles().get(windowedDestination); + if (serializableDataFiles == null) { + return; + } for (SerializableDataFile dataFile : serializableDataFiles) { out.output( FileWriteResult.builder()