Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion chasm/chasmtest/task_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
// ExecutePureTask validates and executes a pure task atomically via [Engine.UpdateComponent].
// It returns taskDropped set to true if [chasm.PureTaskHandler.Validate] returns (false, nil),
// indicating the task is no longer relevant and was not executed.
// After a successful execution, Validate must return (false, nil), otherwise
// the helper returns an error because the task would remain runnable.
//
// The component ref is resolved automatically — no separate [Engine.ReadComponent] call to
// obtain a ref is needed. Pass the component pointer directly.
Expand Down Expand Up @@ -48,7 +50,18 @@ func ExecutePureTask[C chasm.Component, T any](
taskDropped = true
return nil
}
return handler.Execute(mutableCtx, typedC, attrs, task)
if err = handler.Execute(mutableCtx, typedC, attrs, task); err != nil {
return err
}

valid, err = handler.Validate(mutableCtx, typedC, attrs, task)
if err != nil {
return err
}
if valid {
return chasm.NewTaskNotInvalidatedError("pure", fmt.Sprintf("task_type=%T", task))
}
return nil
},
)
return taskDropped, err
Expand Down
119 changes: 119 additions & 0 deletions chasm/chasmtest/task_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package chasmtest

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/server/chasm"
chasmtests "go.temporal.io/server/chasm/lib/tests"
"go.temporal.io/server/chasm/lib/tests/gen/testspb/v1"
"go.temporal.io/server/common/log"
)

type noOpPureTTLTaskHandler struct {
chasm.PureTaskHandlerBase
}

func (h *noOpPureTTLTaskHandler) Execute(
_ chasm.MutableContext,
_ *chasmtests.PayloadStore,
_ chasm.TaskAttributes,
_ *testspb.TestPayloadTTLPureTask,
) error {
return nil
}

func (h *noOpPureTTLTaskHandler) Validate(
_ chasm.Context,
store *chasmtests.PayloadStore,
_ chasm.TaskAttributes,
task *testspb.TestPayloadTTLPureTask,
) (bool, error) {
_, ok := store.State.ExpirationTimes[task.PayloadKey]
return ok, nil
}

func TestExecutePureTaskRequiresPostExecutionInvalidation(t *testing.T) {
engine, store, attrs := newPayloadStoreWithExpiringPayload(t, "pure-task-payload")

task := &testspb.TestPayloadTTLPureTask{PayloadKey: "pure-task-payload"}
taskDropped, err := ExecutePureTask(
context.Background(),
engine,
store,
&chasmtests.PayloadTTLPureTaskHandler{},
attrs,
task,
)
require.NoError(t, err)
require.False(t, taskDropped)

engine, store, attrs = newPayloadStoreWithExpiringPayload(t, "still-valid-pure-task-payload")
task = &testspb.TestPayloadTTLPureTask{PayloadKey: "still-valid-pure-task-payload"}
taskDropped, err = ExecutePureTask(
context.Background(),
engine,
store,
&noOpPureTTLTaskHandler{},
attrs,
task,
)
require.ErrorContains(t, err, "CHASM pure task remained valid after successful execution")
var taskNotInvalidatedErr *chasm.TaskNotInvalidatedError
require.ErrorAs(t, err, &taskNotInvalidatedErr)
require.True(t, taskNotInvalidatedErr.IsTerminalTaskError())
require.False(t, taskDropped)
}

