diff --git a/chasm/lib/scheduler/invoker.go b/chasm/lib/scheduler/invoker.go index 41f51c69f0a..2a12684ecdb 100644 --- a/chasm/lib/scheduler/invoker.go +++ b/chasm/lib/scheduler/invoker.go @@ -65,10 +65,12 @@ type processBufferResult struct { discardStarts []*schedulespb.BufferedStart // Number of buffered starts dropped due to overlap policy during processing. - overlapSkipped int64 + overlapSkipped int64 + overlapSkippedByPolicy map[enumspb.ScheduleOverlapPolicy]int64 - // Nunmber of buffered starts dropped from missing the catchup window. - missedCatchupWindow int64 + // Number of buffered starts dropped from missing the catchup window, + // bucketed by whether a running action contributed to the miss. + missedCatchupByActionRunning map[bool]int64 } // recordProcessBufferResult updates the Invoker's internal state based on result, as well as the @@ -268,8 +270,10 @@ func (i *Invoker) recordCompletedAction( // Update DesiredTime on the first pending start for metrics. DesiredTime is used // to drive action latency between buffered starts (the time it takes between - // completing one start and kicking off the next). We set that on the first start - // pending execution. + // completing one start and kicking off the next). It also signals in processBuffer + // that this start was blocked behind a running action: if DesiredTime (the previous + // action's CloseTime) is past the start's catchup deadline, the previous action's + // duration caused the miss. idx := slices.IndexFunc(i.BufferedStarts, func(start *schedulespb.BufferedStart) bool { return start.Attempt == 0 }) diff --git a/chasm/lib/scheduler/invoker_tasks.go b/chasm/lib/scheduler/invoker_tasks.go index b62ee8a1c81..4069bc3cfc8 100644 --- a/chasm/lib/scheduler/invoker_tasks.go +++ b/chasm/lib/scheduler/invoker_tasks.go @@ -402,10 +402,25 @@ func (h *InvokerProcessBufferTaskHandler) Execute( result := h.processBuffer(ctx, invoker, scheduler) // Update Scheduler metadata. + var totalMissedCatchup int64 + for _, count := range result.missedCatchupByActionRunning { + totalMissedCatchup += count + } scheduler.recordActionResult(&schedulerActionResult{ overlapSkipped: result.overlapSkipped, - missedCatchupWindow: result.missedCatchupWindow, + missedCatchupWindow: totalMissedCatchup, }) + for overlapPolicy, count := range result.overlapSkippedByPolicy { + newTaggedMetricsHandler(h.metricsHandler, scheduler).WithTags( + metrics.StringTag(metrics.ScheduleOverlapPolicyTag, overlapPolicy.String()), + ).Counter(metrics.ScheduleOverlapSkipped.Name()).Record(count) + } + for actionRunning, count := range result.missedCatchupByActionRunning { + newTaggedMetricsHandler(h.metricsHandler, scheduler).WithTags( + metrics.StringTag(metrics.ScheduleMissedReasonTag, metrics.ScheduleMissedReasonBufferExpired), + metrics.StringTag(metrics.ScheduleActionRunningTag, fmt.Sprintf("%t", actionRunning)), + ).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(count) + } // Update internal state and create new tasks. invoker.recordProcessBufferResult(ctx, &result) @@ -423,6 +438,7 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer( ) (result processBufferResult) { runningWorkflows := invoker.runningWorkflowExecutions() isRunning := len(runningWorkflows) > 0 + result.missedCatchupByActionRunning = make(map[bool]int64) // Processing completely ignores any BufferedStart that's already executing/backing off. pendingBufferedStarts := util.FilterSlice(invoker.GetBufferedStarts(), func(start *schedulespb.BufferedStart) bool { @@ -447,13 +463,28 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer( // Update result metrics. result.overlapSkipped = action.OverlapSkipped + result.overlapSkippedByPolicy = action.OverlapSkippedByPolicy // Add starting workflows to result, trim others. Catchup-window expiry is // checked before useScheduledAction so that a start past its catchup // window doesn't consume a LimitedActions slot. for _, start := range readyStarts { - if ctx.Now(invoker).After(h.startWorkflowDeadline(ctx, scheduler, start)) { - result.missedCatchupWindow++ + deadline := h.startWorkflowDeadline(ctx, scheduler, start) + if ctx.Now(invoker).After(deadline) { + // Action was buffered in time but expired before execution + // (e.g., due to overlap deferral, retries, or system delay). + // Only emit the metric if the schedule would have run this + // start -- skip paused or action-exhausted schedules. + if start.Manual || scheduler.useScheduledAction(false) { + // Determine if a running action contributed: either one is still + // running, or the previous action's CloseTime (stored in DesiredTime) + // was already past this start's deadline. + // Note: if no prior action completed, DesiredTime is zero-valued, + // so After(deadline) is false, correctly yielding actionRunning=false. + actionRunning := isRunning || + start.GetDesiredTime().AsTime().After(deadline) + result.missedCatchupByActionRunning[actionRunning]++ + } result.discardStarts = append(result.discardStarts, start) continue } @@ -608,6 +639,10 @@ func (h *InvokerExecuteTaskHandler) startWorkflow( metricsHandler. Timer(metrics.ScheduleActionDelay.Name()). Record(actualStartTime.Sub(desiredTime.AsTime())) + // Record total delay from original schedule time, including any overlap policy wait. + metricsHandler. + Timer(metrics.ScheduleActionE2EDelay.Name()). + Record(actualStartTime.Sub(start.ActualTime.AsTime())) } return nil } diff --git a/chasm/lib/scheduler/scheduler.go b/chasm/lib/scheduler/scheduler.go index 0de9856704d..b042b029248 100644 --- a/chasm/lib/scheduler/scheduler.go +++ b/chasm/lib/scheduler/scheduler.go @@ -19,6 +19,7 @@ import ( "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/contextutil" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/util" @@ -568,6 +569,16 @@ func (s *Scheduler) HandleNexusCompletion( return nil } + // Record how long it took for the callback to arrive after the action completed. + // Use ctx.Now instead of time.Since to use a consistent time source across nodes, + // and clamp to zero in case of clock skew. + if closeTime := info.GetCloseTime().AsTime(); !closeTime.IsZero() { + latency := max(0, ctx.Now(s).Sub(closeTime)) + newTaggedMetricsHandler(ctx.MetricsHandler(), s). + Timer(metrics.ScheduleCallbackLatency.Name()). + Record(latency) + } + // Handle last completed/failed status and payloads. // // TODO - also record payload sizes once we have metrics wired into CHASM context. diff --git a/chasm/lib/scheduler/spec_processor.go b/chasm/lib/scheduler/spec_processor.go index 61585781b80..f3b60d48e11 100644 --- a/chasm/lib/scheduler/spec_processor.go +++ b/chasm/lib/scheduler/spec_processor.go @@ -123,6 +123,7 @@ func (s *SpecProcessorImpl) ProcessTimeRange( var err error var bufferedStarts []*schedulespb.BufferedStart var droppedCount int64 + recordedGenerateLatency := false limitReached := false for next, err = s.NextTime(scheduler, start); err == nil && (!next.Next.IsZero() && !next.Next.After(end)); next, err = s.NextTime(scheduler, next.Next) { lastAction = next.Next @@ -138,21 +139,28 @@ func (s *SpecProcessorImpl) ProcessTimeRange( continue } + // Record generate latency only for the first action in the batch to + // avoid inflating the metric when catching up over a large time range. + if !manual && !recordedGenerateLatency { + metricsHandler.Timer(metrics.ScheduleGenerateLatency.Name()). + Record(end.Sub(next.Next)) + recordedGenerateLatency = true + } + if !manual && end.Sub(next.Next) > catchupWindow { s.logger.Info("Schedule missed catchup window", tag.Time("now", end), tag.Time("time", next.Next)) - metricsHandler.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(1) + // Action's nominal time was already past the catchup window when + // the generator processed the time range. It was never buffered. + metricsHandler.WithTags( + metrics.StringTag(metrics.ScheduleMissedReasonTag, metrics.ScheduleMissedReasonNotBuffered), + ).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(1) scheduler.Info.MissedCatchupWindow++ continue } - if !manual { - metricsHandler.Timer(metrics.ScheduleGenerateLatency.Name()). - Record(end.Sub(next.Next)) - } - if limitReached { droppedCount++ continue diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index d4bddc730d3..1c129a842cc 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -617,6 +617,11 @@ const ( ScheduleBackendChasm = "chasm" ScheduleBackendLegacy = "legacy" ScheduleBackendWorkflow = "workflow" + ScheduleOverlapPolicyTag = "schedule_overlap_policy" + ScheduleMissedReasonTag = "reason" + ScheduleMissedReasonNotBuffered = "not_buffered" + ScheduleMissedReasonBufferExpired = "buffer_expired" + ScheduleActionRunningTag = "action_running" ScheduleMigrationDirectionTag = "schedule_migration_direction" ScheduleMigrationDirectionToChasm = "to_chasm" ScheduleMigrationDirectionToWorkflow = "to_workflow" @@ -1446,6 +1451,10 @@ var ( "schedule_action_delay", WithDescription("Delay between when scheduled actions should/actually happen"), ) + ScheduleActionE2EDelay = NewTimerDef( + "schedule_action_e2e_delay", + WithDescription("End-to-end delay between the action's original schedule time and when it was actually started, including overlap policy wait"), + ) ScheduleGenerateLatency = NewTimerDef( "schedule_generate_latency", WithDescription("Delay between when a scheduled action was due and when the generator buffered it"), @@ -1466,6 +1475,14 @@ var ( "schedule_migration_failed", WithDescription("The number of times a schedule migration fails"), ) + ScheduleOverlapSkipped = NewCounterDef( + "schedule_overlap_skipped", + WithDescription("The number of schedule actions skipped due to overlap policy"), + ) + ScheduleCallbackLatency = NewTimerDef( + "schedule_callback_latency", + WithDescription("Latency between a scheduled action completing and the scheduler receiving the completion callback"), + ) // Worker Versioning WorkerDeploymentCreated = NewCounterDef("worker_deployment_created") diff --git a/service/worker/scheduler/buffer.go b/service/worker/scheduler/buffer.go index ddea9f2a83c..d2a20a2ecdb 100644 --- a/service/worker/scheduler/buffer.go +++ b/service/worker/scheduler/buffer.go @@ -22,7 +22,8 @@ type ( NeedCancel bool NeedTerminate bool // Stats - OverlapSkipped int64 + OverlapSkipped int64 + OverlapSkippedByPolicy map[enumspb.ScheduleOverlapPolicy]int64 } ) @@ -40,6 +41,7 @@ func ProcessBuffer[T Overlappable]( // affect them. var action ProcessBufferResult[T] + action.OverlapSkippedByPolicy = make(map[enumspb.ScheduleOverlapPolicy]int64) var zeroVal T for _, start := range buffer { @@ -64,12 +66,14 @@ func ProcessBuffer[T Overlappable]( case enumspb.SCHEDULE_OVERLAP_POLICY_SKIP: // just skip action.OverlapSkipped++ + action.OverlapSkippedByPolicy[overlapPolicy]++ case enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE: // allow one (the first one) in the buffer if len(action.NewBuffer) == 0 { action.NewBuffer = append(action.NewBuffer, start) } else { action.OverlapSkipped++ + action.OverlapSkippedByPolicy[overlapPolicy]++ } case enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL: // always add to buffer diff --git a/service/worker/scheduler/buffer_test.go b/service/worker/scheduler/buffer_test.go index 85750e5cc9b..af48ac60624 100644 --- a/service/worker/scheduler/buffer_test.go +++ b/service/worker/scheduler/buffer_test.go @@ -51,6 +51,10 @@ func (s *processBufferSuite) TestProcessSkipRunning() { s.Empty(action.NewBuffer) s.False(action.NeedCancel) s.False(action.NeedTerminate) + s.Equal(int64(3), action.OverlapSkipped) + s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{ + enumspb.SCHEDULE_OVERLAP_POLICY_SKIP: 3, + }, action.OverlapSkippedByPolicy) } func (s *processBufferSuite) TestProcessSkipNotRunning() { @@ -71,6 +75,10 @@ func (s *processBufferSuite) TestProcessBufferOneRunning() { s.Equal([]int{3}, jobIds(action.NewBuffer)) s.False(action.NeedCancel) s.False(action.NeedTerminate) + s.Equal(int64(2), action.OverlapSkipped) + s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{ + enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE: 2, + }, action.OverlapSkippedByPolicy) } func (s *processBufferSuite) TestProcessBufferOneNotRunning() { @@ -168,4 +176,28 @@ func (s *processBufferSuite) TestProcessWithResolve() { s.False(action.NeedTerminate) } -// TODO: add test cases for mixed policies +func (s *processBufferSuite) TestProcessMixedPoliciesTracksSkippedByResolvedPolicy() { + buffer := []*job{ + {3, enumspb.SCHEDULE_OVERLAP_POLICY_SKIP}, + {5, enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE}, + {7, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED}, + {9, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED}, + } + resolve := func(policy enumspb.ScheduleOverlapPolicy) enumspb.ScheduleOverlapPolicy { + if policy == enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED { + return enumspb.SCHEDULE_OVERLAP_POLICY_SKIP + } + return policy + } + + action := ProcessBuffer(buffer, true, resolve) + s.Empty(action.OverlappingStarts) + s.Nil(action.NonOverlappingStart) + s.Equal([]int{5}, jobIds(action.NewBuffer)) + s.False(action.NeedCancel) + s.False(action.NeedTerminate) + s.Equal(int64(3), action.OverlapSkipped) + s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{ + enumspb.SCHEDULE_OVERLAP_POLICY_SKIP: 3, + }, action.OverlapSkippedByPolicy) +} diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index d237197e679..b647c415d7b 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -685,6 +685,7 @@ func (s *scheduler) processTimeRange( } lastAction := start + recordedGenerateLatency := false var next GetNextTimeResult for next = s.getNextTime(start); !(next.Next.IsZero() || next.Next.After(end)); next = s.getNextTime(next.Next) { if !s.hasMinVersion(BatchAndCacheTimeQueries) && !s.canTakeScheduledAction(manual, false) { @@ -696,9 +697,21 @@ func (s *scheduler) processTimeRange( // hasMinVersion because this condition couldn't happen in previous versions. continue } + // Record generate latency only for the first action in the batch to + // avoid inflating the metric when catching up over a large time range. + if !manual && !recordedGenerateLatency { + s.metrics.Timer(metrics.ScheduleGenerateLatency.Name()).Record(end.Sub(next.Next)) + recordedGenerateLatency = true + } if !manual && end.Sub(next.Next) > catchupWindow { s.logger.Warn("Schedule missed catchup window", "now", end, "time", next.Next) - s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1) + // Action's nominal time was already past the catchup window when + // the scheduler woke up. It was never buffered for execution. + // action_running is not included: the action was never a candidate + // for execution, so whether something is running is irrelevant. + s.metrics.WithTags(map[string]string{ + metrics.ScheduleMissedReasonTag: metrics.ScheduleMissedReasonNotBuffered, + }).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1) s.Info.MissedCatchupWindow++ continue } @@ -921,7 +934,11 @@ func (s *scheduler) processWatcherResult(id string, f workflow.Future, long bool } } - // Update desired time of next start if it's buffered. This is used for metrics only. + // Update desired time of next start if it's buffered. This is used to + // compute ScheduleActionDelay (time between completing one action and + // starting the next). The legacy path doesn't use deferred starts + // (Attempt == -1) like CHASM, so BufferedStarts[0] is always the next + // pending start. if long && len(s.State.BufferedStarts) > 0 { s.State.BufferedStarts[0].DesiredTime = res.CloseTime } @@ -1354,6 +1371,11 @@ func (s *scheduler) processBuffer() bool { s.State.BufferedStarts = action.NewBuffer s.Info.OverlapSkipped += action.OverlapSkipped + for overlapPolicy, count := range action.OverlapSkippedByPolicy { + s.metrics.WithTags(map[string]string{ + metrics.ScheduleOverlapPolicyTag: overlapPolicy.String(), + }).Counter(metrics.ScheduleOverlapSkipped.Name()).Inc(count) + } // Try starting whatever we're supposed to start now allStarts := action.OverlappingStarts @@ -1501,6 +1523,8 @@ func (s *scheduler) startWorkflow( // record metric only for _scheduled_ actions, not trigger/backfill, otherwise it's not meaningful desiredTime := cmp.Or(start.DesiredTime, start.ActualTime) s.metrics.Timer(metrics.ScheduleActionDelay.Name()).Record(res.RealStartTime.AsTime().Sub(desiredTime.AsTime())) + // Record total delay from original schedule time, including any overlap policy wait. + s.metrics.Timer(metrics.ScheduleActionE2EDelay.Name()).Record(res.RealStartTime.AsTime().Sub(start.ActualTime.AsTime())) } actionResult := &schedulepb.ScheduleActionResult{