Skip to content
21 changes: 21 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,27 @@ var (
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")

// Sharded force replication. The sharded ReplicateBatch activity runs
// many executions per invocation, so per-exec timing is the meaningful
// granularity — the batch-level *_tasks_latency timers above scale with
// BatchSize and aren't comparable across configurations.
GenerateReplicationTaskLatency = NewTimerDef("generate_replication_task_latency")
VerifyReplicationTaskLatency = NewTimerDef("verify_replication_task_latency")
// VerifyReplicationTaskBusy counts verify attempts where the passive
// cluster returned RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW — the cache
// lock is held while history is being applied. A sign of progress that
// doesn't reset the per-shard no-progress timer.
VerifyReplicationTaskBusy = NewCounterDef("verify_replication_task_busy")
// VerifyReplicationTaskPending counts verify attempts where
// DescribeMutableState succeeded but the workflowVerifier saw the target
// lagging the source. A high pending vs. success ratio means verify is
// polling faster than apply can catch up.
VerifyReplicationTaskPending = NewCounterDef("verify_replication_task_pending")
// ReplicatedWorkflowCount accumulates verified-exec counts across each
// ReplicateBatch activity return. Emitted from the workflow so the
// counter is monotonic across activity retries.
ReplicatedWorkflowCount = NewCounterDef("replicated_workflow_count")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
NamespaceReplicationDLQAckLevelGauge = NewGaugeDef("namespace_dlq_ack_level")
Expand Down
1 change: 1 addition & 0 deletions common/primitives/task_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
internalTaskQueuePerNSPrefix = "temporal-sys-per-ns-"

MigrationActivityTQ = "temporal-sys-migration-activity-tq"
MigrationShardedActivityTQ = "temporal-sys-migration-sharded-activity-tq"
AddSearchAttributesActivityTQ = "temporal-sys-add-search-attributes-activity-tq"
DeleteNamespaceActivityTQ = "temporal-sys-delete-namespace-activity-tq"
DLQActivityTQ = "temporal-sys-dlq-activity-tq"
Expand Down
43 changes: 37 additions & 6 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/sdk"
workercommon "go.temporal.io/server/service/worker/common"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -71,12 +72,11 @@ type (
}

verifyReplicationTasksRequest struct {
Namespace string
NamespaceID string
TargetClusterEndpoint string
TargetClusterName string
VerifyInterval time.Duration `validate:"gte=0"`
Executions []*ExecutionInfo
Namespace string
NamespaceID string
TargetClusterName string
VerifyInterval time.Duration `validate:"gte=0"`
Executions []*ExecutionInfo
}

verifyReplicationTasksResponse struct {
Expand All @@ -92,6 +92,14 @@ type (
NamespaceID string
}

DescribeTargetClusterRequest struct {
TargetClusterName string
}

DescribeTargetClusterResponse struct {
ShardCount int32
}

ReplicationStatus struct {
MaxReplicationTaskIds map[int32]int64
}
Expand Down Expand Up @@ -129,6 +137,12 @@ type (
enableHistoryRateLimiter dynamicconfig.BoolPropertyFn
workflowVerifier WorkflowVerifier
chasmRegistry *chasm.Registry
// sdkClientFactory resolves the system SDK client lazily for the
// sharded ReplicateBatch activity's mid-flight ReleaseShards signal.
// Eager resolution at fx-wire time tries to dial the frontend before
// it's listening; the factory's internal sync.Once guarantees a
// single dial on first use.
sdkClientFactory sdk.ClientFactory
}

shardStatus struct {
Expand Down Expand Up @@ -187,6 +201,23 @@ func (a *activities) GetMetadata(_ context.Context, request MetadataRequest) (*M
}, nil
}

// DescribeTargetCluster fetches the remote cluster's history shard count via
// its admin DescribeCluster RPC. The remote must be registered with the
// source cluster's cluster metadata (the cluster name doubles as the
// adminClient cache key) — which is already a prerequisite for force
// replication, since the source generates replication tasks against it.
func (a *activities) DescribeTargetCluster(ctx context.Context, req DescribeTargetClusterRequest) (*DescribeTargetClusterResponse, error) {
remoteAdminClient, err := a.clientBean.GetRemoteAdminClient(req.TargetClusterName)
if err != nil {
return nil, err
}
resp, err := remoteAdminClient.DescribeCluster(ctx, &adminservice.DescribeClusterRequest{})
if err != nil {
return nil, err
}
return &DescribeTargetClusterResponse{ShardCount: resp.GetHistoryShardCount()}, nil
}

// GetMaxReplicationTaskIDs returns max replication task id per shard
func (a *activities) GetMaxReplicationTaskIDs(ctx context.Context) (*ReplicationStatus, error) {
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)
Expand Down
1 change: 1 addition & 0 deletions service/worker/migration/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *activitiesSuite) SetupTest() {
s.mockNamespaceReplicationQueue = persistence.NewMockNamespaceReplicationQueue(s.controller)
s.mockNamespaceRegistry = namespace.NewMockRegistry(s.controller)
s.mockClientBean = client.NewMockBean(s.controller)
s.mockClientFactory = client.NewMockFactory(s.controller)

s.mockFrontendClient = workflowservicemock.NewMockWorkflowServiceClient(s.controller)
s.mockAdminClient = adminservicemock.NewMockAdminServiceClient(s.controller)
Expand Down
38 changes: 23 additions & 15 deletions service/worker/migration/force_replication_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type (

// Used for verifying workflow executions were replicated successfully on target cluster.
EnableVerification bool
TargetClusterEndpoint string
TargetClusterName string
VerifyIntervalInSeconds int `validate:"gte=0"`

Expand Down Expand Up @@ -87,6 +86,17 @@ type (
ReplicatedWorkflowCount int64
ReplicatedWorkflowCountPerSecond float64
PageTokenForRestart []byte

// Sharded-workflow-only recovery bundle: feed these three
// fields back into a fresh ShardedForceReplicationWorkflow's
// NextPageToken / ResumeShards / RecoveredBuckets params to
// resume from a failed run without missing executions. Left
// zero by the legacy ForceReplicationWorkflow variants —
// their PageTokenForRestart is the start-of-run token and
// already covers all in-flight execs at restart cost.
RecoveryNextPageToken []byte
RecoveryResumeShards []ResumeShard
RecoveryBuckets BatchPayload
}
)

Expand Down Expand Up @@ -342,8 +352,8 @@ func validateAndSetForceReplicationParams(ctx workflow.Context, params *ForceRep
return temporal.NewNonRetryableApplicationError("InvalidArgument: Namespace is required", "InvalidArgument", nil)
}

if params.EnableVerification && len(params.TargetClusterEndpoint) == 0 && len(params.TargetClusterName) == 0 {
return temporal.NewNonRetryableApplicationError("InvalidArgument: TargetClusterEndpoint or TargetClusterName is required with verification enabled", "InvalidArgument", nil)
if params.EnableVerification && len(params.TargetClusterName) == 0 {
return temporal.NewNonRetryableApplicationError("InvalidArgument: TargetClusterName is required with verification enabled", "InvalidArgument", nil)
Comment thread
robholland marked this conversation as resolved.
}

if params.ConcurrentActivityCount <= 0 {
Expand Down Expand Up @@ -512,12 +522,11 @@ func enqueueReplicationTasks(ctx workflow.Context, executionsCh workflow.Channel
actx,
a.VerifyReplicationTasks,
&verifyReplicationTasksRequest{
TargetClusterEndpoint: params.TargetClusterEndpoint,
TargetClusterName: params.TargetClusterName,
Namespace: params.Namespace,
NamespaceID: namespaceID,
Executions: migrationExecutions,
VerifyInterval: time.Duration(params.VerifyIntervalInSeconds) * time.Second,
TargetClusterName: params.TargetClusterName,
Namespace: params.Namespace,
NamespaceID: namespaceID,
Executions: migrationExecutions,
VerifyInterval: time.Duration(params.VerifyIntervalInSeconds) * time.Second,
})

pendingVerifyTasks++
Expand Down Expand Up @@ -615,12 +624,11 @@ func enqueueReplicationTasksLocal(
lactx,
a.VerifyReplicationTasks,
&verifyReplicationTasksRequest{
TargetClusterEndpoint: params.TargetClusterEndpoint,
TargetClusterName: params.TargetClusterName,
Namespace: params.Namespace,
NamespaceID: namespaceID,
Executions: executions,
VerifyInterval: time.Duration(params.VerifyIntervalInSeconds) * time.Second,
TargetClusterName: params.TargetClusterName,
Namespace: params.Namespace,
NamespaceID: namespaceID,
Executions: executions,
VerifyInterval: time.Duration(params.VerifyIntervalInSeconds) * time.Second,
})

pendingVerifyTasks++
Expand Down
16 changes: 9 additions & 7 deletions service/worker/migration/force_replication_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *ForceReplicationWorkflowTestSuite) TestForceReplicationWorkflow() {
ListWorkflowsPageSize: 1,
PageCountPerExecution: 4,
EnableVerification: true,
TargetClusterEndpoint: "test-target",
TargetClusterName: "test-target",
})

s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -167,8 +167,7 @@ func (s *ForceReplicationWorkflowTestSuite) TestContinueAsNew() {
PageCountPerExecution: testMaxPageCountPerExecution,
NextPageToken: []byte("fake-page-token-2"),
EnableVerification: true,
TargetClusterEndpoint: "test-target",
TargetClusterName: "",
TargetClusterName: "test-target",
VerifyIntervalInSeconds: defaultVerifyIntervalInSeconds,
LastCloseTime: closeTime,
LastStartTime: startTime,
Expand All @@ -194,7 +193,7 @@ func (s *ForceReplicationWorkflowTestSuite) TestContinueAsNew() {
ListWorkflowsPageSize: 1,
PageCountPerExecution: testMaxPageCountPerExecution,
EnableVerification: true,
TargetClusterEndpoint: "test-target",
TargetClusterName: "test-target",
NextPageToken: []byte("fake-initial-page-token"),
},
expectContinueAsNew,
Expand Down Expand Up @@ -295,7 +294,7 @@ func (s *ForceReplicationWorkflowTestSuite) TestInvalidInput() {
// Empty namespace
},
{
// Empty TargetClusterEndpoint
// Empty TargetClusterName
Namespace: uuid.NewString(),
EnableVerification: true,
},
Expand Down Expand Up @@ -438,7 +437,7 @@ func (s *ForceReplicationWorkflowTestSuite) TestGenerateReplicationTaskNonRetrya
ListWorkflowsPageSize: 1,
PageCountPerExecution: 4,
EnableVerification: true,
TargetClusterEndpoint: "test-target",
TargetClusterName: "test-target",
})

s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -495,7 +494,7 @@ func (s *ForceReplicationWorkflowTestSuite) TestVerifyReplicationTaskNonRetryabl
ListWorkflowsPageSize: 1,
PageCountPerExecution: 4,
EnableVerification: true,
TargetClusterEndpoint: "test-target",
TargetClusterName: "test-target",
})

s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -705,6 +704,7 @@ type heartbeatRecordingInterceptor struct {
seedRecordedHeartbeats []seedReplicationQueueWithUserDataEntriesHeartbeatDetails
replicationRecordedHeartbeats []replicationTasksHeartbeatDetails
generateReplicationRecordedHeartbeats []int
replicateBatchRecordedHeartbeats []replicateBatchHeartbeat
T *testing.T
}

Expand All @@ -725,6 +725,8 @@ func (i *heartbeatRecordingInterceptor) RecordHeartbeat(ctx context.Context, det
i.replicationRecordedHeartbeats = append(i.replicationRecordedHeartbeats, d)
} else if d, ok := details[0].(int); ok {
i.generateReplicationRecordedHeartbeats = append(i.generateReplicationRecordedHeartbeats, d)
} else if d, ok := details[0].(replicateBatchHeartbeat); ok {
i.replicateBatchRecordedHeartbeats = append(i.replicateBatchRecordedHeartbeats, d)
} else {
assert.Fail(i.T, "invalid heartbeat details")
}
Expand Down
Loading
Loading