From 79bdd53601ec619bdff0fa585131e624c2fe5df0 Mon Sep 17 00:00:00 2001 From: David Porter Date: Thu, 4 Jun 2026 23:30:37 -0700 Subject: [PATCH] add a list guard for drop case --- service/frontend/workflow_handler.go | 49 ++++++++-- tests/schedule_migration_test.go | 141 +++++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 9 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 99644b9b822..ee7c82117b6 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -5093,8 +5093,8 @@ func (wh *WorkflowHandler) listSchedulesChasm( return nil, err } - schedules := make([]*schedulepb.ScheduleListEntry, len(resp.Executions)) - for i, ex := range resp.Executions { + schedules := make([]*schedulepb.ScheduleListEntry, 0, len(resp.Executions)) + for _, ex := range resp.Executions { // Schedules returned may have either a Memo (V1) or a ChasmMemo (V2) containing // the ScheduleListInfo. Both must be handled, as migration means a mix of // versions can be returned. @@ -5110,7 +5110,21 @@ func (wh *WorkflowHandler) listSchedulesChasm( workflowID := ex.BusinessID scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix) // needed for V1 schedules, not CHASM - schedules[i] = &schedulepb.ScheduleListEntry{ + // Skip entries that carry no decodable ScheduleListInfo. This happens during + // a CHASM->V1 migration: the migrated V1 workflow is Running under the + // scheduler namespace division (so it matches the query) before it has + // written its TemporalScheduleInfo memo, so neither the V2 nor V1 decode + // above yields a spec. Emitting such an entry with a nil Info crashes clients + // that dereference the spec, so omit it until the info memo is populated. + if listInfo == nil { + wh.logger.Warn("Dropping schedule list entry with no decodable ScheduleInfo memo (expected transiently while a recently-migrated scheduler workflow has not yet written its info memo)", + tag.WorkflowNamespace(namespaceName.String()), + tag.ScheduleID(scheduleID), + tag.WorkflowID(workflowID)) + continue + } + + schedules = append(schedules, &schedulepb.ScheduleListEntry{ ScheduleId: scheduleID, Memo: customMemo, // cleanScheduleSearchAttributes is only needed for V1 schedules @@ -5118,7 +5132,7 @@ func (wh *WorkflowHandler) listSchedulesChasm( IndexedFields: ex.CustomSearchAttributes, }), Info: listInfo, - } + }) } return &workflowservice.ListSchedulesResponse{ @@ -5148,19 +5162,36 @@ func (wh *WorkflowHandler) listSchedulesWorkflow( return nil, err } - schedules := make([]*schedulepb.ScheduleListEntry, len(persistenceResp.Executions)) - for i, ex := range persistenceResp.Executions { + schedules := make([]*schedulepb.ScheduleListEntry, 0, len(persistenceResp.Executions)) + for _, ex := range persistenceResp.Executions { memo := ex.GetMemo() info := wh.decodeScheduleListInfo(memo) - memo = wh.cleanScheduleMemo(memo) workflowID := ex.GetExecution().GetWorkflowId() scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix) - schedules[i] = &schedulepb.ScheduleListEntry{ + + // Skip entries with no decodable ScheduleListInfo. A scheduler workflow is + // Running under the scheduler namespace division (so it matches the query) + // from the moment it is started, but only writes its TemporalScheduleInfo + // memo once it executes its first iteration (see updateMemoAndSearchAttributes + // in the scheduler workflow). This window is especially visible right after a + // CHASM->V1 migration, which starts the V1 workflow with only the custom memo. + // Emitting such an entry with a nil Info crashes clients that dereference the + // spec, so omit it until the info memo is populated. + if info == nil { + wh.logger.Warn("Dropping schedule list entry with no decodable ScheduleInfo memo (expected transiently while a recently-migrated scheduler workflow has not yet written its info memo)", + tag.WorkflowNamespace(namespaceName.String()), + tag.ScheduleID(scheduleID), + tag.WorkflowID(workflowID)) + continue + } + + memo = wh.cleanScheduleMemo(memo) + schedules = append(schedules, &schedulepb.ScheduleListEntry{ ScheduleId: scheduleID, Memo: memo, SearchAttributes: wh.cleanScheduleSearchAttributes(ex.GetSearchAttributes()), Info: info, - } + }) } return &workflowservice.ListSchedulesResponse{ diff --git a/tests/schedule_migration_test.go b/tests/schedule_migration_test.go index 5bc46c0a375..52207f252a4 100644 --- a/tests/schedule_migration_test.go +++ b/tests/schedule_migration_test.go @@ -3,6 +3,7 @@ package tests import ( "encoding/binary" "errors" + "fmt" "strings" "testing" "time" @@ -24,8 +25,11 @@ import ( schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/payload" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/sdk" + "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/await" "go.temporal.io/server/common/testing/parallelsuite" "go.temporal.io/server/service/worker/dummy" @@ -1533,6 +1537,143 @@ func (s *ScheduleMigrationTestSuite) TestDeleteScheduleContextMetadata() { }) } +// TestListSchedulesSkipsEntriesWithoutInfo verifies that ListSchedules does not +// emit list entries whose ScheduleListInfo cannot be decoded (nil Info/Spec). +// +// A scheduler workflow is Running under the scheduler namespace division (so it +// matches the list query) from the moment it is started, but only writes its +// TemporalScheduleInfo memo once it executes its first iteration. In that window +// the entry decodes to a nil Info, which crashes clients that dereference the +// spec. This is especially visible right after a CHASM->V1 migration, which +// starts the V1 workflow with only the custom memo. The handler must skip those +// entries on both list paths: +// - CHASM enabled -> listSchedulesChasm +// - CHASM disabled -> listSchedulesWorkflow (the path hit after migrating back +// to V1 and turning CHASM routing/creation off) +func (s *ScheduleMigrationTestSuite) TestListSchedulesSkipsEntriesWithoutInfo() { + testCases := []struct { + name string + chasmRouted bool + }{ + {name: "ChasmListPath", chasmRouted: true}, + {name: "V1ListPath", chasmRouted: false}, + } + + for _, tc := range testCases { + s.Run(tc.name, func(s *ScheduleMigrationTestSuite) { + env := testcore.NewEnv( + s.T(), + testcore.WithWorkerService("scheduler operations"), + testcore.WithDynamicConfig(dynamicconfig.EnableChasm, true), + testcore.WithDynamicConfig(dynamicconfig.EnableCHASMSchedulerRouting, tc.chasmRouted), + testcore.WithDynamicConfig(dynamicconfig.EnableCHASMSchedulerCreation, tc.chasmRouted), + ) + + ctx := testcore.NewContext() + nsName := env.Namespace().String() + + realSID := testcore.RandomizeStr("real-sched") + noInfoSID := testcore.RandomizeStr("no-info-sched") + + // A real schedule, which should appear in ListSchedules with a spec. + _, err := env.FrontendClient().CreateSchedule(ctx, &workflowservice.CreateScheduleRequest{ + Namespace: nsName, + ScheduleId: realSID, + Schedule: &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{ + Interval: []*schedulepb.IntervalSpec{ + {Interval: durationpb.New(1 * time.Hour)}, + }, + }, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: testcore.RandomizeStr("wid"), + WorkflowType: &commonpb.WorkflowType{Name: testcore.RandomizeStr("wt")}, + TaskQueue: &taskqueuepb.TaskQueue{Name: testcore.RandomizeStr("tq"), Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + }, + }, + }, + }, + Identity: "test", + RequestId: testcore.RandomizeStr("req"), + }) + s.NoError(err) + + // Simulate the mid-migration window: a Running workflow under the scheduler + // namespace division whose memo lacks the TemporalScheduleInfo block. A dummy + // workflow type has no worker to advance/complete it, so it stays Running and + // never writes the info memo. Its list entry therefore decodes to a nil Info. + sa := &commonpb.SearchAttributes{} + searchattribute.AddSearchAttribute(&sa, sadefs.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision)) + _, err = env.GetTestCluster().HistoryClient().StartWorkflowExecution( + ctx, + common.CreateHistoryStartWorkflowRequest( + env.NamespaceID().String(), + &workflowservice.StartWorkflowExecutionRequest{ + Namespace: nsName, + WorkflowId: scheduler.WorkflowIDPrefix + noInfoSID, + WorkflowType: &commonpb.WorkflowType{Name: dummy.DummyWFTypeName}, + TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue}, + SearchAttributes: sa, + Identity: "test", + RequestId: testcore.RandomizeStr("req"), + WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + }, + nil, nil, time.Now().UTC(), + ), + ) + s.NoError(err) + + // Wait until the info-less workflow's visibility record has propagated, so + // that an unfixed handler would actually surface it (otherwise the assertion + // below could pass spuriously because the buggy entry simply hadn't appeared + // yet). We query raw visibility for the workflow ID directly. + noInfoWorkflowID := scheduler.WorkflowIDPrefix + noInfoSID + // The namespace-division clause is required: ListWorkflowExecutions hides + // workflows in a namespace division (such as scheduler workflows) by default. + noInfoQuery := fmt.Sprintf("WorkflowId = '%s' AND %s = '%s'", + noInfoWorkflowID, sadefs.TemporalNamespaceDivision, scheduler.NamespaceDivision) + s.Eventually(func() bool { + wfResp, wfErr := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: nsName, + Query: noInfoQuery, + }) + return wfErr == nil && len(wfResp.GetExecutions()) == 1 + }, 30*time.Second, 500*time.Millisecond, "info-less workflow never became visible") + + // Once the real schedule is visible, the listing must contain it, must omit + // the info-less entry, and every returned entry must have a non-nil spec. + var listResp *workflowservice.ListSchedulesResponse + s.Eventually(func() bool { + listResp, err = env.FrontendClient().ListSchedules(ctx, &workflowservice.ListSchedulesRequest{ + Namespace: nsName, + MaximumPageSize: 10, + }) + if err != nil { + return false + } + for _, e := range listResp.GetSchedules() { + if e.GetScheduleId() == realSID { + return true + } + } + return false + }, 30*time.Second, 500*time.Millisecond) + + ids := make([]string, 0, len(listResp.GetSchedules())) + for _, e := range listResp.GetSchedules() { + ids = append(ids, e.GetScheduleId()) + s.NotNil(e.GetInfo(), "schedule %q has nil Info", e.GetScheduleId()) + s.NotNil(e.GetInfo().GetSpec(), "schedule %q has nil Spec", e.GetScheduleId()) + } + s.Contains(ids, realSID) + s.NotContains(ids, noInfoSID, "entry without a decodable ScheduleInfo must be skipped") + }) + } +} + // TestPatchScheduleContextMetadata verifies that PatchSchedule propagates the // correct context metadata for CHASM and V1 schedules. func (s *ScheduleMigrationTestSuite) TestPatchScheduleContextMetadata() {