From c21a7a6d39c5a5d05927118c35e8dc98fa7285c4 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 11 Jun 2026 13:36:26 +0000 Subject: [PATCH 1/3] Activate SourceState Finalizers before submitting workitem to harness threads --- .../dataflow/worker/StreamingDataflowWorker.java | 2 +- .../streaming/harness/SingleSourceWorkerHarness.java | 8 ++------ .../work/processing/StreamingWorkScheduler.java | 12 ++++-------- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 9e82343474c6..ec35e0f28f8d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -418,7 +418,6 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar .ifPresent( computationState -> { memoryMonitor.waitForResources("GetWork"); - streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); streamingWorkScheduler.scheduleWork( computationState, workItem, @@ -426,6 +425,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies); }), ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index f41223310385..1c9431ed8784 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -165,9 +165,6 @@ private void streamingEngineDispatchLoop( .ifPresent( computationState -> { waitForResources.run(); - if (!appliedFinalizeIds.isEmpty()) { - streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); - } streamingWorkScheduler.scheduleWork( computationState, workItem, @@ -184,6 +181,7 @@ private void streamingEngineDispatchLoop( workCommitter::commit, heartbeatSender), drainMode, + appliedFinalizeIds, getWorkStreamLatencies); })); try { @@ -221,9 +219,6 @@ private void applianceDispatchLoop(Supplier getWorkFn) ImmutableList appliedFinalizeIds = ImmutableList.copyOf( Preconditions.checkNotNull(workResponse).getAppliedFinalizeIdsList()); - if (!appliedFinalizeIds.isEmpty()) { - streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); - } for (Windmill.ComputationWorkItems computationWork : Preconditions.checkNotNull(workResponse).getWorkList()) { String computationId = computationWork.getComputationId(); @@ -252,6 +247,7 @@ private void applianceDispatchLoop(Supplier getWorkFn) Work.createProcessingContext( computationId, getDataClient, workCommitter::commit, heartbeatSender), computationWork.getDrainMode(), + appliedFinalizeIds, /* getWorkStreamLatencies= */ ImmutableList.of()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 364608be82ca..cfa987af400d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -215,7 +215,11 @@ public void scheduleWork( Watermarks watermarks, Work.ProcessingContext processingContext, boolean drainMode, + ImmutableList appliedFinalizeIds, ImmutableList getWorkStreamLatencies) { + // Before any processing starts, call any pending OnCommit callbacks. + commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); + commitFinalizer.finalizeCommits(appliedFinalizeIds); computationState.activateWork( ExecutableWork.create( Work.create( @@ -223,11 +227,6 @@ public void scheduleWork( (work, handle) -> processWork(computationState, work, getWorkStreamLatencies, handle))); } - /** Adds any applied finalize ids to the commit finalizer to have their callbacks executed. */ - public void queueAppliedFinalizeIds(ImmutableList appliedFinalizeIds) { - commitFinalizer.finalizeCommits(appliedFinalizeIds); - } - /** * Executes the user DoFns processing {@link Work} then queues the {@link Commit}(s) to be sent to * backing persistent store to mark that the {@link Work} has finished processing. May retry @@ -255,9 +254,6 @@ private void processWork( setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId); LOG.debug("Starting processing for {}:\n{}", computationId, work); - // Before any processing starts, call any pending OnCommit callbacks. Nothing that requires - // cleanup should be done before this, since we might exit early here. - commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); if (workItem.getSourceState().getOnlyFinalize()) { Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem); outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true)); From 4238c187cae359d9792c37e8b4d3972346e89dd6 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 11 Jun 2026 13:59:28 +0000 Subject: [PATCH 2/3] fix top level finalizer calls --- .../runners/dataflow/worker/StreamingDataflowWorker.java | 2 +- .../streaming/harness/SingleSourceWorkerHarness.java | 8 ++++++-- .../windmill/work/processing/StreamingWorkScheduler.java | 8 +++++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index ec35e0f28f8d..9e82343474c6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -418,6 +418,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar .ifPresent( computationState -> { memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); streamingWorkScheduler.scheduleWork( computationState, workItem, @@ -425,7 +426,6 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar watermarks, processingContext, drainMode, - appliedFinalizeIds, getWorkStreamLatencies); }), ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index 1c9431ed8784..f41223310385 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -165,6 +165,9 @@ private void streamingEngineDispatchLoop( .ifPresent( computationState -> { waitForResources.run(); + if (!appliedFinalizeIds.isEmpty()) { + streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); + } streamingWorkScheduler.scheduleWork( computationState, workItem, @@ -181,7 +184,6 @@ private void streamingEngineDispatchLoop( workCommitter::commit, heartbeatSender), drainMode, - appliedFinalizeIds, getWorkStreamLatencies); })); try { @@ -219,6 +221,9 @@ private void applianceDispatchLoop(Supplier getWorkFn) ImmutableList appliedFinalizeIds = ImmutableList.copyOf( Preconditions.checkNotNull(workResponse).getAppliedFinalizeIdsList()); + if (!appliedFinalizeIds.isEmpty()) { + streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); + } for (Windmill.ComputationWorkItems computationWork : Preconditions.checkNotNull(workResponse).getWorkList()) { String computationId = computationWork.getComputationId(); @@ -247,7 +252,6 @@ private void applianceDispatchLoop(Supplier getWorkFn) Work.createProcessingContext( computationId, getDataClient, workCommitter::commit, heartbeatSender), computationWork.getDrainMode(), - appliedFinalizeIds, /* getWorkStreamLatencies= */ ImmutableList.of()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index cfa987af400d..2e43bb62e023 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -215,11 +215,8 @@ public void scheduleWork( Watermarks watermarks, Work.ProcessingContext processingContext, boolean drainMode, - ImmutableList appliedFinalizeIds, ImmutableList getWorkStreamLatencies) { - // Before any processing starts, call any pending OnCommit callbacks. commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); - commitFinalizer.finalizeCommits(appliedFinalizeIds); computationState.activateWork( ExecutableWork.create( Work.create( @@ -227,6 +224,11 @@ public void scheduleWork( (work, handle) -> processWork(computationState, work, getWorkStreamLatencies, handle))); } + /** Adds any applied finalize ids to the commit finalizer to have their callbacks executed. */ + public void queueAppliedFinalizeIds(ImmutableList appliedFinalizeIds) { + commitFinalizer.finalizeCommits(appliedFinalizeIds); + } + /** * Executes the user DoFns processing {@link Work} then queues the {@link Commit}(s) to be sent to * backing persistent store to mark that the {@link Work} has finished processing. May retry From 6852ecb74ec0068a02668efd70b35b09789a4e0e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 11 Jun 2026 14:03:01 +0000 Subject: [PATCH 3/3] add comments --- .../worker/windmill/work/processing/StreamingWorkScheduler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 2e43bb62e023..a3f23aebdf8f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -216,6 +216,7 @@ public void scheduleWork( Work.ProcessingContext processingContext, boolean drainMode, ImmutableList getWorkStreamLatencies) { + // Before any processing starts, call any pending OnCommit callbacks commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); computationState.activateWork( ExecutableWork.create(