Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,18 @@ BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int elements, long bytes)
if (keyGroupWorkQueue == null) {
return null;
}
@Nullable QueuedWork queuedWork = keyGroupWorkQueue.pollWork(computationId, keyGroup);
if (queuedWork == null) {
return null;
while (true) {
@Nullable QueuedWork queuedWork = keyGroupWorkQueue.pollWork(computationId, keyGroup);
if (queuedWork == null) {
return null;
}
if (queuedWork.getWork().work().isFailed()) {
queuedWork.getHandle().close();
} else {
internalHandle.merge(queuedWork.getHandle());
return queuedWork.getWork();
}
}
internalHandle.merge(queuedWork.getHandle());
return queuedWork.getWork();
}

private void decrementCounters(int elements, long bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,4 +553,74 @@ public void testPollWorkWithLinkedBlockingQueue() throws Exception {
blockerStop.countDown();
testExecutor.shutdown();
}

@Test
public void testPollWorkDropsFailedWork() throws Exception {
BoundedQueueExecutor testExecutor =
new BoundedQueueExecutor(
/* initialMaximumPoolSize= */ 1,
/* keepAliveTime= */ 60,
/* unit= */ TimeUnit.SECONDS,
/* maximumElementsOutstanding= */ 100,
/* maximumBytesOutstanding= */ 10000000,
new ThreadFactoryBuilder().setNameFormat("testStealing-%d").setDaemon(true).build(),
useFairMonitor,
/*useKeyGroupWorkQueue=*/ true);

// Create blocker task to occupy the worker thread
CountDownLatch blockerStart = new CountDownLatch(1);
CountDownLatch blockerStop = new CountDownLatch(1);
ExecutableWork blockerWork =
createWorkWithCompIdAndKeyGroup(
"blockerComp",
DEFAULT_KEY_GROUP,
ignored -> {
blockerStart.countDown();
try {
blockerStop.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

testExecutor.execute(blockerWork, 0);
blockerStart.await();

Work.KeyGroup keyGroup1 = Work.KeyGroup.create(1, 1);

// Create executable tasks
ExecutableWork work1 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, ignored -> {});
ExecutableWork work2 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1, ignored -> {});

// Mark work1 as failed
work1.work().setFailed();
try {

// Enqueue tasks
testExecutor.execute(work1, 100);
testExecutor.execute(work2, 150);

// Total outstanding elements must be 3 (blocker + work1 + work2)
assertEquals(3, testExecutor.elementsOutstanding());

// Steal work from keyGroup1.
// The first work in queue is work1, which is failed.
// It should be dropped, its handle closed, and work2 should be returned.
try (BoundedQueueExecutorWorkHandleImpl stealHandle =
testExecutor.createBudgetHandle(0, 0L)) {
ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1, stealHandle);
assertNotNull(stolen);
assertEquals(work2, stolen);
// blocker (1) + work2 (1) = 2. work1 (1) should have been released.
assertEquals(2, testExecutor.elementsOutstanding());
}
// work2 should also be released now because stealHandle is closed.
// blocker (1) = 1.
assertEquals(1, testExecutor.elementsOutstanding());
} finally {
// Unblock the blocker and shut down
blockerStop.countDown();
testExecutor.shutdown();
}
}
}
Loading