From 3172e8ad42f4aac3a112605ca5df9098a269a72c Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 4 Jun 2026 23:44:23 -0700 Subject: [PATCH 1/3] Global RPS Control for Admin Batch Operation --- common/dynamicconfig/constants.go | 16 ++++++++++-- common/log/tag/values.go | 1 + service/worker/batcher/activities.go | 23 +++++++++------- service/worker/batcher/fx.go | 39 ++++++++++++++++++++-------- 4 files changed, 56 insertions(+), 23 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index b71722ef123..2c4a9e77202 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -3227,12 +3227,24 @@ When enabled, the scavenger will delete completed workflow execution data that a BatcherRPS = NewNamespaceIntSetting( "worker.batcherRPS", 50, - `BatcherRPS controls number the rps of batch operations`, + `BatcherRPS controls number the rps of one batch operation`, ) BatcherConcurrency = NewNamespaceIntSetting( "worker.batcherConcurrency", 5, - `BatcherConcurrency controls the concurrency of one batch operation`, + `BatcherConcurrency controls the concurrency of one batch or admin batch operation`, + ) + AdminBatcherHostRPS = NewGlobalIntSetting( + "worker.adminBatcherHostRPS", + 100, + `AdminBatcherHostRPS controls number the rps of all admin batch operations per host`, + ) + AdminBatcherGlobalRPS = NewGlobalIntSetting( + "worker.adminBatcherGlobalRPS", + 0, + `AdminBatcherGlobalRPS controls number the rps of all admin batch operations across all worker hosts. +The configured value will be divided by the number of worker hosts to get the per host rps limit. +0 means no global limit and each host will use AdminBatcherHostRPS.`, ) WorkerParentCloseMaxConcurrentActivityExecutionSize = NewGlobalIntSetting( "worker.ParentCloseMaxConcurrentActivityExecutionSize", diff --git a/common/log/tag/values.go b/common/log/tag/values.go index 9427d0c720c..3a3838e4c87 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -121,6 +121,7 @@ var ( ComponentESVisibilityManager = component("es-visibility-manager") ComponentArchiver = component("archiver") ComponentBatcher = component("batcher") + ComponentAdminBatcher = component("admin-batcher") ComponentWorker = component("worker") ComponentWorkerManager = component("worker-manager") ComponentPerNSWorkerManager = component("perns-worker-manager") diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index d6e72f8ccda..512597fe5f5 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -49,7 +49,6 @@ var ( type batchProcessorConfig struct { namespace string adjustedQuery string - rps dynamicconfig.IntPropertyFnWithNamespaceFilter concurrency int initialPageToken []byte initialExecutions []*commonpb.WorkflowExecution @@ -158,14 +157,12 @@ func (a *activities) processWorkflowsWithProactiveFetching( ctx context.Context, config batchProcessorConfig, startWorkerProcessor batchWorkerProcessor, + rateLimiter quotas.RateLimiter, sdkClient sdkclient.Client, metricsHandler metrics.Handler, logger log.Logger, hbd HeartBeatDetails, ) (HeartBeatDetails, error) { - rateLimiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { - return float64(config.rps(config.namespace)) - }) concurrency := int(math.Max(1, float64(config.concurrency))) @@ -264,10 +261,11 @@ func (a *activities) processWorkflowsWithProactiveFetching( type activities struct { activityDeps - namespace namespace.Name - namespaceID namespace.ID - rps dynamicconfig.IntPropertyFnWithNamespaceFilter - concurrency dynamicconfig.IntPropertyFnWithNamespaceFilter + namespace namespace.Name + namespaceID namespace.ID + rps dynamicconfig.IntPropertyFnWithNamespaceFilter + concurrency dynamicconfig.IntPropertyFnWithNamespaceFilter + adminBatcherHostRateLimiter quotas.RateLimiter } // checkNamespace validates that batchParams targets the worker's own namespace. @@ -319,6 +317,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams var visibilityQuery string var executions []*commonpb.WorkflowExecution + // Admin batch uses the host level rate limiter which applies across all namespaces and all admin batch workflows. + rateLimiter := a.adminBatcherHostRateLimiter + if batchParams.AdminRequest != nil { ctx = headers.SetCallerType(ctx, headers.CallerTypePreemptable) adminReq := batchParams.AdminRequest @@ -327,6 +328,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams } else { visibilityQuery = a.adjustQueryBatchTypeEnum(batchParams.Request.VisibilityQuery, batchParams.BatchType) executions = batchParams.Request.Executions + rateLimiter = quotas.NewDefaultOutgoingRateLimiter(func() float64 { + return float64(a.rps(ns)) + }) } if startOver { @@ -353,7 +357,6 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams config := batchProcessorConfig{ namespace: ns, adjustedQuery: visibilityQuery, - rps: a.rps, concurrency: a.getOperationConcurrency(int(batchParams.Concurrency)), initialPageToken: hbd.PageToken, initialExecutions: executions, @@ -373,7 +376,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams a.startTaskProcessor(ctx, batchParams, ns, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger) } - return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) + return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, rateLimiter, sdkClient, metricsHandler, logger, hbd) } func (a *activities) getActivityLogger(ctx context.Context) log.Logger { diff --git a/service/worker/batcher/fx.go b/service/worker/batcher/fx.go index 9259d84c422..9d7a9f5aef6 100644 --- a/service/worker/batcher/fx.go +++ b/service/worker/batcher/fx.go @@ -7,8 +7,12 @@ import ( "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/quotas" + "go.temporal.io/server/common/quotas/calculator" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/sdk" workercommon "go.temporal.io/server/service/worker/common" @@ -25,9 +29,10 @@ const ( type ( workerComponent struct { - activityDeps activityDeps - dc *dynamicconfig.Collection - enabledFeature dynamicconfig.BoolPropertyFnWithNamespaceFilter + activityDeps activityDeps + dc *dynamicconfig.Collection + adminBatcherHostRateLimiter quotas.RateLimiter + enabledFeature dynamicconfig.BoolPropertyFnWithNamespaceFilter } activityDeps struct { @@ -52,13 +57,24 @@ var Module = fx.Options( func NewResult( dc *dynamicconfig.Collection, + serviceResolver membership.ServiceResolver, params activityDeps, ) fxResult { + adminBatcherHostRateFn := calculator.NewLoggedCalculator( + calculator.ClusterAwareQuotaCalculator{ + MemberCounter: serviceResolver, + PerInstanceQuota: dynamicconfig.AdminBatcherHostRPS.Get(dc), + GlobalQuota: dynamicconfig.AdminBatcherGlobalRPS.Get(dc), + }, + log.With(params.Logger, tag.ComponentAdminBatcher, tag.ScopeHost), + ).GetQuota + return fxResult{ Component: &workerComponent{ - activityDeps: params, - dc: dc, - enabledFeature: dynamicconfig.EnableBatcherNamespace.Get(dc), + activityDeps: params, + dc: dc, + enabledFeature: dynamicconfig.EnableBatcherNamespace.Get(dc), + adminBatcherHostRateLimiter: quotas.NewDefaultOutgoingRateLimiter(adminBatcherHostRateFn), }, } } @@ -83,10 +99,11 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na func (s *workerComponent) activities(name namespace.Name, id namespace.ID) *activities { return &activities{ - activityDeps: s.activityDeps, - namespace: name, - namespaceID: id, - rps: dynamicconfig.BatcherRPS.Get(s.dc), - concurrency: dynamicconfig.BatcherConcurrency.Get(s.dc), + activityDeps: s.activityDeps, + namespace: name, + namespaceID: id, + rps: dynamicconfig.BatcherRPS.Get(s.dc), + concurrency: dynamicconfig.BatcherConcurrency.Get(s.dc), + adminBatcherHostRateLimiter: s.adminBatcherHostRateLimiter, } } From 502e9e6d071ce5f4e4d31fe1d81c55e1cae36624 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 5 Jun 2026 01:24:16 -0700 Subject: [PATCH 2/3] use RequestRateLimiter, allow fx injection --- service/worker/batcher/activities.go | 33 +++++---- .../batcher/activities_namespace_test.go | 4 +- service/worker/batcher/activities_test.go | 8 +-- service/worker/batcher/fx.go | 70 +++++++++++-------- 4 files changed, 64 insertions(+), 51 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 512597fe5f5..df50f5ae2c6 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -43,6 +43,10 @@ const ( var ( errNamespaceMismatch = errors.New("namespace mismatch") + + batchQuotaRequest = quotas.Request{ + Token: 1, + } ) // batchProcessorConfig holds the configuration for batch processing @@ -59,7 +63,7 @@ type batchWorkerProcessor func( ctx context.Context, taskCh chan task, respCh chan taskResponse, - rateLimiter quotas.RateLimiter, + rateLimiter quotas.RequestRateLimiter, sdkClient sdkclient.Client, frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, @@ -157,7 +161,7 @@ func (a *activities) processWorkflowsWithProactiveFetching( ctx context.Context, config batchProcessorConfig, startWorkerProcessor batchWorkerProcessor, - rateLimiter quotas.RateLimiter, + rateLimiter quotas.RequestRateLimiter, sdkClient sdkclient.Client, metricsHandler metrics.Handler, logger log.Logger, @@ -261,11 +265,10 @@ func (a *activities) processWorkflowsWithProactiveFetching( type activities struct { activityDeps - namespace namespace.Name - namespaceID namespace.ID - rps dynamicconfig.IntPropertyFnWithNamespaceFilter - concurrency dynamicconfig.IntPropertyFnWithNamespaceFilter - adminBatcherHostRateLimiter quotas.RateLimiter + namespace namespace.Name + namespaceID namespace.ID + rps dynamicconfig.IntPropertyFnWithNamespaceFilter + concurrency dynamicconfig.IntPropertyFnWithNamespaceFilter } // checkNamespace validates that batchParams targets the worker's own namespace. @@ -318,7 +321,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams var executions []*commonpb.WorkflowExecution // Admin batch uses the host level rate limiter which applies across all namespaces and all admin batch workflows. - rateLimiter := a.adminBatcherHostRateLimiter + rateLimiter := quotas.RequestRateLimiter(a.AdminBatcherRateLimiter) if batchParams.AdminRequest != nil { ctx = headers.SetCallerType(ctx, headers.CallerTypePreemptable) @@ -328,9 +331,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams } else { visibilityQuery = a.adjustQueryBatchTypeEnum(batchParams.Request.VisibilityQuery, batchParams.BatchType) executions = batchParams.Request.Executions - rateLimiter = quotas.NewDefaultOutgoingRateLimiter(func() float64 { + rateLimiter = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return float64(a.rps(ns)) - }) + })) } if startOver { @@ -367,7 +370,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams ctx context.Context, taskCh chan task, respCh chan taskResponse, - rateLimiter quotas.RateLimiter, + rateLimiter quotas.RequestRateLimiter, sdkClient sdkclient.Client, frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, @@ -423,7 +426,7 @@ func (a *activities) startTaskProcessor( namespace string, taskCh chan task, respCh chan taskResponse, - limiter quotas.RateLimiter, + limiter quotas.RequestRateLimiter, sdkClient sdkclient.Client, frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, @@ -676,7 +679,7 @@ func (a *activities) processAdminTask( ctx context.Context, batchOperation *batchspb.BatchOperationInput, task task, - limiter quotas.RateLimiter, + limiter quotas.RequestRateLimiter, ) error { adminReq := batchOperation.AdminRequest switch adminReq.Operation.(type) { @@ -704,11 +707,11 @@ func (a *activities) processAdminTask( func processTask( ctx context.Context, - limiter quotas.RateLimiter, + limiter quotas.RequestRateLimiter, task task, procFn func(*workflowpb.WorkflowExecutionInfo) error, ) error { - err := limiter.Wait(ctx) + err := limiter.Wait(ctx, batchQuotaRequest) if err != nil { return err } diff --git a/service/worker/batcher/activities_namespace_test.go b/service/worker/batcher/activities_namespace_test.go index 3698a722381..000f1b6bfa8 100644 --- a/service/worker/batcher/activities_namespace_test.go +++ b/service/worker/batcher/activities_namespace_test.go @@ -142,8 +142,8 @@ func TestStartTaskProcessor_UsesWorkerBoundNamespaceForSignal(t *testing.T) { go func() { defer close(done) a.startTaskProcessor(ctx, batchOp, ns, taskCh, respCh, - quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 1e9 }), nil, mockFE, - metrics.NoopMetricsHandler, log.NewTestLogger()) + quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 1e9 })), + nil, mockFE, metrics.NoopMetricsHandler, log.NewTestLogger()) }() <-respCh diff --git a/service/worker/batcher/activities_test.go b/service/worker/batcher/activities_test.go index ea4d92c0c96..c08874767bb 100644 --- a/service/worker/batcher/activities_test.go +++ b/service/worker/batcher/activities_test.go @@ -451,7 +451,7 @@ func (s *activitiesSuite) TestProcessAdminTask_RefreshWorkflowTasks() { }, } - limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }) + limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })) // Expect RefreshWorkflowTasks to be called with correct parameters mockHistoryClient.EXPECT().RefreshWorkflowTasks(gomock.Any(), gomock.Any()).DoAndReturn( @@ -497,7 +497,7 @@ func (s *activitiesSuite) TestProcessAdminTask_RefreshWorkflowTasks_Error() { }, } - limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }) + limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })) expectedErr := errors.New("refresh failed") // Use gomock.Any() for context since it's modified with CallerTypePreemptable header @@ -617,7 +617,7 @@ func (s *activitiesSuite) TestStartTaskProcessor_SignalUsesWorkerNamespace() { taskCh := make(chan task, 1) respCh := make(chan taskResponse, 1) - limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }) + limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })) // The signal must be executed with the worker's trusted namespace, not the user-supplied one. s.mockFrontendClient.EXPECT(). @@ -657,7 +657,7 @@ func (s *activitiesSuite) TestProcessAdminTask_UnknownOperation() { }, } - limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }) + limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })) err := a.processAdminTask(ctx, batchOperation, testTask, limiter) s.Require().Error(err) diff --git a/service/worker/batcher/fx.go b/service/worker/batcher/fx.go index 9d7a9f5aef6..71b5aaeef4f 100644 --- a/service/worker/batcher/fx.go +++ b/service/worker/batcher/fx.go @@ -28,21 +28,23 @@ const ( ) type ( + AdminBatcherRateLimiter quotas.RequestRateLimiter + workerComponent struct { - activityDeps activityDeps - dc *dynamicconfig.Collection - adminBatcherHostRateLimiter quotas.RateLimiter - enabledFeature dynamicconfig.BoolPropertyFnWithNamespaceFilter + activityDeps activityDeps + dc *dynamicconfig.Collection + enabledFeature dynamicconfig.BoolPropertyFnWithNamespaceFilter } activityDeps struct { fx.In - MetricsHandler metrics.Handler - Logger log.Logger - ClientFactory sdk.ClientFactory - FrontendClient workflowservice.WorkflowServiceClient - AdminClient adminservice.AdminServiceClient - HistoryClient resource.HistoryClient + MetricsHandler metrics.Handler + Logger log.Logger + ClientFactory sdk.ClientFactory + FrontendClient workflowservice.WorkflowServiceClient + AdminClient adminservice.AdminServiceClient + HistoryClient resource.HistoryClient + AdminBatcherRateLimiter AdminBatcherRateLimiter } fxResult struct { @@ -52,29 +54,38 @@ type ( ) var Module = fx.Options( + fx.Provide(AdminBatcherRateLimiterProvider), fx.Provide(NewResult), ) -func NewResult( +func AdminBatcherRateLimiterProvider( dc *dynamicconfig.Collection, serviceResolver membership.ServiceResolver, + logger log.Logger, +) AdminBatcherRateLimiter { + return quotas.NewRequestRateLimiterAdapter( + quotas.NewDefaultOutgoingRateLimiter( + calculator.NewLoggedCalculator( + calculator.ClusterAwareQuotaCalculator{ + MemberCounter: serviceResolver, + PerInstanceQuota: dynamicconfig.AdminBatcherHostRPS.Get(dc), + GlobalQuota: dynamicconfig.AdminBatcherGlobalRPS.Get(dc), + }, + log.With(logger, tag.ComponentAdminBatcher, tag.ScopeHost), + ).GetQuota, + ), + ) +} + +func NewResult( + dc *dynamicconfig.Collection, params activityDeps, ) fxResult { - adminBatcherHostRateFn := calculator.NewLoggedCalculator( - calculator.ClusterAwareQuotaCalculator{ - MemberCounter: serviceResolver, - PerInstanceQuota: dynamicconfig.AdminBatcherHostRPS.Get(dc), - GlobalQuota: dynamicconfig.AdminBatcherGlobalRPS.Get(dc), - }, - log.With(params.Logger, tag.ComponentAdminBatcher, tag.ScopeHost), - ).GetQuota - return fxResult{ Component: &workerComponent{ - activityDeps: params, - dc: dc, - enabledFeature: dynamicconfig.EnableBatcherNamespace.Get(dc), - adminBatcherHostRateLimiter: quotas.NewDefaultOutgoingRateLimiter(adminBatcherHostRateFn), + activityDeps: params, + dc: dc, + enabledFeature: dynamicconfig.EnableBatcherNamespace.Get(dc), }, } } @@ -99,11 +110,10 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na func (s *workerComponent) activities(name namespace.Name, id namespace.ID) *activities { return &activities{ - activityDeps: s.activityDeps, - namespace: name, - namespaceID: id, - rps: dynamicconfig.BatcherRPS.Get(s.dc), - concurrency: dynamicconfig.BatcherConcurrency.Get(s.dc), - adminBatcherHostRateLimiter: s.adminBatcherHostRateLimiter, + activityDeps: s.activityDeps, + namespace: name, + namespaceID: id, + rps: dynamicconfig.BatcherRPS.Get(s.dc), + concurrency: dynamicconfig.BatcherConcurrency.Get(s.dc), } } From a7dad562b2ef2ed0bb393f9e41c6a2a5e1dab437 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 5 Jun 2026 10:58:58 -0700 Subject: [PATCH 3/3] fix typo --- common/dynamicconfig/constants.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 2c4a9e77202..42631c07a2b 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -3237,12 +3237,12 @@ When enabled, the scavenger will delete completed workflow execution data that a AdminBatcherHostRPS = NewGlobalIntSetting( "worker.adminBatcherHostRPS", 100, - `AdminBatcherHostRPS controls number the rps of all admin batch operations per host`, + `AdminBatcherHostRPS controls the rps of all admin batch operations per host`, ) AdminBatcherGlobalRPS = NewGlobalIntSetting( "worker.adminBatcherGlobalRPS", 0, - `AdminBatcherGlobalRPS controls number the rps of all admin batch operations across all worker hosts. + `AdminBatcherGlobalRPS controls the rps of all admin batch operations across all worker hosts. The configured value will be divided by the number of worker hosts to get the per host rps limit. 0 means no global limit and each host will use AdminBatcherHostRPS.`, )