Skip to content
7 changes: 5 additions & 2 deletions chasm/lib/scheduler/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ type processBufferResult struct {
discardStarts []*schedulespb.BufferedStart

// Number of buffered starts dropped due to overlap policy during processing.
overlapSkipped int64
overlapSkipped int64
overlapSkippedByPolicy map[enumspb.ScheduleOverlapPolicy]int64

// Nunmber of buffered starts dropped from missing the catchup window.
// Number of buffered starts dropped from missing the catchup window.
missedCatchupWindow int64
// Whether an action was running when the buffer was processed.
actionRunning bool
}

// recordProcessBufferResult updates the Invoker's internal state based on result, as well as the
Expand Down
23 changes: 22 additions & 1 deletion chasm/lib/scheduler/invoker_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -405,6 +406,19 @@ func (h *InvokerProcessBufferTaskHandler) Execute(
overlapSkipped: result.overlapSkipped,
missedCatchupWindow: result.missedCatchupWindow,
})
if result.overlapSkipped > 0 {
for overlapPolicy, count := range result.overlapSkippedByPolicy {
newTaggedMetricsHandler(h.metricsHandler, scheduler).WithTags(
metrics.StringTag(metrics.ScheduleOverlapPolicyTag, overlapPolicy.String()),
).Counter(metrics.ScheduleOverlapSkipped.Name()).Record(count)
}
}
if result.missedCatchupWindow > 0 {
newTaggedMetricsHandler(h.metricsHandler, scheduler).WithTags(
metrics.StringTag(metrics.ScheduleMissedReasonTag, metrics.ScheduleMissedReasonBufferExpired),
metrics.StringTag(metrics.ScheduleActionRunningTag, strconv.FormatBool(result.actionRunning)),
).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(result.missedCatchupWindow)
}

