diff --git a/service/matching/workers/registry_impl.go b/service/matching/workers/registry_impl.go index ea607ea60ff..d1b5a1a8ee8 100644 --- a/service/matching/workers/registry_impl.go +++ b/service/matching/workers/registry_impl.go @@ -20,13 +20,14 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives" "go.uber.org/fx" + "google.golang.org/protobuf/types/known/timestamppb" ) // listWorkersPageToken is the cursor for paginating ListWorkers results. type listWorkersPageToken struct { - // LastWorkerInstanceKey is the WorkerInstanceKey of the last worker returned in the previous page. - // The next page will return workers with keys > this value. - LastWorkerInstanceKey string `json:"l"` + LastTaskQueue string `json:"t,omitempty"` + LastStartTimestamp int64 `json:"s,omitempty"` // UnixNano + LastWorkerInstanceKey string `json:"k,omitempty"` } type ( @@ -443,9 +444,29 @@ func (m *registryImpl) CountWorkers(nsID namespace.ID, query string, includeSyst return int64(len(workers)), nil } +// compareWorkers defines the sort order for ListWorkers pagination: +// (TaskQueue asc, StartTime asc, WorkerInstanceKey asc). +// TaskQueue groups workers by what they poll. StartTime provides a stable, +// human-meaningful order within each group (oldest first). WorkerInstanceKey +// is a unique tiebreaker for pagination correctness. +func compareWorkers(a, b *workerpb.WorkerHeartbeat) int { + if c := strings.Compare(a.GetTaskQueue(), b.GetTaskQueue()); c != 0 { + return c + } + aTime := a.GetStartTime().AsTime().UnixNano() + bTime := b.GetStartTime().AsTime().UnixNano() + if aTime != bTime { + if aTime < bTime { + return -1 + } + return 1 + } + return strings.Compare(a.GetWorkerInstanceKey(), b.GetWorkerInstanceKey()) +} + // paginateWorkers applies cursor-based pagination to a list of workers. -// Workers are sorted by WorkerInstanceKey for deterministic ordering. -// Returns the paginated slice and a token for the next page (nil if no more pages). +// Workers are sorted by (TaskQueue, StartTime asc, WorkerInstanceKey) for stable, +// deterministic ordering. Returns the paginated slice and a token for the next page. func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPageToken []byte) (ListWorkersResponse, error) { if len(workers) == 0 { return ListWorkersResponse{Workers: workers}, nil @@ -456,34 +477,30 @@ func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPage return ListWorkersResponse{Workers: workers}, nil } - // Sort by WorkerInstanceKey for deterministic pagination - slices.SortFunc(workers, func(a, b *workerpb.WorkerHeartbeat) int { - return strings.Compare(a.WorkerInstanceKey, b.WorkerInstanceKey) - }) + slices.SortFunc(workers, compareWorkers) - // Decode page token to find the cursor - var cursor string + // Decode page token and find the starting index using binary search (O(log n)). + // Stale tokens from a previous sort order will have zero-value fields, causing + // the cursor to sort before all workers — the client gets page 1 again, which + // is acceptable during the narrow rolling-deploy window. + startIdx := 0 if len(nextPageToken) > 0 { var token listWorkersPageToken if err := json.Unmarshal(nextPageToken, &token); err != nil { return ListWorkersResponse{}, serviceerror.NewInvalidArgument("invalid next_page_token") } - cursor = token.LastWorkerInstanceKey - } - - // Find the starting index using binary search (O(log n)) - startIdx := 0 - if cursor != "" { - // BinarySearchFunc returns the index where cursor would be inserted. - // We want the first worker with key > cursor. - startIdx, _ = slices.BinarySearchFunc(workers, cursor, func(worker *workerpb.WorkerHeartbeat, target string) int { - return strings.Compare(worker.WorkerInstanceKey, target) + cursor := &workerpb.WorkerHeartbeat{ + TaskQueue: token.LastTaskQueue, + StartTime: timestamppb.New(time.Unix(0, token.LastStartTimestamp)), + WorkerInstanceKey: token.LastWorkerInstanceKey, + } + startIdx, _ = slices.BinarySearchFunc(workers, cursor, func(w, c *workerpb.WorkerHeartbeat) int { + return compareWorkers(w, c) }) - // If exact match found, move past it to get first key > cursor - if startIdx < len(workers) && workers[startIdx].WorkerInstanceKey == cursor { + // Move past the cursor itself to get the first entry after it + if startIdx < len(workers) && compareWorkers(workers[startIdx], cursor) == 0 { startIdx++ } - // If we've gone past the end, return empty if startIdx >= len(workers) { return ListWorkersResponse{}, nil } @@ -500,8 +517,11 @@ func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPage // Generate next page token if there are more results var newNextPageToken []byte if endIdx < len(workers) { + last := result[len(result)-1] token := listWorkersPageToken{ - LastWorkerInstanceKey: result[len(result)-1].WorkerInstanceKey, + LastTaskQueue: last.GetTaskQueue(), + LastStartTimestamp: last.GetStartTime().AsTime().UnixNano(), + LastWorkerInstanceKey: last.GetWorkerInstanceKey(), } newNextPageToken, _ = json.Marshal(token) } diff --git a/service/matching/workers/registry_test.go b/service/matching/workers/registry_test.go index 7319d8e0033..418eeb42b60 100644 --- a/service/matching/workers/registry_test.go +++ b/service/matching/workers/registry_test.go @@ -12,6 +12,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/metrics/metricstest" "go.temporal.io/server/common/namespace" + "google.golang.org/protobuf/types/known/timestamppb" ) // Test helper config defaults @@ -433,96 +434,96 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) { func TestRegistryImpl_ListWorkersPagination(t *testing.T) { r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) - // Add 5 workers in non-sorted order to verify sorting works + baseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + + // Add 5 workers across two task queues with varying start times. + // Expected sort order: (TaskQueue asc, StartTime asc, WorkerInstanceKey asc) + // queue-a: worker-a2 (t+1), worker-a1 (t+2) + // queue-b: worker-b2 (t+0), worker-b3 (t+0, key tiebreak), worker-b1 (t+3) r.upsertHeartbeats("ns1", "ns1_name", nil /* principal */, []*workerpb.WorkerHeartbeat{ - {WorkerInstanceKey: "worker-c"}, - {WorkerInstanceKey: "worker-a"}, - {WorkerInstanceKey: "worker-e"}, - {WorkerInstanceKey: "worker-b"}, - {WorkerInstanceKey: "worker-d"}, + {WorkerInstanceKey: "worker-a1", TaskQueue: "queue-a", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))}, + {WorkerInstanceKey: "worker-b1", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime.Add(3 * time.Second))}, + {WorkerInstanceKey: "worker-a2", TaskQueue: "queue-a", StartTime: timestamppb.New(baseTime.Add(1 * time.Second))}, + {WorkerInstanceKey: "worker-b2", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime)}, + {WorkerInstanceKey: "worker-b3", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime)}, }) - // Test page size of 2 t.Run("first page", func(t *testing.T) { resp, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2}) require.NoError(t, err) - assert.Len(t, resp.Workers, 2) - assert.Equal(t, "worker-a", resp.Workers[0].WorkerInstanceKey) - assert.Equal(t, "worker-b", resp.Workers[1].WorkerInstanceKey) - assert.NotNil(t, resp.NextPageToken, "should have next page token") + require.Len(t, resp.Workers, 2) + require.Equal(t, "worker-a2", resp.Workers[0].WorkerInstanceKey) + require.Equal(t, "worker-a1", resp.Workers[1].WorkerInstanceKey) + require.NotNil(t, resp.NextPageToken) }) - // Test second page t.Run("second page", func(t *testing.T) { - // Get first page to get the token resp1, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2}) - resp2, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp1.NextPageToken}) require.NoError(t, err) - assert.Len(t, resp2.Workers, 2) - assert.Equal(t, "worker-c", resp2.Workers[0].WorkerInstanceKey) - assert.Equal(t, "worker-d", resp2.Workers[1].WorkerInstanceKey) - assert.NotNil(t, resp2.NextPageToken, "should have next page token") + require.Len(t, resp2.Workers, 2) + require.Equal(t, "worker-b2", resp2.Workers[0].WorkerInstanceKey) + require.Equal(t, "worker-b3", resp2.Workers[1].WorkerInstanceKey) + require.NotNil(t, resp2.NextPageToken) }) - // Test last page t.Run("last page", func(t *testing.T) { - // Get first two pages resp1, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2}) resp2, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp1.NextPageToken}) - resp3, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp2.NextPageToken}) require.NoError(t, err) - assert.Len(t, resp3.Workers, 1) - assert.Equal(t, "worker-e", resp3.Workers[0].WorkerInstanceKey) - assert.Nil(t, resp3.NextPageToken, "should not have next page token on last page") + require.Len(t, resp3.Workers, 1) + require.Equal(t, "worker-b1", resp3.Workers[0].WorkerInstanceKey) + require.Nil(t, resp3.NextPageToken) }) } func TestRegistryImpl_ListWorkersPaginationWithDeletedCursor(t *testing.T) { - // Test that pagination continues correctly even if the cursor item is deleted - // between pagination requests. + baseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) t.Run("cursor item deleted", func(t *testing.T) { - // Simulate: page 1 returned workers a, b with cursor "b" - // Before page 2, worker "b" is evicted - // Page 2 should continue from "c" (first key > "b") + // Page 1 returned workers a, b with cursor at "worker-b". + // Before page 2, worker-b is evicted. + // Page 2 should continue from worker-c (first entry after cursor). workers := []*workerpb.WorkerHeartbeat{ - {WorkerInstanceKey: "worker-a"}, + {WorkerInstanceKey: "worker-a", TaskQueue: "q", StartTime: timestamppb.New(baseTime)}, // worker-b was deleted - {WorkerInstanceKey: "worker-c"}, - {WorkerInstanceKey: "worker-d"}, + {WorkerInstanceKey: "worker-c", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))}, + {WorkerInstanceKey: "worker-d", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(3 * time.Second))}, } - // Create a token pointing to the deleted "worker-b" - token, _ := json.Marshal(listWorkersPageToken{LastWorkerInstanceKey: "worker-b"}) + token, _ := json.Marshal(listWorkersPageToken{ + LastTaskQueue: "q", + LastStartTimestamp: baseTime.Add(1 * time.Second).UnixNano(), + LastWorkerInstanceKey: "worker-b", + }) resp, err := paginateWorkers(workers, 2, token) require.NoError(t, err) - assert.Len(t, resp.Workers, 2) - // Should start from "worker-c" (first key > "worker-b") - assert.Equal(t, "worker-c", resp.Workers[0].WorkerInstanceKey) - assert.Equal(t, "worker-d", resp.Workers[1].WorkerInstanceKey) + require.Len(t, resp.Workers, 2) + require.Equal(t, "worker-c", resp.Workers[0].WorkerInstanceKey) + require.Equal(t, "worker-d", resp.Workers[1].WorkerInstanceKey) }) t.Run("cursor at end deleted", func(t *testing.T) { - // Simulate: cursor points to "worker-d" which was the last item - // Before next request, "worker-d" is evicted - // Should return empty (no more results) + // Cursor points to worker-d which was the last item. After eviction, + // no workers sort after the cursor, so result should be empty. workers := []*workerpb.WorkerHeartbeat{ - {WorkerInstanceKey: "worker-a"}, - {WorkerInstanceKey: "worker-b"}, - {WorkerInstanceKey: "worker-c"}, - // worker-d was deleted + {WorkerInstanceKey: "worker-a", TaskQueue: "q", StartTime: timestamppb.New(baseTime)}, + {WorkerInstanceKey: "worker-b", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(1 * time.Second))}, + {WorkerInstanceKey: "worker-c", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))}, } - // Create a token pointing to the deleted "worker-d" - token, _ := json.Marshal(listWorkersPageToken{LastWorkerInstanceKey: "worker-d"}) + token, _ := json.Marshal(listWorkersPageToken{ + LastTaskQueue: "q", + LastStartTimestamp: baseTime.Add(3 * time.Second).UnixNano(), + LastWorkerInstanceKey: "worker-d", + }) resp, err := paginateWorkers(workers, 2, token) require.NoError(t, err) - assert.Empty(t, resp.Workers, "should return empty when cursor is past all remaining workers") - assert.Nil(t, resp.NextPageToken) + require.Empty(t, resp.Workers) + require.Nil(t, resp.NextPageToken) }) }