Skip to content

ADR: TaskReadinessGate plugin extension point#7151

Open
robsyme wants to merge 2 commits into
masterfrom
adr/task-readiness-gate
Open

ADR: TaskReadinessGate plugin extension point#7151
robsyme wants to merge 2 commits into
masterfrom
adr/task-readiness-gate

Conversation

@robsyme

@robsyme robsyme commented May 16, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds an architecture decision record proposing a new TaskReadinessGate plugin extension point that defers task submission until external preconditions are met (e.g. restoring S3 objects from Glacier before an AWS Batch job tries to stage them).

The motivation is that plugins needing this capability today must subclass an executor and its task handler purely to override TaskHandler.isReady(), which:

  • forces users to opt in via process.executor = '<plugin-specific-name>' instead of a drop-in plugins { id '...' };
  • couples plugins to executor internals (e.g. @CompileStatic is incompatible with AwsBatchTaskHandler subclassing due to its proxy dispatch);
  • requires a new executor subclass per (executor × cold-storage backend) combination.

The proposed SPI is consulted by TaskPollingMonitor before submitting any task, works uniformly across every executor, and is a small additive change to core (one interface + a handful of lines at the call site). Behavior is bit-identical when no plugin registers a gate.

The ADR documents the contract (polling-based, must return promptly, exceptions signal permanent failure), the call-site integration, considered alternatives (channel operator, S3 NIO interception — both rejected with reasoning), explicit non-goals, and follow-ups left for later (AbstractAsyncReadinessGate helper, per-process scoping, gate ordering).

Status in the ADR is draft — opening for review and discussion before implementation.

Test plan

  • Reviewers (Paolo, Ben) read the ADR and confirm the SPI shape, contract, and call-site integration.
  • Confirm the non-goals (no helper class, no scoping, no ordering) are acceptable for v1.
  • Confirm the placement under nextflow.processor rather than nf-commons.
  • Once accepted, follow-up PR implements the interface, the three edits to TaskPollingMonitor, Spock specs, and developer-docs section.

Adds an architecture decision record proposing a new TaskReadinessGate
plugin extension point that defers task submission until external
preconditions are met. Replaces the current pattern of subclassing
an executor and its task handler purely to override TaskHandler.isReady(),
enabling drop-in plugin behavior and removing executor coupling.

Status: draft.
Signed-off-by: Rob Syme <rob.syme@gmail.com>
@netlify

netlify Bot commented May 16, 2026

Copy link
Copy Markdown

Deploy Preview for nextflow-docs-staging canceled.

Name Link
🔨 Latest commit 8730536
🔍 Latest deploy log https://app.netlify.com/projects/nextflow-docs-staging/deploys/6a0f3e28fab9e90008a853e3

@pditommaso

Copy link
Copy Markdown
Member

Proposal: collapse the SPI to a single blocking prepare(), run on a core-managed executor

Re-read the ADR a few times. The cleanest factoring I can see eliminates the polling contract entirely and lets the plugin author write the obvious blocking code. Core absorbs the async management, plugin authors don't see CompletableFuture, the scheduler thread is never blocked, and the SPI shrinks to one method.

The interface

package nextflow.processor

interface TaskReadinessGate extends ExtensionPoint {

    /**
     * Blocking preparation. Invoked once per task on a managed background thread
     * (virtual threads by default). The task is admitted for submission when this
     * method returns; throw to mark it as permanently failed.
     *
     * Implementations may block freely — Thread.sleep, network calls, long polling.
     * The scheduler thread is never blocked by this call.
     *
     * Implementations must honor Thread.interrupt() so that task eviction or
     * workflow abort can unblock the prepare call promptly.
     */
    void prepare(TaskHandler handler) throws InterruptedException
}

That's the whole SPI. No isReady(). No fire-once-on-enqueue/poll-on-canSubmit split. No CompletableFuture in the plugin's vocabulary. "Done" = method returned. "Failed" = method threw.

Core integration in TaskPollingMonitor

A shared executor — one pool governs total prepare-time concurrency across all gates. Easier to split into per-gate pools later than to merge.

