diff --git a/chasm/lib/scheduler/backfiller.go b/chasm/lib/scheduler/backfiller.go index 9693b999892..e132596730d 100644 --- a/chasm/lib/scheduler/backfiller.go +++ b/chasm/lib/scheduler/backfiller.go @@ -1,6 +1,7 @@ package scheduler import ( + "fmt" "time" schedulespb "go.temporal.io/server/api/schedule/v1" @@ -45,6 +46,7 @@ func addBackfiller( scheduler.Backfillers = make(chasm.Map[string, *Backfiller]) } scheduler.Backfillers[id] = chasm.NewComponentField(ctx, backfiller) + scheduler.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("added backfiller: %s", id)) return backfiller } @@ -60,6 +62,8 @@ func newBackfillerWithState(ctx chasm.MutableContext, state *schedulerpb.Backfil // scheduleTask schedules a BackfillerTask at the given time. func (b *Backfiller) scheduleTask(ctx chasm.MutableContext, scheduledTime time.Time) { + b.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("scheduled backfillerTask for %s", scheduledTime.Format(time.RFC3339))) ctx.AddTask(b, chasm.TaskAttributes{ ScheduledTime: scheduledTime, }, &schedulerpb.BackfillerTask{}) diff --git a/chasm/lib/scheduler/backfiller_tasks.go b/chasm/lib/scheduler/backfiller_tasks.go index dd1484c18a8..525c595db44 100644 --- a/chasm/lib/scheduler/backfiller_tasks.go +++ b/chasm/lib/scheduler/backfiller_tasks.go @@ -63,9 +63,10 @@ func (b *BackfillerTaskHandler) Execute( scheduler := backfiller.Scheduler.Get(ctx) logger := newTaggedLogger(b.baseLogger, scheduler) - invoker := scheduler.Invoker.Get(ctx) + backfiller.EventLog.Get(ctx).LogEvent(ctx, "backfillerTask executed") + // If the buffer is already full, don't move the watermark at all, just back off // and retry. tweakables := b.config.Tweakables(scheduler.Namespace) diff --git a/chasm/lib/scheduler/generator.go b/chasm/lib/scheduler/generator.go index 5f771e49152..5b49162d0ec 100644 --- a/chasm/lib/scheduler/generator.go +++ b/chasm/lib/scheduler/generator.go @@ -1,6 +1,7 @@ package scheduler import ( + "fmt" "time" "go.temporal.io/server/chasm" @@ -49,6 +50,8 @@ func (g *Generator) Generate(ctx chasm.MutableContext) { // scheduleTask schedules a GeneratorTask at the given time. func (g *Generator) scheduleTask(ctx chasm.MutableContext, scheduledTime time.Time) { + g.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("scheduled generatorTask for %s", scheduledTime.Format(time.RFC3339))) ctx.AddTask(g, chasm.TaskAttributes{ ScheduledTime: scheduledTime, }, &schedulerpb.GeneratorTask{}) diff --git a/chasm/lib/scheduler/generator_tasks.go b/chasm/lib/scheduler/generator_tasks.go index ddc9dfcfb7b..222851e49e7 100644 --- a/chasm/lib/scheduler/generator_tasks.go +++ b/chasm/lib/scheduler/generator_tasks.go @@ -2,6 +2,7 @@ package scheduler import ( "fmt" + "time" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" @@ -59,6 +60,8 @@ func (g *GeneratorTaskHandler) Execute( now := ctx.Now(generator) + generator.EventLog.Get(ctx).LogEvent(ctx, "generatorTask executed") + // If we have no last processed time, this is a new schedule. if generator.LastProcessedTime == nil { createdAt := timestamppb.New(now) @@ -103,6 +106,8 @@ func (g *GeneratorTaskHandler) Execute( // Emit metrics and update state for any dropped actions. if result.DroppedCount > 0 { + generator.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("buffer overrun, dropped %d actions", result.DroppedCount)) logger.Warn("Buffer overrun, dropping actions", tag.Int64("dropped-count", result.DroppedCount)) metricsHandler.Counter(metrics.ScheduleBufferOverruns.Name()).Record(result.DroppedCount) @@ -134,6 +139,8 @@ func (g *GeneratorTaskHandler) Execute( // customer can describe/modify/restart the schedule. // // Once the idle timer expires, we close the component. + generator.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("scheduled idle task for %s", idleExpiration.Format(time.RFC3339))) ctx.AddTask(scheduler, chasm.TaskAttributes{ ScheduledTime: idleExpiration, }, &schedulerpb.SchedulerIdleTask{ diff --git a/chasm/lib/scheduler/invoker.go b/chasm/lib/scheduler/invoker.go index 9c568a8b3a7..41f51c69f0a 100644 --- a/chasm/lib/scheduler/invoker.go +++ b/chasm/lib/scheduler/invoker.go @@ -85,6 +85,8 @@ func (i *Invoker) recordProcessBufferResult(ctx chasm.MutableContext, result *pr // Drop discarded starts, and update requested starts for execution. var starts []*schedulespb.BufferedStart + readiedStarts := 0 + deferredStarts := 0 for _, start := range i.GetBufferedStarts() { if discards[start.RequestId] { continue @@ -93,17 +95,24 @@ func (i *Invoker) recordProcessBufferResult(ctx chasm.MutableContext, result *pr // Starts ready for execution are set to their first attempt. if ready[start.RequestId] && start.Attempt < 1 { start.Attempt = 1 + readiedStarts++ } else if start.Attempt == 0 { // Start was processed but deferred (e.g., BUFFER_ONE policy with running workflow). // Mark as deferred (-1) to distinguish from newly-enqueued starts so addTasks // won't schedule an immediate ProcessBuffer task for them - they wait on // recordCompletedAction to re-enable. start.Attempt = -1 + deferredStarts++ } starts = append(starts, start) } + if readiedStarts > 0 || deferredStarts > 0 { + i.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("recordProcessBufferResult readied %d starts, deferred %d starts", readiedStarts, deferredStarts)) + } + // Update internal state. i.BufferedStarts = starts i.CancelWorkflows = append(i.GetCancelWorkflows(), result.cancelWorkflows...) @@ -170,13 +179,18 @@ func (i *Invoker) recordExecuteResult(ctx chasm.MutableContext, result *executeR } // Remove failed (non-retryable) starts from the buffer. + removedStarts := 0 + retriedStarts := 0 i.BufferedStarts = slices.DeleteFunc(i.GetBufferedStarts(), func(start *schedulespb.BufferedStart) bool { + removedStarts++ return failed[start.RequestId] }) i.CancelWorkflows = slices.DeleteFunc(i.GetCancelWorkflows(), func(we *commonpb.WorkflowExecution) bool { + removedStarts++ return canceled[we.RunId] }) i.TerminateWorkflows = slices.DeleteFunc(i.GetTerminateWorkflows(), func(we *commonpb.WorkflowExecution) bool { + removedStarts++ return terminated[we.RunId] }) @@ -197,9 +211,16 @@ func (i *Invoker) recordExecuteResult(ctx chasm.MutableContext, result *executeR if retry, ok := retryable[start.RequestId]; ok { start.Attempt++ start.BackoffTime = retry.GetBackoffTime() + retriedStarts++ } } + i.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("recordExecuteResult kicked off %d starts, removed %d starts, retried %d starts", + newlyStarted, + removedStarts, + retriedStarts)) + i.addTasks(ctx) return newlyStarted, droppedDuplicates } @@ -225,6 +246,8 @@ func (i *Invoker) recordCompletedAction( completed *schedulespb.CompletedResult, requestID string, ) (scheduleTime time.Time) { + i.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("recording completed action: %s", requestID)) + // Find the BufferedStart and mark it as completed. for _, start := range i.BufferedStarts { if start.GetRequestId() == requestID { @@ -270,10 +293,13 @@ func (i *Invoker) addTasks(ctx chasm.MutableContext) { // If we have Attempt = 0 starts, generate a ProcessBufferTask immediately. If we // have starts that are backing off, add a timer task for the earliest backoff time. if i.hasUnprocessedStarts() { + i.EventLog.Get(ctx).LogEvent(ctx, "scheduled processBufferTask immediately") ctx.AddTask(i, chasm.TaskAttributes{ ScheduledTime: chasm.TaskScheduledTimeImmediate, }, &schedulerpb.InvokerProcessBufferTask{}) } else if deadline := i.nextBackoffDeadline(); !deadline.IsZero() { + i.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("scheduled processBufferTask for %s", deadline.Format(time.RFC3339))) ctx.AddTask(i, chasm.TaskAttributes{ ScheduledTime: deadline, }, &schedulerpb.InvokerProcessBufferTask{}) @@ -284,6 +310,7 @@ func (i *Invoker) addTasks(ctx chasm.MutableContext) { if len(i.GetCancelWorkflows()) > 0 || len(i.GetTerminateWorkflows()) > 0 || len(i.getEligibleBufferedStarts()) > 0 { + i.EventLog.Get(ctx).LogEvent(ctx, "scheduled executeTask") ctx.AddTask(i, chasm.TaskAttributes{}, &schedulerpb.InvokerExecuteTask{}) } } diff --git a/chasm/lib/scheduler/invoker_tasks.go b/chasm/lib/scheduler/invoker_tasks.go index 84b3b1f935b..b62ee8a1c81 100644 --- a/chasm/lib/scheduler/invoker_tasks.go +++ b/chasm/lib/scheduler/invoker_tasks.go @@ -390,6 +390,8 @@ func (h *InvokerProcessBufferTaskHandler) Execute( ) error { scheduler := invoker.Scheduler.Get(ctx) + invoker.EventLog.Get(ctx).LogEvent(ctx, "processBufferTask executed") + // Make sure we have something to start. executionInfo := scheduler.Schedule.GetAction().GetStartWorkflow() if executionInfo == nil { diff --git a/chasm/lib/scheduler/scheduler.go b/chasm/lib/scheduler/scheduler.go index c774461de07..0de9856704d 100644 --- a/chasm/lib/scheduler/scheduler.go +++ b/chasm/lib/scheduler/scheduler.go @@ -168,6 +168,7 @@ func NewSentinel( Info: &schedulepb.ScheduleInfo{}, }, cacheConflictToken: scheduler.InitialConflictToken, + EventLog: chasm.NewComponentField(ctx, NewEventLog(ctx)), } now := ctx.Now(s) s.Info.CreateTime = timestamppb.New(now) @@ -787,6 +788,8 @@ func (s *Scheduler) MigrateToWorkflow( s.Schedule.State.Paused = true s.Schedule.State.Notes = "paused for migration to workflow-backed scheduler" + s.EventLog.Get(ctx).LogEvent(ctx, "started migration to V1") + // Schedule a side-effect task to export state and start the V1 workflow. ctx.AddTask(s, chasm.TaskAttributes{}, &schedulerpb.SchedulerMigrateToWorkflowTask{}) @@ -843,6 +846,7 @@ func (s *Scheduler) Update( s.Info.UpdateTime = timestamppb.New(ctx.Now(s)) s.updateConflictToken() + s.EventLog.Get(ctx).LogEvent(ctx, "updated via API") // Since the spec may have been updated, kick off the generator. s.Generator.Get(ctx).Generate(ctx) @@ -869,6 +873,7 @@ func (s *Scheduler) Patch( if req.FrontendRequest.Patch.Pause != "" { s.Schedule.State.Paused = true s.Schedule.State.Notes = req.FrontendRequest.Patch.Pause + s.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("paused via API: %s", req.FrontendRequest.Patch.Pause)) } if req.FrontendRequest.Patch.Unpause != "" { if s.WorkflowMigration != nil { @@ -876,6 +881,7 @@ func (s *Scheduler) Patch( } s.Schedule.State.Paused = false s.Schedule.State.Notes = req.FrontendRequest.Patch.Unpause + s.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("unpaused via API: %s", req.FrontendRequest.Patch.Unpause)) } if err := s.handlePatch(ctx, req.FrontendRequest.Patch); err != nil { diff --git a/chasm/lib/scheduler/scheduler_tasks.go b/chasm/lib/scheduler/scheduler_tasks.go index 7e84fed84cc..e8688f231de 100644 --- a/chasm/lib/scheduler/scheduler_tasks.go +++ b/chasm/lib/scheduler/scheduler_tasks.go @@ -52,6 +52,7 @@ func (r *SchedulerIdleTaskHandler) Execute( _ chasm.TaskAttributes, _ *schedulerpb.SchedulerIdleTask, ) error { + scheduler.EventLog.Get(ctx).LogEvent(ctx, "schedule closed from idle timer") scheduler.Closed = true return nil } @@ -189,6 +190,9 @@ func (r *SchedulerCallbacksTaskHandler) Execute( } } + s.EventLog.Get(ctx).LogEvent(ctx, + fmt.Sprintf("attached callbacks to %d already-running workflow(s)", len(results))) + // Now that running workflow state has been refreshed, scheduler tasks can be // fired. invoker.addTasks(ctx)