Add sharded force replication workflow#10528
Conversation
Introduces a parallel ShardedForceReplicationWorkflow alongside the existing ForceReplicationWorkflow variants. The new variant routes each execution by destination history shard, packs batches across shards, and gates on per-shard exclusivity so a single hot shard can't dominate the apply pipeline. Defaults are unchanged — the legacy workflow stays the default; callers opt into the sharded variant by starting a workflow of type "force-replication-sharded" on the new MigrationShardedActivityTQ. Wired as a second WorkerComponent in migration.Module: dedicated workflow + activity workers polling primitives.MigrationShardedActivityTQ. The shared *activities struct picks up adminClient (from a local admin client) and sdkClientFactory fields so the new ReplicateBatch activity can drive both the via-frontend inject path and the mid-flight ReleaseShards signal without relying on per-workflow plumbing. Side effect: the legacy *activities builder on main never populated adminClient, so the existing generateMigrationTaskViaFrontend code path (activities.go GenerateLastHistoryReplicationTasks call) would have nil-pointer'd if that dynamic-config flag were enabled. The new ClientBean-based wiring in newActivitiesFromParams populates adminClient for both the legacy and sharded *activities instances, fixing that latent NPE for the legacy via-frontend path as well.
worker.go's upgrade-hack pass registers each component's activities on
the default worker before the dedicated worker (see the TODO at
worker.go:82). The legacy and sharded WorkerComponents share the
*activities method set, so whichever runs second hits the SDK's
already-registered check and panics on CountWorkflow / ListWorkflows /
etc.
Both call sites now use activity.RegisterOptions{DisableAlreadyRegisteredCheck:
true} — the default worker isn't dispatched to by either workflow (both
have dedicated activity workers), so winner-takes-all on those duplicate
registrations is harmless.
There was a problem hiding this comment.
Pull request overview
This PR introduces a new sharded variant of the force-replication workflow (“force-replication-sharded”) that routes work by destination history shard, packs batches across shards, and enforces per-shard exclusivity to avoid hot-shard domination and head-of-line blocking. It also updates migration worker wiring to run the sharded workflow/activity on a dedicated task queue while keeping the legacy workflow as the default.
Changes:
- Add
ShardedForceReplicationWorkflowplus supporting sharded activity (ReplicateBatch) and shared sharded payload/types for deterministic packing, draining, and resume-on-CAN behavior. - Wire a second migration worker component polling
primitives.MigrationShardedActivityTQ, and update*activitiesconstruction to reliably populateadminClient(fixing a latent nil-pointer path). - Add unit + functional tests covering the sharded workflow path, plus supporting test hooks/metrics/task-queue definitions.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
tests/xdc/user_data_replication_test.go |
Adds an XDC test that exercises task-queue user-data replication via the sharded force-replication workflow. |
tests/xdc/failover_test.go |
Adds sharded variants of existing force-migration tests (closed workflow + reset workflow). |
service/worker/migration/sharded_workflow.go |
Implements the sharded force-replication workflow with per-shard packing, draining, CAN carry-over, and status query compatibility. |
service/worker/migration/sharded_workflow_test.go |
Adds workflow-unit tests for packing, resume payloads, CAN/drain plumbing, and error propagation. |
service/worker/migration/sharded_types.go |
Introduces sharded payload wire format (nested by shard/BID), params, activity req/resp types, and signal payloads. |
service/worker/migration/sharded_types_test.go |
Adds unit tests for JSON tuple encoding and deterministic flattening. |
service/worker/migration/sharded_activity.go |
Implements the sharded ReplicateBatch activity (inject + verify, drain mode, shard-release signaling, per-shard no-progress backstop). |
service/worker/migration/sharded_activity_test.go |
Adds activity-unit tests for inject/verify, skip paths, heartbeat inject resume, and stuck-shard behavior. |
service/worker/migration/fx.go |
Adds sharded worker component + dedicated TQ wiring; refactors activities construction to use ClientBean-based local admin client + SDK client factory. |
service/worker/migration/force_replication_workflow_test.go |
Extends heartbeat test interceptor to record new sharded activity heartbeat details. |
service/worker/migration/activities.go |
Extends activities with sdkClientFactory for sharded mid-flight signaling. |
service/worker/migration/activities_test.go |
Extends test setup with a mocked client factory used by sharded activity tests. |
common/primitives/task_queues.go |
Adds MigrationShardedActivityTQ. |
common/metrics/metric_defs.go |
Adds per-exec sharded force-replication metrics (latencies/counters). |
Fetch target shard count via Describe call rather than passing as a param or defaulting to source side's shard count.
If we exit due to an error, ensure that an operator can restart the workflow without missing any executions from previous pages that had not yet been handled. There were a few edge cases where some might be dropped.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Reviewed by Cursor Bugbot for commit 9b77454. Configure here.

What changed?
Introduces a parallel ShardedForceReplicationWorkflow alongside the existing ForceReplicationWorkflow variants. The new variant routes each execution by destination history shard, packs batches across shards, and enforces per-shard exclusivity — at most one in-flight batch per target shard, carrying at most MaxExecsPerShard execs for that shard — so the per-shard in-flight backlog on the destination's apply queue is bounded. As an indirect benefit, no single hot shard can dominate any one batch's inject burst. Defaults are unchanged — the legacy workflow stays the default; callers opt into the sharded variant by starting a workflow of type "force-replication-sharded" on the new MigrationShardedActivityTQ.
Wired as a second WorkerComponent in migration.Module: dedicated workflow + activity workers polling primitives.MigrationShardedActivityTQ. The shared *activities struct picks up adminClient (from a local admin client) and sdkClientFactory fields so the new ReplicateBatch activity can drive both the via-frontend inject path and the mid-flight ReleaseShards signal without relying on per-workflow plumbing.
Side effect: the legacy *activities builder on main never populated adminClient, so the existing generateMigrationTaskViaFrontend code path (activities.go GenerateLastHistoryReplicationTasks call) would have nil-pointer'd if that dynamic-config flag were enabled. The new ClientBean-based wiring in newActivitiesFromParams populates adminClient for both the legacy and sharded *activities instances, fixing that latent NPE for the legacy via-frontend path as well.
Why?
Handles per-shard back-pressure to avoid over loading shards (or making over loaded shards worse), and avoids head-of-line blocking due to a slow shard so that we can make progress on happy shards quickly.
How did you test it?
Potential risks
RPS limits are set differently and scale per shard. They are high because the workflow provides back-pressure per-shard, but we may need to calm them down or add a global RPS flag.