private List<TaskReadinessGate> readinessGates = Collections.emptyList()
private ExecutorService gateExecutor

private final ConcurrentMap<TaskHandler, List<Future<?>>> gateFutures = new ConcurrentHashMap<>()

@Override
TaskMonitor start() {
    readinessGates = Plugins.getExtensions(TaskReadinessGate)
    gateExecutor = Threads.useVirtual()
        ? Executors.newVirtualThreadPerTaskExecutor()
        : Executors.newCachedThreadPool(new CustomThreadFactory('ReadinessGate'))
    session.onShutdown { gateExecutor.shutdownNow() }
    // ... existing start logic
    return this
}

@Override
void schedule(TaskHandler handler) {
    pendingLock.lock()
    try {
        pendingQueue << handler
        if( readinessGates ) {
            final futures = new ArrayList<Future<?>>(readinessGates.size())
            for( gate in readinessGates ) {
                futures << gateExecutor.submit { gate.prepare(handler) }
            }
            gateFutures.put(handler, futures)
        }
        taskAvail.signal()
        notifyTaskPending(handler)
    }
    finally {
        pendingLock.unlock()
    }
}

protected boolean canSubmit(TaskHandler handler) {
    // Gates evaluated before maxForks/capacity short-circuit, so a task waiting
    // behind a full forks limit still has its prepare() running in the background.
    allGatesReady(handler) \
        && handler.isReady() \
        && handler.canForkProcess() \
        && (capacity > 0 ? checkQueueCapacity(handler) : true)
}

private boolean allGatesReady(TaskHandler handler) {
    final futures = gateFutures.get(handler)
    if( !futures ) return true
    for( f in futures ) {
        if( !f.isDone() ) return false
        if( f.isCancelled() ) { handleGateFailure(handler, new CancellationException()); return false }
        try { f.get() }
        catch( ExecutionException e ) { handleGateFailure(handler, e.cause); return false }
        catch( InterruptedException e ) { Thread.currentThread().interrupt(); return false }
    }
    gateFutures.remove(handler)
    return true
}

@Override
boolean evict(TaskHandler handler) {
    gateFutures.remove(handler)?.each { it.cancel(true) }
    // ... existing remove + slotAvail.signal() logic
}

The scheduler thread's only gate-related work per tick is f.isDone() per gate per pending task — a volatile read on the CompletableFuture state machine, nanoseconds. All blocking work lives on the gate executor. The future cache is bounded by pendingQueue.size() + runningQueue.size() and is cleaned up on admission or eviction.

Two notable differences from the ADR's call-site design:

  1. No fire-once-on-schedule + poll-on-canSubmit split. schedule() submits once to the executor; canSubmit() polls the future. The two-call-site contortion in the current ADR was working around a constraint (the polling contract) that the new design doesn't have.
  2. Reorder canSubmit so readiness evaluates before canForkProcess()/checkQueueCapacity(). Doesn't affect the new design's correctness — the preparation is already running asynchronously — but keeps the AND chain semantically ordered (cheapest fixed-shape checks first, then capacity gates).

What a Glacier gate looks like

class GlacierReadinessGate implements TaskReadinessGate {

    @Override
    void prepare(TaskHandler handler) throws InterruptedException {
        for( S3Path p : extractS3Inputs(handler.task) ) {
            issueRestoreIfNeeded(p)              // RestoreObject — idempotent, ~10ms
            while( !isRestored(p) ) {            // HeadObject poll
                if( isPermanentlyFailed(p) ) {
                    throw new ProcessException("Glacier restore failed for $p")
                }
                Thread.sleep(60_000)             // virtual thread parks, ~zero cost
            }
        }
    }
}

That's the entire plugin. No executor wiring, no future cache, no idempotency dance, no "must return promptly" reasoning. Cross-task dedup, if wanted, stays a plugin-internal optimization (ConcurrentMap<S3Key, CompletableFuture<Void>> inside issueRestoreIfNeeded).

Why blocking, not CompletableFuture<Void> prepare()

