Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
08d9bab
Add TaskReadinessGate plugin extension point interface
robsyme May 19, 2026
faec578
Pin InterruptedException in TaskReadinessGate contract test
robsyme May 19, 2026
6326896
Add executor.gateMaxWait config option
robsyme May 19, 2026
f2a4000
Wire TaskReadinessGate discovery and executor into TaskPollingMonitor
robsyme May 19, 2026
d61d9f0
Document GateState role in gateMaxWait accounting
robsyme May 19, 2026
53e64b8
Submit TaskReadinessGate work on schedule()
robsyme May 19, 2026
e0e32be
Document gateExecutor non-blocking-submission contract
robsyme May 19, 2026
a2b7256
Check TaskReadinessGate futures in canSubmit()
robsyme May 19, 2026
1066c40
Preserve cause exception type in TaskReadinessGate failure path
robsyme May 19, 2026
3968af2
Test TaskReadinessGate exception propagation through canSubmit
robsyme May 19, 2026
0199381
Preserve ProcessException identity through TaskReadinessGate failure …
robsyme May 19, 2026
56f7ec4
Wrap non-ProcessException causes to preserve ProcessRetryableExceptio…
robsyme May 19, 2026
5fdcfab
Enforce executor.gateMaxWait timeout in TaskReadinessGate flow
robsyme May 19, 2026
7532936
Cancel in-flight TaskReadinessGate futures on task eviction
robsyme May 19, 2026
af4bc73
Test all-must-complete semantics for multiple TaskReadinessGates
robsyme May 19, 2026
e7f4913
Stub forks/ready in ParallelPollingMonitorTest array-size case after …
robsyme May 19, 2026
d77e0ab
Document TaskReadinessGate extension point and executor.gateMaxWait
robsyme May 19, 2026
d699108
Remove implementation plan from upstream docs
robsyme May 19, 2026
898913d
Document retry-marker placement requirement in TaskReadinessGate
robsyme May 19, 2026
48b2762
Drop executor.gateMaxWait; let plugins own timeout policy via hints
robsyme May 20, 2026
d8c05ec
Extract TaskGateManager from TaskPollingMonitor
robsyme May 20, 2026
21c7d60
Revert TaskPollingMonitor constructor lock-init refactor
robsyme May 20, 2026
34f8fed
Address re-review nits: stale javadoc, field-init comment, style, intent
robsyme May 21, 2026
c7445c2
Rename TaskGateManager.submit() to prepare() per PR review
robsyme May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions docs/developer/task-readiness-gate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
(task-readiness-gate)=

# `TaskReadinessGate`

`TaskReadinessGate` is a plugin extension point that defers task submission until an external precondition is met — for example, restoring an S3 object from Glacier before an AWS Batch worker tries to stage it. It works uniformly with every executor that uses Nextflow's `TaskPollingMonitor` and removes the need for plugins to subclass an executor and its task handler purely to override `TaskHandler.isReady()`.

## Interface

A gate implements one method:

```groovy
package nextflow.processor

import org.pf4j.ExtensionPoint

interface TaskReadinessGate extends ExtensionPoint {
void prepare(TaskHandler handler) throws InterruptedException
}
```

The plugin registers an implementation via the standard PF4J `@Extension` mechanism, the same way `TraceObserverFactory`, `CacheFactory`, and other extension points are discovered.

## Contract

- **Blocking is allowed.** `prepare` runs on a managed virtual-thread executor inside `TaskPollingMonitor`. Calling `Thread.sleep`, blocking I/O, or long-polling APIs is fine. The scheduler thread is never blocked.
- **Throwing fails the task.** Any exception marks the task as permanently failed and routes the cause through the task's `errorStrategy` directive. `ProcessException` (and subclasses) propagate identity-preserved. Other throwables are wrapped in a `ProcessException` with the original attached as `cause`, so retry markers like `ProcessRetryableException` reach `TaskProcessor.resumeOrDie` intact.
- **Retry markers** (`ProcessRetryableException`, `CloudSpotTerminationException`) are recognised by `resumeOrDie` on the *cause* of the thrown exception, not on the exception itself. If you want `errorStrategy 'retry'` to fire for a transient failure, throw the marker as-is (it will be wrapped) — do not pre-wrap it in a `ProcessException`, since the `ProcessException` would propagate identity-preserved and the marker would be lost.
- **Interrupts must be honored.** Task eviction and workflow abort cancel the in-flight `prepare` by interrupting its thread. Use interruptible primitives (`Thread.sleep`, blocking I/O on NIO channels, `Future.get`). Core does not enforce a wall-clock deadline — the plugin owns timeout policy.
- **Multiple gates compose.** When several plugins register gates, all must complete successfully before the task is admitted. Gates run in parallel; order is unspecified.

## Timeouts and per-process overrides via `hints`

Core deliberately does not ship an executor-level timeout. The right deadline depends on which plugin is registered and on the workload (e.g. Glacier Standard is hours; Deep Archive Bulk is days). Plugins enforce their own deadlines via the existing `hints` directive — namespaced under the plugin's name to avoid collisions.

