From 6d97b4df401ec946aadbaf5f5c31ba88fdc16230 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 27 Jun 2026 15:33:46 -0700 Subject: [PATCH 1/3] fix(streamer): write non-partitioned sample-writes table for record-size estimation The first-batch sample write now rewrites each record with an empty partition path so the auxiliary sample-writes table is non-partitioned. Previously, when the incoming batch spanned many source partitions, the bulk insert (run with parallelism 1) emitted at least one tiny file per partition path, which slowed the sample write and inflated the per-record size estimate through per-file metadata overhead. Writing a single non-partitioned file yields a faster sample write and a more accurate estimate. --- .../streamer/SparkSampleWritesUtils.java | 11 ++- .../TestSparkSampleWritesUtils.java | 72 ++++++++++++++++++- 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java index c50ff3484cad2..16a32a2fa6f0c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -108,7 +110,14 @@ private static Pair doSampleWrites(JavaSparkContext jsc, Option try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) { int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE); return recordsOpt.map(records -> { - List samples = records.coalesce(1).take(size); + // Rewrite each record with an empty partition path so the sample-writes table is + // effectively non-partitioned. The bulk-insert writer routes records to files by + // HoodieRecord.getPartitionPath(); without this, an input that spans many source + // partitions fans out into many tiny files even though parallelism is 1, slowing + // the sample write and inflating per-file metadata in the size estimate. + List samples = records.coalesce(1).take(size).stream() + .map(r -> r.newInstance(new HoodieKey(r.getRecordKey(), ""))) + .collect(Collectors.toList()); if (samples.isEmpty()) { return emptyRes; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java index 2706a97e5d5c0..84a163e8da654 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java @@ -32,12 +32,19 @@ import org.apache.hudi.utilities.config.HoodieStreamerConfig; import org.apache.hudi.utilities.streamer.SparkSampleWritesUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -86,7 +93,7 @@ public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws Excepti } @Test - public void overwriteRecordSizeEstimateForEmptyTable() { + public void overwriteRecordSizeEstimateForEmptyTable() throws IOException { int originalRecordSize = 100; TypedProperties props = new TypedProperties(); props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true"); @@ -99,9 +106,70 @@ public void overwriteRecordSizeEstimateForEmptyTable() { .build(); String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1); + // dataGen round-robins across the three DEFAULT_PARTITION_PATHS, so the input spans + // multiple source partitions. The sample-writes table must still write non-partitioned. JavaRDD records = jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2); Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig); assertTrue(writeConfigOpt.isPresent()); - assertEquals(779.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0); + // The 2000 records now land in a single non-partitioned file instead of being diluted + // across the three source partitions, so the per-record estimate is lower and more + // accurate (one parquet footer/dictionary amortized over all records) than the previous + // multi-file value of ~779. + assertEquals(337.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0); + assertSampleWritesNonPartitioned(); + } + + @Test + public void sampleWritesAreNonPartitionedEvenForManyPartitionInput() throws IOException { + int recordsPerPartition = 50; + String[] partitionPaths = IntStream.range(0, 20) + .mapToObj(i -> String.format("year=2024/month=01/day=%02d", i + 1)) + .toArray(String[]::new); + HoodieTestDataGenerator manyPartitionGen = new HoodieTestDataGenerator(partitionPaths); + + TypedProperties props = new TypedProperties(); + props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true"); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withProperties(props) + .forTable("foo") + .withPath(basePath()) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .build(); + + String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1); + List allRecords = new ArrayList<>(); + for (String partition : partitionPaths) { + allRecords.addAll(manyPartitionGen.generateInsertsForPartition(commitTime, recordsPerPartition, partition)); + } + JavaRDD records = jsc().parallelize(allRecords, 4); + + Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), writeConfig); + assertTrue(writeConfigOpt.isPresent(), "Sample write should produce a record-size estimate."); + assertSampleWritesNonPartitioned(); + } + + /** + * Walks the sample-writes folder and fails if any data files were placed under a + * source-partition subdirectory. A non-partitioned sample-writes table places its data + * files directly under each run directory; a partitioned write would fan them out across + * subdirectories named after the source partition paths. + */ + private void assertSampleWritesNonPartitioned() throws IOException { + Path sampleWritesPath = new Path(basePath(), ".hoodie/.aux/.sample_writes"); + FileSystem fs = sampleWritesPath.getFileSystem(jsc().hadoopConfiguration()); + assertTrue(fs.exists(sampleWritesPath), "Sample-writes folder should exist after a sample write."); + FileStatus[] runs = fs.listStatus(sampleWritesPath); + assertTrue(runs.length > 0, "Sample-writes folder should contain at least one run."); + for (FileStatus run : runs) { + List partitionDirs = new ArrayList<>(); + for (FileStatus entry : fs.listStatus(run.getPath())) { + if (entry.isDirectory() && !entry.getPath().getName().equals(".hoodie")) { + partitionDirs.add(entry.getPath().getName()); + } + } + assertTrue(partitionDirs.isEmpty(), + "Sample-writes run at " + run.getPath() + " should have no source partition subdirectories, but found: " + + Arrays.toString(partitionDirs.toArray())); + } } } From 05bd495713173c13fa4e7e0f4c9d51aab7888007 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 29 Jun 2026 21:52:21 -0700 Subject: [PATCH 2/3] Simplify code comments and drop pre-fix references in tests --- .../utilities/streamer/SparkSampleWritesUtils.java | 7 ++----- .../deltastreamer/TestSparkSampleWritesUtils.java | 13 +++---------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java index 16a32a2fa6f0c..b85d5241196df 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java @@ -110,11 +110,8 @@ private static Pair doSampleWrites(JavaSparkContext jsc, Option try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) { int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE); return recordsOpt.map(records -> { - // Rewrite each record with an empty partition path so the sample-writes table is - // effectively non-partitioned. The bulk-insert writer routes records to files by - // HoodieRecord.getPartitionPath(); without this, an input that spans many source - // partitions fans out into many tiny files even though parallelism is 1, slowing - // the sample write and inflating per-file metadata in the size estimate. + // Empty partition path so all sampled records write to a single non-partitioned file, + // instead of fanning out into one tiny file per source partition and skewing the estimate. List samples = records.coalesce(1).take(size).stream() .map(r -> r.newInstance(new HoodieKey(r.getRecordKey(), ""))) .collect(Collectors.toList()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java index 84a163e8da654..69032a008cc88 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java @@ -106,15 +106,10 @@ public void overwriteRecordSizeEstimateForEmptyTable() throws IOException { .build(); String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1); - // dataGen round-robins across the three DEFAULT_PARTITION_PATHS, so the input spans - // multiple source partitions. The sample-writes table must still write non-partitioned. + // Input spans multiple source partitions; the sample-writes table must still be non-partitioned. JavaRDD records = jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2); Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig); assertTrue(writeConfigOpt.isPresent()); - // The 2000 records now land in a single non-partitioned file instead of being diluted - // across the three source partitions, so the per-record estimate is lower and more - // accurate (one parquet footer/dictionary amortized over all records) than the previous - // multi-file value of ~779. assertEquals(337.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0); assertSampleWritesNonPartitioned(); } @@ -149,10 +144,8 @@ public void sampleWritesAreNonPartitionedEvenForManyPartitionInput() throws IOEx } /** - * Walks the sample-writes folder and fails if any data files were placed under a - * source-partition subdirectory. A non-partitioned sample-writes table places its data - * files directly under each run directory; a partitioned write would fan them out across - * subdirectories named after the source partition paths. + * Fails if the sample-writes folder contains any source-partition subdirectory, i.e. the + * sample write was not flattened into a single non-partitioned file. */ private void assertSampleWritesNonPartitioned() throws IOException { Path sampleWritesPath = new Path(basePath(), ".hoodie/.aux/.sample_writes"); From 4ec295beec0140130332350eaf4f7702a9d4ac6d Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 29 Jun 2026 22:46:14 -0700 Subject: [PATCH 3/3] Make sample-writes test methods package-private --- .../utilities/deltastreamer/TestSparkSampleWritesUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java index 69032a008cc88..f751477927146 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java @@ -93,7 +93,7 @@ public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws Excepti } @Test - public void overwriteRecordSizeEstimateForEmptyTable() throws IOException { + void overwriteRecordSizeEstimateForEmptyTable() throws IOException { int originalRecordSize = 100; TypedProperties props = new TypedProperties(); props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true"); @@ -115,7 +115,7 @@ public void overwriteRecordSizeEstimateForEmptyTable() throws IOException { } @Test - public void sampleWritesAreNonPartitionedEvenForManyPartitionInput() throws IOException { + void sampleWritesAreNonPartitionedEvenForManyPartitionInput() throws IOException { int recordsPerPartition = 50; String[] partitionPaths = IntStream.range(0, 20) .mapToObj(i -> String.format("year=2024/month=01/day=%02d", i + 1))