Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .changeset/bunched-resubmit-dispatch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"@fluidframework/runtime-definitions": minor
"@fluidframework/datastore-definitions": minor
"@fluidframework/container-runtime": minor
"@fluidframework/datastore": minor
"__section": feature
---
Extend bunched dispatch from `processMessages` to `reSubmit`

The container runtime now bunches contiguous same-DDS resubmit entries when replaying a pending batch, mirroring the existing bunched dispatch for inbound `processMessages`. A batch of N consecutive ops targeting the same channel now makes one round trip through `ChannelCollection → FluidDataStoreContext → FluidDataStoreRuntime → IChannelContext → IDeltaHandler` rather than N.

Check warning on line 10 in .changeset/bunched-resubmit-dispatch.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [Microsoft.Acronyms] 'DDS' has no definition. Raw Output: {"message": "[Microsoft.Acronyms] 'DDS' has no definition.", "location": {"path": ".changeset/bunched-resubmit-dispatch.md", "range": {"start": {"line": 10, "column": 51}}}, "severity": "INFO"}

New API surface on `@legacy @beta`:

- `IRuntimeResubmitMessage` and `IRuntimeResubmitMessageCollection` (`@fluidframework/runtime-definitions`) — the bunched envelope, with a shared `squash` flag.

Check failure on line 14 in .changeset/bunched-resubmit-dispatch.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [Microsoft.Dashes] Remove the spaces around ' — '. Raw Output: {"message": "[Microsoft.Dashes] Remove the spaces around ' — '.", "location": {"path": ".changeset/bunched-resubmit-dispatch.md", "range": {"start": {"line": 14, "column": 108}}}, "severity": "ERROR"}
- Optional `IFluidDataStoreChannel.reSubmitMessages(type, collection)` (`@fluidframework/runtime-definitions`) — opt-in bunched form alongside the existing `reSubmit`.

Check failure on line 15 in .changeset/bunched-resubmit-dispatch.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [Microsoft.Dashes] Remove the spaces around ' — '. Raw Output: {"message": "[Microsoft.Dashes] Remove the spaces around ' — '.", "location": {"path": ".changeset/bunched-resubmit-dispatch.md", "range": {"start": {"line": 15, "column": 111}}}, "severity": "ERROR"}
- Optional `IDeltaHandler.reSubmitMessages(collection)` (`@fluidframework/datastore-definitions`) — opt-in bunched form alongside the existing `reSubmit`.

Check failure on line 16 in .changeset/bunched-resubmit-dispatch.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [Microsoft.Dashes] Remove the spaces around ' — '. Raw Output: {"message": "[Microsoft.Dashes] Remove the spaces around ' — '.", "location": {"path": ".changeset/bunched-resubmit-dispatch.md", "range": {"start": {"line": 16, "column": 98}}}, "severity": "ERROR"}

DDSes that do not implement `reSubmitMessages` automatically fall back to per-message `reSubmit` calls. `SharedObject`-derived DDSes get a default implementation that loops on the existing `reSubmitCore` / `reSubmitSquashedCore` paths; they may override to take advantage of seeing the full run together. Non-`FluidDataStoreOp` runtime message types (Attach, Alias, GC, etc.) continue to use the existing single-op `reSubmit` path.

