Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3227,12 +3227,24 @@ When enabled, the scavenger will delete completed workflow execution data that a
BatcherRPS = NewNamespaceIntSetting(
"worker.batcherRPS",
50,
`BatcherRPS controls number the rps of batch operations`,
`BatcherRPS controls number the rps of one batch operation`,
)
BatcherConcurrency = NewNamespaceIntSetting(
"worker.batcherConcurrency",
5,
`BatcherConcurrency controls the concurrency of one batch operation`,
`BatcherConcurrency controls the concurrency of one batch or admin batch operation`,
)
AdminBatcherHostRPS = NewGlobalIntSetting(
"worker.adminBatcherHostRPS",
100,
`AdminBatcherHostRPS controls number the rps of all admin batch operations per host`,
Comment thread
yycptt marked this conversation as resolved.
Outdated
)
AdminBatcherGlobalRPS = NewGlobalIntSetting(
"worker.adminBatcherGlobalRPS",
0,
`AdminBatcherGlobalRPS controls number the rps of all admin batch operations across all worker hosts.
Comment thread
yycptt marked this conversation as resolved.
Outdated
The configured value will be divided by the number of worker hosts to get the per host rps limit.
0 means no global limit and each host will use AdminBatcherHostRPS.`,
)
WorkerParentCloseMaxConcurrentActivityExecutionSize = NewGlobalIntSetting(
"worker.ParentCloseMaxConcurrentActivityExecutionSize",
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var (
ComponentESVisibilityManager = component("es-visibility-manager")
ComponentArchiver = component("archiver")
ComponentBatcher = component("batcher")
ComponentAdminBatcher = component("admin-batcher")
ComponentWorker = component("worker")
ComponentWorkerManager = component("worker-manager")
ComponentPerNSWorkerManager = component("perns-worker-manager")
Expand Down
23 changes: 13 additions & 10 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ var (
type batchProcessorConfig struct {
namespace string
adjustedQuery string
rps dynamicconfig.IntPropertyFnWithNamespaceFilter
concurrency int
initialPageToken []byte
initialExecutions []*commonpb.WorkflowExecution
Expand Down Expand Up @@ -158,14 +157,12 @@ func (a *activities) processWorkflowsWithProactiveFetching(
ctx context.Context,
config batchProcessorConfig,
startWorkerProcessor batchWorkerProcessor,
rateLimiter quotas.RateLimiter,
sdkClient sdkclient.Client,
metricsHandler metrics.Handler,
logger log.Logger,
hbd HeartBeatDetails,
) (HeartBeatDetails, error) {
rateLimiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 {
return float64(config.rps(config.namespace))
})

concurrency := int(math.Max(1, float64(config.concurrency)))

Expand Down Expand Up @@ -264,10 +261,11 @@ func (a *activities) processWorkflowsWithProactiveFetching(

type activities struct {
activityDeps
namespace namespace.Name
namespaceID namespace.ID
rps dynamicconfig.IntPropertyFnWithNamespaceFilter
concurrency dynamicconfig.IntPropertyFnWithNamespaceFilter
namespace namespace.Name
namespaceID namespace.ID
rps dynamicconfig.IntPropertyFnWithNamespaceFilter
concurrency dynamicconfig.IntPropertyFnWithNamespaceFilter
adminBatcherHostRateLimiter quotas.RateLimiter
}

// checkNamespace validates that batchParams targets the worker's own namespace.
Expand Down Expand Up @@ -319,6 +317,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
var visibilityQuery string
var executions []*commonpb.WorkflowExecution

// Admin batch uses the host level rate limiter which applies across all namespaces and all admin batch workflows.
rateLimiter := a.adminBatcherHostRateLimiter

if batchParams.AdminRequest != nil {
ctx = headers.SetCallerType(ctx, headers.CallerTypePreemptable)
adminReq := batchParams.AdminRequest
Expand All @@ -327,6 +328,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
} else {
visibilityQuery = a.adjustQueryBatchTypeEnum(batchParams.Request.VisibilityQuery, batchParams.BatchType)
executions = batchParams.Request.Executions
rateLimiter = quotas.NewDefaultOutgoingRateLimiter(func() float64 {
return float64(a.rps(ns))
})
}

if startOver {
Expand All @@ -353,7 +357,6 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
config := batchProcessorConfig{
namespace: ns,
adjustedQuery: visibilityQuery,
rps: a.rps,
concurrency: a.getOperationConcurrency(int(batchParams.Concurrency)),
initialPageToken: hbd.PageToken,
initialExecutions: executions,
Expand All @@ -373,7 +376,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
a.startTaskProcessor(ctx, batchParams, ns, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger)
}

return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd)
return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, rateLimiter, sdkClient, metricsHandler, logger, hbd)
}

func (a *activities) getActivityLogger(ctx context.Context) log.Logger {
Expand Down
39 changes: 28 additions & 11 deletions service/worker/batcher/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/quotas/calculator"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
workercommon "go.temporal.io/server/service/worker/common"
Expand All @@ -25,9 +29,10 @@ const (

type (
workerComponent struct {
activityDeps activityDeps
dc *dynamicconfig.Collection
enabledFeature dynamicconfig.BoolPropertyFnWithNamespaceFilter
activityDeps activityDeps
dc *dynamicconfig.Collection
adminBatcherHostRateLimiter quotas.RateLimiter
enabledFeature dynamicconfig.BoolPropertyFnWithNamespaceFilter
}

activityDeps struct {
Expand All @@ -52,13 +57,24 @@ var Module = fx.Options(

func NewResult(
dc *dynamicconfig.Collection,
serviceResolver membership.ServiceResolver,
params activityDeps,
) fxResult {
adminBatcherHostRateFn := calculator.NewLoggedCalculator(
calculator.ClusterAwareQuotaCalculator{
MemberCounter: serviceResolver,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this pay attention to the per-ns worker count? you can get the number from the workercommon.RegistrationDetails passed to Register, and this worker's share is Multiplicity / TotalWorkers

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm not really? this is limit across all namespaces and all admin batch jobs.

Copy link
Copy Markdown
Member Author

@yycptt yycptt Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I will check if I can just use the distributed rate limit here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, I misunderstood the scope

PerInstanceQuota: dynamicconfig.AdminBatcherHostRPS.Get(dc),
GlobalQuota: dynamicconfig.AdminBatcherGlobalRPS.Get(dc),
},
log.With(params.Logger, tag.ComponentAdminBatcher, tag.ScopeHost),
).GetQuota

return fxResult{
Component: &workerComponent{
activityDeps: params,
dc: dc,
enabledFeature: dynamicconfig.EnableBatcherNamespace.Get(dc),
activityDeps: params,
dc: dc,
enabledFeature: dynamicconfig.EnableBatcherNamespace.Get(dc),
adminBatcherHostRateLimiter: quotas.NewDefaultOutgoingRateLimiter(adminBatcherHostRateFn),
},
}
}
Expand All @@ -83,10 +99,11 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na

func (s *workerComponent) activities(name namespace.Name, id namespace.ID) *activities {
return &activities{
activityDeps: s.activityDeps,
namespace: name,
namespaceID: id,
rps: dynamicconfig.BatcherRPS.Get(s.dc),
concurrency: dynamicconfig.BatcherConcurrency.Get(s.dc),
activityDeps: s.activityDeps,
namespace: name,
namespaceID: id,
rps: dynamicconfig.BatcherRPS.Get(s.dc),
concurrency: dynamicconfig.BatcherConcurrency.Get(s.dc),
adminBatcherHostRateLimiter: s.adminBatcherHostRateLimiter,
}
}
Loading