```nextflow
process FAST_PATH {
hints 'glacier/skip': true // bypass the gate entirely
// ...
}

process LONG_RESTORE {
hints 'glacier/maxWait': '48h' // per-process deadline override
// ...
}
```

Inside the gate, read `handler.task.config.hints` and enforce the deadline yourself:

```groovy
@Override
void prepare(TaskHandler handler) throws InterruptedException {
final hints = handler.task.config.hints
if( hints['glacier/skip'] == true ) return

final maxWait = (hints['glacier/maxWait'] as Duration) ?: defaultMaxWait
final deadline = System.currentTimeMillis() + maxWait.toMillis()

for( S3Path path : extractS3Inputs(handler.task) ) {
manager.issueRestoreIfNeeded(path)
while( !manager.isRestored(path) ) {
if( System.currentTimeMillis() > deadline )
throw new ProcessException("Glacier restore exceeded ${maxWait} for ${path}")
if( manager.isPermanentlyFailed(path) )
throw new ProcessException("Glacier restore failed for ${path}")
Thread.sleep(60_000)
}
}
}
```

The `hints` directive uses prefix-separated keys (`<plugin>/<name>`); plugins should namespace their hint keys (e.g. `glacier/skip`, `mycorp.cold-storage/skip`) to avoid collisions with other plugins.

## When `prepare` runs

`TaskPollingMonitor.schedule()` submits `prepare` to the managed executor the moment the task is enqueued — before any executor slot has freed up, before `canForkProcess()` is consulted. This means restore work for tasks queued behind a full executor begins immediately, not when slots free up.

`canSubmit()` then polls the resulting `Future` on every monitor tick. The task is admitted as soon as every gate's future has completed successfully and the standard `canForkProcess` / `isReady` / capacity checks pass.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import java.util.concurrent.Callable
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.exception.ProcessException
import nextflow.plugin.Plugins
import nextflow.util.CustomThreadFactory
import nextflow.util.Threads

