Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -44,6 +45,7 @@
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
Expand Down Expand Up @@ -108,7 +110,11 @@ private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, Option
try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
return recordsOpt.map(records -> {
List<HoodieRecord> samples = records.coalesce(1).take(size);
// Empty partition path so all sampled records write to a single non-partitioned file,
// instead of fanning out into one tiny file per source partition and skewing the estimate.
List<HoodieRecord> samples = records.coalesce(1).take(size).stream()
.map(r -> r.newInstance(new HoodieKey(r.getRecordKey(), "")))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Collapsing the sample to a single non-partitioned file maximizes parquet footer/dictionary amortization, which is why the estimate drops so much (~779 → ~337). But the real target table is partitioned, so its per-record on-disk size sits somewhere between these depending on how many records each partition actually accumulates per commit. Since this estimate drives copyOnWriteRecordSizeEstimate (file bin-packing), could the single-file layout under-estimate and lead to larger-than-target files for sources whose partitions stay sparse? Curious whether you considered keeping a bounded number of partitions rather than fully flattening to one.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impact is bounded to the first commit: getWriteConfigWithRecordSizeEstimate early-returns once the timeline is non-empty (SparkSampleWritesUtils.java:69), and from the next commit on AverageRecordSizeEstimator.averageBytesPerRecord recomputes from real partitioned commit stats, using this config only as the empty-timeline fallback (AverageRecordSizeEstimator.java:70-105). That estimator also subtracts a per-file metadata estimate and skips below-threshold commits (AverageRecordSizeEstimator.java:86,90,93), so the target semantic is data bytes per record, not footer overhead - which is what the single-file sample now measures. A sparse-partition under-estimate self-corrects after the first commit, and a bounded-partition layout would partly reintroduce the footer overhead this removes.

.collect(Collectors.toList());
if (samples.isEmpty()) {
return emptyRes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.streamer.SparkSampleWritesUtils;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -86,7 +93,7 @@ public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws Excepti
}

@Test
public void overwriteRecordSizeEstimateForEmptyTable() {
void overwriteRecordSizeEstimateForEmptyTable() throws IOException {
int originalRecordSize = 100;
TypedProperties props = new TypedProperties();
props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
Expand All @@ -99,9 +106,63 @@ public void overwriteRecordSizeEstimateForEmptyTable() {
.build();

String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1);
// Input spans multiple source partitions; the sample-writes table must still be non-partitioned.
JavaRDD<HoodieRecord> records = jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2);
Option<HoodieWriteConfig> writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig);
assertTrue(writeConfigOpt.isPresent());
assertEquals(779.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0);
assertEquals(337.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0);
assertSampleWritesNonPartitioned();
}

@Test
void sampleWritesAreNonPartitionedEvenForManyPartitionInput() throws IOException {
int recordsPerPartition = 50;
String[] partitionPaths = IntStream.range(0, 20)
.mapToObj(i -> String.format("year=2024/month=01/day=%02d", i + 1))
.toArray(String[]::new);
HoodieTestDataGenerator manyPartitionGen = new HoodieTestDataGenerator(partitionPaths);

TypedProperties props = new TypedProperties();
props.put(HoodieStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProperties(props)
.forTable("foo")
.withPath(basePath())
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.build();

String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1);
List<HoodieRecord> allRecords = new ArrayList<>();
for (String partition : partitionPaths) {
allRecords.addAll(manyPartitionGen.generateInsertsForPartition(commitTime, recordsPerPartition, partition));
}
JavaRDD<HoodieRecord> records = jsc().parallelize(allRecords, 4);

Option<HoodieWriteConfig> writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), writeConfig);
assertTrue(writeConfigOpt.isPresent(), "Sample write should produce a record-size estimate.");
assertSampleWritesNonPartitioned();
}

/**
* Fails if the sample-writes folder contains any source-partition subdirectory, i.e. the
* sample write was not flattened into a single non-partitioned file.
*/
private void assertSampleWritesNonPartitioned() throws IOException {
Path sampleWritesPath = new Path(basePath(), ".hoodie/.aux/.sample_writes");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you use SAMPLE_WRITES_FOLDER_PATH here instead of the hardcoded string? The constant is already imported in SparkSampleWritesUtils; if the path changes, this assertion would silently walk the wrong directory.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you build this from the SAMPLE_WRITES_FOLDER_PATH constant instead of hardcoding .hoodie/.aux/.sample_writes? Keeps the test from silently breaking if that path ever changes.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

FileSystem fs = sampleWritesPath.getFileSystem(jsc().hadoopConfiguration());
assertTrue(fs.exists(sampleWritesPath), "Sample-writes folder should exist after a sample write.");
FileStatus[] runs = fs.listStatus(sampleWritesPath);
assertTrue(runs.length > 0, "Sample-writes folder should contain at least one run.");
for (FileStatus run : runs) {
List<String> partitionDirs = new ArrayList<>();
for (FileStatus entry : fs.listStatus(run.getPath())) {
if (entry.isDirectory() && !entry.getPath().getName().equals(".hoodie")) {
partitionDirs.add(entry.getPath().getName());
}
}
assertTrue(partitionDirs.isEmpty(),
"Sample-writes run at " + run.getPath() + " should have no source partition subdirectories, but found: "
+ Arrays.toString(partitionDirs.toArray()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: partitionDirs.toString() gives the same [a, b, ...] output without the unnecessary toArray() roundtrip — could you simplify?

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
}
}
Loading