From 2c1278636a603fb5e06a5f5bf6f179b0fbf41d7c Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 11 Jun 2026 13:27:09 +0000 Subject: [PATCH 1/2] Drop failed work in BoundedQueueExecutor::pollWork --- .../worker/util/BoundedQueueExecutor.java | 16 +++-- .../worker/util/BoundedQueueExecutorTest.java | 67 +++++++++++++++++++ 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 8964246c1160..d9f4ae96476b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -391,12 +391,18 @@ BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int elements, long bytes) if (keyGroupWorkQueue == null) { return null; } - @Nullable QueuedWork queuedWork = keyGroupWorkQueue.pollWork(computationId, keyGroup); - if (queuedWork == null) { - return null; + while (true) { + @Nullable QueuedWork queuedWork = keyGroupWorkQueue.pollWork(computationId, keyGroup); + if (queuedWork == null) { + return null; + } + if (queuedWork.getWork().work().isFailed()) { + queuedWork.getHandle().close(); + } else { + internalHandle.merge(queuedWork.getHandle()); + return queuedWork.getWork(); + } } - internalHandle.merge(queuedWork.getHandle()); - return queuedWork.getWork(); } private void decrementCounters(int elements, long bytes) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index a98102751fb2..9106133cec24 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -553,4 +553,71 @@ public void testPollWorkWithLinkedBlockingQueue() throws Exception { blockerStop.countDown(); testExecutor.shutdown(); } + + @Test + public void testPollWorkDropsFailedWork() throws Exception { + BoundedQueueExecutor testExecutor = + new BoundedQueueExecutor( + /* initialMaximumPoolSize= */ 1, + /* keepAliveTime= */ 60, + /* unit= */ TimeUnit.SECONDS, + /* maximumElementsOutstanding= */ 100, + /* maximumBytesOutstanding= */ 10000000, + new ThreadFactoryBuilder().setNameFormat("testStealing-%d").setDaemon(true).build(), + useFairMonitor, + /*useKeyGroupWorkQueue=*/ true); + + // Create blocker task to occupy the worker thread + CountDownLatch blockerStart = new CountDownLatch(1); + CountDownLatch blockerStop = new CountDownLatch(1); + ExecutableWork blockerWork = + createWorkWithCompIdAndKeyGroup( + "blockerComp", + DEFAULT_KEY_GROUP, + ignored -> { + blockerStart.countDown(); + try { + blockerStop.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + testExecutor.execute(blockerWork, 0); + blockerStart.await(); + + Work.KeyGroup keyGroup1 = Work.KeyGroup.create(1, 1); + + // Create executable tasks + ExecutableWork work1 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, ignored -> {}); + ExecutableWork work2 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, ignored -> {}); + + // Mark work1 as failed + work1.work().setFailed(); + + // Enqueue tasks + testExecutor.execute(work1, 100); + testExecutor.execute(work2, 150); + + // Total outstanding elements must be 3 (blocker + work1 + work2) + assertEquals(3, testExecutor.elementsOutstanding()); + + // Steal work from keyGroup1. + // The first work in queue is work1, which is failed. + // It should be dropped, its handle closed, and work2 should be returned. + try (BoundedQueueExecutorWorkHandleImpl stealHandle = testExecutor.createBudgetHandle(0, 0L)) { + ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1, stealHandle); + assertNotNull(stolen); + assertEquals(work2, stolen); + // blocker (1) + work2 (1) = 2. work1 (1) should have been released. + assertEquals(2, testExecutor.elementsOutstanding()); + } + // work2 should also be released now because stealHandle is closed. + // blocker (1) = 1. + assertEquals(1, testExecutor.elementsOutstanding()); + + // Unblock the blocker and shut down + blockerStop.countDown(); + testExecutor.shutdown(); + } } From 3670a036aa83a3c18e18f3b9435214f3cbb0ad13 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 11 Jun 2026 13:42:25 +0000 Subject: [PATCH 2/2] address comment --- .../worker/util/BoundedQueueExecutorTest.java | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index 9106133cec24..c39b7f3a1d4d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -594,30 +594,33 @@ public void testPollWorkDropsFailedWork() throws Exception { // Mark work1 as failed work1.work().setFailed(); - - // Enqueue tasks - testExecutor.execute(work1, 100); - testExecutor.execute(work2, 150); - - // Total outstanding elements must be 3 (blocker + work1 + work2) - assertEquals(3, testExecutor.elementsOutstanding()); - - // Steal work from keyGroup1. - // The first work in queue is work1, which is failed. - // It should be dropped, its handle closed, and work2 should be returned. - try (BoundedQueueExecutorWorkHandleImpl stealHandle = testExecutor.createBudgetHandle(0, 0L)) { - ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1, stealHandle); - assertNotNull(stolen); - assertEquals(work2, stolen); - // blocker (1) + work2 (1) = 2. work1 (1) should have been released. - assertEquals(2, testExecutor.elementsOutstanding()); + try { + + // Enqueue tasks + testExecutor.execute(work1, 100); + testExecutor.execute(work2, 150); + + // Total outstanding elements must be 3 (blocker + work1 + work2) + assertEquals(3, testExecutor.elementsOutstanding()); + + // Steal work from keyGroup1. + // The first work in queue is work1, which is failed. + // It should be dropped, its handle closed, and work2 should be returned. + try (BoundedQueueExecutorWorkHandleImpl stealHandle = + testExecutor.createBudgetHandle(0, 0L)) { + ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1, stealHandle); + assertNotNull(stolen); + assertEquals(work2, stolen); + // blocker (1) + work2 (1) = 2. work1 (1) should have been released. + assertEquals(2, testExecutor.elementsOutstanding()); + } + // work2 should also be released now because stealHandle is closed. + // blocker (1) = 1. + assertEquals(1, testExecutor.elementsOutstanding()); + } finally { + // Unblock the blocker and shut down + blockerStop.countDown(); + testExecutor.shutdown(); } - // work2 should also be released now because stealHandle is closed. - // blocker (1) = 1. - assertEquals(1, testExecutor.elementsOutstanding()); - - // Unblock the blocker and shut down - blockerStop.countDown(); - testExecutor.shutdown(); } }