Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 45 additions & 25 deletions service/matching/workers/registry_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives"
"go.uber.org/fx"
"google.golang.org/protobuf/types/known/timestamppb"
)

// listWorkersPageToken is the cursor for paginating ListWorkers results.
type listWorkersPageToken struct {
// LastWorkerInstanceKey is the WorkerInstanceKey of the last worker returned in the previous page.
// The next page will return workers with keys > this value.
LastWorkerInstanceKey string `json:"l"`
LastTaskQueue string `json:"t,omitempty"`
LastStartTimestamp int64 `json:"s,omitempty"` // UnixNano
LastWorkerInstanceKey string `json:"k,omitempty"`
}

type (
Expand Down Expand Up @@ -443,9 +444,29 @@ func (m *registryImpl) CountWorkers(nsID namespace.ID, query string, includeSyst
return int64(len(workers)), nil
}

// compareWorkers defines the sort order for ListWorkers pagination:
// (TaskQueue asc, StartTime asc, WorkerInstanceKey asc).
// TaskQueue groups workers by what they poll. StartTime provides a stable,
// human-meaningful order within each group (oldest first). WorkerInstanceKey
// is a unique tiebreaker for pagination correctness.
func compareWorkers(a, b *workerpb.WorkerHeartbeat) int {
if c := strings.Compare(a.GetTaskQueue(), b.GetTaskQueue()); c != 0 {
return c
}
aTime := a.GetStartTime().AsTime().UnixNano()
bTime := b.GetStartTime().AsTime().UnixNano()
if aTime != bTime {
if aTime < bTime {
return -1
}
return 1
}
return strings.Compare(a.GetWorkerInstanceKey(), b.GetWorkerInstanceKey())
}

// paginateWorkers applies cursor-based pagination to a list of workers.
// Workers are sorted by WorkerInstanceKey for deterministic ordering.
// Returns the paginated slice and a token for the next page (nil if no more pages).
// Workers are sorted by (TaskQueue, StartTime asc, WorkerInstanceKey) for stable,
// deterministic ordering. Returns the paginated slice and a token for the next page.
func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPageToken []byte) (ListWorkersResponse, error) {
if len(workers) == 0 {
return ListWorkersResponse{Workers: workers}, nil
Expand All @@ -456,34 +477,30 @@ func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPage
return ListWorkersResponse{Workers: workers}, nil
}

// Sort by WorkerInstanceKey for deterministic pagination
slices.SortFunc(workers, func(a, b *workerpb.WorkerHeartbeat) int {
return strings.Compare(a.WorkerInstanceKey, b.WorkerInstanceKey)
})
slices.SortFunc(workers, compareWorkers)

// Decode page token to find the cursor
var cursor string
// Decode page token and find the starting index using binary search (O(log n)).
// Stale tokens from a previous sort order will have zero-value fields, causing
// the cursor to sort before all workers — the client gets page 1 again, which
// is acceptable during the narrow rolling-deploy window.
startIdx := 0
if len(nextPageToken) > 0 {
var token listWorkersPageToken
if err := json.Unmarshal(nextPageToken, &token); err != nil {
return ListWorkersResponse{}, serviceerror.NewInvalidArgument("invalid next_page_token")
}
cursor = token.LastWorkerInstanceKey
}

// Find the starting index using binary search (O(log n))
startIdx := 0
if cursor != "" {
// BinarySearchFunc returns the index where cursor would be inserted.
// We want the first worker with key > cursor.
startIdx, _ = slices.BinarySearchFunc(workers, cursor, func(worker *workerpb.WorkerHeartbeat, target string) int {
return strings.Compare(worker.WorkerInstanceKey, target)
cursor := &workerpb.WorkerHeartbeat{
TaskQueue: token.LastTaskQueue,
StartTime: timestamppb.New(time.Unix(0, token.LastStartTimestamp)),
WorkerInstanceKey: token.LastWorkerInstanceKey,
}
startIdx, _ = slices.BinarySearchFunc(workers, cursor, func(w, c *workerpb.WorkerHeartbeat) int {
return compareWorkers(w, c)
})
// If exact match found, move past it to get first key > cursor
if startIdx < len(workers) && workers[startIdx].WorkerInstanceKey == cursor {
// Move past the cursor itself to get the first entry after it
if startIdx < len(workers) && compareWorkers(workers[startIdx], cursor) == 0 {
startIdx++
}
// If we've gone past the end, return empty
if startIdx >= len(workers) {
return ListWorkersResponse{}, nil
}
Expand All @@ -500,8 +517,11 @@ func paginateWorkers(workers []*workerpb.WorkerHeartbeat, pageSize int, nextPage
// Generate next page token if there are more results
var newNextPageToken []byte
if endIdx < len(workers) {
last := result[len(result)-1]
token := listWorkersPageToken{
LastWorkerInstanceKey: result[len(result)-1].WorkerInstanceKey,
LastTaskQueue: last.GetTaskQueue(),
LastStartTimestamp: last.GetStartTime().AsTime().UnixNano(),
LastWorkerInstanceKey: last.GetWorkerInstanceKey(),
}
newNextPageToken, _ = json.Marshal(token)
}
Expand Down
99 changes: 50 additions & 49 deletions service/matching/workers/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/metrics/metricstest"
"go.temporal.io/server/common/namespace"
"google.golang.org/protobuf/types/known/timestamppb"
)

// Test helper config defaults
Expand Down Expand Up @@ -433,96 +434,96 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) {
func TestRegistryImpl_ListWorkersPagination(t *testing.T) {
r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))

// Add 5 workers in non-sorted order to verify sorting works
baseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)

// Add 5 workers across two task queues with varying start times.
// Expected sort order: (TaskQueue asc, StartTime asc, WorkerInstanceKey asc)
// queue-a: worker-a2 (t+1), worker-a1 (t+2)
// queue-b: worker-b2 (t+0), worker-b3 (t+0, key tiebreak), worker-b1 (t+3)
r.upsertHeartbeats("ns1", "ns1_name", nil /* principal */, []*workerpb.WorkerHeartbeat{
{WorkerInstanceKey: "worker-c"},
{WorkerInstanceKey: "worker-a"},
{WorkerInstanceKey: "worker-e"},
{WorkerInstanceKey: "worker-b"},
{WorkerInstanceKey: "worker-d"},
{WorkerInstanceKey: "worker-a1", TaskQueue: "queue-a", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))},
{WorkerInstanceKey: "worker-b1", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime.Add(3 * time.Second))},
{WorkerInstanceKey: "worker-a2", TaskQueue: "queue-a", StartTime: timestamppb.New(baseTime.Add(1 * time.Second))},
{WorkerInstanceKey: "worker-b2", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime)},
{WorkerInstanceKey: "worker-b3", TaskQueue: "queue-b", StartTime: timestamppb.New(baseTime)},
})

