Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5093,8 +5093,8 @@ func (wh *WorkflowHandler) listSchedulesChasm(
return nil, err
}

schedules := make([]*schedulepb.ScheduleListEntry, len(resp.Executions))
for i, ex := range resp.Executions {
schedules := make([]*schedulepb.ScheduleListEntry, 0, len(resp.Executions))
for _, ex := range resp.Executions {
// Schedules returned may have either a Memo (V1) or a ChasmMemo (V2) containing
// the ScheduleListInfo. Both must be handled, as migration means a mix of
// versions can be returned.
Expand All @@ -5110,15 +5110,29 @@ func (wh *WorkflowHandler) listSchedulesChasm(
workflowID := ex.BusinessID
scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix) // needed for V1 schedules, not CHASM

schedules[i] = &schedulepb.ScheduleListEntry{
// Skip entries that carry no decodable ScheduleListInfo. This happens during
// a CHASM->V1 migration: the migrated V1 workflow is Running under the
// scheduler namespace division (so it matches the query) before it has
// written its TemporalScheduleInfo memo, so neither the V2 nor V1 decode
// above yields a spec. Emitting such an entry with a nil Info crashes clients
// that dereference the spec, so omit it until the info memo is populated.
if listInfo == nil {
wh.logger.Warn("Dropping schedule list entry with no decodable ScheduleInfo memo (expected transiently while a recently-migrated scheduler workflow has not yet written its info memo)",
tag.WorkflowNamespace(namespaceName.String()),
tag.ScheduleID(scheduleID),
tag.WorkflowID(workflowID))
continue
}

schedules = append(schedules, &schedulepb.ScheduleListEntry{
ScheduleId: scheduleID,
Memo: customMemo,
// cleanScheduleSearchAttributes is only needed for V1 schedules
SearchAttributes: wh.cleanScheduleSearchAttributes(&commonpb.SearchAttributes{
IndexedFields: ex.CustomSearchAttributes,
}),
Info: listInfo,
}
})
}

return &workflowservice.ListSchedulesResponse{
Expand Down Expand Up @@ -5148,19 +5162,36 @@ func (wh *WorkflowHandler) listSchedulesWorkflow(
return nil, err
}

schedules := make([]*schedulepb.ScheduleListEntry, len(persistenceResp.Executions))
for i, ex := range persistenceResp.Executions {
schedules := make([]*schedulepb.ScheduleListEntry, 0, len(persistenceResp.Executions))
for _, ex := range persistenceResp.Executions {
memo := ex.GetMemo()
info := wh.decodeScheduleListInfo(memo)
memo = wh.cleanScheduleMemo(memo)
workflowID := ex.GetExecution().GetWorkflowId()
scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix)
schedules[i] = &schedulepb.ScheduleListEntry{

// Skip entries with no decodable ScheduleListInfo. A scheduler workflow is
// Running under the scheduler namespace division (so it matches the query)
// from the moment it is started, but only writes its TemporalScheduleInfo
// memo once it executes its first iteration (see updateMemoAndSearchAttributes
// in the scheduler workflow). This window is especially visible right after a
// CHASM->V1 migration, which starts the V1 workflow with only the custom memo.
// Emitting such an entry with a nil Info crashes clients that dereference the
// spec, so omit it until the info memo is populated.
if info == nil {
wh.logger.Warn("Dropping schedule list entry with no decodable ScheduleInfo memo (expected transiently while a recently-migrated scheduler workflow has not yet written its info memo)",
tag.WorkflowNamespace(namespaceName.String()),
tag.ScheduleID(scheduleID),
tag.WorkflowID(workflowID))
continue
}

memo = wh.cleanScheduleMemo(memo)
schedules = append(schedules, &schedulepb.ScheduleListEntry{
ScheduleId: scheduleID,
Memo: memo,
SearchAttributes: wh.cleanScheduleSearchAttributes(ex.GetSearchAttributes()),
Info: info,
}
})
}

return &workflowservice.ListSchedulesResponse{
Expand Down
141 changes: 141 additions & 0 deletions tests/schedule_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tests
import (
"encoding/binary"
"errors"
"fmt"
"strings"
"testing"
"time"
Expand All @@ -24,8 +25,11 @@ import (
schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/searchattribute/sadefs"
"go.temporal.io/server/common/testing/await"
"go.temporal.io/server/common/testing/parallelsuite"
"go.temporal.io/server/service/worker/dummy"
Expand Down Expand Up @@ -1533,6 +1537,143 @@ func (s *ScheduleMigrationTestSuite) TestDeleteScheduleContextMetadata() {
})
}

// TestListSchedulesSkipsEntriesWithoutInfo verifies that ListSchedules does not
// emit list entries whose ScheduleListInfo cannot be decoded (nil Info/Spec).
//
// A scheduler workflow is Running under the scheduler namespace division (so it
// matches the list query) from the moment it is started, but only writes its
// TemporalScheduleInfo memo once it executes its first iteration. In that window
// the entry decodes to a nil Info, which crashes clients that dereference the
// spec. This is especially visible right after a CHASM->V1 migration, which
// starts the V1 workflow with only the custom memo. The handler must skip those
// entries on both list paths:
// - CHASM enabled -> listSchedulesChasm
// - CHASM disabled -> listSchedulesWorkflow (the path hit after migrating back
// to V1 and turning CHASM routing/creation off)
func (s *ScheduleMigrationTestSuite) TestListSchedulesSkipsEntriesWithoutInfo() {
testCases := []struct {
name string
chasmRouted bool
}{
{name: "ChasmListPath", chasmRouted: true},
{name: "V1ListPath", chasmRouted: false},
}

for _, tc := range testCases {
s.Run(tc.name, func(s *ScheduleMigrationTestSuite) {
env := testcore.NewEnv(
s.T(),
testcore.WithWorkerService("scheduler operations"),
testcore.WithDynamicConfig(dynamicconfig.EnableChasm, true),
testcore.WithDynamicConfig(dynamicconfig.EnableCHASMSchedulerRouting, tc.chasmRouted),
testcore.WithDynamicConfig(dynamicconfig.EnableCHASMSchedulerCreation, tc.chasmRouted),
)

ctx := testcore.NewContext()
nsName := env.Namespace().String()

realSID := testcore.RandomizeStr("real-sched")
noInfoSID := testcore.RandomizeStr("no-info-sched")

// A real schedule, which should appear in ListSchedules with a spec.
_, err := env.FrontendClient().CreateSchedule(ctx, &workflowservice.CreateScheduleRequest{
Namespace: nsName,
ScheduleId: realSID,
Schedule: &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{
Interval: []*schedulepb.IntervalSpec{
{Interval: durationpb.New(1 * time.Hour)},
},
},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: testcore.RandomizeStr("wid"),
WorkflowType: &commonpb.WorkflowType{Name: testcore.RandomizeStr("wt")},
TaskQueue: &taskqueuepb.TaskQueue{Name: testcore.RandomizeStr("tq"), Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
},
},
},
},
Identity: "test",
RequestId: testcore.RandomizeStr("req"),
})
s.NoError(err)

// Simulate the mid-migration window: a Running workflow under the scheduler
// namespace division whose memo lacks the TemporalScheduleInfo block. A dummy
// workflow type has no worker to advance/complete it, so it stays Running and
// never writes the info memo. Its list entry therefore decodes to a nil Info.
sa := &commonpb.SearchAttributes{}
searchattribute.AddSearchAttribute(&sa, sadefs.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision))
_, err = env.GetTestCluster().HistoryClient().StartWorkflowExecution(
ctx,
common.CreateHistoryStartWorkflowRequest(
env.NamespaceID().String(),
&workflowservice.StartWorkflowExecutionRequest{
Namespace: nsName,
WorkflowId: scheduler.WorkflowIDPrefix + noInfoSID,
WorkflowType: &commonpb.WorkflowType{Name: dummy.DummyWFTypeName},
TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue},
SearchAttributes: sa,
Identity: "test",
RequestId: testcore.RandomizeStr("req"),
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
},
nil, nil, time.Now().UTC(),
),
)
s.NoError(err)

// Wait until the info-less workflow's visibility record has propagated, so
// that an unfixed handler would actually surface it (otherwise the assertion
// below could pass spuriously because the buggy entry simply hadn't appeared
// yet). We query raw visibility for the workflow ID directly.
noInfoWorkflowID := scheduler.WorkflowIDPrefix + noInfoSID
// The namespace-division clause is required: ListWorkflowExecutions hides
// workflows in a namespace division (such as scheduler workflows) by default.
noInfoQuery := fmt.Sprintf("WorkflowId = '%s' AND %s = '%s'",
noInfoWorkflowID, sadefs.TemporalNamespaceDivision, scheduler.NamespaceDivision)
s.Eventually(func() bool {
wfResp, wfErr := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Namespace: nsName,
Query: noInfoQuery,
})
return wfErr == nil && len(wfResp.GetExecutions()) == 1
}, 30*time.Second, 500*time.Millisecond, "info-less workflow never became visible")

