From 3af1cd81af9fb03571bab2cfab91a368a806b187 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Thu, 4 Jun 2026 18:42:29 -0700 Subject: [PATCH 1/3] Sort ListWorkers by (TaskQueue, StartTime, WorkerInstanceKey) Replaces the previous WorkerInstanceKey-only sort with a composite key that groups workers by task queue and orders by start time (oldest first) within each group. This provides stable ordering across page refreshes since both TaskQueue and StartTime are immutable for a worker's lifetime. Co-Authored-By: Claude Opus 4.6 --- service/matching/workers/registry_impl.go | 79 +++++++++++------- service/matching/workers/registry_test.go | 99 ++++++++++++----------- 2 files changed, 100 insertions(+), 78 deletions(-) diff --git a/service/matching/workers/registry_impl.go b/service/matching/workers/registry_impl.go index ea607ea60ff..68146f83446 100644 --- a/service/matching/workers/registry_impl.go +++ b/service/matching/workers/registry_impl.go @@ -10,6 +10,8 @@ import ( "sync/atomic" "time" + "google.golang.org/protobuf/types/known/timestamppb" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -23,10 +25,12 @@ import ( ) // listWorkersPageToken is the cursor for paginating ListWorkers results. +// The cursor contains all sort-key fields of the last returned worker so that +// the next page can resume from the correct position. type listWorkersPageToken struct { - // LastWorkerInstanceKey is the WorkerInstanceKey of the last worker returned in the previous page. - // The next page will return workers with keys > this value. - LastWorkerInstanceKey string `json:"l"` + LastTaskQueue string `json:"t"` + LastStartTimestamp int64 `json:"s"` // UnixNano + LastWorkerInstanceKey string `json:"k"` } type ( @@ -443,9 +447,29 @@ func (m *registryImpl) CountWorkers(nsID namespace.ID, query string, includeSyst return int64(len(workers)), nil } +// compareWorkers defines the sort order for ListWorkers pagination: +// (TaskQueue asc, StartTime asc, WorkerInstanceKey asc). +// TaskQueue groups workers by what they poll. StartTime provides a stable, +// human-meaningful order within each group (oldest first). WorkerInstanceKey +// is a unique tiebreaker for pagination correctness. +func compareWorkers(a, b *workerpb.WorkerHeartbeat) int { + if c := strings.Compare(a.GetTaskQueue(), b.GetTaskQueue()); c != 0 { + return c + } + aTime := a.GetStartTime().AsTime().UnixNano() + bTime := b.GetStartTime().AsTime().UnixNano() + if aTime != bTime { + if aTime < bTime { + return -1 + } + return 1 + } + return strings.Compare(a.GetWorkerInstanceKey(), b.GetWorkerInstanceKey()) +} + // paginateWorkers applies cursor-based pagination to a list of workers. -// Workers are sorted by WorkerInstanceKey for deterministic ordering. -// Returns the paginated slice and a token for the next page (nil if no more pages). +// Workers are sorted by (TaskQueue, StartTime asc, WorkerInstanceKey) for stable, +// deterministic ordering. Returns the paginated slice and a token for the next page. func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPageToken []byte) (ListWorkersResponse, error) { if len(workers) == 0 { return ListWorkersResponse{Workers: workers}, nil @@ -456,36 +480,30 @@ func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPage return ListWorkersResponse{Workers: workers}, nil } - // Sort by WorkerInstanceKey for deterministic pagination - slices.SortFunc(workers, func(a, b *workerpb.WorkerHeartbeat) int { - return strings.Compare(a.WorkerInstanceKey, b.WorkerInstanceKey) - }) + slices.SortFunc(workers, compareWorkers) // Decode page token to find the cursor - var cursor string + startIdx := 0 if len(nextPageToken) > 0 { var token listWorkersPageToken if err := json.Unmarshal(nextPageToken, &token); err != nil { return ListWorkersResponse{}, serviceerror.NewInvalidArgument("invalid next_page_token") } - cursor = token.LastWorkerInstanceKey - } - - // Find the starting index using binary search (O(log n)) - startIdx := 0 - if cursor != "" { - // BinarySearchFunc returns the index where cursor would be inserted. - // We want the first worker with key > cursor. - startIdx, _ = slices.BinarySearchFunc(workers, cursor, func(worker *workerpb.WorkerHeartbeat, target string) int { - return strings.Compare(worker.WorkerInstanceKey, target) - }) - // If exact match found, move past it to get first key > cursor - if startIdx < len(workers) && workers[startIdx].WorkerInstanceKey == cursor { - startIdx++ - } - // If we've gone past the end, return empty - if startIdx >= len(workers) { - return ListWorkersResponse{}, nil + // Linear scan to find first worker after the cursor. Binary search is not + // worth the complexity for a 3-field composite key on an in-memory list. + for i, w := range workers { + if compareWorkers(w, &workerpb.WorkerHeartbeat{ + TaskQueue: token.LastTaskQueue, + StartTime: timestamppb.New(time.Unix(0, token.LastStartTimestamp)), + WorkerInstanceKey: token.LastWorkerInstanceKey, + }) > 0 { + startIdx = i + break + } + if i == len(workers)-1 { + // Cursor is past all workers + return ListWorkersResponse{}, nil + } } } @@ -500,8 +518,11 @@ func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPage // Generate next page token if there are more results var newNextPageToken []byte if endIdx < len(workers) { + last := result[len(result)-1] token := listWorkersPageToken{ - LastWorkerInstanceKey: result[len(result)-1].WorkerInstanceKey, + LastTaskQueue: last.GetTaskQueue(), + LastStartTimestamp: last.GetStartTime().AsTime().UnixNano(), + LastWorkerInstanceKey: last.GetWorkerInstanceKey(), } newNextPageToken, _ = json.Marshal(token) } diff --git a/service/matching/workers/registry_test.go b/service/matching/workers/registry_test.go index 7319d8e0033..7ec7c8d1dc0 100644 --- a/service/matching/workers/registry_test.go +++ b/service/matching/workers/registry_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" workerpb "go.temporal.io/api/worker/v1" + "google.golang.org/protobuf/types/known/timestamppb" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/metrics/metricstest" @@ -433,96 +434,96 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) { func TestRegistryImpl_ListWorkersPagination(t *testing.T) { r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) - // Add 5 workers in non-sorted order to verify sorting works + baseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + + // Add 5 workers across two task queues with varying start times. + // Expected sort order: (TaskQueue asc, StartTime asc, WorkerInstanceKey asc) + // queue-a: worker-a2 (t+1), worker-a1 (t+2) + // queue-b: worker-b2 (t+0), worker-b3 (t+0, key tiebreak), worker-b1 (t+3) r.upsertHeartbeats("ns1", "ns1_name", nil /* principal */, []*workerpb.WorkerHeartbeat{ - {WorkerInstanceKey: "worker-c"}, - {WorkerInstanceKey: "worker-a"}, - {WorkerInstanceKey: "worker-e"}, - {WorkerInstanceKey: "worker-b"}, - {WorkerInstanceKey: "worker-d"}, + {WorkerInstanceKey: "worker-a1", TaskQueue: "queue-a", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))}, + {WorkerInstanceKey: "worker-b1", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime.Add(3 * time.Second))}, + {WorkerInstanceKey: "worker-a2", TaskQueue: "queue-a", StartTime: timestamppb.New(baseTime.Add(1 * time.Second))}, + {WorkerInstanceKey: "worker-b2", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime)}, + {WorkerInstanceKey: "worker-b3", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime)}, }) - // Test page size of 2 t.Run("first page", func(t *testing.T) { resp, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2}) require.NoError(t, err) - assert.Len(t, resp.Workers, 2) - assert.Equal(t, "worker-a", resp.Workers[0].WorkerInstanceKey) - assert.Equal(t, "worker-b", resp.Workers[1].WorkerInstanceKey) - assert.NotNil(t, resp.NextPageToken, "should have next page token") + require.Len(t, resp.Workers, 2) + require.Equal(t, "worker-a2", resp.Workers[0].WorkerInstanceKey) + require.Equal(t, "worker-a1", resp.Workers[1].WorkerInstanceKey) + require.NotNil(t, resp.NextPageToken) }) - // Test second page t.Run("second page", func(t *testing.T) { - // Get first page to get the token resp1, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2}) - resp2, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp1.NextPageToken}) require.NoError(t, err) - assert.Len(t, resp2.Workers, 2) - assert.Equal(t, "worker-c", resp2.Workers[0].WorkerInstanceKey) - assert.Equal(t, "worker-d", resp2.Workers[1].WorkerInstanceKey) - assert.NotNil(t, resp2.NextPageToken, "should have next page token") + require.Len(t, resp2.Workers, 2) + require.Equal(t, "worker-b2", resp2.Workers[0].WorkerInstanceKey) + require.Equal(t, "worker-b3", resp2.Workers[1].WorkerInstanceKey) + require.NotNil(t, resp2.NextPageToken) }) - // Test last page t.Run("last page", func(t *testing.T) { - // Get first two pages resp1, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2}) resp2, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp1.NextPageToken}) - resp3, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp2.NextPageToken}) require.NoError(t, err) - assert.Len(t, resp3.Workers, 1) - assert.Equal(t, "worker-e", resp3.Workers[0].WorkerInstanceKey) - assert.Nil(t, resp3.NextPageToken, "should not have next page token on last page") + require.Len(t, resp3.Workers, 1) + require.Equal(t, "worker-b1", resp3.Workers[0].WorkerInstanceKey) + require.Nil(t, resp3.NextPageToken) }) } func TestRegistryImpl_ListWorkersPaginationWithDeletedCursor(t *testing.T) { - // Test that pagination continues correctly even if the cursor item is deleted - // between pagination requests. + baseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) t.Run("cursor item deleted", func(t *testing.T) { - // Simulate: page 1 returned workers a, b with cursor "b" - // Before page 2, worker "b" is evicted - // Page 2 should continue from "c" (first key > "b") + // Page 1 returned workers a, b with cursor at "worker-b". + // Before page 2, worker-b is evicted. + // Page 2 should continue from worker-c (first entry after cursor). workers := []*workerpb.WorkerHeartbeat{ - {WorkerInstanceKey: "worker-a"}, + {WorkerInstanceKey: "worker-a", TaskQueue: "q", StartTime: timestamppb.New(baseTime)}, // worker-b was deleted - {WorkerInstanceKey: "worker-c"}, - {WorkerInstanceKey: "worker-d"}, + {WorkerInstanceKey: "worker-c", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))}, + {WorkerInstanceKey: "worker-d", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(3 * time.Second))}, } - // Create a token pointing to the deleted "worker-b" - token, _ := json.Marshal(listWorkersPageToken{LastWorkerInstanceKey: "worker-b"}) + token, _ := json.Marshal(listWorkersPageToken{ + LastTaskQueue: "q", + LastStartTimestamp: baseTime.Add(1 * time.Second).UnixNano(), + LastWorkerInstanceKey: "worker-b", + }) resp, err := paginateWorkers(workers, 2, token) require.NoError(t, err) - assert.Len(t, resp.Workers, 2) - // Should start from "worker-c" (first key > "worker-b") - assert.Equal(t, "worker-c", resp.Workers[0].WorkerInstanceKey) - assert.Equal(t, "worker-d", resp.Workers[1].WorkerInstanceKey) + require.Len(t, resp.Workers, 2) + require.Equal(t, "worker-c", resp.Workers[0].WorkerInstanceKey) + require.Equal(t, "worker-d", resp.Workers[1].WorkerInstanceKey) }) t.Run("cursor at end deleted", func(t *testing.T) { - // Simulate: cursor points to "worker-d" which was the last item - // Before next request, "worker-d" is evicted - // Should return empty (no more results) + // Cursor points to worker-d which was the last item. After eviction, + // no workers sort after the cursor, so result should be empty. workers := []*workerpb.WorkerHeartbeat{ - {WorkerInstanceKey: "worker-a"}, - {WorkerInstanceKey: "worker-b"}, - {WorkerInstanceKey: "worker-c"}, - // worker-d was deleted + {WorkerInstanceKey: "worker-a", TaskQueue: "q", StartTime: timestamppb.New(baseTime)}, + {WorkerInstanceKey: "worker-b", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(1 * time.Second))}, + {WorkerInstanceKey: "worker-c", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))}, } - // Create a token pointing to the deleted "worker-d" - token, _ := json.Marshal(listWorkersPageToken{LastWorkerInstanceKey: "worker-d"}) + token, _ := json.Marshal(listWorkersPageToken{ + LastTaskQueue: "q", + LastStartTimestamp: baseTime.Add(3 * time.Second).UnixNano(), + LastWorkerInstanceKey: "worker-d", + }) resp, err := paginateWorkers(workers, 2, token) require.NoError(t, err) - assert.Empty(t, resp.Workers, "should return empty when cursor is past all remaining workers") - assert.Nil(t, resp.NextPageToken) + require.Empty(t, resp.Workers) + require.Nil(t, resp.NextPageToken) }) } From 1a4908f10ef50f4e29975b5f7732d19d3156361c Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Thu, 4 Jun 2026 18:43:00 -0700 Subject: [PATCH 2/3] Fix import ordering and struct alignment Co-Authored-By: Claude Opus 4.6 --- service/matching/workers/registry_impl.go | 9 ++++----- service/matching/workers/registry_test.go | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/service/matching/workers/registry_impl.go b/service/matching/workers/registry_impl.go index 68146f83446..638b9d76c8f 100644 --- a/service/matching/workers/registry_impl.go +++ b/service/matching/workers/registry_impl.go @@ -10,8 +10,6 @@ import ( "sync/atomic" "time" - "google.golang.org/protobuf/types/known/timestamppb" - commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -22,15 +20,16 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives" "go.uber.org/fx" + "google.golang.org/protobuf/types/known/timestamppb" ) // listWorkersPageToken is the cursor for paginating ListWorkers results. // The cursor contains all sort-key fields of the last returned worker so that // the next page can resume from the correct position. type listWorkersPageToken struct { - LastTaskQueue string `json:"t"` - LastStartTimestamp int64 `json:"s"` // UnixNano - LastWorkerInstanceKey string `json:"k"` + LastTaskQueue string `json:"t"` + LastStartTimestamp int64 `json:"s"` // UnixNano + LastWorkerInstanceKey string `json:"k"` } type ( diff --git a/service/matching/workers/registry_test.go b/service/matching/workers/registry_test.go index 7ec7c8d1dc0..418eeb42b60 100644 --- a/service/matching/workers/registry_test.go +++ b/service/matching/workers/registry_test.go @@ -8,11 +8,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" workerpb "go.temporal.io/api/worker/v1" - "google.golang.org/protobuf/types/known/timestamppb" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/metrics/metricstest" "go.temporal.io/server/common/namespace" + "google.golang.org/protobuf/types/known/timestamppb" ) // Test helper config defaults From f85a4ccbf7a8f5a21df2756ba2a5e4b58600f889 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Thu, 4 Jun 2026 19:04:30 -0700 Subject: [PATCH 3/3] Simplify page token: remove versioning, accept graceful degradation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stale tokens from the old sort order will have zero-value fields, causing the cursor to land at position 0 — the client gets page 1 again. This is acceptable during the narrow rolling-deploy window. Co-Authored-By: Claude Opus 4.6 --- service/matching/workers/registry_impl.go | 42 +++++++++++------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/service/matching/workers/registry_impl.go b/service/matching/workers/registry_impl.go index 638b9d76c8f..d1b5a1a8ee8 100644 --- a/service/matching/workers/registry_impl.go +++ b/service/matching/workers/registry_impl.go @@ -24,12 +24,10 @@ import ( ) // listWorkersPageToken is the cursor for paginating ListWorkers results. -// The cursor contains all sort-key fields of the last returned worker so that -// the next page can resume from the correct position. type listWorkersPageToken struct { - LastTaskQueue string `json:"t"` - LastStartTimestamp int64 `json:"s"` // UnixNano - LastWorkerInstanceKey string `json:"k"` + LastTaskQueue string `json:"t,omitempty"` + LastStartTimestamp int64 `json:"s,omitempty"` // UnixNano + LastWorkerInstanceKey string `json:"k,omitempty"` } type ( @@ -481,28 +479,30 @@ func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPage slices.SortFunc(workers, compareWorkers) - // Decode page token to find the cursor + // Decode page token and find the starting index using binary search (O(log n)). + // Stale tokens from a previous sort order will have zero-value fields, causing + // the cursor to sort before all workers — the client gets page 1 again, which + // is acceptable during the narrow rolling-deploy window. startIdx := 0 if len(nextPageToken) > 0 { var token listWorkersPageToken if err := json.Unmarshal(nextPageToken, &token); err != nil { return ListWorkersResponse{}, serviceerror.NewInvalidArgument("invalid next_page_token") } - // Linear scan to find first worker after the cursor. Binary search is not - // worth the complexity for a 3-field composite key on an in-memory list. - for i, w := range workers { - if compareWorkers(w, &workerpb.WorkerHeartbeat{ - TaskQueue: token.LastTaskQueue, - StartTime: timestamppb.New(time.Unix(0, token.LastStartTimestamp)), - WorkerInstanceKey: token.LastWorkerInstanceKey, - }) > 0 { - startIdx = i - break - } - if i == len(workers)-1 { - // Cursor is past all workers - return ListWorkersResponse{}, nil - } + cursor := &workerpb.WorkerHeartbeat{ + TaskQueue: token.LastTaskQueue, + StartTime: timestamppb.New(time.Unix(0, token.LastStartTimestamp)), + WorkerInstanceKey: token.LastWorkerInstanceKey, + } + startIdx, _ = slices.BinarySearchFunc(workers, cursor, func(w, c *workerpb.WorkerHeartbeat) int { + return compareWorkers(w, c) + }) + // Move past the cursor itself to get the first entry after it + if startIdx < len(workers) && compareWorkers(workers[startIdx], cursor) == 0 { + startIdx++ + } + if startIdx >= len(workers) { + return ListWorkersResponse{}, nil } }