Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 110 additions & 58 deletions packages/dds/task-manager/src/taskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import {
AttachState,
type ReadOnlyInfo,
} from "@fluidframework/container-definitions/internal";
import { assert } from "@fluidframework/core-utils/internal";
import {
DoublyLinkedList,
type ListNode,
assert,
} from "@fluidframework/core-utils/internal";
import type {
IChannelAttributes,
IFluidDataStoreRuntime,
Expand Down Expand Up @@ -91,7 +95,14 @@ export class TaskManagerClass
* Mapping of taskId to a queue of clientIds that are waiting on the task. Maintains the consensus state of the
* queue, even if we know we've submitted an op that should eventually modify the queue.
*/
private readonly taskQueues = new Map<string, string[]>();
private readonly taskQueues = new Map<string, DoublyLinkedList<string>>();

/**
* Side-lookup map paralleling {@link TaskManagerClass.taskQueues}: for each taskId, maps clientId to the
* {@link ListNode} that holds that clientId in the corresponding queue. Enables O(1) lookup and removal of a
* client from a queue without scanning the list. Must be kept symmetric with `taskQueues` at every mutation.
*/
private readonly taskQueueNodes = new Map<string, Map<string, ListNode<string>>>();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: The new taskQueueNodes: Map<string, Map<string, ListNode<string>>> must stay byte-for-byte symmetric with taskQueues across seven mutation sites: addClientToQueue, removeClientFromQueue, the "complete" op handler, the isDetached() shortcut in subscribeToTask, replacePlaceholderInAllQueues, scrubClientsNotInQuorum, and loadCore. The invariant is enforced only by two runtime asserts at the iteration sites (around lines 856-858 and 883-885) and by a one-line comment on the field declaration saying "Must be kept symmetric with taskQueues at every mutation." Every future mutation site is a silent-corruption hazard — the PR's own risk section names this "the main long-term hazard … there is no encapsulating type that enforces this — every future mutation site must update both."

Two independent design proposals converged on the same mitigation, and it lines up with ChumpChief's stated preference (#25607, 2025-10-09: "I've started preferring merging these internal emitters to a single internalEvents member") for consolidating parallel state holders.

Suggested shape. Add a small private helper inside the package — e.g. IndexedClientQueue wrapping DoublyLinkedList<string> + Map<string, ListNode<string>> — exposing has / push / delete / first / last / length / replace / [Symbol.iterator]. Replace the two parallel maps on TaskManagerClass with a single Map<string, IndexedClientQueue>. Do the analogous wrap for latestPendingOps keyed by messageId — that composes cleanly with closing the O(p) reSubmitCore lookup. The two side-map asserts retire; the invariant becomes unrepresentable. No public API change; behavior preserved.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: Resolved on sha 8cc348d. The new IndexedList<K, V> (taskManager.ts:278-330) is exactly the wrapper shape this thread proposed — single private encapsulation of DoublyLinkedList + side-index Map, with has / push / insertAfter / deleteByKey / removeNode / first / last / length / getNode / [Symbol.iterator] / map. taskQueues is now Map<string, IndexedList<string, string>> and latestPendingOps is Map<string, IndexedList<number, IPendingOp>>; the parallel taskQueueNodes field and its two iteration-site asserts are gone. The list/index-sync invariant is structural rather than convention-driven. Safe to mark resolved.


// opWatcher emits for every op on this data store. This is just a repackaging of processMessagesCore into events.
private readonly opWatcher: EventEmitter = new EventEmitter();
Expand All @@ -112,7 +123,7 @@ export class TaskManagerClass
/**
* Tracks the most recent pending op for a given task
*/
private readonly latestPendingOps = new Map<string, IPendingOp[]>();
private readonly latestPendingOps = new Map<string, DoublyLinkedList<IPendingOp>>();

/**
* Tracks tasks that are this client is currently subscribed to.
Expand Down Expand Up @@ -153,12 +164,12 @@ export class TaskManagerClass
if (local) {
const latestPendingOps = this.latestPendingOps.get(taskId);
assert(latestPendingOps !== undefined, 0xc3c /* No pending ops for task */);
const pendingOp = latestPendingOps.shift();
const pendingOpNode = latestPendingOps.shift();
assert(
pendingOp !== undefined && pendingOp.messageId === messageId,
pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId,
0xc3d /* Unexpected op */,
);
assert(pendingOp.type === "volunteer", 0x07c /* "Unexpected op type" */);
assert(pendingOpNode.data.type === "volunteer", 0x07c /* "Unexpected op type" */);
if (latestPendingOps.length === 0) {
this.latestPendingOps.delete(taskId);
}
Expand All @@ -174,12 +185,12 @@ export class TaskManagerClass
if (local) {
const latestPendingOps = this.latestPendingOps.get(taskId);
assert(latestPendingOps !== undefined, 0xc3e /* No pending ops for task */);
const pendingOp = latestPendingOps.shift();
const pendingOpNode = latestPendingOps.shift();
assert(
pendingOp !== undefined && pendingOp.messageId === messageId,
pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId,
0xc3f /* Unexpected op */,
);
assert(pendingOp.type === "abandon", 0x07e /* "Unexpected op type" */);
assert(pendingOpNode.data.type === "abandon", 0x07e /* "Unexpected op type" */);
if (latestPendingOps.length === 0) {
this.latestPendingOps.delete(taskId);
}
Expand All @@ -196,18 +207,19 @@ export class TaskManagerClass
if (local) {
const latestPendingOps = this.latestPendingOps.get(taskId);
assert(latestPendingOps !== undefined, 0xc40 /* No pending ops for task */);
const pendingOp = latestPendingOps.shift();
const pendingOpNode = latestPendingOps.shift();
assert(
pendingOp !== undefined && pendingOp.messageId === messageId,
pendingOpNode !== undefined && pendingOpNode.data.messageId === messageId,
0xc41 /* Unexpected op */,
);
assert(pendingOp.type === "complete", 0x401 /* Unexpected op type */);
assert(pendingOpNode.data.type === "complete", 0x401 /* Unexpected op type */);
if (latestPendingOps.length === 0) {
this.latestPendingOps.delete(taskId);
}
}

this.taskQueues.delete(taskId);
this.taskQueueNodes.delete(taskId);
this.completedWatcher.emit("completed", taskId, messageId);
this.emit("completed", taskId);
},
Expand Down Expand Up @@ -237,7 +249,7 @@ export class TaskManagerClass
this.connectionWatcher.on("disconnect", () => {
// Emit "lost" for any tasks we were assigned to.
for (const [taskId, clientQueue] of this.taskQueues.entries()) {
if (this.isAttached() && clientQueue[0] === this.clientId) {
if (this.isAttached() && clientQueue.first?.data === this.clientId) {
this.emit("lost", taskId);
}
}
Expand All @@ -260,7 +272,7 @@ export class TaskManagerClass
this.submitLocalMessage(op, pendingOp.messageId);
const latestPendingOps = this.latestPendingOps.get(taskId);
if (latestPendingOps === undefined) {
this.latestPendingOps.set(taskId, [pendingOp]);
this.latestPendingOps.set(taskId, new DoublyLinkedList<IPendingOp>([pendingOp]));
} else {
latestPendingOps.push(pendingOp);
}
Expand All @@ -278,7 +290,7 @@ export class TaskManagerClass
this.submitLocalMessage(op, pendingOp.messageId);
const latestPendingOps = this.latestPendingOps.get(taskId);
if (latestPendingOps === undefined) {
this.latestPendingOps.set(taskId, [pendingOp]);
this.latestPendingOps.set(taskId, new DoublyLinkedList<IPendingOp>([pendingOp]));
} else {
latestPendingOps.push(pendingOp);
}
Expand All @@ -297,7 +309,7 @@ export class TaskManagerClass
this.submitLocalMessage(op, pendingOp.messageId);
const latestPendingOps = this.latestPendingOps.get(taskId);
if (latestPendingOps === undefined) {
this.latestPendingOps.set(taskId, [pendingOp]);
this.latestPendingOps.set(taskId, new DoublyLinkedList<IPendingOp>([pendingOp]));
} else {
latestPendingOps.push(pendingOp);
}
Expand Down Expand Up @@ -560,7 +572,7 @@ export class TaskManagerClass
return false;
}

const currentAssignee = this.taskQueues.get(taskId)?.[0];
const currentAssignee = this.taskQueues.get(taskId)?.first?.data;
return currentAssignee !== undefined && currentAssignee === this.clientId;
}

Expand All @@ -572,7 +584,7 @@ export class TaskManagerClass
return false;
}