func newPayloadStoreWithExpiringPayload(
t *testing.T,
payloadKey string,
) (*Engine, *chasmtests.PayloadStore, chasm.TaskAttributes) {
t.Helper()

registry := chasm.NewRegistry(log.NewNoopLogger())
require.NoError(t, registry.Register(&chasm.CoreLibrary{}))
require.NoError(t, registry.Register(chasmtests.Library))

engine := NewEngine(t, registry)
ctx := chasm.NewEngineContext(context.Background(), engine)
executionKey := chasm.ExecutionKey{
NamespaceID: "namespace-id",
BusinessID: payloadKey,
}

var store *chasmtests.PayloadStore
_, err := chasm.StartExecution(
ctx,
executionKey,
func(mutableContext chasm.MutableContext, _ any) (*chasmtests.PayloadStore, error) {
var err error
store, err = chasmtests.NewPayloadStore(mutableContext)
return store, err
},
nil,
)
require.NoError(t, err)
require.NotNil(t, store)

_, _, err = chasm.UpdateComponent(
ctx,
chasm.NewComponentRef[*chasmtests.PayloadStore](executionKey),
(*chasmtests.PayloadStore).AddPayload,
chasmtests.AddPayloadRequest{
PayloadKey: payloadKey,
Payload: &commonpb.Payload{
Data: []byte("payload"),
},
TTL: time.Hour,
},
)
require.NoError(t, err)

return engine, store, chasm.TaskAttributes{
ScheduledTime: time.Now().Add(2 * time.Hour),
}
}
27 changes: 27 additions & 0 deletions chasm/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,30 @@ func NewExecutionAlreadyStartedErr(
func (e *ExecutionAlreadyStartedError) Error() string {
return e.Message
}

// TaskNotInvalidatedError indicates a CHASM task executed successfully but its
// validator still reports the task as valid. Retrying such a task can loop
// forever, so queue executors should treat it as terminal and send it to DLQ
// when task DLQ is enabled.
type TaskNotInvalidatedError struct {
TaskKind string
TaskInfo string
}

func NewTaskNotInvalidatedError(
taskKind string,
taskInfo string,
) *TaskNotInvalidatedError {
return &TaskNotInvalidatedError{
TaskKind: taskKind,
TaskInfo: taskInfo,
}
}

func (e *TaskNotInvalidatedError) Error() string {
return "CHASM " + e.TaskKind + " task remained valid after successful execution: " + e.TaskInfo
}

func (e *TaskNotInvalidatedError) IsTerminalTaskError() bool {
return true
}
3 changes: 3 additions & 0 deletions chasm/lib/scheduler/backfiller_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (b *BackfillerTaskHandler) Validate(
attrs chasm.TaskAttributes,
_ *schedulerpb.BackfillerTask,
) (bool, error) {
if attrs.IsImmediate() {
return backfiller.GetAttempt() == 0, nil
}
return validateTaskHighWaterMark(
backfiller.GetLastProcessedTime(),
attrs.ScheduledTime,
Expand Down
1 change: 1 addition & 0 deletions chasm/lib/scheduler/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func newGeneratorWithState(ctx chasm.MutableContext, state *schedulerpb.Generato
// Generate immediately kicks off a new GeneratorTask. Used after updating the
// schedule specification.
func (g *Generator) Generate(ctx chasm.MutableContext) {
g.FutureActionTimes = nil
g.scheduleTask(ctx, chasm.TaskScheduledTimeImmediate)
}

Expand Down
3 changes: 3 additions & 0 deletions chasm/lib/scheduler/generator_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func (g *GeneratorTaskHandler) Validate(
attrs chasm.TaskAttributes,
_ *schedulerpb.GeneratorTask,
) (bool, error) {
if attrs.IsImmediate() {
return generator.GetFutureActionTimes() == nil, nil
}
return validateTaskHighWaterMark(
generator.GetLastProcessedTime(),
attrs.ScheduledTime,
Expand Down
6 changes: 6 additions & 0 deletions chasm/lib/scheduler/invoker_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,12 @@ func (h *InvokerProcessBufferTaskHandler) Validate(
attrs chasm.TaskAttributes,
_ *schedulerpb.InvokerProcessBufferTask,
) (bool, error) {
if attrs.IsImmediate() {
lastProcessedTime := invoker.GetLastProcessedTime()
return lastProcessedTime == nil ||
(lastProcessedTime.GetSeconds() == 0 && lastProcessedTime.GetNanos() == 0) ||
lastProcessedTime.AsTime().Before(ctx.Now(invoker)), nil
}
return validateTaskHighWaterMark(
invoker.GetLastProcessedTime(),
attrs.ScheduledTime,
Expand Down
13 changes: 7 additions & 6 deletions chasm/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -3188,12 +3188,13 @@ func (n *Node) ExecutePureTask(
return true, execErr
}

// TODO - a task validator must succeed validation after a task executes
// successfully (without error), otherwise it will generate an infinite loop.
// Check for this case by marking the in-memory task as having executed, which the
// CloseTransaction method will check against.
//
// See: https://github.com/temporalio/temporal/pull/7701#discussion_r2072026993
valid, err = n.validateTask(validationContext, taskAttributes, taskInstance)
if err != nil {
return true, err
}
if valid {
return true, NewTaskNotInvalidatedError("pure", fmt.Sprintf("task_type=%s", registrableTask.fqType()))
}

return true, nil
}
Expand Down
117 changes: 102 additions & 15 deletions chasm/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3159,18 +3159,22 @@ func (s *nodeSuite) TestExecuteImmediatePureTask() {
},
)

// One valid task, one invalid task
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(false, nil).Times(1)
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(true, nil).Times(1)
s.testLibrary.mockPureTaskHandler.EXPECT().
Execute(
gomock.AssignableToTypeOf(&mutableCtx{}),
gomock.Any(),
gomock.Eq(taskAttributes),
gomock.Any(),
).Return(nil).Times(1)
// One invalid task, one valid task that invalidates after execution.
gomock.InOrder(
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(false, nil).Times(1),
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(true, nil).Times(1),
s.testLibrary.mockPureTaskHandler.EXPECT().
Execute(
gomock.AssignableToTypeOf(&mutableCtx{}),
gomock.Any(),
gomock.Eq(taskAttributes),
gomock.Any(),
).Return(nil).Times(1),
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(false, nil).Times(1),
)

mutations, err = root.CloseTransaction()
s.NoError(err)
Expand All @@ -3182,6 +3186,44 @@ func (s *nodeSuite) TestExecuteImmediatePureTask() {
s.Equal(tasks.MaximumKey.FireTime, s.nodeBackend.LastDeletePureTaskCall())
}

func (s *nodeSuite) TestExecuteImmediatePureTaskRequiresPostExecutionInvalidation() {
root := s.testComponentTree()

_, err := root.CloseTransaction()
s.NoError(err)

mutableContext := NewMutableContext(context.Background(), root)
component, err := root.Component(mutableContext, ComponentRef{})
s.NoError(err)
testComponent := component.(*TestComponent)

taskAttributes := TaskAttributes{ScheduledTime: TaskScheduledTimeImmediate}
pureTask := &TestPureTask{
Payload: &commonpb.Payload{Data: []byte("root-task-payload")},
}
mutableContext.AddTask(testComponent, taskAttributes, pureTask)

gomock.InOrder(
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Eq(pureTask)).Return(true, nil).Times(1),
s.testLibrary.mockPureTaskHandler.EXPECT().
Execute(
gomock.AssignableToTypeOf(&mutableCtx{}),
gomock.Any(),
gomock.Eq(taskAttributes),
gomock.Eq(pureTask),
).Return(nil).Times(1),
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Eq(pureTask)).Return(true, nil).Times(1),
)

_, err = root.CloseTransaction()
s.ErrorContains(err, "CHASM pure task remained valid after successful execution")
var taskNotInvalidatedErr *TaskNotInvalidatedError
s.ErrorAs(err, &taskNotInvalidatedErr)
s.True(taskNotInvalidatedErr.IsTerminalTaskError())
}

func (s *nodeSuite) TestEachPureTask() {
now := s.timeSource.Now()

Expand Down Expand Up @@ -3374,7 +3416,7 @@ func (s *nodeSuite) TestExecutePureTask() {
},
}

taskAttributes := TaskAttributes{}
taskAttributes := TaskAttributes{ScheduledTime: s.timeSource.Now()}
pureTask := &TestPureTask{
Payload: &commonpb.Payload{
Data: []byte("some-random-data"),
Expand All @@ -3400,11 +3442,31 @@ func (s *nodeSuite) TestExecutePureTask() {
s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).Return(retValue, errValue).Times(1)
}
type validateResult struct {
valid bool
err error
}
expectValidateSequence := func(results ...validateResult) {
calls := make([]*gomock.Call, 0, len(results))
for _, result := range results {
calls = append(calls, s.testLibrary.mockPureTaskHandler.EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Eq(taskAttributes), gomock.Any()).
Return(result.valid, result.err).Times(1))
}
orderedCalls := make([]any, 0, len(calls))
for _, call := range calls {
orderedCalls = append(orderedCalls, call)
}
gomock.InOrder(orderedCalls...)
}

// Succeed task execution and validation (happy case).
// Succeed task execution and post-execution validation reports the task is invalid.
root.setValueState(valueStateSynced)
expectExecute(nil)
expectValidate(true, nil)
expectValidateSequence(
validateResult{valid: true},
validateResult{valid: false},
)
executed, err := root.ExecutePureTask(ctx, taskAttributes, pureTask)
s.NoError(err)
s.True(executed)
Expand All @@ -3420,6 +3482,31 @@ func (s *nodeSuite) TestExecutePureTask() {
s.ErrorIs(expectedErr, err)
s.Equal(valueStateNeedSyncStructure, root.valueState)

// Succeed execution, but post-execution validation still returns valid.
root.setValueState(valueStateSynced)
expectExecute(nil)
expectValidateSequence(
validateResult{valid: true},
validateResult{valid: true},
)
_, err = root.ExecutePureTask(ctx, taskAttributes, pureTask)
s.ErrorContains(err, "CHASM pure task remained valid after successful execution")
var taskNotInvalidatedErr *TaskNotInvalidatedError
s.ErrorAs(err, &taskNotInvalidatedErr)
s.True(taskNotInvalidatedErr.IsTerminalTaskError())
s.Equal(valueStateNeedSyncStructure, root.valueState)

// Succeed execution, but post-execution validation errors.
root.setValueState(valueStateSynced)
expectExecute(nil)
expectValidateSequence(
validateResult{valid: true},
validateResult{valid: false, err: expectedErr},
)
_, err = root.ExecutePureTask(ctx, taskAttributes, pureTask)
s.ErrorIs(expectedErr, err)
s.Equal(valueStateNeedSyncStructure, root.valueState)

// Fail task validation (no execution occurs).
root.setValueState(valueStateSynced)
expectValidate(false, nil)
Expand Down