The future-returning variant is more flexible — plugins with native async SDKs could return their SDK's future directly, skipping a parked thread. We considered it and chose against:

  • Plugin authors are domain experts (bioinformatics, cloud infra), not async-Java experts. void + throws is unambiguous; CompletableFuture chains, cancellation propagation, and CompletionException unwrapping are not.
  • Virtual threads make the "parked blocking thread" cost negligible (~1 KB/task). The async-SDK win is real but small.
  • FilePorter is already the in-tree precedent: FileTransfer is a blocking Runnable submitted to a managed pool. New extension points should follow.
  • Uniform threading model = uniform observability. One named pool, predictable thread dumps. Per-plugin executors fragment that.

The future variant is a non-breaking future extension if a real need emerges (e.g. an overload default CompletableFuture<Void> prepareAsync(TaskHandler) { ... wrap prepare in core executor ... }).

Why TaskHandler, not TaskRun

Every other method on the scheduler boundary takes TaskHandler (schedule, canSubmit, submit, evict). Making this one method take TaskRun is a small but unnecessary asymmetry. The base class is portable (lives in nextflow.processor, not in any executor module), and handler.status / submitTimeMillis are genuinely useful for gates that want to self-impose a "preparing too long, give up" timeout.

The cost is wider surface area — submit(), kill(), setStatus() are reachable from the handle. The javadoc needs an explicit "read-only; do not invoke lifecycle methods" line. Worth it for the uniformity.

What we consciously trade away

These are real and worth naming:

  • Plugin authors with native async SDKs pay a small wrap cost (~1 KB per parked virtual thread).
  • Pure synchronous predicates pay one executor submission (microseconds).
  • Cross-task dedup is plugin-internal — same as it would be in the future-returning variant, just at a slightly different layer.

None of these are blocking issues; they're the price of a smaller SPI.

Safety net worth shipping with v1

To avoid silent hangs from gates that never complete:

// in allGatesReady, before the futures loop:
final maxWait = config.getGateMaxWait()   // default: e.g. 24h, configurable
if( maxWait && System.currentTimeMillis() - handler.scheduleTimeMillis > maxWait.millis ) {
    futures.each { it.cancel(true) }
    handleGateFailure(handler, new ProcessException("Task readiness gate timed out after $maxWait"))
    return false
}

Surfaces stuck preparations as task failures, where errorStrategy 'retry' can pick them up naturally.

Backward compatibility

Identical to the ADR's claim: with no gates registered, canSubmit evaluates one empty-list check per pending task per tick; schedule adds one untaken branch. TaskHandler.isReady() is unchanged. The SPI is purely additive.

Testing

Same matrix as the ADR proposes, simpler to express:

  • No gates registered → canSubmit matches current behavior.
  • One gate that returns from prepare() immediately → task admitted as soon as capacity/forks allow.
  • One gate that sleeps then returns → task waits in pending across ticks; admitted on the tick after prepare returns.
  • Gate throws → task fails with the thrown cause; routed through errorStrategy.
  • Multiple gates → all-must-complete semantics; one failure aborts the task.
  • Eviction during prepare → future cancelled, gate's InterruptedException propagates as cancellation.
  • Gate exceeds gateMaxWait → task fails with timeout cause.

Gates injected through a test double instead of a real PF4J context, same as the ADR proposes.

Open question: per-process enable/disable?

The ADR's non-goals already list "per-process gate scoping" as out of scope, but with the runtime cost moved into core (executor submission + future tracking per task per gate), it's worth revisiting whether pipelines should be able to opt out at the process level. Something like:

process FAST_PATH {
    readinessGate false   // skip all registered gates for this process
    // ...
}

process GLACIER_DEPENDENT {
    readinessGate 'glacier'   // enable only the named gate
    // ...
}

The case for adding it:

  • A pipeline with 50 processes, only 3 of which read Glacier-stored inputs, pays one executor submission per task for the other 47 even when no gate would do meaningful work. Cheap (microseconds) but non-zero.
  • It gives users an escape hatch without uninstalling the plugin — useful when a gate has a bug or when one specific process needs to bypass it for a hotfix.
  • Composes with the existing withName / withLabel selector machinery, so the config-layer cost is small.

