Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -44,6 +45,7 @@
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
Expand Down Expand Up @@ -108,7 +110,14 @@ private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, Option
try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
return recordsOpt.map(records -> {
List<HoodieRecord> samples = records.coalesce(1).take(size);
// Rewrite each record with an empty partition path so the sample-writes table is
// effectively non-partitioned. The bulk-insert writer routes records to files by
// HoodieRecord.getPartitionPath(); without this, an input that spans many source
// partitions fans out into many tiny files even though parallelism is 1, slowing
// the sample write and inflating per-file metadata in the size estimate.
List<HoodieRecord> samples = records.coalesce(1).take(size).stream()
.map(r -> r.newInstance(new HoodieKey(r.getRecordKey(), "")))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Collapsing the sample to a single non-partitioned file maximizes parquet footer/dictionary amortization, which is why the estimate drops so much (~779 → ~337). But the real target table is partitioned, so its per-record on-disk size sits somewhere between these depending on how many records each partition actually accumulates per commit. Since this estimate drives copyOnWriteRecordSizeEstimate (file bin-packing), could the single-file layout under-estimate and lead to larger-than-target files for sources whose partitions stay sparse? Curious whether you considered keeping a bounded number of partitions rather than fully flattening to one.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impact is bounded to the first commit: getWriteConfigWithRecordSizeEstimate early-returns once the timeline is non-empty (SparkSampleWritesUtils.java:69), and from the next commit on AverageRecordSizeEstimator.averageBytesPerRecord recomputes from real partitioned commit stats, using this config only as the empty-timeline fallback (AverageRecordSizeEstimator.java:70-105). That estimator also subtracts a per-file metadata estimate and skips below-threshold commits (AverageRecordSizeEstimator.java:86,90,93), so the target semantic is data bytes per record, not footer overhead - which is what the single-file sample now measures. A sparse-partition under-estimate self-corrects after the first commit, and a bounded-partition layout would partly reintroduce the footer overhead this removes.

.collect(Collectors.toList());
if (samples.isEmpty()) {
return emptyRes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.streamer.SparkSampleWritesUtils;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -86,7 +93,7 @@ public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws Excepti
}

@Test
public void overwriteRecordSizeEstimateForEmptyTable() {
public void overwriteRecordSizeEstimateForEmptyTable() throws IOException {
int originalRecordSize = 100;
TypedProperties props = new TypedProperties();
props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
Expand All @@ -99,9 +106,70 @@ public void overwriteRecordSizeEstimateForEmptyTable() {
.build();

String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1);
// dataGen round-robins across the three DEFAULT_PARTITION_PATHS, so the input spans
// multiple source partitions. The sample-writes table must still write non-partitioned.
JavaRDD<HoodieRecord> records = jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2);
Option<HoodieWriteConfig> writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig);
assertTrue(writeConfigOpt.isPresent());
assertEquals(779.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0);
// The 2000 records now land in a single non-partitioned file instead of being diluted
// across the three source partitions, so the per-record estimate is lower and more
// accurate (one parquet footer/dictionary amortized over all records) than the previous
// multi-file value of ~779.
assertEquals(337.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0);
assertSampleWritesNonPartitioned();
}

@Test
public void sampleWritesAreNonPartitionedEvenForManyPartitionInput() throws IOException {
int recordsPerPartition = 50;
String[] partitionPaths = IntStream.range(0, 20)
.mapToObj(i -> String.format("year=2024/month=01/day=%02d", i + 1))
.toArray(String[]::new);
HoodieTestDataGenerator manyPartitionGen = new HoodieTestDataGenerator(partitionPaths);

TypedProperties props = new TypedProperties();
props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProperties(props)
.forTable("foo")
.withPath(basePath())
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.build();

String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1);
List<HoodieRecord> allRecords = new ArrayList<>();
for (String partition : partitionPaths) {
allRecords.addAll(manyPartitionGen.generateInsertsForPartition(commitTime, recordsPerPartition, partition));
}
JavaRDD<HoodieRecord> records = jsc().parallelize(allRecords, 4);

Option<HoodieWriteConfig> writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), writeConfig);
assertTrue(writeConfigOpt.isPresent(), "Sample write should produce a record-size estimate.");
assertSampleWritesNonPartitioned();
}

/**
* Walks the sample-writes folder and fails if any data files were placed under a
* source-partition subdirectory. A non-partitioned sample-writes table places its data
* files directly under each run directory; a partitioned write would fan them out across
* subdirectories named after the source partition paths.
*/
private void assertSampleWritesNonPartitioned() throws IOException {
Path sampleWritesPath = new Path(basePath(), ".hoodie/.aux/.sample_writes");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you use SAMPLE_WRITES_FOLDER_PATH here instead of the hardcoded string? The constant is already imported in SparkSampleWritesUtils; if the path changes, this assertion would silently walk the wrong directory.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you build this from the SAMPLE_WRITES_FOLDER_PATH constant instead of hardcoding .hoodie/.aux/.sample_writes? Keeps the test from silently breaking if that path ever changes.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

FileSystem fs = sampleWritesPath.getFileSystem(jsc().hadoopConfiguration());
assertTrue(fs.exists(sampleWritesPath), "Sample-writes folder should exist after a sample write.");
FileStatus[] runs = fs.listStatus(sampleWritesPath);
assertTrue(runs.length > 0, "Sample-writes folder should contain at least one run.");
for (FileStatus run : runs) {
List<String> partitionDirs = new ArrayList<>();
for (FileStatus entry : fs.listStatus(run.getPath())) {
if (entry.isDirectory() && !entry.getPath().getName().equals(".hoodie")) {
partitionDirs.add(entry.getPath().getName());
}
}
assertTrue(partitionDirs.isEmpty(),
"Sample-writes run at " + run.getPath() + " should have no source partition subdirectories, but found: "
+ Arrays.toString(partitionDirs.toArray()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: partitionDirs.toString() gives the same [a, b, ...] output without the unnecessary toArray() roundtrip — could you simplify?

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
}
}
Loading