fix(streamer): write non-partitioned sample-writes table for record-size estimation#19115
fix(streamer): write non-partitioned sample-writes table for record-size estimation#19115yihua wants to merge 3 commits into
Conversation
…ize estimation The first-batch sample write now rewrites each record with an empty partition path so the auxiliary sample-writes table is non-partitioned. Previously, when the incoming batch spanned many source partitions, the bulk insert (run with parallelism 1) emitted at least one tiny file per partition path, which slowed the sample write and inflated the per-record size estimate through per-file metadata overhead. Writing a single non-partitioned file yields a faster sample write and a more accurate estimate.
hudi-agent
left a comment
There was a problem hiding this comment.
Thanks for working on this! The PR rewrites each sampled record with an empty partition path so the auxiliary sample-writes table is written non-partitioned, making the first-batch record-size estimate faster and avoiding per-source-partition file fan-out. The mechanics look correct — newInstance(HoodieKey) preserves the payload, operation, and ordering value while only swapping the key. One point on estimation semantics worth double-checking in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor suggestions below: preferring the existing SAMPLE_WRITES_FOLDER_PATH constant over a hardcoded path string, and a small simplification in the assertion message.
| // partitions fans out into many tiny files even though parallelism is 1, slowing | ||
| // the sample write and inflating per-file metadata in the size estimate. | ||
| List<HoodieRecord> samples = records.coalesce(1).take(size).stream() | ||
| .map(r -> r.newInstance(new HoodieKey(r.getRecordKey(), ""))) |
There was a problem hiding this comment.
🤖 Collapsing the sample to a single non-partitioned file maximizes parquet footer/dictionary amortization, which is why the estimate drops so much (~779 → ~337). But the real target table is partitioned, so its per-record on-disk size sits somewhere between these depending on how many records each partition actually accumulates per commit. Since this estimate drives copyOnWriteRecordSizeEstimate (file bin-packing), could the single-file layout under-estimate and lead to larger-than-target files for sources whose partitions stay sparse? Curious whether you considered keeping a bounded number of partitions rather than fully flattening to one.
There was a problem hiding this comment.
Impact is bounded to the first commit: getWriteConfigWithRecordSizeEstimate early-returns once the timeline is non-empty (SparkSampleWritesUtils.java:69), and from the next commit on AverageRecordSizeEstimator.averageBytesPerRecord recomputes from real partitioned commit stats, using this config only as the empty-timeline fallback (AverageRecordSizeEstimator.java:70-105). That estimator also subtracts a per-file metadata estimate and skips below-threshold commits (AverageRecordSizeEstimator.java:86,90,93), so the target semantic is data bytes per record, not footer overhead - which is what the single-file sample now measures. A sparse-partition under-estimate self-corrects after the first commit, and a bounded-partition layout would partly reintroduce the footer overhead this removes.
| * subdirectories named after the source partition paths. | ||
| */ | ||
| private void assertSampleWritesNonPartitioned() throws IOException { | ||
| Path sampleWritesPath = new Path(basePath(), ".hoodie/.aux/.sample_writes"); |
There was a problem hiding this comment.
🤖 nit: could you use SAMPLE_WRITES_FOLDER_PATH here instead of the hardcoded string? The constant is already imported in SparkSampleWritesUtils; if the path changes, this assertion would silently walk the wrong directory.
| } | ||
| assertTrue(partitionDirs.isEmpty(), | ||
| "Sample-writes run at " + run.getPath() + " should have no source partition subdirectories, but found: " | ||
| + Arrays.toString(partitionDirs.toArray())); |
There was a problem hiding this comment.
🤖 nit: partitionDirs.toString() gives the same [a, b, ...] output without the unnecessary toArray() roundtrip — could you simplify?
hudi-agent
left a comment
There was a problem hiding this comment.
Thanks for working on this! This PR rewrites each sampled record with an empty partition path so the auxiliary sample-writes table is non-partitioned, making the first-batch record-size estimate faster and less skewed by per-file metadata overhead. I traced the record newInstance semantics, the record-based bulk-insert partitioning (which routes by the record key's partition path, confirming the flattening works), and the estimate math — the change looks functionally sound. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One minor maintainability suggestion on a hardcoded path in the test.
cc @yihua
| * sample write was not flattened into a single non-partitioned file. | ||
| */ | ||
| private void assertSampleWritesNonPartitioned() throws IOException { | ||
| Path sampleWritesPath = new Path(basePath(), ".hoodie/.aux/.sample_writes"); |
There was a problem hiding this comment.
🤖 nit: could you build this from the SAMPLE_WRITES_FOLDER_PATH constant instead of hardcoding .hoodie/.aux/.sample_writes? Keeps the test from silently breaking if that path ever changes.
lokeshj1703
left a comment
There was a problem hiding this comment.
@yihua Thanks for working on this! Apart from minor comments from agent, the changes look good to me.
Describe the issue this Pull Request addresses
When
hoodie.streamer.sample.writes.enabledis on, the first batch's sample write estimates the average record size by writing a sample to the auxiliary.hoodie/.aux/.sample_writestable. The sample bulk insert runs with parallelism 1, but it routes each record to a file by the record's partition path. When the incoming batch spans many source partitions, the write fans out into one tiny file per partition, which slows the sample write and inflates the per-record size estimate through per-file metadata overhead.Summary and Changelog
Rewrite each sampled record with an empty partition path before the sample bulk insert, so the auxiliary sample-writes table is effectively non-partitioned and all sampled records land in a single file. The sample write is faster and the record-size estimate is more accurate (one parquet footer/dictionary amortized over all records instead of one per source partition).
SparkSampleWritesUtils.doSampleWrites: map each record tonewInstance(new HoodieKey(recordKey, ""))before the bulk insert.TestSparkSampleWritesUtils: assert the sample-writes folder has no source-partition subdirectories, and add a test with 20 source partitions. The empty-table estimate adjusts from ~779 to ~337 to reflect the single-file layout.Impact
No public API or config change. Faster and more accurate first-batch record-size estimation for partitioned sources that use Hudi Streamer sample writes.
Risk Level
low
The change only affects the throwaway sample-writes table used for size estimation. The estimate is derived from write-stat byte and record counts (partition-agnostic), so flattening the layout does not change correctness. Covered by unit tests including a 20-partition case.
Documentation Update
none
Contributor's checklist