diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 8b4ae0863f72..a3f58ba18343 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -18,12 +18,10 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.DESTINATION; -import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.PARTITION; -import static org.apache.beam.sdk.io.iceberg.RecordWriterManager.getPartitionDataPath; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import java.util.List; import java.util.Map; -import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; @@ -31,24 +29,24 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.apache.iceberg.util.PropertyUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,9 +85,6 @@ private static class WriteDoFn extends DoFn>, FileWriteRes private final IcebergCatalogConfig catalogConfig; private final String filePrefix; private final Schema dataSchema; - private transient @MonotonicNonNull Map specIds; - private transient @MonotonicNonNull Map> - partitionFieldMaps; WriteDoFn( IcebergCatalogConfig catalogConfig, @@ -102,67 +97,48 @@ private static class WriteDoFn extends DoFn>, FileWriteRes this.dataSchema = dataSchema; } - @Setup - public void setup() { - partitionFieldMaps = Maps.newHashMap(); - specIds = Maps.newHashMap(); - } - @ProcessElement public void processElement( - @Element KV> element, OutputReceiver out) + @Element KV> element, + BoundedWindow window, + PaneInfo paneInfo, + OutputReceiver out) throws Exception { String tableIdentifier = checkStateNotNull(element.getKey().getString(DESTINATION)); - String partitionPath = checkStateNotNull(element.getKey().getString(PARTITION)); IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); Table table = getOrCreateTable(destination, dataSchema); - partitionPath = - getPartitionDataPath( - partitionPath, getPartitionFieldMap(destination.getTableIdentifier(), table)); - - StructLike partitionData = - table.spec().isPartitioned() - ? DataFiles.data(table.spec(), partitionPath) - : new PartitionKey(table.spec(), table.schema()); - - String fileName = - destination - .getFileFormat() - .addExtension(String.format("%s-%s", filePrefix, UUID.randomUUID())); - - RecordWriter writer = - new RecordWriter(table, destination.getFileFormat(), fileName, partitionData); + + long maxFileSize = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + WindowedValue windowedDestination = + WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); + RecordWriterManager writer = + new RecordWriterManager(catalogConfig, filePrefix, maxFileSize, Integer.MAX_VALUE); try { for (Row row : element.getValue()) { - Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); - writer.write(record); + writer.write(windowedDestination, row); } } finally { writer.close(); } - SerializableDataFile sdf = SerializableDataFile.from(writer.getDataFile(), partitionPath); - out.output( - FileWriteResult.builder() - .setTableIdentifier(destination.getTableIdentifier()) - .setSerializableDataFile(sdf) - .build()); - } - - private Map getPartitionFieldMap( - TableIdentifier identifier, Table table) { - @Nullable Integer specId = checkStateNotNull(specIds).get(identifier); - if (specId != null && specId == table.spec().specId()) { - return checkStateNotNull(checkStateNotNull(partitionFieldMaps).get(identifier)); + @Nullable + List serializableDataFiles = + writer.getSerializableDataFiles().get(windowedDestination); + if (serializableDataFiles == null) { + return; } - Map partitionFieldMap = Maps.newHashMap(); - for (PartitionField partitionField : table.spec().fields()) { - partitionFieldMap.put(partitionField.name(), partitionField); + for (SerializableDataFile dataFile : serializableDataFiles) { + out.output( + FileWriteResult.builder() + .setTableIdentifier(destination.getTableIdentifier()) + .setSerializableDataFile(dataFile) + .build()); } - checkStateNotNull(specIds).put(identifier, table.spec().specId()); - checkStateNotNull(partitionFieldMaps).put(identifier, partitionFieldMap); - return partitionFieldMap; } Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index c5fc5a6b6fe7..36d7ea8dc69f 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -58,9 +58,11 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; @@ -491,6 +493,56 @@ public void writePartitionedData(boolean autosharding) { p.run(); } + @Test + public void testWritingPartitionedDataRespectsFileSizePropertyWithSpillOver() { + Schema schema = Schema.builder().addStringField("str").addInt32Field("int").build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + PartitionSpec spec = PartitionSpec.builderFor(icebergSchema).identity("str").build(); + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + + Table table = + warehouse.createTable( + TableIdentifier.parse(identifier), + icebergSchema, + spec, + ImmutableMap.of(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "1")); + Map config = + ImmutableMap.of( + "table", + identifier, + "catalog_properties", + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), + "distribution_mode", + distributionMode.name()); + + List rows = + Arrays.asList( + Row.withSchema(schema).addValues("a", 1).build(), + Row.withSchema(schema).addValues("a", 2).build(), + Row.withSchema(schema).addValues("a", 3).build(), + Row.withSchema(schema).addValues("b", 1).build(), + Row.withSchema(schema).addValues("b", 2).build(), + Row.withSchema(schema).addValues("b", 3).build()); + + testPipeline + .apply("Records To Add", Create.of(rows)) + .setRowSchema(schema) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)); + testPipeline.run().waitUntilFinish(); + + Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions()); + PCollection readRows = + p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + PAssert.that(readRows).containsInAnyOrder(rows); + + table.refresh(); + List datafiles = new ArrayList<>(); + table.snapshots().forEach(s -> s.addedDataFiles(table.io()).forEach(datafiles::add)); + assertEquals(6, datafiles.size()); + + p.run(); + } + @Test public void testWriteCreateTableWithPartitionSpec() { String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);