From 5595f80cbd2159fec07c9cafd5ac3e2e42e9b10b Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 9 Jun 2026 12:11:33 +0000 Subject: [PATCH 1/2] Add project context for datastream-to-spanner --- v2/datastream-to-spanner/architecture.dot | 50 ++++ v2/datastream-to-spanner/architecture.svg | 243 ++++++++++++++++++++ v2/datastream-to-spanner/project-context.md | 60 +++++ 3 files changed, 353 insertions(+) create mode 100644 v2/datastream-to-spanner/architecture.dot create mode 100644 v2/datastream-to-spanner/architecture.svg create mode 100644 v2/datastream-to-spanner/project-context.md diff --git a/v2/datastream-to-spanner/architecture.dot b/v2/datastream-to-spanner/architecture.dot new file mode 100644 index 0000000000..b7c316b976 --- /dev/null +++ b/v2/datastream-to-spanner/architecture.dot @@ -0,0 +1,50 @@ +digraph Architecture { + node [shape=box, fontname="Helvetica", style=filled, fillcolor=lightblue]; + + Datastream [label="Source DB\n(via Datastream)", shape=cylinder]; + GCS [label="Google Cloud Storage\n(AVRO/JSON)", shape=folder]; + Spanner [label="Cloud Spanner\n(Destination & Shadow Tables)", shape=cylinder]; + DLQ [label="Dead Letter Queue\n(GCS)", shape=folder]; + FilteredEventsGCS [label="Filtered Events\n(GCS)", shape=folder]; + + subgraph cluster_dataflow { + label="Cloud Dataflow (DataStreamToSpanner)"; + fontname="Helvetica-Bold"; + style=dashed; + color=gray; + + ReadDataStream [label="DataStreamIO\n(Read Events)"]; + ProcessSchema [label="Process Information Schema\n(Read DDL)"]; + ReadDLQ [label="FileBasedDeadLetterQueueReconsumer\n(Read from DLQ)"]; + Reshuffle [label="Reshuffle\n(Random Key)"]; + TransformEvents [label="Apply Transformation to events"]; + WriteSpanner [label="Write events to Cloud Spanner"]; + WriteFiltered [label="Write Filtered Events"]; + WriteDLQRetry [label="Write To DLQ\n(Retryable Errors)"]; + WriteDLQSevere [label="Write To DLQ2\n(Severe Errors)"]; + } + + Datastream -> GCS [label=" Change Events"]; + GCS -> ReadDataStream; + Spanner -> ProcessSchema [label=" DDL"]; + ProcessSchema -> TransformEvents [label=" SideInput: DDL View", style=dotted]; + ProcessSchema -> WriteSpanner [label=" SideInput: DDL View", style=dotted]; + DLQ -> ReadDLQ [label=" Failed Events"]; + + ReadDataStream -> Reshuffle; + ReadDLQ -> Reshuffle [label=" Retryable Events"]; + ReadDLQ -> WriteDLQSevere [label=" Permanent Errors"]; + + Reshuffle -> TransformEvents [label=" JSON Records"]; + TransformEvents -> WriteFiltered [label=" Filtered Events"]; + WriteFiltered -> FilteredEventsGCS; + TransformEvents -> WriteSpanner [label=" Transformed Events"]; + TransformEvents -> WriteDLQSevere [label=" Permanent Errors"]; + + WriteSpanner -> Spanner [label=" Spanner Mutations"]; + WriteSpanner -> WriteDLQRetry [label=" Retryable Errors"]; + WriteSpanner -> WriteDLQSevere [label=" Permanent Errors"]; + + WriteDLQRetry -> DLQ; + WriteDLQSevere -> DLQ; +} diff --git a/v2/datastream-to-spanner/architecture.svg b/v2/datastream-to-spanner/architecture.svg new file mode 100644 index 0000000000..9687a82161 --- /dev/null +++ b/v2/datastream-to-spanner/architecture.svg @@ -0,0 +1,243 @@ + + + + + + +Architecture + + +cluster_dataflow + +Cloud Dataflow (DataStreamToSpanner) + + + +Datastream + + +Source DB +(via Datastream) + + + +GCS + +Google Cloud Storage +(AVRO/JSON) + + + +Datastream->GCS + + + Change Events + + + +ReadDataStream + +DataStreamIO +(Read Events) + + + +GCS->ReadDataStream + + + + + +Spanner + + +Cloud Spanner +(Destination & Shadow Tables) + + + +ProcessSchema + +Process Information Schema +(Read DDL) + + + +Spanner->ProcessSchema + + + DDL + + + +DLQ + +Dead Letter Queue +(GCS) + + + +ReadDLQ + +FileBasedDeadLetterQueueReconsumer +(Read from DLQ) + + + +DLQ->ReadDLQ + + + Failed Events + + + +FilteredEventsGCS + +Filtered Events +(GCS) + + + +Reshuffle + +Reshuffle +(Random Key) + + + +ReadDataStream->Reshuffle + + + + + +TransformEvents + +Apply Transformation to events + + + +ProcessSchema->TransformEvents + + + SideInput: DDL View + + + +WriteSpanner + +Write events to Cloud Spanner + + + +ProcessSchema->WriteSpanner + + + SideInput: DDL View + + + +ReadDLQ->Reshuffle + + + Retryable Events + + + +WriteDLQSevere + +Write To DLQ2 +(Severe Errors) + + + +ReadDLQ->WriteDLQSevere + + + Permanent Errors + + + +Reshuffle->TransformEvents + + + JSON Records + + + +TransformEvents->WriteSpanner + + + Transformed Events + + + +WriteFiltered + +Write Filtered Events + + + +TransformEvents->WriteFiltered + + + Filtered Events + + + +TransformEvents->WriteDLQSevere + + + Permanent Errors + + + +WriteSpanner->Spanner + + + Spanner Mutations + + + +WriteDLQRetry + +Write To DLQ +(Retryable Errors) + + + +WriteSpanner->WriteDLQRetry + + + Retryable Errors + + + +WriteSpanner->WriteDLQSevere + + + Permanent Errors + + + +WriteFiltered->FilteredEventsGCS + + + + + +WriteDLQRetry->DLQ + + + + + +WriteDLQSevere->DLQ + + + + + diff --git a/v2/datastream-to-spanner/project-context.md b/v2/datastream-to-spanner/project-context.md new file mode 100644 index 0000000000..83cb0e3df1 --- /dev/null +++ b/v2/datastream-to-spanner/project-context.md @@ -0,0 +1,60 @@ +# Project Context: Datastream to Spanner + + + +## Overview + +* **Core Intent:** The Dataflow template is a streaming CDC (Change Data Capture) migration pipeline that moves real-time database changes from Datastream to Cloud Spanner. It's meant to reduce application downtime by applying in-flight changes while bulk data loads happen. +* **Primary Users:** Database administrators, migration engineers, and customers moving databases to Cloud Spanner. +* **Critical SLOs/Guarantees:** Must ensure eventual consistency with the source database by never allowing older events to overwrite newer ones (preserving commit order). +* **Terminology:** + * **Change Event (CE):** A DML change (insert/update/delete) that contains the full row data. + * **Shadow Table:** A companion table created alongside each Spanner destination table to keep track of versioning metadata (like an Oracle SCN) for each primary key. + * **DLQ:** Dead Letter Queue (for failed records). + * **Oracle SCN:** System Change Number. A version number ensuring events are committed in the right order. + * **Schema Override:** A mechanism to override the schema of the destination database. + +## Technical Details + +* **Tech Stack & Versions:** + * **Languages:** Java 17 + * **Frameworks/Libraries:** Apache Beam, GCP Spanner SDK, Datastream API Client. + * **Key Technologies:** Cloud Spanner, Cloud Dataflow, Datastream, Google Cloud Storage (GCS) +* **Code Location:** `v2/datastream-to-spanner` +* **Data Flow:** Source Database -> Datastream -> Google Cloud Storage (Avro/JSON) -> Cloud Dataflow Pipeline (parses, transforms, schema validates) -> Cloud Spanner. Failed events are sent to a Dead Letter Queue (DLQ) in GCS for recycling/retrying. +* **Project Structure (Logical Architecture Mapping):** + * `v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates`: Core dataflow pipeline logic (`DataStreamToSpanner.java`) and DoFns. + * `v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream`: Datastream mapping, JSON/Avro parsing, and dialect-specific parsing contexts. + * `v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner`: Spanner-specific logic including `SpannerTransactionWriter` and Schema overrides parser. + * `v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/transform`: Custom transformation and processing logic for change events. + * `v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates`: Unit and Integration tests. +* **Build/Run Commands:** + See the `README.md` file for instructions on building and running the pipeline. + +## Documentation + +* **Architecture Diagram & Dependency Tree:** [architecture.svg](architecture.svg) (Source: `architecture.dot`). + * **Rule:** Always keep the `.dot` and `.svg` files in sync. If you modify the architecture, you MUST regenerate the `.svg` from the `.dot` file. + +## AI Agent Tips + +* **Common Tasks:** Adding new dialect support for Datastream, improving retry logic for the DLQ, adding transformations or metrics for the Dataflow pipeline. +* **Coding Standards & Best Practices:** + * Individual CEs are processed separately for parallel scaling, rather than grouping them into the original source transactions. Consistency is managed using lateness checks on the Shadow Tables. + * **Avoid Serial Processing:** Do not attempt to group events by transaction or serially order them. The approach relies on parallel workers, taking advantage of Cloud Dataflow's scale. + * **Avoid GroupBy:** Do not use `GroupByKey` or internal worker state to filter stale events before writing. It doesn't scale well and complicates state recovery. Always use Shadow Tables for the lateness check. + * Because CE writes are idempotent and protected by version checks, the template relies heavily on automatic retries for failed events, reducing the complexity of referential integrity (e.g., when a child arrives before a parent). + * **Referential Integrity:** For foreign keys and interleaved tables, rely purely on the retry mechanisms. A child event arriving before its parent will fail, but will eventually be written when it is retried after the parent succeeds. + * Schema migration must be done *prior* to starting this pipeline; it does not process or replicate DDL events. +* **Testing Frameworks & Guidelines:** + * **Frameworks:** JUnit 4, Mockito for mocking dependencies. + * **Rules:** Ensure adequate UT coverage for new logic. Integration tests should be placed in the respective `*IT.java` classes with robust wait conditions. +* **Areas to be Careful (Gotchas):** + * Lateness checks on Shadow Tables are critical; bugs here can lead to data inconsistency. + * DLQ retry logic (both `retryDLQ` and `retryAllDLQ` modes) handles data integrity on errors. Modifying it must be done carefully to prevent infinite loops or skipped events. + * **Fatal Errors:** Unexpected/fatal errors (like type conversion failures) should not be endlessly retried. Ensure any new exceptions are properly routed to the severe DLQ bucket. + * **Version Overflow:** Be mindful of edge cases in version ordering (e.g. if the Oracle SCN exceeds limits and restarts at zero). Ensure comparisons in `ChangeEventSequence` remain robust against edge case overflows. + * **Data Size Limits:** Datastream enforces a 3MB size limit per Change Event. Ensure no individual rows exceed this. +* **Example PRs:** + * [PR #3035](https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/3035) - [datastream-to-spanner] Unable to convert field timestamp to long + * [PR #2867](https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/2867) - changed mysql event ordering in datastream to spanner From a1147144732f8b3aa7310108f64f600b47475dab Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Wed, 10 Jun 2026 04:43:11 +0000 Subject: [PATCH 2/2] Clarify Datastream/Dataflow boundary --- v2/datastream-to-spanner/project-context.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2/datastream-to-spanner/project-context.md b/v2/datastream-to-spanner/project-context.md index 83cb0e3df1..776f1003b7 100644 --- a/v2/datastream-to-spanner/project-context.md +++ b/v2/datastream-to-spanner/project-context.md @@ -4,7 +4,7 @@ ## Overview -* **Core Intent:** The Dataflow template is a streaming CDC (Change Data Capture) migration pipeline that moves real-time database changes from Datastream to Cloud Spanner. It's meant to reduce application downtime by applying in-flight changes while bulk data loads happen. +* **Core Intent:** The Dataflow template is a streaming CDC (Change Data Capture) migration pipeline that applies real-time database changes to Cloud Spanner. **Important distinction:** This template does *not* read directly from the source database or pipe data to Datastream. Instead, Datastream independently reads from the source DB and writes the change events as Avro (or JSON) files to a GCS bucket. This Dataflow template then consumes those files from GCS, converts the Avro records into internal JSON representations, and applies the changes to Spanner. It's meant to reduce application downtime by applying in-flight changes while bulk data loads happen. * **Primary Users:** Database administrators, migration engineers, and customers moving databases to Cloud Spanner. * **Critical SLOs/Guarantees:** Must ensure eventual consistency with the source database by never allowing older events to overwrite newer ones (preserving commit order). * **Terminology:** @@ -21,7 +21,7 @@ * **Frameworks/Libraries:** Apache Beam, GCP Spanner SDK, Datastream API Client. * **Key Technologies:** Cloud Spanner, Cloud Dataflow, Datastream, Google Cloud Storage (GCS) * **Code Location:** `v2/datastream-to-spanner` -* **Data Flow:** Source Database -> Datastream -> Google Cloud Storage (Avro/JSON) -> Cloud Dataflow Pipeline (parses, transforms, schema validates) -> Cloud Spanner. Failed events are sent to a Dead Letter Queue (DLQ) in GCS for recycling/retrying. +* **Data Flow:** Source Database -> Datastream -> Google Cloud Storage (Avro/JSON) -> Cloud Dataflow Pipeline (consumes Avro/JSON from GCS, converts Avro to JSON, transforms, schema validates) -> Cloud Spanner. Failed events are sent to a Dead Letter Queue (DLQ) in GCS for recycling/retrying. * **Project Structure (Logical Architecture Mapping):** * `v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates`: Core dataflow pipeline logic (`DataStreamToSpanner.java`) and DoFns. * `v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream`: Datastream mapping, JSON/Avro parsing, and dialect-specific parsing contexts.