Skip to content
Open
4 changes: 4 additions & 0 deletions chasm/lib/scheduler/backfiller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"fmt"
"time"

schedulespb "go.temporal.io/server/api/schedule/v1"
Expand Down Expand Up @@ -45,6 +46,7 @@ func addBackfiller(
scheduler.Backfillers = make(chasm.Map[string, *Backfiller])
}
scheduler.Backfillers[id] = chasm.NewComponentField(ctx, backfiller)
scheduler.EventLog.Get(ctx).LogEvent(ctx, fmt.Sprintf("added backfiller: %s", id))

return backfiller
}
Expand All @@ -60,6 +62,8 @@ func newBackfillerWithState(ctx chasm.MutableContext, state *schedulerpb.Backfil

// scheduleTask schedules a BackfillerTask at the given time.
func (b *Backfiller) scheduleTask(ctx chasm.MutableContext, scheduledTime time.Time) {
b.EventLog.Get(ctx).LogEvent(ctx,
fmt.Sprintf("scheduled backfillerTask for %s", scheduledTime.Format(time.RFC3339)))
ctx.AddTask(b, chasm.TaskAttributes{
ScheduledTime: scheduledTime,
}, &schedulerpb.BackfillerTask{})
Expand Down
14 changes: 8 additions & 6 deletions chasm/lib/scheduler/backfiller_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@ func NewBackfillerTaskHandler(opts BackfillerTaskHandlerOptions) *BackfillerTask
}

func (b *BackfillerTaskHandler) Validate(
ctx chasm.Context,
_ chasm.Context,
backfiller *Backfiller,
attrs chasm.TaskAttributes,
_ *schedulerpb.BackfillerTask,
) (bool, error) {
return validateTaskHighWaterMark(
backfiller.GetLastProcessedTime(),
attrs.ScheduledTime,
)
return validateTaskHighWaterMark(backfiller.GetLastProcessedTime(), attrs.ScheduledTime)
}

func (b *BackfillerTaskHandler) Execute(
Expand All @@ -66,9 +63,10 @@ func (b *BackfillerTaskHandler) Execute(

scheduler := backfiller.Scheduler.Get(ctx)
logger := newTaggedLogger(b.baseLogger, scheduler)

invoker := scheduler.Invoker.Get(ctx)

backfiller.EventLog.Get(ctx).LogEvent(ctx, "backfillerTask executed")

// If the buffer is already full, don't move the watermark at all, just back off
// and retry.
tweakables := b.config.Tweakables(scheduler.Namespace)
Expand Down Expand Up @@ -110,6 +108,10 @@ func (b *BackfillerTaskHandler) Execute(
logger.Debug("backfill complete, deleting Backfiller",
tag.String("backfill-id", backfiller.GetBackfillId()))
delete(scheduler.Backfillers, backfiller.GetBackfillId())

// Revive the Generator so it can re-evaluate idle/close eligibility now
// that this backfiller is gone.
scheduler.Generator.Get(ctx).Generate(ctx)
return nil
}

Expand Down
25 changes: 25 additions & 0 deletions chasm/lib/scheduler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"time"

"go.temporal.io/server/chasm"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
)
Expand All @@ -28,6 +29,30 @@ type (
}
)

// tweakablesCtxKey keys the namespace-filtered Tweakables accessor that scheduler
// components read via the CHASM context (registered in Library.Components).
type tweakablesCtxKeyType struct{}

var tweakablesCtxKey = tweakablesCtxKeyType{}

// tweakablesFromContext returns the scheduler Tweakables for the context's namespace,
// falling back to DefaultTweakables when no config is registered.
func tweakablesFromContext(ctx chasm.Context) Tweakables {
if fn, ok := ctx.Value(tweakablesCtxKey).(dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables]); ok && fn != nil {
return fn(ctx.NamespaceEntry().Name().String())
}
return DefaultTweakables
}

// contextValues builds the CHASM context values exposed to scheduler components.
func (c *Config) contextValues() map[any]any {
var tweakables dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables]
if c != nil {
tweakables = c.Tweakables
}
return map[any]any{tweakablesCtxKey: tweakables}
}