// Once the real schedule is visible, the listing must contain it, must omit
// the info-less entry, and every returned entry must have a non-nil spec.
var listResp *workflowservice.ListSchedulesResponse
s.Eventually(func() bool {
listResp, err = env.FrontendClient().ListSchedules(ctx, &workflowservice.ListSchedulesRequest{
Namespace: nsName,
MaximumPageSize: 10,
})
if err != nil {
return false
}
for _, e := range listResp.GetSchedules() {
if e.GetScheduleId() == realSID {
return true
}
}
return false
}, 30*time.Second, 500*time.Millisecond)

ids := make([]string, 0, len(listResp.GetSchedules()))
for _, e := range listResp.GetSchedules() {
ids = append(ids, e.GetScheduleId())
s.NotNil(e.GetInfo(), "schedule %q has nil Info", e.GetScheduleId())
s.NotNil(e.GetInfo().GetSpec(), "schedule %q has nil Spec", e.GetScheduleId())
}
s.Contains(ids, realSID)
s.NotContains(ids, noInfoSID, "entry without a decodable ScheduleInfo must be skipped")
})
}
}

// TestPatchScheduleContextMetadata verifies that PatchSchedule propagates the
// correct context metadata for CHASM and V1 schedules.
func (s *ScheduleMigrationTestSuite) TestPatchScheduleContextMetadata() {
Expand Down
Loading