From f6bd9daaa643fdd4b7ac0ac527d1afc35f56b532 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 9 Jun 2026 11:28:30 +0000 Subject: [PATCH 1/3] docs: add project context and architecture diagrams for spanner-to-sourcedb --- v2/spanner-to-sourcedb/architecture.dot | 42 ++++ v2/spanner-to-sourcedb/architecture.svg | 226 ++++++++++++++++++++++ v2/spanner-to-sourcedb/project-context.md | 80 ++++++++ 3 files changed, 348 insertions(+) create mode 100644 v2/spanner-to-sourcedb/architecture.dot create mode 100644 v2/spanner-to-sourcedb/architecture.svg create mode 100644 v2/spanner-to-sourcedb/project-context.md diff --git a/v2/spanner-to-sourcedb/architecture.dot b/v2/spanner-to-sourcedb/architecture.dot new file mode 100644 index 0000000000..ec1159fa27 --- /dev/null +++ b/v2/spanner-to-sourcedb/architecture.dot @@ -0,0 +1,42 @@ +digraph G { + node [shape=box, style=filled, color=lightblue]; + + Spanner [label="Cloud Spanner\n(Change Streams)", shape=cylinder, color=lightgrey]; + MetadataDb [label="Spanner Metadata DB\n(Shadow Tables)", shape=cylinder, color=lightgrey]; + SourceDb [label="Source Database\n(MySQL / PostgreSQL / Cassandra)", shape=cylinder, color=lightyellow]; + DLQ [label="GCS Dead Letter Queue\n(DLQ)", shape=cylinder, color=lightpink]; + PubSub [label="Pub/Sub (DLQ Retry)"]; + + SpannerIOReader [label="SpannerIO\nChange Stream Reader"]; + InformationSchema [label="SpannerInformationSchemaProcessor"]; + Filter [label="FilterRecordsFn\n(Forward Migration Filter)"]; + Preprocess [label="PreprocessRecordsFn"]; + Reshuffle [label="Reshuffle\n(Random Key)"]; + + SourceWriter [label="SourceWriterTransform"]; + Processor [label="InputRecordProcessor"]; + DMLGenerator [label="DMLGenerator\n(MySQL, PG, Cassandra)"]; + DAO [label="Source DAOs\n(JdbcDao, CassandraDao)"]; + DLQWriter [label="DLQWriteTransform"]; + DLQReconsumer [label="DLQ Reconsumer\n(PubSub or Continuous)"]; + + Spanner -> SpannerIOReader [label=" CDC Events"]; + MetadataDb -> InformationSchema [label=" DDL / Metatdata"]; + SpannerIOReader -> Reshuffle; + Reshuffle -> Filter; + Filter -> Preprocess; + + DLQReconsumer -> Preprocess [label=" Retryable DLQ Events"]; + PubSub -> DLQReconsumer; + DLQ -> DLQReconsumer; + + Preprocess -> SourceWriter [label=" TrimmedShardedDataChangeRecord"]; + SourceWriter -> Processor; + Processor -> DMLGenerator; + Processor -> MetadataDb [label=" Check/Update Shadow Table (processed_commit_ts)"]; + DMLGenerator -> DAO; + DAO -> SourceDb [label=" Apply Mutations"]; + + SourceWriter -> DLQWriter [label=" Errors / Exceptions"]; + DLQWriter -> DLQ; +} diff --git a/v2/spanner-to-sourcedb/architecture.svg b/v2/spanner-to-sourcedb/architecture.svg new file mode 100644 index 0000000000..ddaac125ea --- /dev/null +++ b/v2/spanner-to-sourcedb/architecture.svg @@ -0,0 +1,226 @@ + + + + + + +G + + + +Spanner + + +Cloud Spanner +(Change Streams) + + + +SpannerIOReader + +SpannerIO +Change Stream Reader + + + +Spanner->SpannerIOReader + + + CDC Events + + + +MetadataDb + + +Spanner Metadata DB +(Shadow Tables) + + + +InformationSchema + +SpannerInformationSchemaProcessor + + + +MetadataDb->InformationSchema + + + DDL / Metatdata + + + +SourceDb + + +Source Database +(MySQL / PostgreSQL / Cassandra) + + + +DLQ + + +GCS Dead Letter Queue +(DLQ) + + + +DLQReconsumer + +DLQ Reconsumer +(PubSub or Continuous) + + + +DLQ->DLQReconsumer + + + + + +PubSub + +Pub/Sub (DLQ Retry) + + + +PubSub->DLQReconsumer + + + + + +Reshuffle + +Reshuffle +(Random Key) + + + +SpannerIOReader->Reshuffle + + + + + +Filter + +FilterRecordsFn +(Forward Migration Filter) + + + +Preprocess + +PreprocessRecordsFn + + + +Filter->Preprocess + + + + + +SourceWriter + +SourceWriterTransform + + + +Preprocess->SourceWriter + + + TrimmedShardedDataChangeRecord + + + +Reshuffle->Filter + + + + + +Processor + +InputRecordProcessor + + + +SourceWriter->Processor + + + + + +DLQWriter + +DLQWriteTransform + + + +SourceWriter->DLQWriter + + + Errors / Exceptions + + + +Processor->MetadataDb + + + Check/Update Shadow Table (processed_commit_ts) + + + +DMLGenerator + +DMLGenerator +(MySQL, PG, Cassandra) + + + +Processor->DMLGenerator + + + + + +DAO + +Source DAOs +(JdbcDao, CassandraDao) + + + +DMLGenerator->DAO + + + + + +DAO->SourceDb + + + Apply Mutations + + + +DLQWriter->DLQ + + + + + +DLQReconsumer->Preprocess + + + Retryable DLQ Events + + + diff --git a/v2/spanner-to-sourcedb/project-context.md b/v2/spanner-to-sourcedb/project-context.md new file mode 100644 index 0000000000..cda5c62ff7 --- /dev/null +++ b/v2/spanner-to-sourcedb/project-context.md @@ -0,0 +1,80 @@ +# Project Context: Spanner to SourceDb (Reverse Replication) + + + +## Overview + +* **Core Intent**: A streaming reverse migration Dataflow pipeline to replicate data from Cloud Spanner back into various Source Databases (MySQL, PostgreSQL, Cassandra). It implements per-primary-key ordering guarantees using shadow tables, allowing for high throughput. +* **Primary Users**: SREs and external customers migrating off Cloud Spanner and executing a "cut-back" to their original source database. +* **Critical SLOs/Guarantees**: + * Per-primary-key ordering guarantee (relaxed from per-shard ordering). +* **Terminology**: + * **Reverse Replication**: Replicating data from Spanner to the Source DB. + * **Change Streams**: Spanner's CDC mechanism. + * **Shadow Table**: Spanner metadata table used to track `processed_commit_ts` per primary key to prevent out-of-order writes. + * **Data Freshness**: Pipeline lag indicator. + * **Cut-back**: The process of shifting application write traffic back to the source. + * **DLQ:** Dead Letter Queue (for failed records). + +## Technical Details + +* **Tech Stack & Versions**: + + * **Languages**: Java 17 + * **Frameworks/Libraries**: Apache Beam 2.73.0, Maven, HikariCP 5.0.1, Spanner Migrations SDK. + * **Key Technologies**: Cloud Spanner, Cloud Storage (GCS) DLQ, MySQL, PostgreSQL, Cassandra. +* **Code Location**: `v2/spanner-to-sourcedb` +* **Data Flow**: Data is captured via Spanner Change Streams -> Filtered/Preprocessed -> Dataflow ensures same-key records are processed by the same thread -> The `processed_commit_ts` is checked/updated in a Spanner shadow table -> Written to Source Shards via JDBC/Cassandra drivers. Failures are written to a GCS Dead Letter Queue (DLQ) for retries. +* **Project Structure (Logical Architecture Mapping)**: + + * `src/main/java/com/google/cloud/teleport/v2/templates`: Main pipeline definition (`SpannerToSourceDb.java`). + * `src/main/java/com/google/cloud/teleport/v2/templates/changestream`: Utilities and convertors for parsing Spanner Change Streams (e.g., `TrimmedShardedDataChangeRecord`). + * `src/main/java/com/google/cloud/teleport/v2/templates/constants`: Project-level constants. + * `src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection`: Connection pool helpers (Hikari for JDBC, Cassandra connections). + * `src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source`: DAOs for Source DB interaction (`JdbcDao.java`, `CassandraDao.java`). + * `src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/spanner`: DAOs for Shadow Table metadata tracking (`SpannerDao.java`). + * `src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml`: DML generators for target databases (`MySQLDMLGenerator`, `PostgreSQLDMLGenerator`, `CassandraDMLGenerator`). + * `src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor`: Processors for handling mapped input records (`InputRecordProcessor`). + * `src/main/java/com/google/cloud/teleport/v2/templates/exceptions`: Custom runtime exceptions. + * `src/main/java/com/google/cloud/teleport/v2/templates/models`: POJOs and AutoValue models. + * `src/main/java/com/google/cloud/teleport/v2/templates/transforms`: Custom Beam transformations (e.g., `SourceWriterTransform`, `SpannerInformationSchemaProcessorTransform`). + * `src/main/java/com/google/cloud/teleport/v2/templates/utils`: Generic pipeline utilities. +* **Build/Run Commands**: + + See the `README_Spanner_to_SourceDb.md` file for instructions on building and running the pipeline. + +## Documentation + +* **Architecture Diagram & Dependency Tree**: [architecture.svg](architecture.svg) (Source: `architecture.dot`). + + * **Rule**: Always keep the `.dot` and `.svg` files in sync. If you modify the architecture, you MUST regenerate the `.svg` from the `.dot` file. + +## AI Agent Tips + +* **Common Tasks**: Adding DML generators for new databases, handling new types of schema overrides, improving unit test coverage, updating DLQ processing logic. +* **Coding Standards & Best Practices**: + * **Parallelism**: Controlled via `maxShardConnections` per shard configuration. + * **Stale Writes**: Any writes with an older timestamp than what is recorded in the shadow table must be explicitly skipped to prevent data corruption. + * **AutoValue**: Use `AutoValue` for POJOs and models. Ensure all required variables are set before building. + * **Beam Paradigms**: Strictly adhere to Apache Beam constructs (`PTransform`, `DoFn`). Use `TupleTag` for handling multiple outputs like DLQ side channels. + * **Serializability**: All variables within `PTransform` and `DoFn` must be serializable. For non-serializable objects (like JDBC Connections, `HikariDataSource`, or `SpannerDao`), mark them as `transient` and initialize them within `@Setup` or `@StartBundle` methods.s + * **Connection Management**: Use the Hikari connection pool (preferred over DBCP2). Always ensure connections are safely returned to the pool (using `try-with-resources` or `try-finally` blocks) to prevent connection leakages. + * **Security**: NEVER log sensitive credentials, connection strings, or customer PII. Use `structured-logging` instead of standard output. + * **Formatting**: Always run `mvn spotless:apply -pl v2/spanner-to-sourcedb -am` before committing to adhere to project formatting standards. +* **Testing Frameworks & Guidelines**: + * **Frameworks**: JUnit 4, AssertJ, and Mockito. + * **Unit Tests**: Use `@RunWith(JUnit4.class)`. Strive for at least 80% coverage. Mock database responses with Mockito to ensure fast, isolated tests. + * **Non-Destructive Refactoring**: Append new dedicated test methods for new functionality. Do not arbitrarily delete or rewrite existing tests unless addressing a breaking API change. + * **100% Branch & Exception Coverage**: Ensure `if/else` paths and caught/thrown exceptions are fully asserted in tests using AssertJ's `assertThatThrownBy` or JUnit's `assertThrows`. +* **Areas to be Careful (Gotchas)**: + * **Integration Tests**: NEVER execute `*IT.java` (Integration) or `*LT.java` (Load) test suites during local development/machine verification. Only execute `*Test.java` (Unit) locally. + * **DML Generation Syntax**: When updating `MySQLDMLGenerator`, `PostgreSQLDMLGenerator`, or `CassandraDMLGenerator`, verify that the generated SQL/CQL syntax is strictly valid for that specific dialect and version. + * **Shadow Table Lock Contention**: Be cautious when adding transactional reads/writes around shadow tables. High throughput updates increase Spanner's load and can create lock contention. Keep transactions fast and localized. + * **Foreign Keys & Retries**: Parent-child ordering relies entirely on retry loops and SpannerIO's straggler handling. Changes to the error tag logic in `SourceWriterFn` or `AssignShardIdFn` can silently break foreign key insertions. Retryable errors expected in the DLQ include Foreign Key violations, Spanner `RESOURCE_EXHAUSTED` (shadow tables), and Source DB transient network issues. + * **Metric Reliability**: Metrics like `success_record_count` may be skewed if worker restarts occur (causing re-processing). Do not rely on them for strict transactional accounting. + * **Resume Strategy**: When resuming a paused job, users must supply a `startTimestamp` matching the previous `DataFreshness` minus a 10-minute buffer to ensure no events are dropped. + * **1:1 Row Mapping Assumption**: The pipeline assumes a single Spanner row does not map to more than one source row. +* **Example PRs**: + * [d1dbadb17](https://github.com/GoogleCloudPlatform/DataflowTemplates/commit/d1dbadb17) - Feature: Adding support for PostgreSQL as source in reverse replication. + * [74e5f1fe1](https://github.com/GoogleCloudPlatform/DataflowTemplates/commit/74e5f1fe1) - Feature/Fix: Fail fast if MySQL destination is read-only. + * [23310bcea](https://github.com/GoogleCloudPlatform/DataflowTemplates/commit/23310bcea) - Bug Fix: Avoid multiple GCS reads for constructing schema mapper. From 1886bf528ccbaae6cb26a68bde1e979135bf658f Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Mon, 15 Jun 2026 17:13:15 +0000 Subject: [PATCH 2/3] review changes --- v2/spanner-to-sourcedb/architecture.dot | 2 +- v2/spanner-to-sourcedb/architecture.svg | 2 +- v2/spanner-to-sourcedb/project-context.md | 40 +++++++++++++++++++++-- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/v2/spanner-to-sourcedb/architecture.dot b/v2/spanner-to-sourcedb/architecture.dot index ec1159fa27..ce353c975c 100644 --- a/v2/spanner-to-sourcedb/architecture.dot +++ b/v2/spanner-to-sourcedb/architecture.dot @@ -21,7 +21,7 @@ digraph G { DLQReconsumer [label="DLQ Reconsumer\n(PubSub or Continuous)"]; Spanner -> SpannerIOReader [label=" CDC Events"]; - MetadataDb -> InformationSchema [label=" DDL / Metatdata"]; + MetadataDb -> InformationSchema [label=" DDL / Metadata"]; SpannerIOReader -> Reshuffle; Reshuffle -> Filter; Filter -> Preprocess; diff --git a/v2/spanner-to-sourcedb/architecture.svg b/v2/spanner-to-sourcedb/architecture.svg index ddaac125ea..0843efe9a5 100644 --- a/v2/spanner-to-sourcedb/architecture.svg +++ b/v2/spanner-to-sourcedb/architecture.svg @@ -50,7 +50,7 @@ MetadataDb->InformationSchema - DDL / Metatdata + DDL / Metadata diff --git a/v2/spanner-to-sourcedb/project-context.md b/v2/spanner-to-sourcedb/project-context.md index cda5c62ff7..e954dedf88 100644 --- a/v2/spanner-to-sourcedb/project-context.md +++ b/v2/spanner-to-sourcedb/project-context.md @@ -10,7 +10,7 @@ * Per-primary-key ordering guarantee (relaxed from per-shard ordering). * **Terminology**: * **Reverse Replication**: Replicating data from Spanner to the Source DB. - * **Change Streams**: Spanner's CDC mechanism. + * **Change Streams**: Spanner's CDC (Change Data Capture) mechanism that captures and emits row-level data modifications (inserts, updates, deletes) in near real-time, providing the source events for the reverse replication pipeline. * **Shadow Table**: Spanner metadata table used to track `processed_commit_ts` per primary key to prevent out-of-order writes. * **Data Freshness**: Pipeline lag indicator. * **Cut-back**: The process of shifting application write traffic back to the source. @@ -48,16 +48,50 @@ * **Architecture Diagram & Dependency Tree**: [architecture.svg](architecture.svg) (Source: `architecture.dot`). * **Rule**: Always keep the `.dot` and `.svg` files in sync. If you modify the architecture, you MUST regenerate the `.svg` from the `.dot` file. +* **User Guide**: [README.md](README.md) contains operational guidelines, prerequisites, troubleshooting, and customization instructions. + +## Sharding Configuration + +* **Sharding and Routing**: Reverse replication uses a shard identifier column per table to route Spanner records to a given source shard. The value of this column corresponds to the `logicalShardId` specified in the shard configuration file. +* **MySQL vs Cassandra Configurations**: + * **MySQL/PostgreSQL**: Uses a JSON array specifying connection details per logical shard. + ```json + [ + { + "logicalShardId": "shard1", + "host": "10.11.12.13", + "user": "root", + "secretManagerUri": "projects/123/secrets/rev-cmek-cred-shard1/versions/latest", + "port": "3306", + "dbName": "db1" + } + ] + ``` + * **Cassandra**: Uses a single HOCON format file since Cassandra inherently handles cluster routing, meaning no `logicalShardId` mapping is required at the Dataflow level. + ```hocon + datastax-java-driver { + basic.contact-points = ["10.244.21.233:9042"] + basic.session-keyspace = "keyspace_name" + basic.load-balancing-policy { + local-datacenter = "datacenter1" + } + advanced.auth-provider { + class = PlainTextAuthProvider + username = "root" + password = "admin" + } + } + ``` ## AI Agent Tips * **Common Tasks**: Adding DML generators for new databases, handling new types of schema overrides, improving unit test coverage, updating DLQ processing logic. * **Coding Standards & Best Practices**: - * **Parallelism**: Controlled via `maxShardConnections` per shard configuration. + * **Parallelism and Connection Limits**: Parallelism for writing to the source database is constrained by the `maxShardConnections` setting in the shard configuration. This acts as a throttling mechanism to ensure Dataflow workers do not overwhelm the target source database with too many concurrent connections. * **Stale Writes**: Any writes with an older timestamp than what is recorded in the shadow table must be explicitly skipped to prevent data corruption. * **AutoValue**: Use `AutoValue` for POJOs and models. Ensure all required variables are set before building. * **Beam Paradigms**: Strictly adhere to Apache Beam constructs (`PTransform`, `DoFn`). Use `TupleTag` for handling multiple outputs like DLQ side channels. - * **Serializability**: All variables within `PTransform` and `DoFn` must be serializable. For non-serializable objects (like JDBC Connections, `HikariDataSource`, or `SpannerDao`), mark them as `transient` and initialize them within `@Setup` or `@StartBundle` methods.s + * **Serializability**: All variables within `PTransform` and `DoFn` must be serializable. For non-serializable objects (like JDBC Connections, `HikariDataSource`, or `SpannerDao`), mark them as `transient` and initialize them within `@Setup` or `@StartBundle` methods. * **Connection Management**: Use the Hikari connection pool (preferred over DBCP2). Always ensure connections are safely returned to the pool (using `try-with-resources` or `try-finally` blocks) to prevent connection leakages. * **Security**: NEVER log sensitive credentials, connection strings, or customer PII. Use `structured-logging` instead of standard output. * **Formatting**: Always run `mvn spotless:apply -pl v2/spanner-to-sourcedb -am` before committing to adhere to project formatting standards. From b36adcde5d456bd213df3d35e7c611e29679291a Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Mon, 15 Jun 2026 18:57:30 +0000 Subject: [PATCH 3/3] reference improvements --- v2/spanner-to-sourcedb/project-context.md | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/v2/spanner-to-sourcedb/project-context.md b/v2/spanner-to-sourcedb/project-context.md index e954dedf88..7db0a105b0 100644 --- a/v2/spanner-to-sourcedb/project-context.md +++ b/v2/spanner-to-sourcedb/project-context.md @@ -1,6 +1,13 @@ # Project Context: Spanner to SourceDb (Reverse Replication) - + ## Overview @@ -100,14 +107,16 @@ * **Unit Tests**: Use `@RunWith(JUnit4.class)`. Strive for at least 80% coverage. Mock database responses with Mockito to ensure fast, isolated tests. * **Non-Destructive Refactoring**: Append new dedicated test methods for new functionality. Do not arbitrarily delete or rewrite existing tests unless addressing a breaking API change. * **100% Branch & Exception Coverage**: Ensure `if/else` paths and caught/thrown exceptions are fully asserted in tests using AssertJ's `assertThatThrownBy` or JUnit's `assertThrows`. -* **Areas to be Careful (Gotchas)**: +* **Core Architectural Decisions**: + * **1:1 Row Mapping Assumption**: The pipeline assumes a single Spanner row does not map to more than one source row. + * **Resume Strategy**: When resuming a paused job, users must supply a `startTimestamp` matching the previous `DataFreshness` minus a 10-minute buffer to ensure no events are dropped. + * **Foreign Keys & Retries**: Parent-child ordering relies entirely on retry loops and SpannerIO's straggler handling. Changes to the error tag logic in `SourceWriterFn` or `AssignShardIdFn` can silently break foreign key insertions. Retryable errors expected in the DLQ include Foreign Key violations, Spanner `RESOURCE_EXHAUSTED` (shadow tables), and Source DB transient network issues. +* **Known Issues & Quirks**: + * **Shadow Table Lock Contention**: Be cautious when adding transactional reads/writes around shadow tables. High throughput updates increase Spanner's load and can create lock contention. Keep transactions fast and localized. * **Integration Tests**: NEVER execute `*IT.java` (Integration) or `*LT.java` (Load) test suites during local development/machine verification. Only execute `*Test.java` (Unit) locally. +* **Lessons Learned & Ah-ha Moments**: * **DML Generation Syntax**: When updating `MySQLDMLGenerator`, `PostgreSQLDMLGenerator`, or `CassandraDMLGenerator`, verify that the generated SQL/CQL syntax is strictly valid for that specific dialect and version. - * **Shadow Table Lock Contention**: Be cautious when adding transactional reads/writes around shadow tables. High throughput updates increase Spanner's load and can create lock contention. Keep transactions fast and localized. - * **Foreign Keys & Retries**: Parent-child ordering relies entirely on retry loops and SpannerIO's straggler handling. Changes to the error tag logic in `SourceWriterFn` or `AssignShardIdFn` can silently break foreign key insertions. Retryable errors expected in the DLQ include Foreign Key violations, Spanner `RESOURCE_EXHAUSTED` (shadow tables), and Source DB transient network issues. * **Metric Reliability**: Metrics like `success_record_count` may be skewed if worker restarts occur (causing re-processing). Do not rely on them for strict transactional accounting. - * **Resume Strategy**: When resuming a paused job, users must supply a `startTimestamp` matching the previous `DataFreshness` minus a 10-minute buffer to ensure no events are dropped. - * **1:1 Row Mapping Assumption**: The pipeline assumes a single Spanner row does not map to more than one source row. * **Example PRs**: * [d1dbadb17](https://github.com/GoogleCloudPlatform/DataflowTemplates/commit/d1dbadb17) - Feature: Adding support for PostgreSQL as source in reverse replication. * [74e5f1fe1](https://github.com/GoogleCloudPlatform/DataflowTemplates/commit/74e5f1fe1) - Feature/Fix: Fail fast if MySQL destination is read-only.