Skip to content
21 changes: 21 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,27 @@ var (
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")

// Sharded force replication. The sharded ReplicateBatch activity runs
// many executions per invocation, so per-exec timing is the meaningful
// granularity — the batch-level *_tasks_latency timers above scale with
// BatchSize and aren't comparable across configurations.
GenerateReplicationTaskLatency = NewTimerDef("generate_replication_task_latency")
VerifyReplicationTaskLatency = NewTimerDef("verify_replication_task_latency")
// VerifyReplicationTaskBusy counts verify attempts where the passive
// cluster returned RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW — the cache
// lock is held while history is being applied. A sign of progress that
// doesn't reset the per-shard no-progress timer.
VerifyReplicationTaskBusy = NewCounterDef("verify_replication_task_busy")
// VerifyReplicationTaskPending counts verify attempts where
// DescribeMutableState succeeded but the workflowVerifier saw the target
// lagging the source. A high pending vs. success ratio means verify is
// polling faster than apply can catch up.
VerifyReplicationTaskPending = NewCounterDef("verify_replication_task_pending")
// ReplicatedWorkflowCount accumulates verified-exec counts across each
// ReplicateBatch activity return. Emitted from the workflow so the
// counter is monotonic across activity retries.
ReplicatedWorkflowCount = NewCounterDef("replicated_workflow_count")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
NamespaceReplicationDLQAckLevelGauge = NewGaugeDef("namespace_dlq_ack_level")
Expand Down
1 change: 1 addition & 0 deletions common/primitives/task_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
internalTaskQueuePerNSPrefix = "temporal-sys-per-ns-"

MigrationActivityTQ = "temporal-sys-migration-activity-tq"
MigrationShardedActivityTQ = "temporal-sys-migration-sharded-activity-tq"
AddSearchAttributesActivityTQ = "temporal-sys-add-search-attributes-activity-tq"
DeleteNamespaceActivityTQ = "temporal-sys-delete-namespace-activity-tq"
DLQActivityTQ = "temporal-sys-dlq-activity-tq"
Expand Down
7 changes: 7 additions & 0 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/sdk"
workercommon "go.temporal.io/server/service/worker/common"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -129,6 +130,12 @@ type (
enableHistoryRateLimiter dynamicconfig.BoolPropertyFn
workflowVerifier WorkflowVerifier
chasmRegistry *chasm.Registry
// sdkClientFactory resolves the system SDK client lazily for the
// sharded ReplicateBatch activity's mid-flight ReleaseShards signal.
// Eager resolution at fx-wire time tries to dial the frontend before
// it's listening; the factory's internal sync.Once guarantees a
// single dial on first use.
sdkClientFactory sdk.ClientFactory
}

shardStatus struct {
Expand Down
1 change: 1 addition & 0 deletions service/worker/migration/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *activitiesSuite) SetupTest() {
s.mockNamespaceReplicationQueue = persistence.NewMockNamespaceReplicationQueue(s.controller)
s.mockNamespaceRegistry = namespace.NewMockRegistry(s.controller)
s.mockClientBean = client.NewMockBean(s.controller)
s.mockClientFactory = client.NewMockFactory(s.controller)

s.mockFrontendClient = workflowservicemock.NewMockWorkflowServiceClient(s.controller)
s.mockAdminClient = adminservicemock.NewMockAdminServiceClient(s.controller)
Expand Down
3 changes: 3 additions & 0 deletions service/worker/migration/force_replication_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ type heartbeatRecordingInterceptor struct {
seedRecordedHeartbeats []seedReplicationQueueWithUserDataEntriesHeartbeatDetails
replicationRecordedHeartbeats []replicationTasksHeartbeatDetails
generateReplicationRecordedHeartbeats []int
replicateBatchRecordedHeartbeats []replicateBatchHeartbeat
T *testing.T
}

Expand All @@ -725,6 +726,8 @@ func (i *heartbeatRecordingInterceptor) RecordHeartbeat(ctx context.Context, det
i.replicationRecordedHeartbeats = append(i.replicationRecordedHeartbeats, d)
} else if d, ok := details[0].(int); ok {
i.generateReplicationRecordedHeartbeats = append(i.generateReplicationRecordedHeartbeats, d)
} else if d, ok := details[0].(replicateBatchHeartbeat); ok {
i.replicateBatchRecordedHeartbeats = append(i.replicateBatchRecordedHeartbeats, d)
} else {
assert.Fail(i.T, "invalid heartbeat details")
}
Expand Down
158 changes: 135 additions & 23 deletions service/worker/migration/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package migration

import (
"context"
"fmt"

"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/activity"
sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/chasm"
serverClient "go.temporal.io/server/client"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
Expand All @@ -18,6 +21,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
workercommon "go.temporal.io/server/service/worker/common"
"go.uber.org/fx"
)
Expand All @@ -32,13 +36,15 @@ type (
FrontendClient workflowservice.WorkflowServiceClient
ClientFactory serverClient.Factory
ClientBean serverClient.Bean
ClusterMetadata cluster.Metadata
NamespaceReplicationQueue persistence.NamespaceReplicationQueue
TaskManager persistence.TaskManager
Logger log.Logger
MetricsHandler metrics.Handler
DynamicCollection *dynamicconfig.Collection
WorkflowVerifier WorkflowVerifier
ChasmRegistry *chasm.Registry
SDKClientFactory sdk.ClientFactory
}

fxResult struct {
Expand All @@ -48,21 +54,49 @@ type (

replicationWorkerComponent struct {
initParams
activities *activities
}

// shardedWorkerComponent registers the sharded force-replication
// workflow + ReplicateBatch activity on their dedicated TQ. Holds an
// *activities-sized clone so its activity registration is isolated
// from the default-TQ worker — sharded inject paths don't accidentally
// land on the legacy MigrationActivityTQ.
shardedWorkerComponent struct {
activities *activities
}
)

var Module = fx.Options(
fx.Provide(NewResult),
fx.Provide(NewShardedResult),
fx.Provide(workflowVerifierProvider),
)

func NewResult(params initParams) fxResult {
component := &replicationWorkerComponent{
initParams: params,
func NewResult(params initParams) (fxResult, error) {
a, err := newActivitiesFromParams(params, forceReplicationWorkflowName)
if err != nil {
return fxResult{}, err
}
return fxResult{
Component: component,
Component: &replicationWorkerComponent{
initParams: params,
activities: a,
},
}, nil
}

// NewShardedResult constructs the sharded WorkerComponent. The component
// owns its own *activities clone so registration against the sharded TQ
// doesn't bleed into the legacy worker.
func NewShardedResult(params initParams) (fxResult, error) {
a, err := newActivitiesFromParams(params, shardedForceReplicationWorkflowName)
if err != nil {
return fxResult{}, err
}
return fxResult{
Component: &shardedWorkerComponent{activities: a},
}, nil
}

func (wc *replicationWorkerComponent) RegisterWorkflow(registry sdkworker.Registry) {
Expand All @@ -80,7 +114,16 @@ func (wc *replicationWorkerComponent) DedicatedWorkflowWorkerOptions() *workerco
}

func (wc *replicationWorkerComponent) RegisterActivities(registry sdkworker.Registry) {
registry.RegisterActivity(wc.activities())
// DisableAlreadyRegisteredCheck because the sharded WorkerComponent
// shares the *activities method set; whichever component registers
// first on the default worker wins (per the worker.go upgrade-hack
// pass), and the second component's reflection-based registration
// would otherwise panic on every method name. The default worker
// isn't dispatched to by either workflow — both have dedicated
// activity workers — so winner-takes-all is fine.
registry.RegisterActivityWithOptions(wc.activities, activity.RegisterOptions{
DisableAlreadyRegisteredCheck: true,
})
}

func (wc *replicationWorkerComponent) DedicatedActivityWorkerOptions() *workercommon.DedicatedWorkerOptions {
Expand All @@ -92,6 +135,56 @@ func (wc *replicationWorkerComponent) DedicatedActivityWorkerOptions() *workerco
}
}

func (sc *shardedWorkerComponent) RegisterWorkflow(registry sdkworker.Registry) {
registry.RegisterWorkflowWithOptions(ShardedForceReplicationWorkflow, workflow.RegisterOptions{
Name: shardedForceReplicationWorkflowName,
})
registry.RegisterWorkflowWithOptions(ForceTaskQueueUserDataReplicationWorkflow, workflow.RegisterOptions{
Name: forceTaskQueueUserDataReplicationWorkflow,
})
}

func (sc *shardedWorkerComponent) DedicatedWorkflowWorkerOptions() *workercommon.DedicatedWorkerOptions {
// Workflow + activity share the same TQ so the workflow's default
// ExecuteActivity (no explicit TaskQueue) routes to our dedicated
// activity worker rather than the default-TQ worker. Without a
// dedicated workflow worker here the workflow would land on
// default-worker-tq and its activities would pile up on the (separate,
// our-TQ) dedicated activity worker, unscheduled.
//
// LocalActivityWorkerOnly is essential: by default a worker polls for
// both workflow and activity tasks on its TQ. Since the activity
// worker (a separate sdkworker.Worker) also polls this TQ and is the
// one that owns the registered activities, leaving activity polling
// enabled here means this worker races for activity tasks and
// dispatches them with no registrations — ActivityNotRegisteredError,
// "Supported types: []".
return &workercommon.DedicatedWorkerOptions{
TaskQueue: primitives.MigrationShardedActivityTQ,
Options: sdkworker.Options{
LocalActivityWorkerOnly: true,
},
}
}

func (sc *shardedWorkerComponent) RegisterActivities(registry sdkworker.Registry) {
// See replicationWorkerComponent.RegisterActivities — both components
// share the *activities method set, so the second registration on the
// default worker would otherwise panic.
registry.RegisterActivityWithOptions(sc.activities, activity.RegisterOptions{
DisableAlreadyRegisteredCheck: true,
})
}

func (sc *shardedWorkerComponent) DedicatedActivityWorkerOptions() *workercommon.DedicatedWorkerOptions {
return &workercommon.DedicatedWorkerOptions{
TaskQueue: primitives.MigrationShardedActivityTQ,
Options: sdkworker.Options{
BackgroundActivityContext: headers.SetCallerType(context.Background(), headers.CallerTypePreemptable),
},
}
}

func workflowVerifierProvider() WorkflowVerifier {
return func(
_ context.Context,
Expand All @@ -108,23 +201,42 @@ func workflowVerifierProvider() WorkflowVerifier {
}
}

func (wc *replicationWorkerComponent) activities() *activities {
return &activities{
HistoryShardCount: wc.PersistenceConfig.NumHistoryShards,
executionManager: wc.ExecutionManager,
NamespaceRegistry: wc.NamespaceRegistry,
HistoryClient: wc.HistoryClient,
frontendClient: wc.FrontendClient,
clientFactory: wc.ClientFactory,
clientBean: wc.ClientBean,
namespaceReplicationQueue: wc.NamespaceReplicationQueue,
taskManager: wc.TaskManager,
Logger: wc.Logger,
MetricsHandler: wc.MetricsHandler,
forceReplicationMetricsHandler: wc.MetricsHandler.WithTags(metrics.WorkflowTypeTag(forceReplicationWorkflowName)),
generateMigrationTaskViaFrontend: dynamicconfig.WorkerGenerateMigrationTaskViaFrontend.Get(wc.DynamicCollection),
enableHistoryRateLimiter: dynamicconfig.WorkerEnableHistoryRateLimiter.Get(wc.DynamicCollection),
workflowVerifier: wc.WorkflowVerifier,
chasmRegistry: wc.ChasmRegistry,
// newActivitiesFromParams builds the shared *activities struct from the
// fx params. workflowTypeName tags the forceReplicationMetricsHandler so
// the legacy and sharded variants emit force-replication metrics under
// distinct workflow_type tags.
//
// adminClient is the local admin client cached by ClientBean at startup.
// Routing through the bean (rather than constructing a fresh wrapper via
// NewLocalAdminClientWithTimeout) reuses the same retry+metric wrapper
// every other consumer in the process sees, and guarantees adminClient
// is non-nil so the inject and verify paths can use it without nil
// guarding. A lookup failure indicates ClusterMetadata is misconfigured;
// surfacing it as an fx error fails app start cleanly rather than mid-run.
func newActivitiesFromParams(params initParams, workflowTypeName string) (*activities, error) {
localCluster := params.ClusterMetadata.GetCurrentClusterName()
localAdmin, err := params.ClientBean.GetRemoteAdminClient(localCluster)
if err != nil {
return nil, fmt.Errorf("migration: local admin client missing from ClientBean for cluster %q: %w", localCluster, err)
}
return &activities{
HistoryShardCount: params.PersistenceConfig.NumHistoryShards,
executionManager: params.ExecutionManager,
NamespaceRegistry: params.NamespaceRegistry,
HistoryClient: params.HistoryClient,
frontendClient: params.FrontendClient,
adminClient: localAdmin,
clientFactory: params.ClientFactory,
clientBean: params.ClientBean,
namespaceReplicationQueue: params.NamespaceReplicationQueue,
taskManager: params.TaskManager,
Logger: params.Logger,
MetricsHandler: params.MetricsHandler,
forceReplicationMetricsHandler: params.MetricsHandler.WithTags(metrics.WorkflowTypeTag(workflowTypeName)),
generateMigrationTaskViaFrontend: dynamicconfig.WorkerGenerateMigrationTaskViaFrontend.Get(params.DynamicCollection),
enableHistoryRateLimiter: dynamicconfig.WorkerEnableHistoryRateLimiter.Get(params.DynamicCollection),
workflowVerifier: params.WorkflowVerifier,
chasmRegistry: params.ChasmRegistry,
sdkClientFactory: params.SDKClientFactory,
}, nil
}
Loading
Loading