Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3227,12 +3227,24 @@ When enabled, the scavenger will delete completed workflow execution data that a
BatcherRPS = NewNamespaceIntSetting(
"worker.batcherRPS",
50,
`BatcherRPS controls number the rps of batch operations`,
`BatcherRPS controls number the rps of one batch operation`,
)
BatcherConcurrency = NewNamespaceIntSetting(
"worker.batcherConcurrency",
5,
`BatcherConcurrency controls the concurrency of one batch operation`,
`BatcherConcurrency controls the concurrency of one batch or admin batch operation`,
)
AdminBatcherHostRPS = NewGlobalIntSetting(
"worker.adminBatcherHostRPS",
100,
`AdminBatcherHostRPS controls the rps of all admin batch operations per host`,
)
AdminBatcherGlobalRPS = NewGlobalIntSetting(
"worker.adminBatcherGlobalRPS",
0,
`AdminBatcherGlobalRPS controls the rps of all admin batch operations across all worker hosts.
The configured value will be divided by the number of worker hosts to get the per host rps limit.
0 means no global limit and each host will use AdminBatcherHostRPS.`,
)
WorkerParentCloseMaxConcurrentActivityExecutionSize = NewGlobalIntSetting(
"worker.ParentCloseMaxConcurrentActivityExecutionSize",
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var (
ComponentESVisibilityManager = component("es-visibility-manager")
ComponentArchiver = component("archiver")
ComponentBatcher = component("batcher")
ComponentAdminBatcher = component("admin-batcher")
ComponentWorker = component("worker")
ComponentWorkerManager = component("worker-manager")
ComponentPerNSWorkerManager = component("perns-worker-manager")
Expand Down
30 changes: 18 additions & 12 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ const (

var (
errNamespaceMismatch = errors.New("namespace mismatch")

batchQuotaRequest = quotas.Request{
Token: 1,
}
)

// batchProcessorConfig holds the configuration for batch processing
type batchProcessorConfig struct {
namespace string
adjustedQuery string
rps dynamicconfig.IntPropertyFnWithNamespaceFilter
concurrency int
initialPageToken []byte
initialExecutions []*commonpb.WorkflowExecution
Expand All @@ -60,7 +63,7 @@ type batchWorkerProcessor func(
ctx context.Context,
taskCh chan task,
respCh chan taskResponse,
rateLimiter quotas.RateLimiter,
rateLimiter quotas.RequestRateLimiter,
sdkClient sdkclient.Client,
frontendClient workflowservice.WorkflowServiceClient,
metricsHandler metrics.Handler,
Expand Down Expand Up @@ -158,14 +161,12 @@ func (a *activities) processWorkflowsWithProactiveFetching(
ctx context.Context,
config batchProcessorConfig,
startWorkerProcessor batchWorkerProcessor,
rateLimiter quotas.RequestRateLimiter,
sdkClient sdkclient.Client,
metricsHandler metrics.Handler,
logger log.Logger,
hbd HeartBeatDetails,
) (HeartBeatDetails, error) {
rateLimiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 {
return float64(config.rps(config.namespace))
})

concurrency := int(math.Max(1, float64(config.concurrency)))

Expand Down Expand Up @@ -319,6 +320,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
var visibilityQuery string
var executions []*commonpb.WorkflowExecution

// Admin batch uses the host level rate limiter which applies across all namespaces and all admin batch workflows.
rateLimiter := quotas.RequestRateLimiter(a.AdminBatcherRateLimiter)

if batchParams.AdminRequest != nil {
ctx = headers.SetCallerType(ctx, headers.CallerTypePreemptable)
adminReq := batchParams.AdminRequest
Expand All @@ -327,6 +331,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
} else {
visibilityQuery = a.adjustQueryBatchTypeEnum(batchParams.Request.VisibilityQuery, batchParams.BatchType)
executions = batchParams.Request.Executions
rateLimiter = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 {
return float64(a.rps(ns))
}))
}

if startOver {
Expand All @@ -353,7 +360,6 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
config := batchProcessorConfig{
namespace: ns,
adjustedQuery: visibilityQuery,
rps: a.rps,
concurrency: a.getOperationConcurrency(int(batchParams.Concurrency)),
initialPageToken: hbd.PageToken,
initialExecutions: executions,
Expand All @@ -364,7 +370,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
ctx context.Context,
taskCh chan task,
respCh chan taskResponse,
rateLimiter quotas.RateLimiter,
rateLimiter quotas.RequestRateLimiter,
sdkClient sdkclient.Client,
frontendClient workflowservice.WorkflowServiceClient,
metricsHandler metrics.Handler,
Expand All @@ -373,7 +379,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams
a.startTaskProcessor(ctx, batchParams, ns, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger)
}

return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd)
return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, rateLimiter, sdkClient, metricsHandler, logger, hbd)
}

func (a *activities) getActivityLogger(ctx context.Context) log.Logger {
Expand Down Expand Up @@ -420,7 +426,7 @@ func (a *activities) startTaskProcessor(
namespace string,
taskCh chan task,
respCh chan taskResponse,
limiter quotas.RateLimiter,
limiter quotas.RequestRateLimiter,
sdkClient sdkclient.Client,
frontendClient workflowservice.WorkflowServiceClient,
metricsHandler metrics.Handler,
Expand Down Expand Up @@ -673,7 +679,7 @@ func (a *activities) processAdminTask(
ctx context.Context,
batchOperation *batchspb.BatchOperationInput,
task task,
limiter quotas.RateLimiter,
limiter quotas.RequestRateLimiter,
) error {
adminReq := batchOperation.AdminRequest
switch adminReq.Operation.(type) {
Expand Down Expand Up @@ -701,11 +707,11 @@ func (a *activities) processAdminTask(

func processTask(
ctx context.Context,
limiter quotas.RateLimiter,
limiter quotas.RequestRateLimiter,
task task,
procFn func(*workflowpb.WorkflowExecutionInfo) error,
) error {
err := limiter.Wait(ctx)
err := limiter.Wait(ctx, batchQuotaRequest)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions service/worker/batcher/activities_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func TestStartTaskProcessor_UsesWorkerBoundNamespaceForSignal(t *testing.T) {
go func() {
defer close(done)
a.startTaskProcessor(ctx, batchOp, ns, taskCh, respCh,
quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 1e9 }), nil, mockFE,
metrics.NoopMetricsHandler, log.NewTestLogger())
quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 1e9 })),
nil, mockFE, metrics.NoopMetricsHandler, log.NewTestLogger())
}()

<-respCh
Expand Down
8 changes: 4 additions & 4 deletions service/worker/batcher/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (s *activitiesSuite) TestProcessAdminTask_RefreshWorkflowTasks() {
},
}

limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })
limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }))

