diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 8964246c1160..d9f4ae96476b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -391,12 +391,18 @@ BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int elements, long bytes) if (keyGroupWorkQueue == null) { return null; } - @Nullable QueuedWork queuedWork = keyGroupWorkQueue.pollWork(computationId, keyGroup); - if (queuedWork == null) { - return null; + while (true) { + @Nullable QueuedWork queuedWork = keyGroupWorkQueue.pollWork(computationId, keyGroup); + if (queuedWork == null) { + return null; + } + if (queuedWork.getWork().work().isFailed()) { + queuedWork.getHandle().close(); + } else { + internalHandle.merge(queuedWork.getHandle()); + return queuedWork.getWork(); + } } - internalHandle.merge(queuedWork.getHandle()); - return queuedWork.getWork(); } private void decrementCounters(int elements, long bytes) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index a98102751fb2..c39b7f3a1d4d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -553,4 +553,74 @@ public void testPollWorkWithLinkedBlockingQueue() throws Exception { blockerStop.countDown(); testExecutor.shutdown(); } + + @Test + public void testPollWorkDropsFailedWork() throws Exception { + BoundedQueueExecutor testExecutor = + new BoundedQueueExecutor( + /* initialMaximumPoolSize= */ 1, + /* keepAliveTime= */ 60, + /* unit= */ TimeUnit.SECONDS, + /* maximumElementsOutstanding= */ 100, + /* maximumBytesOutstanding= */ 10000000, + new ThreadFactoryBuilder().setNameFormat("testStealing-%d").setDaemon(true).build(), + useFairMonitor, + /*useKeyGroupWorkQueue=*/ true); + + // Create blocker task to occupy the worker thread + CountDownLatch blockerStart = new CountDownLatch(1); + CountDownLatch blockerStop = new CountDownLatch(1); + ExecutableWork blockerWork = + createWorkWithCompIdAndKeyGroup( + "blockerComp", + DEFAULT_KEY_GROUP, + ignored -> { + blockerStart.countDown(); + try { + blockerStop.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + testExecutor.execute(blockerWork, 0); + blockerStart.await(); + + Work.KeyGroup keyGroup1 = Work.KeyGroup.create(1, 1); + + // Create executable tasks + ExecutableWork work1 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, ignored -> {}); + ExecutableWork work2 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, ignored -> {}); + + // Mark work1 as failed + work1.work().setFailed(); + try { + + // Enqueue tasks + testExecutor.execute(work1, 100); + testExecutor.execute(work2, 150); + + // Total outstanding elements must be 3 (blocker + work1 + work2) + assertEquals(3, testExecutor.elementsOutstanding()); + + // Steal work from keyGroup1. + // The first work in queue is work1, which is failed. + // It should be dropped, its handle closed, and work2 should be returned. + try (BoundedQueueExecutorWorkHandleImpl stealHandle = + testExecutor.createBudgetHandle(0, 0L)) { + ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1, stealHandle); + assertNotNull(stolen); + assertEquals(work2, stolen); + // blocker (1) + work2 (1) = 2. work1 (1) should have been released. + assertEquals(2, testExecutor.elementsOutstanding()); + } + // work2 should also be released now because stealHandle is closed. + // blocker (1) = 1. + assertEquals(1, testExecutor.elementsOutstanding()); + } finally { + // Unblock the blocker and shut down + blockerStop.countDown(); + testExecutor.shutdown(); + } + } }