var (
CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting(
"scheduler.tweakables",
Expand Down
11 changes: 7 additions & 4 deletions chasm/lib/scheduler/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ func (e *EventLog) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
return chasm.LifecycleStateRunning
}

// LogEvent appends an event with the given message. Messages longer than
// maxMessageLen bytes are truncated at a UTF-8 rune boundary; once the log
// has more than maxEntries entries, the earliest entries are dropped.
func (e *EventLog) LogEvent(ctx chasm.MutableContext, msg string, maxEntries, maxMessageLen int) {
// LogEvent appends an event with the given message. Messages longer than the
// configured maximum length are truncated at a UTF-8 rune boundary; once the
// log exceeds the configured maximum entries, the earliest entries are dropped.
func (e *EventLog) LogEvent(ctx chasm.MutableContext, msg string) {
tw := tweakablesFromContext(ctx)
maxEntries, maxMessageLen := tw.EventLogMaxEntries, tw.EventLogMaxMessageLen

if len(msg) > maxMessageLen {
// Back off to the nearest UTF-8 rune boundary so we don't split a
// multibyte rune.
Expand Down
34 changes: 27 additions & 7 deletions chasm/lib/scheduler/eventlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/stretchr/testify/require"
enumspb "go.temporal.io/api/enums/v1"
schedulepb "go.temporal.io/api/schedule/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/scheduler"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
testEventLogMaxEntries = 30
testEventLogMaxMessageLen = 1000
var (
testEventLogMaxEntries = scheduler.DefaultTweakables.EventLogMaxEntries
testEventLogMaxMessageLen = scheduler.DefaultTweakables.EventLogMaxMessageLen
)

func TestEventLog_Accumulates(t *testing.T) {
Expand All @@ -24,7 +26,7 @@ func TestEventLog_Accumulates(t *testing.T) {

messages := []string{"first", "second", "third"}
for _, m := range messages {
eventLog.LogEvent(ctx, m, testEventLogMaxEntries, testEventLogMaxMessageLen)
eventLog.LogEvent(ctx, m)
}

require.Len(t, eventLog.Events, len(messages))
Expand All @@ -40,7 +42,7 @@ func TestEventLog_TruncatesLongMessages(t *testing.T) {
eventLog.Events = nil

long := strings.Repeat("x", testEventLogMaxMessageLen+50)
eventLog.LogEvent(ctx, long, testEventLogMaxEntries, testEventLogMaxMessageLen)
eventLog.LogEvent(ctx, long)

require.Len(t, eventLog.Events, 1)
require.Len(t, eventLog.Events[0].Message, testEventLogMaxMessageLen)
Expand All @@ -55,7 +57,7 @@ func TestEventLog_DropsEarliestWhenFull(t *testing.T) {
const overflow = 5
total := testEventLogMaxEntries + overflow
for i := range total {
eventLog.LogEvent(ctx, fmt.Sprintf("event-%d", i), testEventLogMaxEntries, testEventLogMaxMessageLen)
eventLog.LogEvent(ctx, fmt.Sprintf("event-%d", i))
}

require.Len(t, eventLog.Events, testEventLogMaxEntries)
Expand All @@ -65,6 +67,24 @@ func TestEventLog_DropsEarliestWhenFull(t *testing.T) {
require.Equal(t, fmt.Sprintf("event-%d", total-1), eventLog.Events[len(eventLog.Events)-1].Message)
}

// TestEventLog_NoConfigFallsBackToDefaults checks that LogEvent applies the
// default retention limits instead of panicking when no config is reachable via
// the context, as in tdbg's registration-only setup.
func TestEventLog_NoConfigFallsBackToDefaults(t *testing.T) {
ctx := &chasm.MockMutableContext{}
eventLog := scheduler.NewEventLog(ctx)

require.NotPanics(t, func() {
eventLog.LogEvent(ctx, "logged without a registered config")
})
require.Len(t, eventLog.Events, 1)

long := strings.Repeat("x", scheduler.DefaultTweakables.EventLogMaxMessageLen+50)
eventLog.LogEvent(ctx, long)
require.Len(t, eventLog.Events, 2)
require.Len(t, eventLog.Events[1].Message, scheduler.DefaultTweakables.EventLogMaxMessageLen)
}

func TestEventLog_EachComponentHasOwn(t *testing.T) {
sched, ctx, _ := setupSchedulerForTest(t)

Expand All @@ -86,7 +106,7 @@ func TestEventLog_EachComponentHasOwn(t *testing.T) {
sched.Generator.Get(ctx).EventLog.Get(ctx).Events = nil
backfiller.EventLog.Get(ctx).Events = nil

sched.EventLog.Get(ctx).LogEvent(ctx, "scheduler-event", testEventLogMaxEntries, testEventLogMaxMessageLen)
sched.EventLog.Get(ctx).LogEvent(ctx, "scheduler-event")
require.Len(t, sched.EventLog.Get(ctx).Events, 1)
require.Empty(t, sched.Generator.Get(ctx).EventLog.Get(ctx).Events)
require.Empty(t, backfiller.EventLog.Get(ctx).Events)
Expand Down
13 changes: 13 additions & 0 deletions chasm/lib/scheduler/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,16 @@ func (s *Scheduler) RecordCompletedAction(
func (i *Invoker) RunningWorkflowID(requestID string) string {
return i.runningWorkflowID(requestID)
}

// RecordExecuteResult exposes recordExecuteResult so tests can pin the
// per-RequestId idempotency guard against concurrent ExecuteTasks.
func (i *Invoker) RecordExecuteResult(
ctx chasm.MutableContext,
completed []*schedulespb.BufferedStart,
retryable []*schedulespb.BufferedStart,
) (newlyStarted, droppedDuplicates int) {
return i.recordExecuteResult(ctx, &executeResult{
CompletedStarts: completed,
RetryableStarts: retryable,
})
}
78 changes: 46 additions & 32 deletions chasm/lib/scheduler/gen/schedulerpb/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading