From ee840faea28abd23db9d4e3e88df70ae70e7123d Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 3 Jun 2026 12:26:58 -0400 Subject: [PATCH 1/3] DLQ CHASM pure task if valid after execution, add unit test verifications to test framework --- chasm/chasmtest/task_helpers.go | 15 +++- chasm/chasmtest/task_helpers_test.go | 119 +++++++++++++++++++++++++++ chasm/errors.go | 27 ++++++ chasm/tree.go | 15 ++-- chasm/tree_test.go | 60 ++++++++++++-- 5 files changed, 222 insertions(+), 14 deletions(-) create mode 100644 chasm/chasmtest/task_helpers_test.go diff --git a/chasm/chasmtest/task_helpers.go b/chasm/chasmtest/task_helpers.go index a8e6d3c6ec9..9d342ccec48 100644 --- a/chasm/chasmtest/task_helpers.go +++ b/chasm/chasmtest/task_helpers.go @@ -10,6 +10,8 @@ import ( // ExecutePureTask validates and executes a pure task atomically via [Engine.UpdateComponent]. // It returns taskDropped set to true if [chasm.PureTaskHandler.Validate] returns (false, nil), // indicating the task is no longer relevant and was not executed. +// After a successful execution, Validate must return (false, nil), otherwise +// the helper returns an error because the task would remain runnable. // // The component ref is resolved automatically — no separate [Engine.ReadComponent] call to // obtain a ref is needed. Pass the component pointer directly. @@ -48,7 +50,18 @@ func ExecutePureTask[C chasm.Component, T any]( taskDropped = true return nil } - return handler.Execute(mutableCtx, typedC, attrs, task) + if err = handler.Execute(mutableCtx, typedC, attrs, task); err != nil { + return err + } + + valid, err = handler.Validate(mutableCtx, typedC, attrs, task) + if err != nil { + return err + } + if valid { + return chasm.NewTaskNotInvalidatedError("pure", fmt.Sprintf("task_type=%T", task)) + } + return nil }, ) return taskDropped, err diff --git a/chasm/chasmtest/task_helpers_test.go b/chasm/chasmtest/task_helpers_test.go new file mode 100644 index 00000000000..cd3e096ce41 --- /dev/null +++ b/chasm/chasmtest/task_helpers_test.go @@ -0,0 +1,119 @@ +package chasmtest + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/server/chasm" + chasmtests "go.temporal.io/server/chasm/lib/tests" + "go.temporal.io/server/chasm/lib/tests/gen/testspb/v1" + "go.temporal.io/server/common/log" +) + +type noOpPureTTLTaskHandler struct { + chasm.PureTaskHandlerBase +} + +func (h *noOpPureTTLTaskHandler) Execute( + _ chasm.MutableContext, + _ *chasmtests.PayloadStore, + _ chasm.TaskAttributes, + _ *testspb.TestPayloadTTLPureTask, +) error { + return nil +} + +func (h *noOpPureTTLTaskHandler) Validate( + _ chasm.Context, + store *chasmtests.PayloadStore, + _ chasm.TaskAttributes, + task *testspb.TestPayloadTTLPureTask, +) (bool, error) { + _, ok := store.State.ExpirationTimes[task.PayloadKey] + return ok, nil +} + +func TestExecutePureTaskRequiresPostExecutionInvalidation(t *testing.T) { + engine, store, attrs := newPayloadStoreWithExpiringPayload(t, "pure-task-payload") + + task := &testspb.TestPayloadTTLPureTask{PayloadKey: "pure-task-payload"} + taskDropped, err := ExecutePureTask( + context.Background(), + engine, + store, + &chasmtests.PayloadTTLPureTaskHandler{}, + attrs, + task, + ) + require.NoError(t, err) + require.False(t, taskDropped) + + engine, store, attrs = newPayloadStoreWithExpiringPayload(t, "still-valid-pure-task-payload") + task = &testspb.TestPayloadTTLPureTask{PayloadKey: "still-valid-pure-task-payload"} + taskDropped, err = ExecutePureTask( + context.Background(), + engine, + store, + &noOpPureTTLTaskHandler{}, + attrs, + task, + ) + require.ErrorContains(t, err, "CHASM pure task remained valid after successful execution") + var taskNotInvalidatedErr *chasm.TaskNotInvalidatedError + require.ErrorAs(t, err, &taskNotInvalidatedErr) + require.True(t, taskNotInvalidatedErr.IsTerminalTaskError()) + require.False(t, taskDropped) +} + +func newPayloadStoreWithExpiringPayload( + t *testing.T, + payloadKey string, +) (*Engine, *chasmtests.PayloadStore, chasm.TaskAttributes) { + t.Helper() + + registry := chasm.NewRegistry(log.NewNoopLogger()) + require.NoError(t, registry.Register(&chasm.CoreLibrary{})) + require.NoError(t, registry.Register(chasmtests.Library)) + + engine := NewEngine(t, registry) + ctx := chasm.NewEngineContext(context.Background(), engine) + executionKey := chasm.ExecutionKey{ + NamespaceID: "namespace-id", + BusinessID: payloadKey, + } + + var store *chasmtests.PayloadStore + _, err := chasm.StartExecution( + ctx, + executionKey, + func(mutableContext chasm.MutableContext, _ any) (*chasmtests.PayloadStore, error) { + var err error + store, err = chasmtests.NewPayloadStore(mutableContext) + return store, err + }, + nil, + ) + require.NoError(t, err) + require.NotNil(t, store) + + _, _, err = chasm.UpdateComponent( + ctx, + chasm.NewComponentRef[*chasmtests.PayloadStore](executionKey), + (*chasmtests.PayloadStore).AddPayload, + chasmtests.AddPayloadRequest{ + PayloadKey: payloadKey, + Payload: &commonpb.Payload{ + Data: []byte("payload"), + }, + TTL: time.Hour, + }, + ) + require.NoError(t, err) + + return engine, store, chasm.TaskAttributes{ + ScheduledTime: time.Now().Add(2 * time.Hour), + } +} diff --git a/chasm/errors.go b/chasm/errors.go index f3eace657ba..a6718c6683d 100644 --- a/chasm/errors.go +++ b/chasm/errors.go @@ -19,3 +19,30 @@ func NewExecutionAlreadyStartedErr( func (e *ExecutionAlreadyStartedError) Error() string { return e.Message } + +// TaskNotInvalidatedError indicates a CHASM task executed successfully but its +// validator still reports the task as valid. Retrying such a task can loop +// forever, so queue executors should treat it as terminal and send it to DLQ +// when task DLQ is enabled. +type TaskNotInvalidatedError struct { + TaskKind string + TaskInfo string +} + +func NewTaskNotInvalidatedError( + taskKind string, + taskInfo string, +) *TaskNotInvalidatedError { + return &TaskNotInvalidatedError{ + TaskKind: taskKind, + TaskInfo: taskInfo, + } +} + +func (e *TaskNotInvalidatedError) Error() string { + return "CHASM " + e.TaskKind + " task remained valid after successful execution: " + e.TaskInfo +} + +func (e *TaskNotInvalidatedError) IsTerminalTaskError() bool { + return true +} diff --git a/chasm/tree.go b/chasm/tree.go index 0b8207adac5..48cf05af570 100644 --- a/chasm/tree.go +++ b/chasm/tree.go @@ -3188,12 +3188,15 @@ func (n *Node) ExecutePureTask( return true, execErr } - // TODO - a task validator must succeed validation after a task executes - // successfully (without error), otherwise it will generate an infinite loop. - // Check for this case by marking the in-memory task as having executed, which the - // CloseTransaction method will check against. - // - // See: https://github.com/temporalio/temporal/pull/7701#discussion_r2072026993 + if !taskAttributes.IsImmediate() { + valid, err = n.validateTask(validationContext, taskAttributes, taskInstance) + if err != nil { + return true, err + } + if valid { + return true, NewTaskNotInvalidatedError("pure", fmt.Sprintf("task_type=%s", registrableTask.fqType())) + } + } return true, nil } diff --git a/chasm/tree_test.go b/chasm/tree_test.go index c47fbe7f8c6..ad744ca107d 100644 --- a/chasm/tree_test.go +++ b/chasm/tree_test.go @@ -3159,11 +3159,12 @@ func (s *nodeSuite) TestExecuteImmediatePureTask() { }, ) - // One valid task, one invalid task + // One valid task, one invalid task. s.testLibrary.mockPureTaskHandler.EXPECT(). - Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(false, nil).Times(1) - s.testLibrary.mockPureTaskHandler.EXPECT(). - Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(true, nil).Times(1) + Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()). + DoAndReturn(func(_ Context, _ any, _ TaskAttributes, task *TestPureTask) (bool, error) { + return string(task.Payload.Data) != "root-task-payload", nil + }).Times(2) s.testLibrary.mockPureTaskHandler.EXPECT(). Execute( gomock.AssignableToTypeOf(&mutableCtx{}), @@ -3374,7 +3375,7 @@ func (s *nodeSuite) TestExecutePureTask() { }, } - taskAttributes := TaskAttributes{} + taskAttributes := TaskAttributes{ScheduledTime: s.timeSource.Now()} pureTask := &TestPureTask{ Payload: &commonpb.Payload{ Data: []byte("some-random-data"), @@ -3400,11 +3401,31 @@ func (s *nodeSuite) TestExecutePureTask() { s.testLibrary.mockPureTaskHandler.EXPECT(). Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(retValue, errValue).Times(1) } + type validateResult struct { + valid bool + err error + } + expectValidateSequence := func(results ...validateResult) { + calls := make([]*gomock.Call, 0, len(results)) + for _, result := range results { + calls = append(calls, s.testLibrary.mockPureTaskHandler.EXPECT(). + Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()). + Return(result.valid, result.err).Times(1)) + } + orderedCalls := make([]any, 0, len(calls)) + for _, call := range calls { + orderedCalls = append(orderedCalls, call) + } + gomock.InOrder(orderedCalls...) + } - // Succeed task execution and validation (happy case). + // Succeed task execution and post-execution validation reports the task is invalid. root.setValueState(valueStateSynced) expectExecute(nil) - expectValidate(true, nil) + expectValidateSequence( + validateResult{valid: true}, + validateResult{valid: false}, + ) executed, err := root.ExecutePureTask(ctx, taskAttributes, pureTask) s.NoError(err) s.True(executed) @@ -3420,6 +3441,31 @@ func (s *nodeSuite) TestExecutePureTask() { s.ErrorIs(expectedErr, err) s.Equal(valueStateNeedSyncStructure, root.valueState) + // Succeed execution, but post-execution validation still returns valid. + root.setValueState(valueStateSynced) + expectExecute(nil) + expectValidateSequence( + validateResult{valid: true}, + validateResult{valid: true}, + ) + _, err = root.ExecutePureTask(ctx, taskAttributes, pureTask) + s.ErrorContains(err, "CHASM pure task remained valid after successful execution") + var taskNotInvalidatedErr *TaskNotInvalidatedError + s.ErrorAs(err, &taskNotInvalidatedErr) + s.True(taskNotInvalidatedErr.IsTerminalTaskError()) + s.Equal(valueStateNeedSyncStructure, root.valueState) + + // Succeed execution, but post-execution validation errors. + root.setValueState(valueStateSynced) + expectExecute(nil) + expectValidateSequence( + validateResult{valid: true}, + validateResult{valid: false, err: expectedErr}, + ) + _, err = root.ExecutePureTask(ctx, taskAttributes, pureTask) + s.ErrorIs(expectedErr, err) + s.Equal(valueStateNeedSyncStructure, root.valueState) + // Fail task validation (no execution occurs). root.setValueState(valueStateSynced) expectValidate(false, nil) From 7db9bc3213976146e0eb0a6b33882cd20cf33be0 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 5 Jun 2026 11:10:56 -0400 Subject: [PATCH 2/3] Fix linter issues --- chasm/tree_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/chasm/tree_test.go b/chasm/tree_test.go index ad744ca107d..e06e1c5d3a8 100644 --- a/chasm/tree_test.go +++ b/chasm/tree_test.go @@ -3160,11 +3160,12 @@ func (s *nodeSuite) TestExecuteImmediatePureTask() { ) // One valid task, one invalid task. - s.testLibrary.mockPureTaskHandler.EXPECT(). - Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()). - DoAndReturn(func(_ Context, _ any, _ TaskAttributes, task *TestPureTask) (bool, error) { - return string(task.Payload.Data) != "root-task-payload", nil - }).Times(2) + gomock.InOrder( + s.testLibrary.mockPureTaskHandler.EXPECT(). + Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(false, nil).Times(1), + s.testLibrary.mockPureTaskHandler.EXPECT(). + Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(true, nil).Times(1), + ) s.testLibrary.mockPureTaskHandler.EXPECT(). Execute( gomock.AssignableToTypeOf(&mutableCtx{}), From e60bd604086d3458586940f0ef699c03e97b0996 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 5 Jun 2026 15:30:39 -0400 Subject: [PATCH 3/3] Remove immediate pure task check --- chasm/lib/scheduler/backfiller_tasks.go | 3 ++ chasm/lib/scheduler/generator.go | 1 + chasm/lib/scheduler/generator_tasks.go | 3 ++ chasm/lib/scheduler/invoker_tasks.go | 6 +++ chasm/tree.go | 14 +++---- chasm/tree_test.go | 56 +++++++++++++++++++++---- 6 files changed, 67 insertions(+), 16 deletions(-) diff --git a/chasm/lib/scheduler/backfiller_tasks.go b/chasm/lib/scheduler/backfiller_tasks.go index fa5380b1fa3..4c84ea06cc7 100644 --- a/chasm/lib/scheduler/backfiller_tasks.go +++ b/chasm/lib/scheduler/backfiller_tasks.go @@ -50,6 +50,9 @@ func (b *BackfillerTaskHandler) Validate( attrs chasm.TaskAttributes, _ *schedulerpb.BackfillerTask, ) (bool, error) { + if attrs.IsImmediate() { + return backfiller.GetAttempt() == 0, nil + } return validateTaskHighWaterMark( backfiller.GetLastProcessedTime(), attrs.ScheduledTime, diff --git a/chasm/lib/scheduler/generator.go b/chasm/lib/scheduler/generator.go index 028792f884a..9c3f3b30e3e 100644 --- a/chasm/lib/scheduler/generator.go +++ b/chasm/lib/scheduler/generator.go @@ -41,6 +41,7 @@ func newGeneratorWithState(ctx chasm.MutableContext, state *schedulerpb.Generato // Generate immediately kicks off a new GeneratorTask. Used after updating the // schedule specification. func (g *Generator) Generate(ctx chasm.MutableContext) { + g.FutureActionTimes = nil g.scheduleTask(ctx, chasm.TaskScheduledTimeImmediate) } diff --git a/chasm/lib/scheduler/generator_tasks.go b/chasm/lib/scheduler/generator_tasks.go index b88e96952eb..4b955a98233 100644 --- a/chasm/lib/scheduler/generator_tasks.go +++ b/chasm/lib/scheduler/generator_tasks.go @@ -156,6 +156,9 @@ func (g *GeneratorTaskHandler) Validate( attrs chasm.TaskAttributes, _ *schedulerpb.GeneratorTask, ) (bool, error) { + if attrs.IsImmediate() { + return generator.GetFutureActionTimes() == nil, nil + } return validateTaskHighWaterMark( generator.GetLastProcessedTime(), attrs.ScheduledTime, diff --git a/chasm/lib/scheduler/invoker_tasks.go b/chasm/lib/scheduler/invoker_tasks.go index 19d5bc17309..f09ae84e40c 100644 --- a/chasm/lib/scheduler/invoker_tasks.go +++ b/chasm/lib/scheduler/invoker_tasks.go @@ -377,6 +377,12 @@ func (h *InvokerProcessBufferTaskHandler) Validate( attrs chasm.TaskAttributes, _ *schedulerpb.InvokerProcessBufferTask, ) (bool, error) { + if attrs.IsImmediate() { + lastProcessedTime := invoker.GetLastProcessedTime() + return lastProcessedTime == nil || + (lastProcessedTime.GetSeconds() == 0 && lastProcessedTime.GetNanos() == 0) || + lastProcessedTime.AsTime().Before(ctx.Now(invoker)), nil + } return validateTaskHighWaterMark( invoker.GetLastProcessedTime(), attrs.ScheduledTime, diff --git a/chasm/tree.go b/chasm/tree.go index 48cf05af570..585b88662bc 100644 --- a/chasm/tree.go +++ b/chasm/tree.go @@ -3188,14 +3188,12 @@ func (n *Node) ExecutePureTask( return true, execErr } - if !taskAttributes.IsImmediate() { - valid, err = n.validateTask(validationContext, taskAttributes, taskInstance) - if err != nil { - return true, err - } - if valid { - return true, NewTaskNotInvalidatedError("pure", fmt.Sprintf("task_type=%s", registrableTask.fqType())) - } + valid, err = n.validateTask(validationContext, taskAttributes, taskInstance) + if err != nil { + return true, err + } + if valid { + return true, NewTaskNotInvalidatedError("pure", fmt.Sprintf("task_type=%s", registrableTask.fqType())) } return true, nil diff --git a/chasm/tree_test.go b/chasm/tree_test.go index e06e1c5d3a8..3b5baf6b1f4 100644 --- a/chasm/tree_test.go +++ b/chasm/tree_test.go @@ -3159,20 +3159,22 @@ func (s *nodeSuite) TestExecuteImmediatePureTask() { }, ) - // One valid task, one invalid task. + // One invalid task, one valid task that invalidates after execution. gomock.InOrder( s.testLibrary.mockPureTaskHandler.EXPECT(). Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(false, nil).Times(1), s.testLibrary.mockPureTaskHandler.EXPECT(). Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(true, nil).Times(1), + s.testLibrary.mockPureTaskHandler.EXPECT(). + Execute( + gomock.AssignableToTypeOf(&mutableCtx{}), + gomock.Any(), + gomock.Eq(taskAttributes), + gomock.Any(), + ).Return(nil).Times(1), + s.testLibrary.mockPureTaskHandler.EXPECT(). + Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(false, nil).Times(1), ) - s.testLibrary.mockPureTaskHandler.EXPECT(). - Execute( - gomock.AssignableToTypeOf(&mutableCtx{}), - gomock.Any(), - gomock.Eq(taskAttributes), - gomock.Any(), - ).Return(nil).Times(1) mutations, err = root.CloseTransaction() s.NoError(err) @@ -3184,6 +3186,44 @@ func (s *nodeSuite) TestExecuteImmediatePureTask() { s.Equal(tasks.MaximumKey.FireTime, s.nodeBackend.LastDeletePureTaskCall()) } +func (s *nodeSuite) TestExecuteImmediatePureTaskRequiresPostExecutionInvalidation() { + root := s.testComponentTree() + + _, err := root.CloseTransaction() + s.NoError(err) + + mutableContext := NewMutableContext(context.Background(), root) + component, err := root.Component(mutableContext, ComponentRef{}) + s.NoError(err) + testComponent := component.(*TestComponent) + + taskAttributes := TaskAttributes{ScheduledTime: TaskScheduledTimeImmediate} + pureTask := &TestPureTask{ + Payload: &commonpb.Payload{Data: []byte("root-task-payload")}, + } + mutableContext.AddTask(testComponent, taskAttributes, pureTask) + + gomock.InOrder( + s.testLibrary.mockPureTaskHandler.EXPECT(). + Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Eq(pureTask)).Return(true, nil).Times(1), + s.testLibrary.mockPureTaskHandler.EXPECT(). + Execute( + gomock.AssignableToTypeOf(&mutableCtx{}), + gomock.Any(), + gomock.Eq(taskAttributes), + gomock.Eq(pureTask), + ).Return(nil).Times(1), + s.testLibrary.mockPureTaskHandler.EXPECT(). + Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Eq(pureTask)).Return(true, nil).Times(1), + ) + + _, err = root.CloseTransaction() + s.ErrorContains(err, "CHASM pure task remained valid after successful execution") + var taskNotInvalidatedErr *TaskNotInvalidatedError + s.ErrorAs(err, &taskNotInvalidatedErr) + s.True(taskNotInvalidatedErr.IsTerminalTaskError()) +} + func (s *nodeSuite) TestEachPureTask() { now := s.timeSource.Now()