The case against:

  • The plugin itself can short-circuit cheaply (if (extractS3Inputs(task).isEmpty()) return). For most gates this makes the per-process toggle redundant.
  • Adds a public directive whose semantics across multiple gates need design (readinessGate false vs readinessGate ['glacier'] vs readinessGate.glacier false — there are several reasonable shapes).
  • Better deferred until a real "I want this gate off for this process" report appears.

I lean toward deferring this to a follow-up, matching the ADR's existing posture, but flagging it explicitly here because moving the runtime cost into core changes the cost-benefit slightly. Worth a sentence in the ADR's non-goals section confirming the position either way.


TL;DR vs the current ADR

Current ADR This proposal
Methods on SPI boolean isReady(TaskRun) void prepare(TaskHandler)
Plugin must be non-blocking Yes (contract) No (runs on managed executor)
Call sites in TaskPollingMonitor Two (schedule fire-once + canSubmit poll) One (schedule submit + canSubmit future-poll)
Plugin-author cognitive load "Must return promptly; first call kicks off, subsequent calls report; must be idempotent" "Write blocking code; throw if hopeless; honor interrupts"
Async runtime ownership Plugin's problem Core's problem
Helper class needed Yes (deferred to "later") No
Foot-guns Multiple (idempotency, blocking, polling cost) One (gate must honor interrupts)

Net: smaller SPI, fewer call sites, no foot-guns about scheduler-thread blocking, and the resulting plugin code is ~5 lines for the Glacier case instead of ~40. Worth the bounded extra work in core (the gate executor + future cache, ~50 lines).

Happy to draft the actual diff against TaskPollingMonitor.groovy if there's interest in moving this direction.

@robsyme

robsyme commented May 19, 2026

Copy link
Copy Markdown
Collaborator Author

Yeah, I like this suggestion. simpler interface is much nicer. Complexity in core and much smaller effort for plugin authors. @pditommaso - when we come to the actual plugin, are you ok with the a plugin-specific process directive as suggested in the comment?

process FAST_PATH {
    readinessGate false   // skip all registered gates for this process
    // ...
}

@pditommaso

Copy link
Copy Markdown
Member

Think this should go through via the new hints directive. Wdyt?

@bentsherman bentsherman changed the title Add ADR for TaskReadinessGate plugin extension point ADR: TaskReadinessGate plugin extension point May 21, 2026
@bentsherman

Copy link
Copy Markdown
Member

FYI, I updated the implementation PR to merge into this branch

Please try to keep the ADR up to date with the latest decisions, as it looks like the impl has diverged a good bit

The ADR originally described a polling boolean isReady(TaskRun) contract.
Review on the implementation PR (#7158) led to substantive changes that
need to be reflected here:

- SPI is blocking void prepare(TaskHandler) throws InterruptedException,
  not polling.
- Orchestration lives in a new TaskGateManager class; TaskPollingMonitor
  delegates via three one-liners.
- No executor.gateMaxWait config option — plugins own timeout policy via
  the existing hints directive (e.g. glacier/maxWait).
- Per-process opt-out via hints (no new directive).
- Exception unwrap preserves ProcessException identity and wraps everything
  else, so ProcessRetryableException markers reach resumeOrDie via cause.
- AbstractAsyncReadinessGate helper is permanently a non-goal (not just
  deferred), since the blocking design needs no async wrapper.

Status moved from draft to accepted.

Signed-off-by: Rob Syme <rob.syme@gmail.com>
@robsyme

robsyme commented May 21, 2026

Copy link
Copy Markdown
Collaborator Author

Updated the ADR to match the design that landed in #7158:

  • SPI is blocking void prepare(TaskHandler), not polling boolean isReady(TaskRun).
  • Orchestration lives in a new TaskGateManagerTaskPollingMonitor collapses to one field + four delegation lines.
  • No executor.gateMaxWait config option — plugins own timeout policy via the existing hints directive (glacier/maxWait, glacier/skip).
  • Cause-preservation in the exception unwrap so ProcessRetryableException markers reach resumeOrDie via error.cause.
  • AbstractAsyncReadinessGate helper is now a permanent non-goal (no async runtime for it to wrap).

Status moved from draft to accepted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants