Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chasm/lib/scheduler/backfiller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"fmt"
"time"

schedulespb "go.temporal.io/server/api/schedule/v1"
Expand Down Expand Up @@ -45,6 +46,7 @@ func addBackfiller(
scheduler.Backfillers = make(chasm.Map[string, *Backfiller])
}
scheduler.Backfillers[id] = chasm.NewComponentField(ctx, backfiller)
scheduler.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("added backfiller: %s", id))

return backfiller
}
Expand All @@ -60,6 +62,8 @@ func newBackfillerWithState(ctx chasm.MutableContext, state *schedulerpb.Backfil

// scheduleTask schedules a BackfillerTask at the given time.
func (b *Backfiller) scheduleTask(ctx chasm.MutableContext, scheduledTime time.Time) {
b.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("scheduled backfillerTask for %s", scheduledTime.Format(time.RFC3339)))
ctx.AddTask(b, chasm.TaskAttributes{
ScheduledTime: scheduledTime,
}, &schedulerpb.BackfillerTask{})
Expand Down
3 changes: 2 additions & 1 deletion chasm/lib/scheduler/backfiller_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ func (b *BackfillerTaskHandler) Execute(

scheduler := backfiller.Scheduler.Get(ctx)
logger := newTaggedLogger(b.baseLogger, scheduler)

invoker := scheduler.Invoker.Get(ctx)

backfiller.EventLog.Get(ctx).LogEvent(ctx, "backfillerTask executed")

// If the buffer is already full, don't move the watermark at all, just back off
// and retry.
tweakables := b.config.Tweakables(scheduler.Namespace)
Expand Down
3 changes: 3 additions & 0 deletions chasm/lib/scheduler/generator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"fmt"
"time"

"go.temporal.io/server/chasm"
Expand Down Expand Up @@ -49,6 +50,8 @@ func (g *Generator) Generate(ctx chasm.MutableContext) {

// scheduleTask schedules a GeneratorTask at the given time.
func (g *Generator) scheduleTask(ctx chasm.MutableContext, scheduledTime time.Time) {
g.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("scheduled generatorTask for %s", scheduledTime.Format(time.RFC3339)))
ctx.AddTask(g, chasm.TaskAttributes{
ScheduledTime: scheduledTime,
}, &schedulerpb.GeneratorTask{})
Expand Down
7 changes: 7 additions & 0 deletions chasm/lib/scheduler/generator_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"fmt"
"time"

"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
Expand Down Expand Up @@ -59,6 +60,8 @@ func (g *GeneratorTaskHandler) Execute(

now := ctx.Now(generator)

generator.EventLog.Get(ctx).LogEvent(ctx, "generatorTask executed")

// If we have no last processed time, this is a new schedule.
if generator.LastProcessedTime == nil {
createdAt := timestamppb.New(now)
Expand Down Expand Up @@ -103,6 +106,8 @@ func (g *GeneratorTaskHandler) Execute(

// Emit metrics and update state for any dropped actions.
if result.DroppedCount > 0 {
generator.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("buffer overrun, dropped %d actions", result.DroppedCount))
logger.Warn("Buffer overrun, dropping actions",
tag.Int64("dropped-count", result.DroppedCount))
metricsHandler.Counter(metrics.ScheduleBufferOverruns.Name()).Record(result.DroppedCount)
Expand Down Expand Up @@ -134,6 +139,8 @@ func (g *GeneratorTaskHandler) Execute(
// customer can describe/modify/restart the schedule.
//
// Once the idle timer expires, we close the component.
generator.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("scheduled idle task for %s", idleExpiration.Format(time.RFC3339)))
ctx.AddTask(scheduler, chasm.TaskAttributes{
ScheduledTime: idleExpiration,
}, &schedulerpb.SchedulerIdleTask{
Expand Down
27 changes: 27 additions & 0 deletions chasm/lib/scheduler/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func (i *Invoker) recordProcessBufferResult(ctx chasm.MutableContext, result *pr

// Drop discarded starts, and update requested starts for execution.
var starts []*schedulespb.BufferedStart
readiedStarts := 0
deferredStarts := 0
for _, start := range i.GetBufferedStarts() {
if discards[start.RequestId] {
continue
Expand All @@ -93,17 +95,24 @@ func (i *Invoker) recordProcessBufferResult(ctx chasm.MutableContext, result *pr
// Starts ready for execution are set to their first attempt.
if ready[start.RequestId] && start.Attempt < 1 {
start.Attempt = 1
readiedStarts++
} else if start.Attempt == 0 {
// Start was processed but deferred (e.g., BUFFER_ONE policy with running workflow).
// Mark as deferred (-1) to distinguish from newly-enqueued starts so addTasks
// won't schedule an immediate ProcessBuffer task for them - they wait on
// recordCompletedAction to re-enable.
start.Attempt = -1
deferredStarts++
}

starts = append(starts, start)
}

if readiedStarts > 0 || deferredStarts > 0 {
i.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("recordProcessBufferResult readied %d starts, deferred %d starts", readiedStarts, deferredStarts))
}

// Update internal state.
i.BufferedStarts = starts
i.CancelWorkflows = append(i.GetCancelWorkflows(), result.cancelWorkflows...)
Expand Down Expand Up @@ -170,13 +179,18 @@ func (i *Invoker) recordExecuteResult(ctx chasm.MutableContext, result *executeR
}

// Remove failed (non-retryable) starts from the buffer.
removedStarts := 0
retriedStarts := 0
i.BufferedStarts = slices.DeleteFunc(i.GetBufferedStarts(), func(start *schedulespb.BufferedStart) bool {
removedStarts++
return failed[start.RequestId]
})
i.CancelWorkflows = slices.DeleteFunc(i.GetCancelWorkflows(), func(we *commonpb.WorkflowExecution) bool {
removedStarts++
return canceled[we.RunId]
})
i.TerminateWorkflows = slices.DeleteFunc(i.GetTerminateWorkflows(), func(we *commonpb.WorkflowExecution) bool {
removedStarts++
return terminated[we.RunId]
})

Expand All @@ -197,9 +211,16 @@ func (i *Invoker) recordExecuteResult(ctx chasm.MutableContext, result *executeR
if retry, ok := retryable[start.RequestId]; ok {
start.Attempt++
start.BackoffTime = retry.GetBackoffTime()
retriedStarts++
}
}

i.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("recordExecuteResult kicked off %d starts, removed %d starts, retried %d starts",
newlyStarted,
removedStarts,
retriedStarts))

i.addTasks(ctx)
return newlyStarted, droppedDuplicates
}
Expand All @@ -225,6 +246,8 @@ func (i *Invoker) recordCompletedAction(
completed *schedulespb.CompletedResult,
requestID string,
) (scheduleTime time.Time) {
i.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("recording completed action: %s", requestID))

// Find the BufferedStart and mark it as completed.
for _, start := range i.BufferedStarts {
if start.GetRequestId() == requestID {
Expand Down Expand Up @@ -270,10 +293,13 @@ func (i *Invoker) addTasks(ctx chasm.MutableContext) {
// If we have Attempt = 0 starts, generate a ProcessBufferTask immediately. If we
// have starts that are backing off, add a timer task for the earliest backoff time.
if i.hasUnprocessedStarts() {
i.EventLog.Get(ctx).LogEvent(ctx, "scheduled processBufferTask immediately")
ctx.AddTask(i, chasm.TaskAttributes{
ScheduledTime: chasm.TaskScheduledTimeImmediate,
}, &schedulerpb.InvokerProcessBufferTask{})
} else if deadline := i.nextBackoffDeadline(); !deadline.IsZero() {
i.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("scheduled processBufferTask for %s", deadline.Format(time.RFC3339)))
ctx.AddTask(i, chasm.TaskAttributes{
ScheduledTime: deadline,
}, &schedulerpb.InvokerProcessBufferTask{})
Expand All @@ -284,6 +310,7 @@ func (i *Invoker) addTasks(ctx chasm.MutableContext) {
if len(i.GetCancelWorkflows()) > 0 ||
len(i.GetTerminateWorkflows()) > 0 ||
len(i.getEligibleBufferedStarts()) > 0 {
i.EventLog.Get(ctx).LogEvent(ctx, "scheduled executeTask")
ctx.AddTask(i, chasm.TaskAttributes{}, &schedulerpb.InvokerExecuteTask{})
}
}
Expand Down
2 changes: 2 additions & 0 deletions chasm/lib/scheduler/invoker_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ func (h *InvokerProcessBufferTaskHandler) Execute(
) error {
scheduler := invoker.Scheduler.Get(ctx)

invoker.EventLog.Get(ctx).LogEvent(ctx, "processBufferTask executed")

// Make sure we have something to start.
executionInfo := scheduler.Schedule.GetAction().GetStartWorkflow()
if executionInfo == nil {
Expand Down
6 changes: 6 additions & 0 deletions chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func NewSentinel(
Info: &schedulepb.ScheduleInfo{},
},
cacheConflictToken: scheduler.InitialConflictToken,
EventLog: chasm.NewComponentField(ctx, NewEventLog(ctx)),
}
now := ctx.Now(s)
s.Info.CreateTime = timestamppb.New(now)
Expand Down Expand Up @@ -787,6 +788,8 @@ func (s *Scheduler) MigrateToWorkflow(
s.Schedule.State.Paused = true
s.Schedule.State.Notes = "paused for migration to workflow-backed scheduler"

s.EventLog.Get(ctx).LogEvent(ctx, "started migration to V1")

// Schedule a side-effect task to export state and start the V1 workflow.
ctx.AddTask(s, chasm.TaskAttributes{}, &schedulerpb.SchedulerMigrateToWorkflowTask{})

Expand Down Expand Up @@ -843,6 +846,7 @@ func (s *Scheduler) Update(

s.Info.UpdateTime = timestamppb.New(ctx.Now(s))
s.updateConflictToken()
s.EventLog.Get(ctx).LogEvent(ctx, "updated via API")

// Since the spec may have been updated, kick off the generator.
s.Generator.Get(ctx).Generate(ctx)
Expand All @@ -869,13 +873,15 @@ func (s *Scheduler) Patch(
if req.FrontendRequest.Patch.Pause != "" {
s.Schedule.State.Paused = true
s.Schedule.State.Notes = req.FrontendRequest.Patch.Pause
s.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("paused via API: %s", req.FrontendRequest.Patch.Pause))
}
if req.FrontendRequest.Patch.Unpause != "" {
if s.WorkflowMigration != nil {
return nil, ErrMigrationPending
}
s.Schedule.State.Paused = false
s.Schedule.State.Notes = req.FrontendRequest.Patch.Unpause
s.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("unpaused via API: %s", req.FrontendRequest.Patch.Unpause))
}

if err := s.handlePatch(ctx, req.FrontendRequest.Patch); err != nil {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also does generate a log entry, by virtue of each backfiller added generating a log entry.

Expand Down
4 changes: 4 additions & 0 deletions chasm/lib/scheduler/scheduler_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (r *SchedulerIdleTaskHandler) Execute(
_ chasm.TaskAttributes,
_ *schedulerpb.SchedulerIdleTask,
) error {
scheduler.EventLog.Get(ctx).LogEvent(ctx, "schedule closed from idle timer")
scheduler.Closed = true
return nil
}
Expand Down Expand Up @@ -189,6 +190,9 @@ func (r *SchedulerCallbacksTaskHandler) Execute(
}
}

s.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("attached callbacks to %d already-running workflow(s)", len(results)))

// Now that running workflow state has been refreshed, scheduler tasks can be
// fired.
invoker.addTasks(ctx)
Expand Down
Loading