diff --git a/.changeset/bunched-resubmit-dispatch.md b/.changeset/bunched-resubmit-dispatch.md new file mode 100644 index 000000000000..3b4c309add9d --- /dev/null +++ b/.changeset/bunched-resubmit-dispatch.md @@ -0,0 +1,18 @@ +--- +"@fluidframework/runtime-definitions": minor +"@fluidframework/datastore-definitions": minor +"@fluidframework/container-runtime": minor +"@fluidframework/datastore": minor +"__section": feature +--- +Extend bunched dispatch from `processMessages` to `reSubmit` + +The container runtime now bunches contiguous same-DDS resubmit entries when replaying a pending batch, mirroring the existing bunched dispatch for inbound `processMessages`. A batch of N consecutive ops targeting the same channel now makes one round trip through `ChannelCollection → FluidDataStoreContext → FluidDataStoreRuntime → IChannelContext → IDeltaHandler` rather than N. + +New API surface on `@legacy @beta`: + +- `IRuntimeResubmitMessage` and `IRuntimeResubmitMessageCollection` (`@fluidframework/runtime-definitions`) — the bunched envelope, with a shared `squash` flag. +- Optional `IFluidDataStoreChannel.reSubmitMessages(type, collection)` (`@fluidframework/runtime-definitions`) — opt-in bunched form alongside the existing `reSubmit`. +- Optional `IDeltaHandler.reSubmitMessages(collection)` (`@fluidframework/datastore-definitions`) — opt-in bunched form alongside the existing `reSubmit`. + +DDSes that do not implement `reSubmitMessages` automatically fall back to per-message `reSubmit` calls. `SharedObject`-derived DDSes get a default implementation that loops on the existing `reSubmitCore` / `reSubmitSquashedCore` paths; they may override to take advantage of seeing the full run together. Non-`FluidDataStoreOp` runtime message types (Attach, Alias, GC, etc.) continue to use the existing single-op `reSubmit` path. diff --git a/packages/dds/shared-object-base/src/sharedObject.ts b/packages/dds/shared-object-base/src/sharedObject.ts index 5fa7295ae60e..0bc2e5480217 100644 --- a/packages/dds/shared-object-base/src/sharedObject.ts +++ b/packages/dds/shared-object-base/src/sharedObject.ts @@ -35,6 +35,7 @@ import { totalBlobSizePropertyName, type IRuntimeMessageCollection, type IRuntimeMessagesContent, + type IRuntimeResubmitMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { toDeltaManagerInternal, @@ -550,6 +551,15 @@ export abstract class SharedObjectCore< reSubmit: (content: unknown, localOpMetadata: unknown, squash: boolean) => { this.reSubmit(content, localOpMetadata, squash); }, + reSubmitMessages: (collection: IRuntimeResubmitMessageCollection) => { + // Default implementation iterates the bunch and dispatches each entry through the + // existing single-op reSubmit dispatcher (reSubmitCore vs reSubmitSquashed). DDSes + // that can take advantage of seeing the contiguous run together may override the + // IDeltaHandler attached here. + for (const { contents, localOpMetadata } of collection.messages) { + this.reSubmit(contents, localOpMetadata, collection.squash); + } + }, applyStashedOp: (content: unknown): void => { this.applyStashedOp(parseHandles(content, this.serializer)); }, diff --git a/packages/runtime/container-runtime/src/channelCollection.ts b/packages/runtime/container-runtime/src/channelCollection.ts index a0154267469f..8269547d0ccd 100644 --- a/packages/runtime/container-runtime/src/channelCollection.ts +++ b/packages/runtime/container-runtime/src/channelCollection.ts @@ -66,6 +66,7 @@ import { create404Response, createResponseError, encodeCompactIdToString, + forEachContiguousBunch, isSerializedHandle, processAttachMessageGCData, responseToException, @@ -830,6 +831,48 @@ export class ChannelCollection context.reSubmit(envelope.contents, localOpMetadata, squash); }; + /** + * Resubmit a contiguous run of FluidDataStoreOp entries. Entries are bunched by + * `(address, FluidDataStoreMessage.type)` and forwarded to each data store context in a single + * {@link FluidDataStoreContext.reSubmitMessages} call per bunch — mirroring the inbound + * {@link ChannelCollection.processChannelMessages} bunching pattern. + */ + public readonly reSubmitContainerMessages = ( + entries: readonly { + envelope: IEnvelope; + localOpMetadata: unknown; + }[], + squash: boolean, + ): void => { + forEachContiguousBunch( + entries, + (e) => ({ address: e.envelope.address, type: e.envelope.contents.type }), + (e) => ({ + contents: e.envelope.contents.content, + localOpMetadata: e.localOpMetadata, + }), + (key, messages) => { + const context = this.contexts.get(key.address); + if ( + this.checkAndLogIfDeleted( + key.address, + context, + "Changed", + "reSubmitContainerMessages", + ) + ) { + throw new DataCorruptionError("Context is deleted!", { + callSite: "reSubmitContainerMessages", + ...tagCodeArtifacts({ id: key.address }), + }); + } + assert(!!context, "There should be a store context for the op"); + context.reSubmitMessages(key.type, { squash, messages }); + }, + (a, b) => a.address === b.address && a.type === b.type, + ); + }; + public readonly rollbackDataStoreOp = ( envelope: IEnvelope, localOpMetadata: unknown, @@ -955,31 +998,17 @@ export class ChannelCollection */ private processChannelMessages(messageCollection: IRuntimeMessageCollection): void { const { envelope, messagesContent, local } = messageCollection; - let currentMessageState: { address: string; type: string } | undefined; - let currentMessagesContent: IRuntimeMessagesContent[] = []; - // Helper that sends the current bunch of messages to the data store. It validates that the data stores exists. - const sendBunchedMessages = (): void => { - // Current message state will be undefined for the first message in the list. - if (currentMessageState === undefined) { - return; - } - const currentContext = this.contexts.get(currentMessageState.address); - assert(!!currentContext, 0xa66 /* Context not found */); - - currentContext.processMessages({ - envelope: { ...envelope, type: currentMessageState.type }, - messagesContent: currentMessagesContent, - local, - }); - currentMessagesContent = []; - }; + // First pass: per-message validation, GC bookkeeping, and shape transform. Deleted-context + // messages are dropped here so they never reach a bunch. Non-deleted messages are collected + // with their (address, ddsType) bunch key for the second pass. + interface ChannelMessageItem { + address: string; + type: string; + content: IRuntimeMessagesContent; + } + const items: ChannelMessageItem[] = []; - /** - * Bunch contiguous messages for the same data store and send them together. - * This is an optimization mainly for DDSes, where it can process a bunch of ops together. DDSes - * like merge tree or shared tree can process ops more efficiently when they are bunched together. - */ for (const { contents, ...restOfMessagesContent } of messagesContent) { const contentsEnvelope = contents as IEnvelope; const address = contentsEnvelope.address; @@ -1009,18 +1038,6 @@ export class ChannelCollection } const { type: contextType, content: contextContents } = contentsEnvelope.contents; - // If the address or type of the message changes while processing the message, send the current bunch. - if ( - currentMessageState?.address !== address || - currentMessageState?.type !== contextType - ) { - sendBunchedMessages(); - } - currentMessagesContent.push({ - contents: contextContents, - ...restOfMessagesContent, - }); - currentMessageState = { address, type: contextType }; // Notify that a GC node for the data store changed. This is used to detect if a deleted data store is // being used. @@ -1034,11 +1051,32 @@ export class ChannelCollection detectOutboundReferences(address, contextContents, (fromPath: string, toPath: string) => this.parentContext.addedGCOutboundRoute(fromPath, toPath, envelope.timestamp), ); + + items.push({ + address, + type: contextType, + content: { contents: contextContents, ...restOfMessagesContent }, + }); } - // Process the last bunch of messages, if any. Note that there may not be any messages in case all of them are - // ignored because the data store is deleted. - sendBunchedMessages(); + // Bunch contiguous messages for the same (address, ddsType) and send them together. This is an + // optimization mainly for DDSes, where merge-tree / shared-tree can process a bunch of ops together + // more efficiently than one at a time. + forEachContiguousBunch( + items, + (item) => ({ address: item.address, type: item.type }), + (item) => item.content, + (key, bunch) => { + const context = this.contexts.get(key.address); + assert(!!context, 0xa66 /* Context not found */); + context.processMessages({ + envelope: { ...envelope, type: key.type }, + messagesContent: bunch, + local, + }); + }, + (a, b) => a.address === b.address && a.type === b.type, + ); } private async getDataStore( diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 3c13325457e4..584ff62a0905 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -118,9 +118,11 @@ import type { IGarbageCollectionData, CreateChildSummarizerNodeParam, IDataStore, + IEnvelope, IFluidDataStoreContextDetached, IFluidDataStoreRegistry, IFluidParentContext, + FluidDataStoreMessage, ISummarizeInternalResult, InboundAttachMessage, NamedFluidDataStoreRegistryEntries, @@ -5030,9 +5032,36 @@ export class ContainerRuntime : this.reSubmit.bind(this); this.batchRunner.run(() => { - for (const message of batch) { - resubmitFn(message); + // Collect contiguous runs of FluidDataStoreOp entries and dispatch each run through + // the bunched path so the channel collection can group them by (address, ddsType) for + // efficient single-call resubmit at the DDS layer. Non-FluidDataStoreOp entries flush + // the current run and use the existing single-op path (squash-aware via resubmitFn). + let currentBunch: { + envelope: IEnvelope; + localOpMetadata: unknown; + }[] = []; + + const flushBunch = (): void => { + if (currentBunch.length === 0) { + return; + } + this.channelCollection.reSubmitContainerMessages(currentBunch, squash); + currentBunch = []; + }; + + for (const data of batch) { + const message = data.runtimeOp; + if (message.type === ContainerMessageType.FluidDataStoreOp) { + currentBunch.push({ + envelope: message.contents, + localOpMetadata: data.localOpMetadata, + }); + } else { + flushBunch(); + resubmitFn(data); + } } + flushBunch(); }, resubmitInfo); this.flush(resubmitInfo); diff --git a/packages/runtime/container-runtime/src/dataStoreContext.ts b/packages/runtime/container-runtime/src/dataStoreContext.ts index 6b46a9ac9537..af34b5b7548d 100644 --- a/packages/runtime/container-runtime/src/dataStoreContext.ts +++ b/packages/runtime/container-runtime/src/dataStoreContext.ts @@ -64,6 +64,7 @@ import type { IInboundSignalMessage, IPendingMessagesState, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IFluidDataStoreFactory, PackagePath, IRuntimeStorageService, @@ -1101,6 +1102,19 @@ export abstract class FluidDataStoreContext this.channel.reSubmit(message.type, message.content, localOpMetadata, squash); } + public reSubmitMessages(type: string, collection: IRuntimeResubmitMessageCollection): void { + assert(!!this.channel, "Channel must exist when resubmitting ops"); + if (this.channel.reSubmitMessages !== undefined) { + this.channel.reSubmitMessages(type, collection); + return; + } + + // Fallback for channels that haven't opted in to the bunched form. + for (const { contents, localOpMetadata } of collection.messages) { + this.channel.reSubmit(type, contents, localOpMetadata, collection.squash); + } + } + public rollback(message: FluidDataStoreMessage, localOpMetadata: unknown): void { if (!this.channel) { throw new Error("Channel must exist when rolling back ops"); diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 39f5e2af7aad..812a835b226d 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -190,6 +190,21 @@ function stubChannelCollection( .stub, void>() .callsFake(containerRuntime.submitMessage.bind(containerRuntime)); + // Bunched form: replay each entry as a singleton FluidDataStoreOp through submitMessage. + const reSubmitBunchedFake = sandbox + .stub, void>() + .callsFake((entries, squash) => { + for (const { envelope, localOpMetadata } of entries) { + containerRuntime.submitMessage( + { + type: ContainerMessageType.FluidDataStoreOp, + contents: envelope, + }, + localOpMetadata, + ); + } + }); + const stub = Sinon.createStubInstance( // createSubInstance does not work with property methods (which are // used for stricter typing); so, override via a subclass here. @@ -199,6 +214,10 @@ function stubChannelCollection( ..._args: Parameters ): void {} // @ts-expect-error -- redefine as instance method for stubbing + public reSubmitContainerMessages( + ..._args: Parameters + ): void {} + // @ts-expect-error -- redefine as instance method for stubbing public rollbackDataStoreOp( ..._args: Parameters ) {} @@ -206,6 +225,7 @@ function stubChannelCollection( { setConnectionState: sandbox.stub(), reSubmitContainerMessage: reSubmitFake, + reSubmitContainerMessages: reSubmitBunchedFake, rollbackDataStoreOp: sandbox.stub(), notifyStagingMode: sandbox.stub(), dispose: sandbox.stub(), @@ -1540,6 +1560,18 @@ describe("Runtime", () => { setConnectionState: (_connected: boolean, _clientId?: string) => {}, // Pass data store op right back to ContainerRuntime reSubmitContainerMessage: containerRuntime.submitMessage.bind(containerRuntime), + // Bunched form: replay each entry as a singleton through submitMessage. + reSubmitContainerMessages: (entries, _squash) => { + for (const { envelope, localOpMetadata } of entries) { + containerRuntime.submitMessage( + { + type: ContainerMessageType.FluidDataStoreOp, + contents: envelope, + }, + localOpMetadata, + ); + } + }, } satisfies Partial; return patched; @@ -4487,19 +4519,20 @@ describe("Runtime", () => { controls.commitChanges(); assert( - channelCollectionStub.reSubmitContainerMessage.calledOnce, - "Expected reSubmit to be called once. Prestaging op should not be resubmitted", + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected reSubmitContainerMessages to be called once. Prestaging op should not be resubmitted", ); assert( - channelCollectionStub.reSubmitContainerMessage.calledWithExactly( - { - type: ContainerMessageType.FluidDataStoreOp, - contents: { address: "2", contents: stagedOpContents }, - }, - "LOCAL_OP_METADATA", + channelCollectionStub.reSubmitContainerMessages.calledWithExactly( + [ + { + envelope: { address: "2", contents: stagedOpContents }, + localOpMetadata: "LOCAL_OP_METADATA", + }, + ], /* squash: */ false, // False by default on commitChanges ), - "Unexpected args for reSubmit", + "Unexpected args for reSubmitContainerMessages", ); assert( channelCollectionStub.notifyStagingMode.getCall(1)?.calledWithExactly(false), @@ -4515,6 +4548,84 @@ describe("Runtime", () => { assert.equal(submittedOps[1].contents.address, "2", "Unexpected staged op address"); }); + it("commitChanges bunches contiguous same-data-store ops into one reSubmitContainerMessages call", () => { + const channelCollectionStub = stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + // Three contiguous ops targeting data store "ds1" with the same DDS op type. + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("a"), "md-a"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("b"), "md-b"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("c"), "md-c"); + containerRuntime.flush(); + + controls.commitChanges(); + + assert( + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected exactly one bunched reSubmitContainerMessages call for the contiguous run", + ); + const [entries, squash] = + channelCollectionStub.reSubmitContainerMessages.firstCall.args; + assert.strictEqual(entries.length, 3, "Expected all three entries in one bunch"); + assert.strictEqual(squash, false, "commitChanges default should not squash"); + assert.deepStrictEqual( + entries.map((e) => e.envelope.address), + ["ds1", "ds1", "ds1"], + "All entries should target the same data store address", + ); + assert.deepStrictEqual( + entries.map((e) => e.localOpMetadata), + ["md-a", "md-b", "md-c"], + "Each entry's localOpMetadata should be preserved", + ); + }); + + it("commitChanges splits bunches when the data store address changes", () => { + const channelCollectionStub = stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + // Two ops to ds1, then one to ds2, then two more to ds1 — should produce 3 bunches. + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("a"), "md-a"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("b"), "md-b"); + submitDataStoreOp(containerRuntime, "ds2", genTestDataStoreMessage("c"), "md-c"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("d"), "md-d"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("e"), "md-e"); + containerRuntime.flush(); + + controls.commitChanges(); + + // All five entries arrive in a single contiguous reSubmitContainerMessages call; + // the per-address grouping happens inside ChannelCollection. + assert( + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected single reSubmitContainerMessages call carrying the contiguous run", + ); + const [entries] = channelCollectionStub.reSubmitContainerMessages.firstCall.args; + assert.deepStrictEqual( + entries.map((e) => e.envelope.address), + ["ds1", "ds1", "ds2", "ds1", "ds1"], + "Entries should preserve submission order across address changes", + ); + }); + + it("commitChanges with squash propagates the flag on the bunched call", () => { + const channelCollectionStub = stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("a"), "md-a"); + submitDataStoreOp(containerRuntime, "ds1", genTestDataStoreMessage("b"), "md-b"); + containerRuntime.flush(); + + controls.commitChanges({ squash: true }); + + assert( + channelCollectionStub.reSubmitContainerMessages.calledOnce, + "Expected one bunched reSubmitContainerMessages call", + ); + const [, squash] = channelCollectionStub.reSubmitContainerMessages.firstCall.args; + assert.strictEqual(squash, true, "squash flag should propagate to the bunched call"); + }); + it("discardChanges drops staged ops", () => { const channelCollectionStub = stubChannelCollection(containerRuntime); diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md index fbf624137081..f1a23571a73f 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md @@ -61,6 +61,7 @@ export interface IDeltaHandler { applyStashedOp(message: any): void; processMessages: (messageCollection: IRuntimeMessageCollection) => void; reSubmit(message: any, localOpMetadata: unknown, squash?: boolean): void; + reSubmitMessages?(collection: IRuntimeResubmitMessageCollection): void; rollback?(message: any, localOpMetadata: unknown): void; setConnectionState(connected: boolean): void; } diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md index 174ec80ca5a6..cfda94593c5c 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md @@ -61,6 +61,7 @@ export interface IDeltaHandler { applyStashedOp(message: any): void; processMessages: (messageCollection: IRuntimeMessageCollection) => void; reSubmit(message: any, localOpMetadata: unknown, squash?: boolean): void; + reSubmitMessages?(collection: IRuntimeResubmitMessageCollection): void; rollback?(message: any, localOpMetadata: unknown): void; setConnectionState(connected: boolean): void; } diff --git a/packages/runtime/datastore-definitions/src/channel.ts b/packages/runtime/datastore-definitions/src/channel.ts index 527af53e1138..e9de71698221 100644 --- a/packages/runtime/datastore-definitions/src/channel.ts +++ b/packages/runtime/datastore-definitions/src/channel.ts @@ -9,6 +9,7 @@ import type { IExperimentalIncrementalSummaryContext, IGarbageCollectionData, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, ISummaryTreeWithStats, ITelemetryContext, } from "@fluidframework/runtime-definitions/internal"; @@ -167,6 +168,19 @@ export interface IDeltaHandler { // eslint-disable-next-line @typescript-eslint/no-explicit-any reSubmit(message: any, localOpMetadata: unknown, squash?: boolean): void; + /** + * Called when the runtime asks the client to resubmit a bunch of contiguous messages. + * The bunched form of {@link IDeltaHandler.reSubmit}, mirroring the inbound + * {@link IDeltaHandler.processMessages} shape so DDSes that benefit from processing a + * contiguous run together can do so on resubmit. + * + * Optional: if not implemented, the runtime falls back to iterating the collection and + * invoking {@link IDeltaHandler.reSubmit} per message. + * + * @param collection - The bunch of messages to resubmit, with a shared `squash` flag. + */ + reSubmitMessages?(collection: IRuntimeResubmitMessageCollection): void; + /** * Apply changes from an op just as if a local client has made the change, * including submitting the op. Used when rehydrating an attached container diff --git a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md index 1ee912869d4b..49cc1e22112c 100644 --- a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md +++ b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md @@ -88,6 +88,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; reSubmit(type: DataStoreMessageType, content: any, localOpMetadata: unknown, squash: boolean): void; + reSubmitMessages(type: DataStoreMessageType, collection: IRuntimeResubmitMessageCollection): void; rollback?(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void; // (undocumented) get rootRoutingContext(): this; diff --git a/packages/runtime/datastore/package.json b/packages/runtime/datastore/package.json index 565c0f23dd5c..bfa7f3373363 100644 --- a/packages/runtime/datastore/package.json +++ b/packages/runtime/datastore/package.json @@ -157,7 +157,11 @@ "typescript": "~5.4.5" }, "typeValidation": { - "broken": {}, + "broken": { + "Class_FluidDataStoreRuntime": { + "forwardCompat": false + } + }, "entrypoint": "legacy" } } diff --git a/packages/runtime/datastore/src/channelContext.ts b/packages/runtime/datastore/src/channelContext.ts index 900e00f1ef25..2a1ca2c3bf12 100644 --- a/packages/runtime/datastore/src/channelContext.ts +++ b/packages/runtime/datastore/src/channelContext.ts @@ -19,6 +19,7 @@ import type { IFluidDataStoreContext, ISummarizeResult, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "@fluidframework/runtime-definitions/internal"; import { addBlobToSummary } from "@fluidframework/runtime-utils/internal"; @@ -53,6 +54,12 @@ export interface IChannelContext { reSubmit(content: unknown, localOpMetadata: unknown, squash: boolean): void; + /** + * Resubmit a bunch of contiguous messages for this channel context in one call. + * @param collection - The bunch of messages to resubmit, with a shared `squash` flag. + */ + reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void; + applyStashedOp(content: unknown): unknown; rollback(message: unknown, localOpMetadata: unknown): void; diff --git a/packages/runtime/datastore/src/channelDeltaConnection.ts b/packages/runtime/datastore/src/channelDeltaConnection.ts index 433f613af8ec..2debcb539611 100644 --- a/packages/runtime/datastore/src/channelDeltaConnection.ts +++ b/packages/runtime/datastore/src/channelDeltaConnection.ts @@ -11,6 +11,8 @@ import type { import type { IRuntimeMessageCollection, IRuntimeMessagesContent, + IRuntimeResubmitMessage, + IRuntimeResubmitMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { DataProcessingError } from "@fluidframework/telemetry-utils/internal"; @@ -123,6 +125,39 @@ export class ChannelDeltaConnection implements IDeltaConnection { ); } + public reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void { + // Fan out any stashed-op metadata pairs into individual entries before dispatching the bunch, + // mirroring how processMessages expands messagesContent. + const flattened: IRuntimeResubmitMessage[] = []; + for (const { contents, localOpMetadata } of collection.messages) { + processWithStashedOpMetadataHandling( + contents, + localOpMetadata, + (expandedContents, expandedMetadata) => { + flattened.push({ + contents: expandedContents, + localOpMetadata: expandedMetadata, + }); + }, + ); + } + + const expandedCollection: IRuntimeResubmitMessageCollection = { + squash: collection.squash, + messages: flattened, + }; + + if (this.handler.reSubmitMessages !== undefined) { + this.handler.reSubmitMessages(expandedCollection); + return; + } + + // Fallback for handlers that haven't opted in to the bunched form. + for (const { contents, localOpMetadata } of expandedCollection.messages) { + this.handler.reSubmit(contents, localOpMetadata, expandedCollection.squash); + } + } + public rollback(content: unknown, localOpMetadata: unknown): void { if (this.handler.rollback === undefined) { throw new Error("Handler doesn't support rollback"); diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index 0c34a36c6f3e..1e3e94e8af16 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -59,6 +59,8 @@ import { type IInboundSignalMessage, type IRuntimeMessageCollection, type IRuntimeMessagesContent, + type IRuntimeResubmitMessage, + type IRuntimeResubmitMessageCollection, notifiesReadOnlyState, encodeHandlesInContainerRuntime, type IFluidDataStorePolicies, @@ -76,6 +78,7 @@ import { create404Response, createResponseError, exceptionToResponse, + forEachContiguousBunch, generateHandleContextPath, processAttachMessageGCData, dataStoreLoadTelemetryProps, @@ -1412,6 +1415,62 @@ export class FluidDataStoreRuntime } } + /** + * Resubmit a bunch of messages of the same type. For ChannelOp bunches, contiguous entries + * targeting the same channel address are forwarded to that channel's context as a single + * bunched resubmit; address changes within the bunch flush the current sub-bunch. + * + * @privateRemarks + * `type` parameter's type of `DataStoreMessageType` is a covariance exception over `string` + * that `IFluidDataStoreChannel` specifies. See the existing {@link FluidDataStoreRuntime.reSubmit} + * for context. + */ + public reSubmitMessages( + type: DataStoreMessageType, + collection: IRuntimeResubmitMessageCollection, + ): void { + this.verifyNotClosed(); + + const { squash, messages } = collection; + // The ops being resubmitted will not be submitted as-is, so decrement the count. The + // downstream resubmit calls below may resubmit ops (which will re-increment) or not. + this.pendingOpCount.value -= messages.length; + + switch (type) { + case DataStoreMessageType.ChannelOp: { + // Bunch contiguous entries by channel address and dispatch each sub-bunch in one call. + forEachContiguousBunch( + messages, + (message) => (message.contents as IEnvelope).address, + (message): IRuntimeResubmitMessage => ({ + contents: (message.contents as IEnvelope).contents, + localOpMetadata: message.localOpMetadata, + }), + (address, bunch) => { + const channelContext = this.contexts.get(address); + assert(!!channelContext, "There should be a channel context for the op"); + channelContext.reSubmitMessages({ squash, messages: bunch }); + }, + ); + break; + } + case DataStoreMessageType.Attach: { + // Attach messages aren't meaningfully bunchable — resubmit each individually. + for (const message of messages) { + this.submit( + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment -- mirrors `reSubmit`'s any-typed content + { type, content: message.contents as IAttachMessage }, + message.localOpMetadata, + ); + } + break; + } + default: { + unreachableCase(type); + } + } + } + /** * Revert a local op. * @param content - The content of the original message. diff --git a/packages/runtime/datastore/src/localChannelContext.ts b/packages/runtime/datastore/src/localChannelContext.ts index 641a96b49a65..9b322afb72a7 100644 --- a/packages/runtime/datastore/src/localChannelContext.ts +++ b/packages/runtime/datastore/src/localChannelContext.ts @@ -17,6 +17,7 @@ import type { ISummarizeResult, IPendingMessagesState, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "@fluidframework/runtime-definitions/internal"; import { @@ -115,6 +116,15 @@ export abstract class LocalChannelContextBase implements IChannelContext { ); this.services.value.deltaConnection.reSubmit(content, localOpMetadata, squash); } + + public reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void { + assert(this.isLoaded, "Channel should be loaded to resubmit ops"); + assert( + this.globallyVisible, + "Local channel must be globally visible when resubmitting op", + ); + this.services.value.deltaConnection.reSubmitMessages(collection); + } public rollback(content: unknown, localOpMetadata: unknown): void { assert(this.isLoaded, 0x2ee /* "Channel should be loaded to rollback ops" */); assert( diff --git a/packages/runtime/datastore/src/remoteChannelContext.ts b/packages/runtime/datastore/src/remoteChannelContext.ts index 3b4c1931c82f..5a3948d710c1 100644 --- a/packages/runtime/datastore/src/remoteChannelContext.ts +++ b/packages/runtime/datastore/src/remoteChannelContext.ts @@ -21,6 +21,7 @@ import type { ISummarizerNodeWithGC, IPendingMessagesState, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "@fluidframework/runtime-definitions/internal"; import { @@ -203,6 +204,11 @@ export class RemoteChannelContext implements IChannelContext { this.services.deltaConnection.reSubmit(content, localOpMetadata, squash); } + public reSubmitMessages(collection: IRuntimeResubmitMessageCollection): void { + assert(this.isLoaded, "Remote channel must be loaded when resubmitting ops"); + this.services.deltaConnection.reSubmitMessages(collection); + } + public rollback(content: unknown, localOpMetadata: unknown): void { assert(this.isLoaded, 0x2f0 /* "Remote channel must be loaded when rolling back op" */); diff --git a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts index b0e5b7c27b93..a71b284584f7 100644 --- a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts +++ b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts @@ -24,6 +24,7 @@ declare type MakeUnusedImportErrorsGoAway = TypeOnly | MinimalType | Fu * typeValidation.broken: * "Class_FluidDataStoreRuntime": {"forwardCompat": false} */ +// @ts-expect-error compatibility expected to be broken declare type old_as_current_for_Class_FluidDataStoreRuntime = requireAssignableTo, TypeOnly> /* diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md index 9f18e360b28c..bd9179ce73f0 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md @@ -146,6 +146,7 @@ export interface IFluidDataStoreChannel extends IDisposable { // (undocumented) request(request: IRequest): Promise; reSubmit(type: string, content: any, localOpMetadata: unknown, squash: boolean): void; + reSubmitMessages?(type: string, collection: IRuntimeResubmitMessageCollection): void; rollback?(type: string, content: any, localOpMetadata: unknown): void; // (undocumented) setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void; @@ -297,6 +298,18 @@ export interface IRuntimeMessagesContent { readonly localOpMetadata: unknown; } +// @beta @sealed @legacy +export interface IRuntimeResubmitMessage { + readonly contents: unknown; + readonly localOpMetadata: unknown; +} + +// @beta @sealed @legacy +export interface IRuntimeResubmitMessageCollection { + readonly messages: readonly IRuntimeResubmitMessage[]; + readonly squash: boolean; +} + // @beta @legacy export interface IRuntimeStorageService { readBlob(id: string): Promise; diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md index 75f077ccfa54..42d63f894569 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md @@ -139,6 +139,7 @@ export interface IFluidDataStoreChannel extends IDisposable { // (undocumented) request(request: IRequest): Promise; reSubmit(type: string, content: any, localOpMetadata: unknown, squash: boolean): void; + reSubmitMessages?(type: string, collection: IRuntimeResubmitMessageCollection): void; rollback?(type: string, content: any, localOpMetadata: unknown): void; // (undocumented) setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void; @@ -290,6 +291,18 @@ export interface IRuntimeMessagesContent { readonly localOpMetadata: unknown; } +// @beta @sealed @legacy +export interface IRuntimeResubmitMessage { + readonly contents: unknown; + readonly localOpMetadata: unknown; +} + +// @beta @sealed @legacy +export interface IRuntimeResubmitMessageCollection { + readonly messages: readonly IRuntimeResubmitMessage[]; + readonly squash: boolean; +} + // @beta @legacy export interface IRuntimeStorageService { readBlob(id: string): Promise; diff --git a/packages/runtime/runtime-definitions/src/dataStoreContext.ts b/packages/runtime/runtime-definitions/src/dataStoreContext.ts index 4e514b16b38d..87bf119f379a 100644 --- a/packages/runtime/runtime-definitions/src/dataStoreContext.ts +++ b/packages/runtime/runtime-definitions/src/dataStoreContext.ts @@ -41,6 +41,7 @@ import type { import type { IInboundSignalMessage, IRuntimeMessageCollection, + IRuntimeResubmitMessageCollection, IRuntimeStorageService, } from "./protocol.js"; import type { @@ -480,6 +481,21 @@ export interface IFluidDataStoreChannel extends IDisposable { // eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change reSubmit(type: string, content: any, localOpMetadata: unknown, squash: boolean): void; + /** + * Ask the DDS to resubmit a bunch of contiguous messages of the same type. + * @remarks + * The bunched form of {@link IFluidDataStoreChannel.reSubmit}, mirroring the inbound + * {@link IFluidDataStoreChannel.processMessages} shape so DDSes that benefit from + * processing a contiguous run together can do so on resubmit. + * + * The default implementation provided by the runtime simply loops over + * `collection.messages` calling the single-op {@link IFluidDataStoreChannel.reSubmit} + * path; implementers may override to take advantage of the bunch. + * @param type - The type shared by all messages in the collection. + * @param collection - The bunch of messages to resubmit, with a shared `squash` flag. + */ + reSubmitMessages?(type: string, collection: IRuntimeResubmitMessageCollection): void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change applyStashedOp(content: any): Promise; diff --git a/packages/runtime/runtime-definitions/src/index.ts b/packages/runtime/runtime-definitions/src/index.ts index c5a50343eff5..bbc062a3cb2c 100644 --- a/packages/runtime/runtime-definitions/src/index.ts +++ b/packages/runtime/runtime-definitions/src/index.ts @@ -64,6 +64,8 @@ export type { InboundAttachMessage, IRuntimeMessageCollection, IRuntimeMessagesContent, + IRuntimeResubmitMessage, + IRuntimeResubmitMessageCollection, ISequencedMessageEnvelope, IRuntimeStorageService, } from "./protocol.js"; diff --git a/packages/runtime/runtime-definitions/src/protocol.ts b/packages/runtime/runtime-definitions/src/protocol.ts index d2159b428d2f..615044d3f5d3 100644 --- a/packages/runtime/runtime-definitions/src/protocol.ts +++ b/packages/runtime/runtime-definitions/src/protocol.ts @@ -141,6 +141,44 @@ export interface IRuntimeMessageCollection { readonly messagesContent: readonly IRuntimeMessagesContent[]; } +/** + * A single message to resubmit, as part of an {@link IRuntimeResubmitMessageCollection}. + * @legacy @beta + * @sealed + */ +export interface IRuntimeResubmitMessage { + /** + * The contents of the original message that was submitted. + */ + readonly contents: unknown; + /** + * The local metadata associated with the original message that was submitted. + */ + readonly localOpMetadata: unknown; +} + +/** + * A collection of messages to be resubmitted together as a "bunch". + * @remarks + * All messages in a resubmit collection share the same target — that is, they are + * for the same DDS — and share a single `squash` setting. This mirrors the inbound + * "bunch" shape of {@link IRuntimeMessageCollection} for the outbound resubmit path, + * allowing DDSes to handle a contiguous run of resubmits in one call. + * @legacy @beta + * @sealed + */ +export interface IRuntimeResubmitMessageCollection { + /** + * If true, the DDS should avoid resubmitting any "unnecessary intermediate state" created + * by these messages. Applies uniformly to every message in the collection. + */ + readonly squash: boolean; + /** + * The messages to resubmit, in original submission order. + */ + readonly messages: readonly IRuntimeResubmitMessage[]; +} + /** * Outgoing {@link IFluidDataStoreChannel} message structures. * @internal diff --git a/packages/runtime/runtime-utils/src/bunching.ts b/packages/runtime/runtime-utils/src/bunching.ts new file mode 100644 index 000000000000..342163ca413c --- /dev/null +++ b/packages/runtime/runtime-utils/src/bunching.ts @@ -0,0 +1,57 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +/** + * Walk a sequence of items, grouping each maximal run of contiguous items with the same key + * into a "bunch" and invoking `onBunch` once per run. + * + * @remarks + * This is the core pattern used by the Fluid runtime to dispatch operations to a data store + * or DDS in bunches: inbound op processing (`processMessages`) and outbound resubmit + * (`reSubmitMessages`) both walk a contiguous sequence of operations and dispatch each + * maximal same-target run as a single call to the lower layer. + * + * The helper preserves input order. Each call to `onBunch` receives the key it was bunched + * by along with the list of transformed bunch items in original order. Side effects that need + * to happen per source item (e.g. delete checks, GC node updates) should be performed by the + * caller around the call to this helper, not inside `valueOf` — `valueOf` is purely a shape + * transformation from source item to bunch item. + * + * @param items - The source items to walk. + * @param keyOf - Extracts the bunching key from a source item. Items with equal keys + * (according to `keysEqual`) that appear contiguously are bunched together. + * @param valueOf - Transforms a source item into its bunch-element form. + * @param onBunch - Invoked once per bunch with the key and the bunch items. + * @param keysEqual - Equality predicate for keys. Defaults to `Object.is`. Provide a custom + * predicate to bunch by structured / composite keys. + * + * @internal + */ +export function forEachContiguousBunch( + items: Iterable, + keyOf: (item: TItem) => TKey, + valueOf: (item: TItem) => TBunchItem, + onBunch: (key: TKey, bunch: TBunchItem[]) => void, + keysEqual: (a: TKey, b: TKey) => boolean = Object.is, +): void { + let currentKey: TKey | undefined; + let hasCurrentKey = false; + let bunch: TBunchItem[] = []; + + for (const item of items) { + const key = keyOf(item); + if (hasCurrentKey && !keysEqual(currentKey as TKey, key)) { + onBunch(currentKey as TKey, bunch); + bunch = []; + } + currentKey = key; + hasCurrentKey = true; + bunch.push(valueOf(item)); + } + + if (hasCurrentKey && bunch.length > 0) { + onBunch(currentKey as TKey, bunch); + } +} diff --git a/packages/runtime/runtime-utils/src/index.ts b/packages/runtime/runtime-utils/src/index.ts index 289587c98aa3..2d6571fc8f37 100644 --- a/packages/runtime/runtime-utils/src/index.ts +++ b/packages/runtime/runtime-utils/src/index.ts @@ -3,6 +3,7 @@ * Licensed under the MIT License. */ +export { forEachContiguousBunch } from "./bunching.js"; export { generateHandleContextPath } from "./dataStoreHandleContextUtils.js"; export { create404Response, diff --git a/packages/runtime/runtime-utils/src/test/bunching.spec.ts b/packages/runtime/runtime-utils/src/test/bunching.spec.ts new file mode 100644 index 000000000000..b40b8887d7fc --- /dev/null +++ b/packages/runtime/runtime-utils/src/test/bunching.spec.ts @@ -0,0 +1,142 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; + +import { forEachContiguousBunch } from "../bunching.js"; + +describe("forEachContiguousBunch", () => { + it("emits nothing for an empty iterable", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch<{ k: string; v: number }, string, number>( + [], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, []); + }); + + it("emits a single bunch for a single item", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [{ k: "a", v: 1 }], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [{ key: "a", bunch: [1] }]); + }); + + it("collapses contiguous same-key items into one bunch", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "a", v: 2 }, + { k: "a", v: 3 }, + ], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [{ key: "a", bunch: [1, 2, 3] }]); + }); + + it("splits when the key changes and preserves order", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "a", v: 2 }, + { k: "b", v: 3 }, + { k: "a", v: 4 }, + { k: "a", v: 5 }, + ], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [ + { key: "a", bunch: [1, 2] }, + { key: "b", bunch: [3] }, + { key: "a", bunch: [4, 5] }, + ]); + }); + + it("alternating keys produce singleton bunches", () => { + const bunches: { key: string; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "b", v: 2 }, + { k: "a", v: 3 }, + { k: "b", v: 4 }, + ], + (i) => i.k, + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + ); + assert.deepStrictEqual(bunches, [ + { key: "a", bunch: [1] }, + { key: "b", bunch: [2] }, + { key: "a", bunch: [3] }, + { key: "b", bunch: [4] }, + ]); + }); + + it("uses a structured-key equality predicate when provided", () => { + const bunches: { key: { x: string; y: string }; bunch: number[] }[] = []; + forEachContiguousBunch( + [ + { x: "a", y: "1", v: 10 }, + { x: "a", y: "1", v: 11 }, + { x: "a", y: "2", v: 12 }, + { x: "a", y: "2", v: 13 }, + { x: "a", y: "1", v: 14 }, + ], + (i) => ({ x: i.x, y: i.y }), + (i) => i.v, + (key, bunch) => bunches.push({ key, bunch }), + (a, b) => a.x === b.x && a.y === b.y, + ); + assert.deepStrictEqual(bunches, [ + { key: { x: "a", y: "1" }, bunch: [10, 11] }, + { key: { x: "a", y: "2" }, bunch: [12, 13] }, + { key: { x: "a", y: "1" }, bunch: [14] }, + ]); + }); + + it("treats every item as a distinct key with reference-equality on object keys", () => { + // Default keysEqual is Object.is — two distinct object instances never match, + // so each item becomes its own bunch even when the keys are structurally identical. + const bunches: number[][] = []; + forEachContiguousBunch( + [ + { k: { a: 1 }, v: 1 }, + { k: { a: 1 }, v: 2 }, + ], + (i) => i.k, + (i) => i.v, + (_key, bunch) => bunches.push(bunch), + ); + assert.deepStrictEqual(bunches, [[1], [2]]); + }); + + it("supports valueOf that transforms items", () => { + const bunches: string[][] = []; + forEachContiguousBunch( + [ + { k: "a", v: 1 }, + { k: "a", v: 2 }, + { k: "b", v: 3 }, + ], + (i) => i.k, + (i) => `v${i.v}`, + (_key, bunch) => bunches.push(bunch), + ); + assert.deepStrictEqual(bunches, [["v1", "v2"], ["v3"]]); + }); +});