// Test page size of 2
t.Run("first page", func(t *testing.T) {
resp, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2})
require.NoError(t, err)
assert.Len(t, resp.Workers, 2)
assert.Equal(t, "worker-a", resp.Workers[0].WorkerInstanceKey)
assert.Equal(t, "worker-b", resp.Workers[1].WorkerInstanceKey)
assert.NotNil(t, resp.NextPageToken, "should have next page token")
require.Len(t, resp.Workers, 2)
require.Equal(t, "worker-a2", resp.Workers[0].WorkerInstanceKey)
require.Equal(t, "worker-a1", resp.Workers[1].WorkerInstanceKey)
require.NotNil(t, resp.NextPageToken)
})

// Test second page
t.Run("second page", func(t *testing.T) {
// Get first page to get the token
resp1, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2})

resp2, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp1.NextPageToken})
require.NoError(t, err)
assert.Len(t, resp2.Workers, 2)
assert.Equal(t, "worker-c", resp2.Workers[0].WorkerInstanceKey)
assert.Equal(t, "worker-d", resp2.Workers[1].WorkerInstanceKey)
assert.NotNil(t, resp2.NextPageToken, "should have next page token")
require.Len(t, resp2.Workers, 2)
require.Equal(t, "worker-b2", resp2.Workers[0].WorkerInstanceKey)
require.Equal(t, "worker-b3", resp2.Workers[1].WorkerInstanceKey)
require.NotNil(t, resp2.NextPageToken)
})

// Test last page
t.Run("last page", func(t *testing.T) {
// Get first two pages
resp1, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2})
resp2, _ := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp1.NextPageToken})