// Expect RefreshWorkflowTasks to be called with correct parameters
mockHistoryClient.EXPECT().RefreshWorkflowTasks(gomock.Any(), gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -497,7 +497,7 @@ func (s *activitiesSuite) TestProcessAdminTask_RefreshWorkflowTasks_Error() {
},
}

limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })
limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }))

expectedErr := errors.New("refresh failed")
// Use gomock.Any() for context since it's modified with CallerTypePreemptable header
Expand Down Expand Up @@ -617,7 +617,7 @@ func (s *activitiesSuite) TestStartTaskProcessor_SignalUsesWorkerNamespace() {

taskCh := make(chan task, 1)
respCh := make(chan taskResponse, 1)
limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })
limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }))

// The signal must be executed with the worker's trusted namespace, not the user-supplied one.
s.mockFrontendClient.EXPECT().
Expand Down Expand Up @@ -657,7 +657,7 @@ func (s *activitiesSuite) TestProcessAdminTask_UnknownOperation() {
},
}

limiter := quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 })
limiter := quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100 }))

err := a.processAdminTask(ctx, batchOperation, testTask, limiter)
s.Require().Error(err)
Expand Down
39 changes: 33 additions & 6 deletions service/worker/batcher/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/quotas/calculator"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
workercommon "go.temporal.io/server/service/worker/common"
Expand All @@ -24,6 +28,8 @@ const (
)

type (
AdminBatcherRateLimiter quotas.RequestRateLimiter
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our internal implementation only implements RequestRateLimiter interface, not RateLimiter.


workerComponent struct {
activityDeps activityDeps
dc *dynamicconfig.Collection
Expand All @@ -32,12 +38,13 @@ type (

activityDeps struct {
fx.In
MetricsHandler metrics.Handler
Logger log.Logger
ClientFactory sdk.ClientFactory
FrontendClient workflowservice.WorkflowServiceClient
AdminClient adminservice.AdminServiceClient
HistoryClient resource.HistoryClient
MetricsHandler metrics.Handler
Logger log.Logger
ClientFactory sdk.ClientFactory
FrontendClient workflowservice.WorkflowServiceClient
AdminClient adminservice.AdminServiceClient
HistoryClient resource.HistoryClient
AdminBatcherRateLimiter AdminBatcherRateLimiter
}

fxResult struct {
Expand All @@ -47,9 +54,29 @@ type (
)

var Module = fx.Options(
fx.Provide(AdminBatcherRateLimiterProvider),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to inject a different implementation in cloud.

fx.Provide(NewResult),
)

func AdminBatcherRateLimiterProvider(
dc *dynamicconfig.Collection,
serviceResolver membership.ServiceResolver,
logger log.Logger,
) AdminBatcherRateLimiter {
return quotas.NewRequestRateLimiterAdapter(
quotas.NewDefaultOutgoingRateLimiter(
calculator.NewLoggedCalculator(
calculator.ClusterAwareQuotaCalculator{
MemberCounter: serviceResolver,
PerInstanceQuota: dynamicconfig.AdminBatcherHostRPS.Get(dc),
GlobalQuota: dynamicconfig.AdminBatcherGlobalRPS.Get(dc),
},
log.With(logger, tag.ComponentAdminBatcher, tag.ScopeHost),
).GetQuota,
),
)
}

func NewResult(
dc *dynamicconfig.Collection,
params activityDeps,
Expand Down
Loading