Check warning on line 18 in .changeset/bunched-resubmit-dispatch.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [Microsoft.Semicolon] Try to simplify this sentence. Raw Output: {"message": "[Microsoft.Semicolon] Try to simplify this sentence.", "location": {"path": ".changeset/bunched-resubmit-dispatch.md", "range": {"start": {"line": 18, "column": 235}}}, "severity": "INFO"}
10 changes: 10 additions & 0 deletions packages/dds/shared-object-base/src/sharedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
totalBlobSizePropertyName,
type IRuntimeMessageCollection,
type IRuntimeMessagesContent,
type IRuntimeResubmitMessageCollection,
} from "@fluidframework/runtime-definitions/internal";
import {
toDeltaManagerInternal,
Expand Down Expand Up @@ -550,6 +551,15 @@ export abstract class SharedObjectCore<
reSubmit: (content: unknown, localOpMetadata: unknown, squash: boolean) => {
this.reSubmit(content, localOpMetadata, squash);
},
reSubmitMessages: (collection: IRuntimeResubmitMessageCollection) => {
// Default implementation iterates the bunch and dispatches each entry through the
// existing single-op reSubmit dispatcher (reSubmitCore vs reSubmitSquashed). DDSes
// that can take advantage of seeing the contiguous run together may override the
// IDeltaHandler attached here.
for (const { contents, localOpMetadata } of collection.messages) {
this.reSubmit(contents, localOpMetadata, collection.squash);
}
},
applyStashedOp: (content: unknown): void => {
this.applyStashedOp(parseHandles(content, this.serializer));
},
Expand Down
114 changes: 76 additions & 38 deletions packages/runtime/container-runtime/src/channelCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import {
create404Response,
createResponseError,
encodeCompactIdToString,
forEachContiguousBunch,
isSerializedHandle,
processAttachMessageGCData,
responseToException,
Expand Down Expand Up @@ -830,6 +831,48 @@ export class ChannelCollection
context.reSubmit(envelope.contents, localOpMetadata, squash);
};

/**
* Resubmit a contiguous run of FluidDataStoreOp entries. Entries are bunched by
* `(address, FluidDataStoreMessage.type)` and forwarded to each data store context in a single
* {@link FluidDataStoreContext.reSubmitMessages} call per bunch — mirroring the inbound
* {@link ChannelCollection.processChannelMessages} bunching pattern.
*/
public readonly reSubmitContainerMessages = (
entries: readonly {
envelope: IEnvelope<FluidDataStoreMessage>;
localOpMetadata: unknown;
}[],
squash: boolean,
): void => {
forEachContiguousBunch(
entries,
(e) => ({ address: e.envelope.address, type: e.envelope.contents.type }),
(e) => ({
contents: e.envelope.contents.content,
localOpMetadata: e.localOpMetadata,
}),
(key, messages) => {
const context = this.contexts.get(key.address);
if (
this.checkAndLogIfDeleted(
key.address,
context,
"Changed",
"reSubmitContainerMessages",
)
) {
throw new DataCorruptionError("Context is deleted!", {
callSite: "reSubmitContainerMessages",
...tagCodeArtifacts({ id: key.address }),
});
}
assert(!!context, "There should be a store context for the op");
context.reSubmitMessages(key.type, { squash, messages });
},
(a, b) => a.address === b.address && a.type === b.type,
);
};

public readonly rollbackDataStoreOp = (
envelope: IEnvelope<FluidDataStoreMessage>,
localOpMetadata: unknown,
Expand Down Expand Up @@ -955,31 +998,17 @@ export class ChannelCollection
*/
private processChannelMessages(messageCollection: IRuntimeMessageCollection): void {
const { envelope, messagesContent, local } = messageCollection;
let currentMessageState: { address: string; type: string } | undefined;
let currentMessagesContent: IRuntimeMessagesContent[] = [];

// Helper that sends the current bunch of messages to the data store. It validates that the data stores exists.
const sendBunchedMessages = (): void => {
// Current message state will be undefined for the first message in the list.
if (currentMessageState === undefined) {
return;
}
const currentContext = this.contexts.get(currentMessageState.address);
assert(!!currentContext, 0xa66 /* Context not found */);

currentContext.processMessages({
envelope: { ...envelope, type: currentMessageState.type },
messagesContent: currentMessagesContent,
local,
});
currentMessagesContent = [];
};
// First pass: per-message validation, GC bookkeeping, and shape transform. Deleted-context
// messages are dropped here so they never reach a bunch. Non-deleted messages are collected
// with their (address, ddsType) bunch key for the second pass.
interface ChannelMessageItem {
address: string;
type: string;
content: IRuntimeMessagesContent;
}
const items: ChannelMessageItem[] = [];

/**
* Bunch contiguous messages for the same data store and send them together.
* This is an optimization mainly for DDSes, where it can process a bunch of ops together. DDSes
* like merge tree or shared tree can process ops more efficiently when they are bunched together.
*/
for (const { contents, ...restOfMessagesContent } of messagesContent) {
const contentsEnvelope = contents as IEnvelope<FluidDataStoreMessage>;
const address = contentsEnvelope.address;
Expand Down Expand Up @@ -1009,18 +1038,6 @@ export class ChannelCollection
}

const { type: contextType, content: contextContents } = contentsEnvelope.contents;
// If the address or type of the message changes while processing the message, send the current bunch.
if (
currentMessageState?.address !== address ||
currentMessageState?.type !== contextType
) {
sendBunchedMessages();
}
currentMessagesContent.push({
contents: contextContents,
...restOfMessagesContent,
});
currentMessageState = { address, type: contextType };

// Notify that a GC node for the data store changed. This is used to detect if a deleted data store is
// being used.
Expand All @@ -1034,11 +1051,32 @@ export class ChannelCollection
detectOutboundReferences(address, contextContents, (fromPath: string, toPath: string) =>
this.parentContext.addedGCOutboundRoute(fromPath, toPath, envelope.timestamp),
);

items.push({
address,
type: contextType,
content: { contents: contextContents, ...restOfMessagesContent },
});
}

// Process the last bunch of messages, if any. Note that there may not be any messages in case all of them are
// ignored because the data store is deleted.
sendBunchedMessages();
// Bunch contiguous messages for the same (address, ddsType) and send them together. This is an
// optimization mainly for DDSes, where merge-tree / shared-tree can process a bunch of ops together
// more efficiently than one at a time.
forEachContiguousBunch(
items,
(item) => ({ address: item.address, type: item.type }),
(item) => item.content,
(key, bunch) => {
const context = this.contexts.get(key.address);
assert(!!context, 0xa66 /* Context not found */);
context.processMessages({
envelope: { ...envelope, type: key.type },
messagesContent: bunch,
local,
});
},
(a, b) => a.address === b.address && a.type === b.type,
);
}

private async getDataStore(
Expand Down
33 changes: 31 additions & 2 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ import type {
IGarbageCollectionData,
CreateChildSummarizerNodeParam,
IDataStore,
IEnvelope,
IFluidDataStoreContextDetached,
IFluidDataStoreRegistry,
IFluidParentContext,
FluidDataStoreMessage,
ISummarizeInternalResult,
InboundAttachMessage,
NamedFluidDataStoreRegistryEntries,
Expand Down Expand Up @@ -5030,9 +5032,36 @@ export class ContainerRuntime
: this.reSubmit.bind(this);

this.batchRunner.run(() => {
for (const message of batch) {
resubmitFn(message);
// Collect contiguous runs of FluidDataStoreOp entries and dispatch each run through
// the bunched path so the channel collection can group them by (address, ddsType) for
// efficient single-call resubmit at the DDS layer. Non-FluidDataStoreOp entries flush
// the current run and use the existing single-op path (squash-aware via resubmitFn).
let currentBunch: {
envelope: IEnvelope<FluidDataStoreMessage>;
localOpMetadata: unknown;
}[] = [];

const flushBunch = (): void => {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use the bunching helper here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: Good question — forEachContiguousBunch would parallel the channelCollection.ts:847-873 usage and complete the inbound/outbound symmetry the rest of the design rests on. Recommend landing the conversion here rather than as a follow-up, since reSubmitBatch's ad-hoc accumulation is the one place in the outbound path that doesn't yet route through the shared helper.

if (currentBunch.length === 0) {
return;
}
this.channelCollection.reSubmitContainerMessages(currentBunch, squash);
currentBunch = [];
};

for (const data of batch) {
const message = data.runtimeOp;
if (message.type === ContainerMessageType.FluidDataStoreOp) {
currentBunch.push({
envelope: message.contents,
localOpMetadata: data.localOpMetadata,
});
} else {
flushBunch();
resubmitFn(data);
}
}
flushBunch();
}, resubmitInfo);

this.flush(resubmitInfo);
Expand Down
14 changes: 14 additions & 0 deletions packages/runtime/container-runtime/src/dataStoreContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import type {
IInboundSignalMessage,
IPendingMessagesState,
IRuntimeMessageCollection,
IRuntimeResubmitMessageCollection,
IFluidDataStoreFactory,
PackagePath,
IRuntimeStorageService,
Expand Down Expand Up @@ -1101,6 +1102,19 @@ export abstract class FluidDataStoreContext
this.channel.reSubmit(message.type, message.content, localOpMetadata, squash);
}

public reSubmitMessages(type: string, collection: IRuntimeResubmitMessageCollection): void {
assert(!!this.channel, "Channel must exist when resubmitting ops");
if (this.channel.reSubmitMessages !== undefined) {
this.channel.reSubmitMessages(type, collection);
return;
}

// Fallback for channels that haven't opted in to the bunched form.
for (const { contents, localOpMetadata } of collection.messages) {
this.channel.reSubmit(type, contents, localOpMetadata, collection.squash);
}
}

public rollback(message: FluidDataStoreMessage, localOpMetadata: unknown): void {
if (!this.channel) {
throw new Error("Channel must exist when rolling back ops");
Expand Down
Loading
Loading