resp3, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 2, NextPageToken: resp2.NextPageToken})
require.NoError(t, err)
assert.Len(t, resp3.Workers, 1)
assert.Equal(t, "worker-e", resp3.Workers[0].WorkerInstanceKey)
assert.Nil(t, resp3.NextPageToken, "should not have next page token on last page")
require.Len(t, resp3.Workers, 1)
require.Equal(t, "worker-b1", resp3.Workers[0].WorkerInstanceKey)
require.Nil(t, resp3.NextPageToken)
})
}

func TestRegistryImpl_ListWorkersPaginationWithDeletedCursor(t *testing.T) {
// Test that pagination continues correctly even if the cursor item is deleted
// between pagination requests.
baseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)

t.Run("cursor item deleted", func(t *testing.T) {
// Simulate: page 1 returned workers a, b with cursor "b"
// Before page 2, worker "b" is evicted
// Page 2 should continue from "c" (first key > "b")
// Page 1 returned workers a, b with cursor at "worker-b".
// Before page 2, worker-b is evicted.
// Page 2 should continue from worker-c (first entry after cursor).
workers := []*workerpb.WorkerHeartbeat{
{WorkerInstanceKey: "worker-a"},
{WorkerInstanceKey: "worker-a", TaskQueue: "q", StartTime: timestamppb.New(baseTime)},
// worker-b was deleted
{WorkerInstanceKey: "worker-c"},
{WorkerInstanceKey: "worker-d"},
{WorkerInstanceKey: "worker-c", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))},
{WorkerInstanceKey: "worker-d", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(3 * time.Second))},
}

// Create a token pointing to the deleted "worker-b"
token, _ := json.Marshal(listWorkersPageToken{LastWorkerInstanceKey: "worker-b"})
token, _ := json.Marshal(listWorkersPageToken{
LastTaskQueue: "q",
LastStartTimestamp: baseTime.Add(1 * time.Second).UnixNano(),
LastWorkerInstanceKey: "worker-b",
})

resp, err := paginateWorkers(workers, 2, token)
require.NoError(t, err)
assert.Len(t, resp.Workers, 2)
// Should start from "worker-c" (first key > "worker-b")
assert.Equal(t, "worker-c", resp.Workers[0].WorkerInstanceKey)
assert.Equal(t, "worker-d", resp.Workers[1].WorkerInstanceKey)
require.Len(t, resp.Workers, 2)
require.Equal(t, "worker-c", resp.Workers[0].WorkerInstanceKey)
require.Equal(t, "worker-d", resp.Workers[1].WorkerInstanceKey)
})

t.Run("cursor at end deleted", func(t *testing.T) {
// Simulate: cursor points to "worker-d" which was the last item
// Before next request, "worker-d" is evicted
// Should return empty (no more results)
// Cursor points to worker-d which was the last item. After eviction,
// no workers sort after the cursor, so result should be empty.
workers := []*workerpb.WorkerHeartbeat{
{WorkerInstanceKey: "worker-a"},
{WorkerInstanceKey: "worker-b"},
{WorkerInstanceKey: "worker-c"},
// worker-d was deleted
{WorkerInstanceKey: "worker-a", TaskQueue: "q", StartTime: timestamppb.New(baseTime)},
{WorkerInstanceKey: "worker-b", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(1 * time.Second))},
{WorkerInstanceKey: "worker-c", TaskQueue: "q", StartTime: timestamppb.New(baseTime.Add(2 * time.Second))},
}

// Create a token pointing to the deleted "worker-d"
token, _ := json.Marshal(listWorkersPageToken{LastWorkerInstanceKey: "worker-d"})
token, _ := json.Marshal(listWorkersPageToken{
LastTaskQueue: "q",
LastStartTimestamp: baseTime.Add(3 * time.Second).UnixNano(),
LastWorkerInstanceKey: "worker-d",
})

resp, err := paginateWorkers(workers, 2, token)
require.NoError(t, err)
assert.Empty(t, resp.Workers, "should return empty when cursor is past all remaining workers")
assert.Nil(t, resp.NextPageToken)
require.Empty(t, resp.Workers)
require.Nil(t, resp.NextPageToken)
})
}

Expand Down
Loading