Skip to content
14 changes: 9 additions & 5 deletions chasm/lib/scheduler/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ type processBufferResult struct {
discardStarts []*schedulespb.BufferedStart

// Number of buffered starts dropped due to overlap policy during processing.
overlapSkipped int64
overlapSkipped int64
overlapSkippedByPolicy map[enumspb.ScheduleOverlapPolicy]int64

// Nunmber of buffered starts dropped from missing the catchup window.
missedCatchupWindow int64
// Number of buffered starts dropped from missing the catchup window,
// bucketed by whether a running action contributed to the miss.
missedCatchupByActionRunning map[bool]int64
}

// recordProcessBufferResult updates the Invoker's internal state based on result, as well as the
Expand Down Expand Up @@ -236,8 +238,10 @@ func (i *Invoker) recordCompletedAction(

// Update DesiredTime on the first pending start for metrics. DesiredTime is used
// to drive action latency between buffered starts (the time it takes between
// completing one start and kicking off the next). We set that on the first start
// pending execution.
// completing one start and kicking off the next). It also signals in processBuffer
// that this start was blocked behind a running action: if DesiredTime (the previous
// action's CloseTime) is past the start's catchup deadline, the previous action's
// duration caused the miss.
idx := slices.IndexFunc(i.BufferedStarts, func(start *schedulespb.BufferedStart) bool {
return start.Attempt == 0
})
Expand Down
38 changes: 34 additions & 4 deletions chasm/lib/scheduler/invoker_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,25 @@ func (h *InvokerProcessBufferTaskHandler) Execute(
result := h.processBuffer(ctx, invoker, scheduler)

// Update Scheduler metadata.
var totalMissedCatchup int64
for _, count := range result.missedCatchupByActionRunning {
totalMissedCatchup += count
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, isn't this just?

totalMissedCatchup := len(result.missedCatchupByActionRunning)

}
scheduler.recordActionResult(&schedulerActionResult{
overlapSkipped: result.overlapSkipped,
missedCatchupWindow: result.missedCatchupWindow,
missedCatchupWindow: totalMissedCatchup,
})
for overlapPolicy, count := range result.overlapSkippedByPolicy {
newTaggedMetricsHandler(h.metricsHandler, scheduler).WithTags(
metrics.StringTag(metrics.ScheduleOverlapPolicyTag, overlapPolicy.String()),
).Counter(metrics.ScheduleOverlapSkipped.Name()).Record(count)
}
for actionRunning, count := range result.missedCatchupByActionRunning {
newTaggedMetricsHandler(h.metricsHandler, scheduler).WithTags(
metrics.StringTag(metrics.ScheduleMissedReasonTag, metrics.ScheduleMissedReasonBufferExpired),
metrics.StringTag(metrics.ScheduleActionRunningTag, fmt.Sprintf("%t", actionRunning)),
).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(count)
}

// Update internal state and create new tasks.
invoker.recordProcessBufferResult(ctx, &result)
Expand All @@ -422,6 +437,7 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer(
) (result processBufferResult) {
runningWorkflows := invoker.runningWorkflowExecutions()
isRunning := len(runningWorkflows) > 0
result.missedCatchupByActionRunning = make(map[bool]int64)

// Processing completely ignores any BufferedStart that's already executing/backing off.
pendingBufferedStarts := util.FilterSlice(invoker.GetBufferedStarts(), func(start *schedulespb.BufferedStart) bool {
Expand All @@ -446,6 +462,7 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer(

// Update result metrics.
result.overlapSkipped = action.OverlapSkipped
result.overlapSkippedByPolicy = action.OverlapSkippedByPolicy

// Add starting workflows to result, trim others.
for _, start := range readyStarts {
Expand All @@ -456,9 +473,18 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer(
continue
}

if ctx.Now(invoker).After(h.startWorkflowDeadline(ctx, scheduler, start)) {
// Drop expired starts.
result.missedCatchupWindow++
deadline := h.startWorkflowDeadline(ctx, scheduler, start)
if ctx.Now(invoker).After(deadline) {
// Action was buffered in time but expired before execution
// (e.g., due to overlap deferral, retries, or system delay).
// Determine if a running action contributed: either one is still
// running, or the previous action's CloseTime (stored in DesiredTime)
// was already past this start's deadline.
// Note: if no prior action completed, DesiredTime is zero-valued,
// so After(deadline) is false, correctly yielding actionRunning=false.
actionRunning := isRunning ||
start.GetDesiredTime().AsTime().After(deadline)
result.missedCatchupByActionRunning[actionRunning]++
result.discardStarts = append(result.discardStarts, start)
continue
}
Expand Down Expand Up @@ -607,6 +633,10 @@ func (h *InvokerExecuteTaskHandler) startWorkflow(
metricsHandler.
Timer(metrics.ScheduleActionDelay.Name()).
Record(actualStartTime.Sub(desiredTime.AsTime()))
// Record total delay from original schedule time, including any overlap policy wait.
metricsHandler.
Timer(metrics.ScheduleActionE2EDelay.Name()).
Record(actualStartTime.Sub(start.ActualTime.AsTime()))
}

return &schedulepb.ScheduleActionResult{
Expand Down
11 changes: 11 additions & 0 deletions chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/contextutil"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/util"
Expand Down Expand Up @@ -561,6 +562,16 @@ func (s *Scheduler) HandleNexusCompletion(
return nil
}

// Record how long it took for the callback to arrive after the action completed.
// Use ctx.Now instead of time.Since to use a consistent time source across nodes,
// and clamp to zero in case of clock skew.
if closeTime := info.GetCloseTime().AsTime(); !closeTime.IsZero() {
latency := max(0, ctx.Now(s).Sub(closeTime))
newTaggedMetricsHandler(ctx.MetricsHandler(), s).
Timer(metrics.ScheduleCallbackLatency.Name()).
Record(latency)
}
Comment thread
chaptersix marked this conversation as resolved.

// Handle last completed/failed status and payloads.
//
// TODO - also record payload sizes once we have metrics wired into CHASM context.
Expand Down
20 changes: 14 additions & 6 deletions chasm/lib/scheduler/spec_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (s *SpecProcessorImpl) ProcessTimeRange(
var err error
var bufferedStarts []*schedulespb.BufferedStart
var droppedCount int64
recordedGenerateLatency := false
limitReached := false
for next, err = s.NextTime(scheduler, start); err == nil && (!next.Next.IsZero() && !next.Next.After(end)); next, err = s.NextTime(scheduler, next.Next) {
lastAction = next.Next
Expand All @@ -138,21 +139,28 @@ func (s *SpecProcessorImpl) ProcessTimeRange(
continue
}

// Record generate latency only for the first action in the batch to
// avoid inflating the metric when catching up over a large time range.
if !manual && !recordedGenerateLatency {
metricsHandler.Timer(metrics.ScheduleGenerateLatency.Name()).
Record(end.Sub(next.Next))
recordedGenerateLatency = true
}

if !manual && end.Sub(next.Next) > catchupWindow {
s.logger.Info("Schedule missed catchup window",
tag.Time("now", end),
tag.Time("time", next.Next))
metricsHandler.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(1)
// Action's nominal time was already past the catchup window when
// the generator processed the time range. It was never buffered.
metricsHandler.WithTags(
metrics.StringTag(metrics.ScheduleMissedReasonTag, metrics.ScheduleMissedReasonNotBuffered),
).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(1)

scheduler.Info.MissedCatchupWindow++
continue
}

if !manual {
metricsHandler.Timer(metrics.ScheduleGenerateLatency.Name()).
Record(end.Sub(next.Next))
}

if limitReached {
droppedCount++
continue
Expand Down
17 changes: 17 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,11 @@ const (
ScheduleBackendChasm = "chasm"
ScheduleBackendLegacy = "legacy"
ScheduleBackendWorkflow = "workflow"
ScheduleOverlapPolicyTag = "schedule_overlap_policy"
ScheduleMissedReasonTag = "reason"
ScheduleMissedReasonNotBuffered = "not_buffered"
ScheduleMissedReasonBufferExpired = "buffer_expired"
ScheduleActionRunningTag = "action_running"
ScheduleMigrationDirectionTag = "schedule_migration_direction"
ScheduleMigrationDirectionToChasm = "to_chasm"
ScheduleMigrationDirectionToWorkflow = "to_workflow"
Expand Down Expand Up @@ -1434,6 +1439,10 @@ var (
"schedule_action_delay",
WithDescription("Delay between when scheduled actions should/actually happen"),
)
ScheduleActionE2EDelay = NewTimerDef(
"schedule_action_e2e_delay",
WithDescription("End-to-end delay between the action's original schedule time and when it was actually started, including overlap policy wait"),
)
ScheduleGenerateLatency = NewTimerDef(
"schedule_generate_latency",
WithDescription("Delay between when a scheduled action was due and when the generator buffered it"),
Expand All @@ -1454,6 +1463,14 @@ var (
"schedule_migration_failed",
WithDescription("The number of times a schedule migration fails"),
)
ScheduleOverlapSkipped = NewCounterDef(
"schedule_overlap_skipped",
WithDescription("The number of schedule actions skipped due to overlap policy"),
)
ScheduleCallbackLatency = NewTimerDef(
"schedule_callback_latency",
WithDescription("Latency between a scheduled action completing and the scheduler receiving the completion callback"),
)

// Worker Versioning
WorkerDeploymentCreated = NewCounterDef("worker_deployment_created")
Expand Down
6 changes: 5 additions & 1 deletion service/worker/scheduler/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type (
NeedCancel bool
NeedTerminate bool
// Stats
OverlapSkipped int64
OverlapSkipped int64
OverlapSkippedByPolicy map[enumspb.ScheduleOverlapPolicy]int64
}
)

Expand All @@ -40,6 +41,7 @@ func ProcessBuffer[T Overlappable](
// affect them.

var action ProcessBufferResult[T]
action.OverlapSkippedByPolicy = make(map[enumspb.ScheduleOverlapPolicy]int64)
var zeroVal T

for _, start := range buffer {
Expand All @@ -64,12 +66,14 @@ func ProcessBuffer[T Overlappable](
case enumspb.SCHEDULE_OVERLAP_POLICY_SKIP:
// just skip
action.OverlapSkipped++
action.OverlapSkippedByPolicy[overlapPolicy]++
case enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE:
// allow one (the first one) in the buffer
if len(action.NewBuffer) == 0 {
action.NewBuffer = append(action.NewBuffer, start)
} else {
action.OverlapSkipped++
action.OverlapSkippedByPolicy[overlapPolicy]++
}
case enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL:
// always add to buffer
Expand Down
34 changes: 33 additions & 1 deletion service/worker/scheduler/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (s *processBufferSuite) TestProcessSkipRunning() {
s.Empty(action.NewBuffer)
s.False(action.NeedCancel)
s.False(action.NeedTerminate)
s.Equal(int64(3), action.OverlapSkipped)
s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{
enumspb.SCHEDULE_OVERLAP_POLICY_SKIP: 3,
}, action.OverlapSkippedByPolicy)
}

func (s *processBufferSuite) TestProcessSkipNotRunning() {
Expand All @@ -71,6 +75,10 @@ func (s *processBufferSuite) TestProcessBufferOneRunning() {
s.Equal([]int{3}, jobIds(action.NewBuffer))
s.False(action.NeedCancel)
s.False(action.NeedTerminate)
s.Equal(int64(2), action.OverlapSkipped)
s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{
enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE: 2,
}, action.OverlapSkippedByPolicy)
}

func (s *processBufferSuite) TestProcessBufferOneNotRunning() {
Expand Down Expand Up @@ -168,4 +176,28 @@ func (s *processBufferSuite) TestProcessWithResolve() {
s.False(action.NeedTerminate)
}

// TODO: add test cases for mixed policies
func (s *processBufferSuite) TestProcessMixedPoliciesTracksSkippedByResolvedPolicy() {
buffer := []*job{
{3, enumspb.SCHEDULE_OVERLAP_POLICY_SKIP},
{5, enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE},
{7, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED},
{9, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED},
}
resolve := func(policy enumspb.ScheduleOverlapPolicy) enumspb.ScheduleOverlapPolicy {
if policy == enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED {
return enumspb.SCHEDULE_OVERLAP_POLICY_SKIP
}
return policy
}

action := ProcessBuffer(buffer, true, resolve)
s.Empty(action.OverlappingStarts)
s.Nil(action.NonOverlappingStart)
s.Equal([]int{5}, jobIds(action.NewBuffer))
s.False(action.NeedCancel)
s.False(action.NeedTerminate)
s.Equal(int64(3), action.OverlapSkipped)
s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{
enumspb.SCHEDULE_OVERLAP_POLICY_SKIP: 3,
}, action.OverlapSkippedByPolicy)
}
29 changes: 27 additions & 2 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@

func SchedulerWorkflow(ctx workflow.Context, args *schedulespb.StartScheduleArgs) error {
return schedulerWorkflowWithSpecBuilder(ctx, args, NewSpecBuilder(), false)
}

Check failure on line 228 in service/worker/scheduler/workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

go.temporal.io/server/service/worker/scheduler.SchedulerWorkflow is non-deterministic, reason: calls non-deterministic function go.temporal.io/server/service/worker/scheduler.NewSpecBuilder

Check failure on line 228 in service/worker/scheduler/workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

go.temporal.io/server/service/worker/scheduler.SchedulerWorkflow is non-deterministic, reason: calls non-deterministic function go.temporal.io/server/service/worker/scheduler.schedulerWorkflowWithSpecBuilder

func schedulerWorkflowWithSpecBuilder(ctx workflow.Context, args *schedulespb.StartScheduleArgs, specBuilder *SpecBuilder, enableCHASMMigration bool) error {
scheduler := &scheduler{
StartScheduleArgs: args,

Check failure on line 232 in service/worker/scheduler/workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

go.temporal.io/server/service/worker/scheduler.schedulerWorkflowWithSpecBuilder is non-deterministic, reason: calls non-deterministic function (*go.temporal.io/server/service/worker/scheduler.scheduler).run
ctx: ctx,
a: nil,
logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", args.State.Namespace, "schedule-id", args.State.ScheduleId),
Expand Down Expand Up @@ -678,6 +678,7 @@
}

lastAction := start
recordedGenerateLatency := false
var next GetNextTimeResult
for next = s.getNextTime(start); !(next.Next.IsZero() || next.Next.After(end)); next = s.getNextTime(next.Next) {
if !s.hasMinVersion(BatchAndCacheTimeQueries) && !s.canTakeScheduledAction(manual, false) {
Expand All @@ -689,9 +690,22 @@
// hasMinVersion because this condition couldn't happen in previous versions.
continue
}
// Record generate latency only for the first action in the batch to
// avoid inflating the metric when catching up over a large time range.
if !manual && !recordedGenerateLatency {
s.metrics.Timer(metrics.ScheduleGenerateLatency.Name()).Record(end.Sub(next.Next))
recordedGenerateLatency = true
}
if !manual && end.Sub(next.Next) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", end, "time", next.Next)
s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1)
// Action's nominal time was already past the catchup window when
// the scheduler woke up. It was never buffered for execution.
// Note: action_running is not included here because
// running action state has not been refreshed yet at this point
// (refresh happens later in processBuffer).
s.metrics.WithTags(map[string]string{
metrics.ScheduleMissedReasonTag: metrics.ScheduleMissedReasonNotBuffered,
}).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1)
s.Info.MissedCatchupWindow++
continue
}
Expand Down Expand Up @@ -914,7 +928,11 @@
}
}

// Update desired time of next start if it's buffered. This is used for metrics only.
// Update desired time of next start if it's buffered. This is used to
// compute ScheduleActionDelay (time between completing one action and
// starting the next). The legacy path doesn't use deferred starts
// (Attempt == -1) like CHASM, so BufferedStarts[0] is always the next
// pending start.
if long && len(s.State.BufferedStarts) > 0 {
s.State.BufferedStarts[0].DesiredTime = res.CloseTime
}
Expand Down Expand Up @@ -1346,6 +1364,11 @@

s.State.BufferedStarts = action.NewBuffer
s.Info.OverlapSkipped += action.OverlapSkipped
for overlapPolicy, count := range action.OverlapSkippedByPolicy {
s.metrics.WithTags(map[string]string{
metrics.ScheduleOverlapPolicyTag: overlapPolicy.String(),
Comment thread
davidporter-id-au marked this conversation as resolved.
}).Counter(metrics.ScheduleOverlapSkipped.Name()).Inc(count)
}

// Try starting whatever we're supposed to start now
allStarts := action.OverlappingStarts
Expand Down Expand Up @@ -1493,6 +1516,8 @@
// record metric only for _scheduled_ actions, not trigger/backfill, otherwise it's not meaningful
desiredTime := cmp.Or(start.DesiredTime, start.ActualTime)
s.metrics.Timer(metrics.ScheduleActionDelay.Name()).Record(res.RealStartTime.AsTime().Sub(desiredTime.AsTime()))
// Record total delay from original schedule time, including any overlap policy wait.
s.metrics.Timer(metrics.ScheduleActionE2EDelay.Name()).Record(res.RealStartTime.AsTime().Sub(start.ActualTime.AsTime()))
}

actionResult := &schedulepb.ScheduleActionResult{
Expand Down
Loading