feat(utilities): add Postgres and Mysql Debezium CDC transformers#19110
feat(utilities): add Postgres and Mysql Debezium CDC transformers#19110rahil-c wants to merge 6 commits into
Conversation
b7a70a5 to
7d2571b
Compare
Extracts Debezium change-event flattening out of the Debezium *source* and into a standalone, source-agnostic Transformer, so any source producing the raw Debezium envelope (Kafka, S3/file CDC logs, ...) can feed it. This first PR lands the foundation plus the Postgres implementation: - AbstractDebeziumTransformer: picks before(for deletes)/after, surfaces Debezium metadata columns, optional nested (_debezium_metadata) layout, error-table passthrough, and nullability normalization. - PostgresDebeziumTransformer: surfaces txId/lsn/xmin, defaults a null _event_lsn to 0 for snapshot rows, and nests metadata by default. - DebeziumTransformerConfig: hoodie.streamer.transformer.debezium.* configs. Output columns match DebeziumConstants, so the existing PostgresDebeziumAvroPayload keeps working unchanged. Tests: 12 unit tests (image selection, flat vs nested layout, per-subclass nested default + override, snapshot LSN defaulting). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
7d2571b to
a365198
Compare
Adds MysqlDebeziumTransformer alongside the Postgres one: surfaces the MySQL
binlog metadata (file/pos/row) as _event_bin_file/_event_pos/_event_row and
derives the _event_seq ordering column ("<binlog-suffix>.<pos>") consumed by
the existing MySqlDebeziumAvroPayload. Metadata flat by default; nested layout
supported via the shared config. Adds unit tests (flat + nested + seq).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
Thanks for working on this! This PR extracts the Debezium change-event flattening out of the Postgres/Mysql source classes into standalone transformers and adds an optional nested-metadata layout, with good test coverage. The main thing worth double-checking is whether the flat path should still surface db_schema_source_partition for parity with the existing PostgresDebeziumSource, plus a couple of smaller robustness notes in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few naming and simplification suggestions below.
| // When nested fields are disabled, all metadata fields are at root level | ||
| allColumns.addAll(DEFAULT_ROOT_LEVEL_METADATA_COLUMNS); | ||
| allColumns.addAll(DEFAULT_NESTED_METADATA_COLUMNS); | ||
| allColumns.addAll(typeSpecificMetadataColumns); |
There was a problem hiding this comment.
🤖 In flat mode the source.schema → db_schema_source_partition column isn't surfaced, but the original PostgresDebeziumSource.processDataset always emitted it at root and the nested branch here still adds it (lines 154-155). So a Postgres pipeline adopting this transformer with nested.fields.enable=false would silently lose db_schema_source_partition — which the *_source_partition naming suggests is often a partition field. Is dropping it in flat mode intended, or should hasSchemaField be honored here too?
There was a problem hiding this comment.
Confirmed: PostgresDebeziumSource.processDataset emitted db_schema_source_partition at root for both images, so flat mode here is a parity drop. Two caveats worth weighing: db_schema_source_partition is not in DebeziumConstants.META_COLUMNS and has no in-repo consumer, so the partition-field impact is limited to external pipelines that key on it; and the converse also holds - because Postgres nests by default, db_shard_source_partition (which IS in META_COLUMNS) moves off root too under the default config. If parity is the goal, honor hasSchemaField in the flat branch; otherwise call out the dropped/relocated columns in the config docs.
There was a problem hiding this comment.
Not applicable for this PR — this is intentional/pre-existing behavior (the schema column is DB-dependent and only surfaced when nesting is enabled) and out of scope for this change.
| ? dataset.col(DEBEZIUM_METADATA_FIELD + "." + DebeziumConstants.FLATTENED_POS_COL_NAME) | ||
| : dataset.col(DebeziumConstants.FLATTENED_POS_COL_NAME); | ||
|
|
||
| return dataset.withColumn(DebeziumConstants.ADDED_SEQ_COL_NAME, functions.concat( |
There was a problem hiding this comment.
🤖 The legacy generateUniqueSequence UDF validated the binlog coords (fileId == null || trim().isEmpty() || pos == null || pos < 0) and threw a descriptive HoodieReadFromSourceException before building the seq. Here a null file/pos would produce a null _event_seq, which only fails later (and less clearly) in MySqlDebeziumAvroPayload.shouldPickCurrentRecord. Since _event_seq is the ordering/precombine field, might be worth keeping an explicit guard so malformed events fail fast at transform time.
There was a problem hiding this comment.
Confirmed: the original MysqlDebeziumSource.generateUniqueSequence threw HoodieReadFromSourceException on file == null || file.trim().isEmpty() || pos == null || pos < 0 before building the key. applySeqNo drops that guard, and since Spark concat returns null when any argument is null, a malformed event yields a null _event_seq - the precombine field - which only surfaces later in MySqlDebeziumAvroPayload. Restoring an equivalent fail-fast check (or a coalesce plus assertion) in applySeqNo keeps the early, descriptive failure.
There was a problem hiding this comment.
Not applicable for this PR — this is pre-existing, intentional behavior and out of scope for this change. Open to adding an explicit guard as a follow-up if there's appetite for it.
| // Extract LSN column to root level, keep other metadata nested | ||
| for (Column col : typeSpecificMetadataColumns) { | ||
| String colStr = col.toString(); | ||
| if (colStr.contains(DebeziumConstants.FLATTENED_LSN_COL_NAME)) { |
There was a problem hiding this comment.
🤖 Identifying the LSN column via col.toString().contains("_event_lsn") is a little fragile — it depends on how Spark renders the alias in Column.toString() and would also match any future type-specific column whose expression text happens to contain _event_lsn. Since the subclass already builds typeSpecificMetadataColumns and knows which one is the LSN, could it pass that explicitly (e.g. a dedicated constructor param) instead of string-matching?
There was a problem hiding this comment.
This works today only because Spark's Alias.toString() embeds the alias name (it renders as source.lsn AS _event_lsn); any change to how the column is constructed or rendered would silently break LSN-to-root routing with no test catching it. Passing the LSN column (or its output name) explicitly from the subclass, as suggested, removes the string match entirely.
There was a problem hiding this comment.
Not applicable for this PR — this is pre-existing behavior and out of scope for this change.
| private static final List<Column> DEFAULT_ROOT_LEVEL_METADATA_COLUMNS = Arrays.asList( | ||
| new Column(DebeziumConstants.INCOMING_OP_FIELD).alias(DebeziumConstants.FLATTENED_OP_COL_NAME)); | ||
|
|
||
| private static final List<Column> DEFAULT_NESTED_METADATA_COLUMNS = Arrays.asList( |
There was a problem hiding this comment.
🤖 nit: DEFAULT_NESTED_METADATA_COLUMNS is a bit misleading — in flat mode these columns are spread directly to root level (line 169), so calling them "nested" implies they only live inside the struct. Something like DEFAULT_COMMON_METADATA_COLUMNS would better capture that their placement varies by mode.
There was a problem hiding this comment.
Not applicable for this PR — the naming is pre-existing and out of scope for this change.
|
|
||
| allColumns.add(new Column(String.format("%s.*", DATA_FIELD))); | ||
|
|
||
| if (ConfigUtils.getBooleanWithAltKeys(props, ERROR_TABLE_ENABLED)) { |
There was a problem hiding this comment.
🤖 nit: Arrays.stream(...).collect(Collectors.toList()).contains(...) could be simplified to Arrays.asList(rowDataset.columns()).contains(...) — same result with less noise.
There was a problem hiding this comment.
Not applicable for this PR — this is pre-existing style and out of scope for this change.
| */ | ||
| private static Dataset<Row> useDefaultValuesForLsnIfNull(Dataset<Row> dataset) { | ||
| if (!Arrays.asList(dataset.columns()).contains(DebeziumConstants.FLATTENED_LSN_COL_NAME)) { | ||
| return dataset; |
There was a problem hiding this comment.
🤖 nit: useDefaultValuesForLsnIfNull reads more like a boolean flag or predicate ("should we use defaults?") than a transformation action. Something like defaultNullSnapshotLsn or fillMissingSnapshotLsn would more clearly signal that this method mutates the dataset.
There was a problem hiding this comment.
Not applicable for this PR — pre-existing naming and out of scope for this change.
| * <p>Subclasses configure the database-specific behavior purely through the constructor; there is | ||
| * no abstract method to implement. | ||
| */ | ||
| public class AbstractDebeziumTransformer implements Transformer { |
There was a problem hiding this comment.
This class has only protected constructors, no abstract methods, and a javadoc stating it is meant to be subclassed, yet it is declared as a concrete public class. Every other Abstract* type in the repo is declared abstract (e.g. its sibling AbstractDebeziumAvroPayload), and nothing instantiates this one directly. Declare it public abstract class so the name matches the contract and direct instantiation is prevented at compile time.
…s enabled When the Debezium transformer nests CDC metadata under the _debezium_metadata struct (hoodie.streamer.transformer.debezium.nested.fields.enable=true), the MySQL ordering columns (_event_bin_file, _event_pos) move into that struct, so the inferred ORDERING_FIELDS must reference the nested path. handlePayloadAdhocConfigs previously hardcoded the flat names, producing a wrong ordering field in nested mode. - Add shared DebeziumConstants.DEBEZIUM_METADATA_FIELD (referenced by the transformer too). - Thread a nestedDebeziumMetadataEnabled flag via overloads of inferMergingConfigsForWrites / inferMergingConfigsForV9TableCreation (existing signatures preserved, default false; reader path and other callers unchanged). HoodieStreamer passes the real flag. - MySQL ordering fields get the _debezium_metadata. prefix when nested; Postgres _event_lsn and the operation-type delete key stay at root (transformer keeps them there), so unchanged. - Test: TestHoodieTableConfig#testInferMergingConfigsNestedDebeziumOrderingFields. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
@rmahindra @yihua let me know if we need any integration test or heavier functional test or if the unit tests in this pr suffice? |
…a branches AbstractDebeziumTransformer's error-table corrupt-record passthrough and the schema.nullable.enable=false (nullability-preservation) branch had no test coverage. Add cases for both, plus a focused hudi-common test verifying HoodieTableConfig resolves the Debezium ordering field correctly whether or not nested metadata is enabled (kept in hudi-common's own test tree so its coverage is attributed to the module that owns the logic). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…overloads inferMergingConfigsForV9TableCreation/inferMergingConfigsForWrites' 5-arg overloads (kept for existing callers, delegate to the new 6-arg version with nestedDebeziumMetadataEnabled=false) had no direct test coverage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…nt metadata-field alias - AbstractDebeziumTransformer#apply(): the final nullability rebuild only needs to force non-nullable for columns known to be non-nullable in the source row; every other column (including Debezium metadata columns) should end up nullable, regardless of their nullability in the raw envelope schema. - Remove AbstractDebeziumTransformer.DEBEZIUM_METADATA_FIELD, a redundant delegate to DebeziumConstants.DEBEZIUM_METADATA_FIELD kept around only to avoid touching a few callers; reference DebeziumConstants directly instead.
Describe the issue this Pull Request addresses
Summary and Changelog
Impact
Risk Level
Documentation Update
Contributor's checklist