return this.taskQueues.get(taskId)?.includes(this.clientId) ?? false;
return this.taskQueueNodes.get(taskId)?.has(this.clientId) ?? false;
}

/**
Expand All @@ -594,6 +606,7 @@ export class TaskManagerClass
// we are attached. Additionally, we don't need to check if we are connected while detached.
if (this.isDetached()) {
this.taskQueues.delete(taskId);
this.taskQueueNodes.delete(taskId);
this.completedWatcher.emit("completed", taskId);
this.emit("completed", taskId);
return;
Expand Down Expand Up @@ -634,13 +647,12 @@ export class TaskManagerClass
}

// Only include tasks if there are clients in the queue.
const filteredMap = new Map<string, string[]>();
const content: [string, string[]][] = [];
for (const [taskId, queue] of this.taskQueues) {
if (queue.length > 0) {
filteredMap.set(taskId, queue);
content.push([taskId, [...queue.map((node) => node.data)]]);
}
}
const content = [...filteredMap.entries()];
return createSingleBlobSummary(snapshotFileName, JSON.stringify(content));
}

Expand All @@ -650,7 +662,14 @@ export class TaskManagerClass
protected async loadCore(storage: IChannelStorageService): Promise<void> {
const content = await readAndParse<[string, string[]][]>(storage, snapshotFileName);
for (const [taskId, clientIdQueue] of content) {
this.taskQueues.set(taskId, clientIdQueue);
const list = new DoublyLinkedList<string>();
const nodeMap = new Map<string, ListNode<string>>();
for (const clientId of clientIdQueue) {
const range = list.push(clientId);
nodeMap.set(clientId, range.last);
}
this.taskQueues.set(taskId, list);
this.taskQueueNodes.set(taskId, nodeMap);
}
this.scrubClientsNotInQuorum();
}
Expand Down Expand Up @@ -681,15 +700,16 @@ export class TaskManagerClass
assertIsTaskManagerOperation(content);
const pendingOps = this.latestPendingOps.get(content.taskId);
assert(pendingOps !== undefined, 0xc42 /* No pending ops for task on resubmit attempt */);
const pendingOpIndex = pendingOps.findIndex(
(op) => op.messageId === localOpMetadata && op.type === content.type,
const pendingOpNode = pendingOps.find(
(node) =>
node.data.messageId === localOpMetadata && node.data.type === content.type,
);
assert(pendingOpIndex !== -1, 0xc43 /* Could not match pending op on resubmit attempt */);
pendingOps.splice(pendingOpIndex, 1);
if (
content.type === "volunteer" &&
pendingOps[pendingOps.length - 1]?.type !== "abandon"
) {
assert(
pendingOpNode !== undefined,
0xc43 /* Could not match pending op on resubmit attempt */,
);
pendingOps.remove(pendingOpNode);
if (content.type === "volunteer" && pendingOps.last?.data.type !== "abandon") {
this.submitVolunteerOp(content.taskId);
}
if (pendingOps.length === 0) {
Expand Down Expand Up @@ -763,21 +783,27 @@ export class TaskManagerClass
) {
// Create the queue if it doesn't exist, and push the client on the back.
let clientQueue = this.taskQueues.get(taskId);
let clientNodes = this.taskQueueNodes.get(taskId);
if (clientQueue === undefined) {
clientQueue = [];
clientQueue = new DoublyLinkedList<string>();
this.taskQueues.set(taskId, clientQueue);
}
if (clientNodes === undefined) {
clientNodes = new Map<string, ListNode<string>>();
this.taskQueueNodes.set(taskId, clientNodes);
}

if (clientQueue.includes(clientId)) {
if (clientNodes.has(clientId)) {
// We shouldn't re-add the client if it's already in the queue.
// This may be possible in scenarios where a client was added in
// while detached.
return;
}

const oldLockHolder = clientQueue[0];
clientQueue.push(clientId);
const newLockHolder = clientQueue[0];
const oldLockHolder = clientQueue.first?.data;
const range = clientQueue.push(clientId);
clientNodes.set(clientId, range.last);
const newLockHolder = clientQueue.first?.data;
if (newLockHolder !== oldLockHolder) {
this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder);
}
Expand All @@ -789,18 +815,21 @@ export class TaskManagerClass
if (clientQueue === undefined) {
return;
}
const clientNodes = this.taskQueueNodes.get(taskId);

const oldLockHolder =
clientId === placeholderClientId ? placeholderClientId : clientQueue[0];
const clientIdIndex = clientQueue.indexOf(clientId);
if (clientIdIndex !== -1) {
clientQueue.splice(clientIdIndex, 1);
clientId === placeholderClientId ? placeholderClientId : clientQueue.first?.data;
const nodeToRemove = clientNodes?.get(clientId);
if (nodeToRemove !== undefined) {
clientQueue.remove(nodeToRemove);
clientNodes?.delete(clientId);
// Clean up the queue if there are no more clients in it.
if (clientQueue.length === 0) {
this.taskQueues.delete(taskId);
this.taskQueueNodes.delete(taskId);
}
}
const newLockHolder = clientQueue[0];
const newLockHolder = clientQueue.first?.data;
if (newLockHolder !== oldLockHolder) {
this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder);
}
Expand All @@ -821,14 +850,25 @@ export class TaskManagerClass
this.runtime.clientId !== undefined,
0x475 /* this.runtime.clientId should be defined */,
);
for (const clientQueue of this.taskQueues.values()) {
const clientIdIndex = clientQueue.indexOf(placeholderClientId);
if (clientIdIndex !== -1) {
if (clientQueue.includes(this.runtime.clientId)) {
const realClientId = this.runtime.clientId;
for (const [taskId, clientQueue] of this.taskQueues) {
const clientNodes = this.taskQueueNodes.get(taskId);
assert(
clientNodes !== undefined,
"taskQueueNodes side map missing entry for taskId",
);
const placeholderNode = clientNodes.get(placeholderClientId);
if (placeholderNode !== undefined) {
if (clientNodes.has(realClientId)) {
// If the real clientId is already in the queue, just remove the placeholder.
clientQueue.splice(clientIdIndex, 1);
clientQueue.remove(placeholderNode);
clientNodes.delete(placeholderClientId);
} else {
clientQueue[clientIdIndex] = this.runtime.clientId;
// Insert the real clientId at the placeholder's position, then remove the placeholder.
const range = clientQueue.insertAfter(placeholderNode, realClientId);
clientNodes.set(realClientId, range.last);
clientQueue.remove(placeholderNode);
clientNodes.delete(placeholderClientId);
}
}
}
Expand All @@ -839,14 +879,28 @@ export class TaskManagerClass
private scrubClientsNotInQuorum(): void {
const quorum = this.runtime.getQuorum();
for (const [taskId, clientQueue] of this.taskQueues) {
const filteredClientQueue = clientQueue.filter(
(clientId) => quorum.getMember(clientId) !== undefined,
const clientNodes = this.taskQueueNodes.get(taskId);
assert(
clientNodes !== undefined,
"taskQueueNodes side map missing entry for taskId",
);
if (clientQueue.length !== filteredClientQueue.length) {
if (filteredClientQueue.length === 0) {
let removed = false;
// Walk by collecting removable nodes first to avoid mutating during iteration.
const toRemove: ListNode<string>[] = [];
for (const node of clientQueue) {
if (quorum.getMember(node.data) === undefined) {
toRemove.push(node);
}
}
for (const node of toRemove) {
clientQueue.remove(node);
clientNodes.delete(node.data);
removed = true;
}
if (removed) {
if (clientQueue.length === 0) {
this.taskQueues.delete(taskId);
} else {
this.taskQueues.set(taskId, filteredClientQueue);
this.taskQueueNodes.delete(taskId);
}
this.queueWatcher.emit("queueChange", taskId);
}
Expand All @@ -858,13 +912,10 @@ export class TaskManagerClass
* for the latest pending ops.
*/
private queuedOptimistically(taskId: string): boolean {
const inQueue = this.taskQueues.get(taskId)?.includes(this.clientId) ?? false;
const inQueue = this.taskQueueNodes.get(taskId)?.has(this.clientId) ?? false;
const latestPendingOps = this.latestPendingOps.get(taskId);

const latestPendingOp =
latestPendingOps !== undefined && latestPendingOps.length > 0
? latestPendingOps[latestPendingOps.length - 1]
: undefined;
const latestPendingOp = latestPendingOps?.last?.data;
const isPendingVolunteer = latestPendingOp?.type === "volunteer";
const isPendingAbandonOrComplete =
latestPendingOp?.type === "abandon" || latestPendingOp?.type === "complete";
Expand Down Expand Up @@ -904,7 +955,8 @@ export class TaskManagerClass
const pendingOpToRollback = latestPendingOps.pop();
assert(
// eslint-disable-next-line @typescript-eslint/prefer-optional-chain -- using ?. could change behavior
pendingOpToRollback !== undefined && pendingOpToRollback.messageId === localOpMetadata,
pendingOpToRollback !== undefined &&
pendingOpToRollback.data.messageId === localOpMetadata,
0xc47 /* pending op mismatch */,
);
if (latestPendingOps.length === 0) {
Expand Down
Loading