// Update internal state and create new tasks.
invoker.recordProcessBufferResult(ctx, &result)
Expand All @@ -422,6 +436,7 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer(
) (result processBufferResult) {
runningWorkflows := invoker.runningWorkflowExecutions()
isRunning := len(runningWorkflows) > 0
result.actionRunning = isRunning

// Processing completely ignores any BufferedStart that's already executing/backing off.
pendingBufferedStarts := util.FilterSlice(invoker.GetBufferedStarts(), func(start *schedulespb.BufferedStart) bool {
Expand All @@ -446,6 +461,7 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer(

// Update result metrics.
result.overlapSkipped = action.OverlapSkipped
result.overlapSkippedByPolicy = action.OverlapSkippedByPolicy

// Add starting workflows to result, trim others.
for _, start := range readyStarts {
Expand All @@ -457,7 +473,8 @@ func (h *InvokerProcessBufferTaskHandler) processBuffer(
}

if ctx.Now(invoker).After(h.startWorkflowDeadline(ctx, scheduler, start)) {
// Drop expired starts.
// Action was buffered in time but expired before execution
// (e.g., due to overlap deferral, retries, or system delay).
result.missedCatchupWindow++
result.discardStarts = append(result.discardStarts, start)
continue
Expand Down Expand Up @@ -607,6 +624,10 @@ func (h *InvokerExecuteTaskHandler) startWorkflow(
metricsHandler.
Timer(metrics.ScheduleActionDelay.Name()).
Record(actualStartTime.Sub(desiredTime.AsTime()))
// Record total delay from original schedule time, including any overlap policy wait.
metricsHandler.
Timer(metrics.ScheduleActionStartToCloseDelay.Name()).
Record(actualStartTime.Sub(start.ActualTime.AsTime()))
}

return &schedulepb.ScheduleActionResult{
Expand Down
8 changes: 8 additions & 0 deletions chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/contextutil"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/util"
Expand Down Expand Up @@ -561,6 +562,13 @@ func (s *Scheduler) HandleNexusCompletion(
return nil
}

// Record how long it took for the callback to arrive after the action completed.
if closeTime := info.GetCloseTime().AsTime(); !closeTime.IsZero() {
newTaggedMetricsHandler(ctx.MetricsHandler(), s).
Timer(metrics.ScheduleCallbackLatency.Name()).
Record(time.Since(closeTime))
}
Comment thread
chaptersix marked this conversation as resolved.

// Handle last completed/failed status and payloads.
//
// TODO - also record payload sizes once we have metrics wired into CHASM context.
Expand Down
20 changes: 14 additions & 6 deletions chasm/lib/scheduler/spec_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (s *SpecProcessorImpl) ProcessTimeRange(
var err error
var bufferedStarts []*schedulespb.BufferedStart
var droppedCount int64
recordedGenerateLatency := false
limitReached := false
for next, err = s.NextTime(scheduler, start); err == nil && (!next.Next.IsZero() && !next.Next.After(end)); next, err = s.NextTime(scheduler, next.Next) {
lastAction = next.Next
Expand All @@ -138,21 +139,28 @@ func (s *SpecProcessorImpl) ProcessTimeRange(
continue
}

// Record generate latency only for the first action in the batch to
// avoid inflating the metric when catching up over a large time range.
if !manual && !recordedGenerateLatency {
metricsHandler.Timer(metrics.ScheduleGenerateLatency.Name()).
Record(end.Sub(next.Next))
recordedGenerateLatency = true
}

if !manual && end.Sub(next.Next) > catchupWindow {
s.logger.Info("Schedule missed catchup window",
tag.Time("now", end),
tag.Time("time", next.Next))
metricsHandler.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(1)
// Action's nominal time was already past the catchup window when
// the generator processed the time range. It was never buffered.
metricsHandler.WithTags(
metrics.StringTag(metrics.ScheduleMissedReasonTag, metrics.ScheduleMissedReasonNotBuffered),
).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(1)

scheduler.Info.MissedCatchupWindow++
continue
}

if !manual {
metricsHandler.Timer(metrics.ScheduleGenerateLatency.Name()).
Record(end.Sub(next.Next))
}

if limitReached {
droppedCount++
continue
Expand Down
17 changes: 17 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,11 @@ const (
ScheduleBackendChasm = "chasm"
ScheduleBackendLegacy = "legacy"
ScheduleBackendWorkflow = "workflow"
ScheduleOverlapPolicyTag = "schedule_overlap_policy"
ScheduleMissedReasonTag = "reason"
ScheduleMissedReasonNotBuffered = "not_buffered"
ScheduleMissedReasonBufferExpired = "buffer_expired"
ScheduleActionRunningTag = "action_running"
ScheduleMigrationDirectionTag = "schedule_migration_direction"
ScheduleMigrationDirectionToChasm = "to_chasm"
ScheduleMigrationDirectionToWorkflow = "to_workflow"
Expand Down Expand Up @@ -1434,6 +1439,10 @@ var (
"schedule_action_delay",
WithDescription("Delay between when scheduled actions should/actually happen"),
)
ScheduleActionStartToCloseDelay = NewTimerDef(
"schedule_action_start_to_close_delay",
WithDescription("Delay between the action's original schedule time and when it was actually started, including overlap policy wait"),
)
Comment thread
chaptersix marked this conversation as resolved.
Outdated
ScheduleGenerateLatency = NewTimerDef(
"schedule_generate_latency",
WithDescription("Delay between when a scheduled action was due and when the generator buffered it"),
Expand All @@ -1454,6 +1463,14 @@ var (
"schedule_migration_failed",
WithDescription("The number of times a schedule migration fails"),
)
ScheduleOverlapSkipped = NewCounterDef(
"schedule_overlap_skipped",
WithDescription("The number of schedule actions skipped due to overlap policy"),
)
ScheduleCallbackLatency = NewTimerDef(
"schedule_callback_latency",
WithDescription("Latency between a scheduled action completing and the scheduler receiving the completion callback"),
)

Comment thread
chaptersix marked this conversation as resolved.
Outdated
// Worker Versioning
WorkerDeploymentCreated = NewCounterDef("worker_deployment_created")
Expand Down
6 changes: 5 additions & 1 deletion service/worker/scheduler/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type (
NeedCancel bool
NeedTerminate bool
// Stats
OverlapSkipped int64
OverlapSkipped int64
OverlapSkippedByPolicy map[enumspb.ScheduleOverlapPolicy]int64
}
)

Expand All @@ -40,6 +41,7 @@ func ProcessBuffer[T Overlappable](
// affect them.

var action ProcessBufferResult[T]
action.OverlapSkippedByPolicy = make(map[enumspb.ScheduleOverlapPolicy]int64)
var zeroVal T

for _, start := range buffer {
Expand All @@ -64,12 +66,14 @@ func ProcessBuffer[T Overlappable](
case enumspb.SCHEDULE_OVERLAP_POLICY_SKIP:
// just skip
action.OverlapSkipped++
action.OverlapSkippedByPolicy[overlapPolicy]++
case enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE:
// allow one (the first one) in the buffer
if len(action.NewBuffer) == 0 {
action.NewBuffer = append(action.NewBuffer, start)
} else {
action.OverlapSkipped++
action.OverlapSkippedByPolicy[overlapPolicy]++
}
case enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL:
// always add to buffer
Expand Down
34 changes: 33 additions & 1 deletion service/worker/scheduler/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (s *processBufferSuite) TestProcessSkipRunning() {
s.Empty(action.NewBuffer)
s.False(action.NeedCancel)
s.False(action.NeedTerminate)
s.Equal(int64(3), action.OverlapSkipped)
s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{
enumspb.SCHEDULE_OVERLAP_POLICY_SKIP: 3,
}, action.OverlapSkippedByPolicy)
}

func (s *processBufferSuite) TestProcessSkipNotRunning() {
Expand All @@ -71,6 +75,10 @@ func (s *processBufferSuite) TestProcessBufferOneRunning() {
s.Equal([]int{3}, jobIds(action.NewBuffer))
s.False(action.NeedCancel)
s.False(action.NeedTerminate)
s.Equal(int64(2), action.OverlapSkipped)
s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{
enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE: 2,
}, action.OverlapSkippedByPolicy)
}

func (s *processBufferSuite) TestProcessBufferOneNotRunning() {
Expand Down Expand Up @@ -168,4 +176,28 @@ func (s *processBufferSuite) TestProcessWithResolve() {
s.False(action.NeedTerminate)
}

// TODO: add test cases for mixed policies
func (s *processBufferSuite) TestProcessMixedPoliciesTracksSkippedByResolvedPolicy() {
buffer := []*job{
{3, enumspb.SCHEDULE_OVERLAP_POLICY_SKIP},
{5, enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE},
{7, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED},
{9, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED},
}
resolve := func(policy enumspb.ScheduleOverlapPolicy) enumspb.ScheduleOverlapPolicy {
if policy == enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED {
return enumspb.SCHEDULE_OVERLAP_POLICY_SKIP
}
return policy
}

action := ProcessBuffer(buffer, true, resolve)
s.Empty(action.OverlappingStarts)
s.Nil(action.NonOverlappingStart)
s.Equal([]int{5}, jobIds(action.NewBuffer))
s.False(action.NeedCancel)
s.False(action.NeedTerminate)
s.Equal(int64(3), action.OverlapSkipped)
s.Equal(map[enumspb.ScheduleOverlapPolicy]int64{
enumspb.SCHEDULE_OVERLAP_POLICY_SKIP: 3,
}, action.OverlapSkippedByPolicy)
}
23 changes: 22 additions & 1 deletion service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@

func SchedulerWorkflow(ctx workflow.Context, args *schedulespb.StartScheduleArgs) error {
return schedulerWorkflowWithSpecBuilder(ctx, args, NewSpecBuilder(), false)
}

Check failure on line 228 in service/worker/scheduler/workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

go.temporal.io/server/service/worker/scheduler.SchedulerWorkflow is non-deterministic, reason: calls non-deterministic function go.temporal.io/server/service/worker/scheduler.NewSpecBuilder

Check failure on line 228 in service/worker/scheduler/workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

go.temporal.io/server/service/worker/scheduler.SchedulerWorkflow is non-deterministic, reason: calls non-deterministic function go.temporal.io/server/service/worker/scheduler.schedulerWorkflowWithSpecBuilder

func schedulerWorkflowWithSpecBuilder(ctx workflow.Context, args *schedulespb.StartScheduleArgs, specBuilder *SpecBuilder, enableCHASMMigration bool) error {
scheduler := &scheduler{
StartScheduleArgs: args,

Check failure on line 232 in service/worker/scheduler/workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

go.temporal.io/server/service/worker/scheduler.schedulerWorkflowWithSpecBuilder is non-deterministic, reason: calls non-deterministic function (*go.temporal.io/server/service/worker/scheduler.scheduler).run
ctx: ctx,
a: nil,
logger: sdklog.With(workflow.GetLogger(ctx), "wf-namespace", args.State.Namespace, "schedule-id", args.State.ScheduleId),
Expand Down Expand Up @@ -678,6 +678,7 @@
}

lastAction := start
recordedGenerateLatency := false
var next GetNextTimeResult
for next = s.getNextTime(start); !(next.Next.IsZero() || next.Next.After(end)); next = s.getNextTime(next.Next) {
if !s.hasMinVersion(BatchAndCacheTimeQueries) && !s.canTakeScheduledAction(manual, false) {
Expand All @@ -689,9 +690,20 @@
// hasMinVersion because this condition couldn't happen in previous versions.
continue
}
if !manual && !recordedGenerateLatency {
s.metrics.Timer(metrics.ScheduleGenerateLatency.Name()).Record(end.Sub(next.Next))
recordedGenerateLatency = true
}
if !manual && end.Sub(next.Next) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", end, "time", next.Next)
s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1)
// Action's nominal time was already past the catchup window when
// the scheduler woke up. It was never buffered for execution.
// Note: action_running is not included here because
// running action state has not been refreshed yet at this point
// (refresh happens later in processBuffer).
s.metrics.WithTags(map[string]string{
metrics.ScheduleMissedReasonTag: metrics.ScheduleMissedReasonNotBuffered,
}).Counter(metrics.ScheduleMissedCatchupWindow.Name()).Inc(1)
s.Info.MissedCatchupWindow++
continue
}
Expand Down Expand Up @@ -1346,6 +1358,13 @@

s.State.BufferedStarts = action.NewBuffer
s.Info.OverlapSkipped += action.OverlapSkipped
if action.OverlapSkipped > 0 {
for overlapPolicy, count := range action.OverlapSkippedByPolicy {
s.metrics.WithTags(map[string]string{
metrics.ScheduleOverlapPolicyTag: overlapPolicy.String(),
}).Counter(metrics.ScheduleOverlapSkipped.Name()).Inc(count)
}
}

// Try starting whatever we're supposed to start now
allStarts := action.OverlappingStarts
Expand Down Expand Up @@ -1493,6 +1512,8 @@
// record metric only for _scheduled_ actions, not trigger/backfill, otherwise it's not meaningful
desiredTime := cmp.Or(start.DesiredTime, start.ActualTime)
s.metrics.Timer(metrics.ScheduleActionDelay.Name()).Record(res.RealStartTime.AsTime().Sub(desiredTime.AsTime()))
// Record total delay from original schedule time, including any overlap policy wait.
s.metrics.Timer(metrics.ScheduleActionStartToCloseDelay.Name()).Record(res.RealStartTime.AsTime().Sub(start.ActualTime.AsTime()))
}

actionResult := &schedulepb.ScheduleActionResult{
Expand Down
Loading