/**
* Manages the lifecycle of {@link TaskReadinessGate} extensions on behalf of a
* {@link TaskPollingMonitor}: discovers registered gates via PF4J, runs each gate's
* {@code prepare} call on a managed virtual-thread executor, tracks the resulting
* futures per task, and surfaces completion to the monitor via {@link #isReady}.
*
* <p>The monitor delegates to this class from {@code schedule}, {@code canSubmit},
* and {@code evict}. When no plugin registers a gate, all operations are no-ops and
* no executor is created.
*
* @author Rob Syme <rob.syme@seqera.io>
*/
@Slf4j
@CompileStatic
class TaskGateManager {
Comment thread
pditommaso marked this conversation as resolved.

private final List<TaskReadinessGate> gates

private ExecutorService executor

private final ConcurrentMap<TaskHandler, List<Future<?>>> futuresByHandler = new ConcurrentHashMap<>()

TaskGateManager(Session session) {
this(session, Plugins.getExtensions(TaskReadinessGate))
}

@PackageScope
TaskGateManager(Session session, List<TaskReadinessGate> gates) {
this.gates = gates
if( gates ) {
this.executor = Threads.useVirtual()
? Executors.newVirtualThreadPerTaskExecutor()
: Executors.newCachedThreadPool(new CustomThreadFactory('TaskReadinessGate'))
session?.onShutdown { executor.shutdownNow() }
log.debug "Registered ${gates.size()} task readiness gate(s): ${gates*.class*.simpleName}"
}
}

/**
* Submit each registered gate's {@code prepare} call for the given handler. The
* caller is responsible for serializing concurrent calls for the same handler
* (in {@code TaskPollingMonitor} this happens under {@code pendingLock}).
*/
void submit(TaskHandler handler) {
Comment thread
pditommaso marked this conversation as resolved.
Outdated
if( !gates ) return

final futures = new ArrayList<Future<?>>(gates.size())
for( TaskReadinessGate g : gates ) {
final gate = g // capture in a fresh local for the async closure
futures << executor.submit({ gate.prepare(handler) } as Callable)
}
futuresByHandler.put(handler, futures)
}

/**
* Return {@code true} when every gate's future for this handler has completed
* successfully. Returns {@code false} while at least one future is still running.
* Throws {@link ProcessException} when any gate has failed; identity-preserves
* {@code ProcessException} causes and wraps everything else so retry markers
* carried on the cause reach {@link TaskProcessor#resumeOrDie} intact.
*/
boolean isReady(TaskHandler handler) {
final futures = futuresByHandler.get(handler)
if( !futures ) return true

for( f in futures ) {
if( !f.isDone() ) return false
if( f.isCancelled() ) {
futures*.cancel(true)
futuresByHandler.remove(handler)
throw new ProcessException("Task readiness gate was cancelled for task '${handler.task.name}'")
}
try { f.get() }
catch( ExecutionException e ) {
// cancel peer gates so their work doesn't outlive the failing task
futures*.cancel(true)
futuresByHandler.remove(handler)
final cause = e.cause
if( cause instanceof ProcessException )
throw (ProcessException) cause
throw new ProcessException("Task readiness gate failed for task '${handler.task.name}'", cause ?: e)
}
catch( InterruptedException e ) {
Thread.currentThread().interrupt()
return false
}
}
futuresByHandler.remove(handler)
return true
}

/**
* Cancel any in-flight gate futures for the given handler. Idempotent — safe to
* call whether or not gate state exists for the handler.
*/
void evict(TaskHandler handler) {
futuresByHandler.remove(handler)?.each { it.cancel(true) }
}

/** Visible for testing: snapshot of currently tracked handlers. */
@PackageScope
Set<TaskHandler> trackedHandlers() {
Collections.unmodifiableSet(futuresByHandler.keySet())
}

/** Visible for testing: futures for a given handler, or null. */
@PackageScope
List<Future<?>> futuresFor(TaskHandler handler) {
futuresByHandler.get(handler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock

import com.google.common.util.concurrent.RateLimiter
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.SysEnv
Expand Down Expand Up @@ -132,6 +133,9 @@ class TaskPollingMonitor implements TaskMonitor {

private boolean enableAsyncFinalizer = SysEnv.getBool('NXF_ENABLE_ASYNC_FINALIZER',true)

@PackageScope
TaskGateManager gateManager = new TaskGateManager(null, [])

/**
* Create the task polling monitor with the provided named parameters object.
* <p>
Expand Down Expand Up @@ -248,7 +252,10 @@ class TaskPollingMonitor implements TaskMonitor {
* by the polling monitor
*/
protected boolean canSubmit(TaskHandler handler) {
(capacity > 0 ? checkQueueCapacity(handler) : true) && handler.canForkProcess() && handler.isReady()
gateManager.isReady(handler) \
&& handler.canForkProcess() \
&& handler.isReady() \
&& (capacity > 0 ? checkQueueCapacity(handler) : true)
Comment on lines +260 to +263
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add this extension point, I think we should move the logic in handler.isReady() into a task gate. It can be a follow-up effort

That method is currently only used by Wave to make sure that the container is resolved before submitting the task, which seems like a clear use case for this extension point

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I'd keep into a follow-up effort to keep this PR self-contained

}

/**
Expand Down Expand Up @@ -305,6 +312,7 @@ class TaskPollingMonitor implements TaskMonitor {
pendingLock.lock()
try{
pendingQueue << handler
gateManager.submit(handler)
taskAvail.signal() // signal that a new task is available for execution
notifyTaskPending(handler)
log.trace "Scheduled task > $handler"
Expand All @@ -329,6 +337,8 @@ class TaskPollingMonitor implements TaskMonitor {
return false
}

gateManager.evict(handler)

if( remove(handler) ) {
pendingLock.lock()
try {
Expand All @@ -351,6 +361,8 @@ class TaskPollingMonitor implements TaskMonitor {
*/
@Override
TaskMonitor start() {
this.gateManager = new TaskGateManager(session)

log.debug ">>> barrier register (monitor: ${this.name})"
session.barrier.register(this)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import groovy.transform.CompileStatic
import org.pf4j.ExtensionPoint

/**
* Plugin extension point that defers task submission until an external precondition is met.
*
* <p>Implementations are invoked once per task by the scheduler, on a managed background
* thread (virtual thread when available). The task is admitted for submission when this
* method returns; throw to mark the task as permanently failed and route the cause through
* the task's {@code errorStrategy}.
*
* <p>Implementations may block freely — {@code Thread.sleep}, network calls, long polling.
* The scheduler thread is never blocked by this call.
*
* <p>Implementations <b>must</b> honor {@code Thread.interrupt()} so that task eviction,
* workflow abort, and the {@code executor.gateMaxWait} backstop can unblock {@code prepare}
* promptly. Use interruptible primitives ({@code Thread.sleep}, blocking I/O on NIO
* channels, {@code Future.get}) and propagate {@code InterruptedException}.
*
* <p>When multiple gates are registered, a task is admitted only when every gate's
* {@code prepare} method has returned successfully. Evaluation order across gates is
* unspecified; all gates start in parallel on the managed executor.
*
* <p>Per-process opt-out is available via the {@code hints} directive — gates that wish
* to support it should check a namespaced hint key (e.g. {@code 'glacier/skip': true}) and
* return immediately when set. No core mechanism is required.
*/
@CompileStatic
interface TaskReadinessGate extends ExtensionPoint {
void prepare(TaskHandler handler) throws InterruptedException
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class ParallelPollingMonitorTest extends Specification {
and:
def processor = Mock(TaskProcessor)
def arrayHandler = Mock(TaskHandler) {
canForkProcess() >> true
isReady() >> true
getTask() >> Mock(TaskArrayRun) {
getName() >> 'oversized_array'
getArraySize() >> 10
Expand Down
Loading
Loading