Add TaskReadinessGate plugin extension point#7158
Conversation
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
…path Signed-off-by: Rob Syme <rob.syme@gmail.com>
…n routing TaskProcessor.resumeOrDie inspects error.cause for ProcessRetryableException and CloudSpotTerminationException markers. Rethrowing a RuntimeException implementing ProcessRetryableException as-is would lose retry routing because the marker would end up as `error`, not `error.cause`. Restore the original plan's wrap-all-non-ProcessException semantics and add tests covering retry marker routing and peer-future cancellation. Also fix a Groovy for-loop closure-capture issue in schedule() that caused multi-gate submissions to invoke the wrong gate, surfaced by the new peer-cancellation test. Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
…canSubmit reorder Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
Signed-off-by: Rob Syme <rob.syme@gmail.com>
pditommaso
left a comment
There was a problem hiding this comment.
Thanks for picking this up. The shape is close, but I'd like three changes before we merge.
1. Move gate orchestration into its own manager class
TaskPollingMonitor is already ~930 lines of subtle concurrency, and this PR adds ~80 lines spread across new fields (readinessGates, gateExecutor, gateStates, gateMaxWait), the GateState inner class, the allGatesReady method, plus touch points in start, schedule, canSubmit, evict, and a constructor reshuffle.
I'd like the gate concern extracted into a TaskGateManager (or similar) that owns the gates list, the virtual-thread executor, the future map, and the timeout policy. The monitor diff then collapses to ~4 one-liners: construct the manager in start(), manager.submit(handler) in schedule, manager.isReady(handler) in canSubmit, manager.evict(handler) in evict. The constructor lock-init change should then stand or fall on its own merit, separate from the gate work.
2. Replace executor.gateMaxWait with a hints entry — let the plugin own the timeout
Drop gateMaxWait from core entirely. The plugin (e.g. Glacier) reads its own per-process hint — hints 'glacier/maxWait': '5h' — and enforces its deadline itself, throwing through prepare. This keeps the core SPI minimal (the original prepare(handler) interface only), puts timeout policy where it's actually understood, and gives natural per-process scoping. A plugin that ignores both interrupts and its own deadline can hang slots — but a plugin that ignores interrupts already breaks eviction, so the core safety net was always partial.
3. If any gateMaxWait-style knob survives, default it to ~5 min, not 24h
24h is functionally "no limit" — a stuck gate burns a full day of slot before surfacing. A 5-min default forces realistic waits (Glacier Standard ~5h, Deep Archive Bulk ~48h) to be explicit and opt in, which is more discoverable and fails fast in tests/dev. Moot if #2 lands and the executor-level knob is removed.
Per code review on PR #7158: a one-size-fits-all executor-level timeout doesn't compose with workload variability (Glacier Standard hours vs Deep Archive Bulk days), and a "safety net" that requires plugins to honor interrupts is moot when uncooperative plugins already break eviction. Plugins enforce their own deadlines, typically by reading per-process hints (e.g. 'glacier/maxWait') from handler.task.config.hints. Documents the recommended pattern in the developer page. Signed-off-by: Rob Syme <rob.syme@gmail.com>
Per code review on PR #7158: move the readiness-gate orchestration into a dedicated TaskGateManager. The monitor diff against upstream collapses to field + ~3 one-liners (submit/isReady/evict), making the gate concern reviewable in isolation from the monitor's existing concurrency. The new TaskGateManagerTest carries the behavioural coverage that previously lived in TaskPollingMonitorReadinessGateTest; the latter shrinks to a few integration smoke tests pinning the monitor's three delegation points. Signed-off-by: Rob Syme <rob.syme@gmail.com>
With TaskGateManager extracted, the property-style monitor construction used in tests no longer dereferences locks before start() runs. Move lock/condition initialization back to start() to keep TaskPollingMonitor unchanged from upstream apart from the gate delegation lines. Drops the redundant TaskPollingMonitorReadinessGateTest — the gate behaviour is fully covered by TaskGateManagerTest, and the monitor's three delegation points are simple one-liners that don't merit a separate integration test layer. Signed-off-by: Rob Syme <rob.syme@gmail.com>
✅ Deploy Preview for nextflow-docs-staging canceled.
|
|
Thanks @pditommaso — pushed an updated branch addressing all three points. 1. 2. 3. Constructor lock-init refactor reverted. With |
| * caller is responsible for serializing concurrent calls for the same handler | ||
| * (in {@code TaskPollingMonitor} this happens under {@code pendingLock}). | ||
| */ | ||
| void submit(TaskHandler handler) { |
There was a problem hiding this comment.
I'd call this prepare to not confuse with task scheduling/submission logic and keep near Gate "prepare" semantic
There was a problem hiding this comment.
Yeah, great suggestion. Fixing...
There was a problem hiding this comment.
Renamed in c7445c2. Reads cleanly at the call site (gateManager.prepare(handler)) and mirrors the SPI method name.
| */ | ||
| @Slf4j | ||
| @CompileStatic | ||
| class TaskGateManager { |
There was a problem hiding this comment.
Nit, maybe TaskGateHandler is a bit more idiomatic
There was a problem hiding this comment.
Hesitant on this one. Looking at the existing naming conventions:
*Handlerin this repo is overwhelmingly per-task lifecycle:TaskHandler,CachedTaskHandler,StoredTaskHandler,NativeTaskHandler,LocalTaskHandler,GridTaskHandler. (ContainerHandlerandProcessEntryHandlerexist but are outliers.)*Manageris for session-scoped orchestrators:ThreadPoolManager,LockManager,AssetManager.
TaskGateManager is session-scoped (one per monitor, orchestrates across all tasks), so it fits the Manager pattern. Naming it TaskGateHandler and placing it next to TaskHandler in nextflow.processor would, I think, suggest a per-task object — the opposite of what it is.
Happy to rename if you'd still prefer TaskGateHandler after that, but flagging the per-task naming collision in case it changes your read.
|
Thanks @robsyme — the three follow-ups land cleanly. Must fixStale
Plugin authors reading the interface doc will look for an option that no longer exists. Suggested rewording: Nice to have
Everything else looks good — test coverage is thorough, exception identity preservation is correctly pinned (incl. |
- TaskReadinessGate javadoc: remove stale reference to dropped executor.gateMaxWait config option; point at the developer guide for the recommended hints-based timeout pattern. - TaskPollingMonitor.gateManager: document why the field is pre-initialised to an empty manager (supports Spock property-style construction). - canSubmit AND-chain: switch to leading-&& continuation style matching FilePorter, ConfigValidator, and other Groovy 4 code in this repo. - TaskGateManager.isReady InterruptedException branch: comment the intent (polling thread interrupted, restore flag, leave state for eviction). Signed-off-by: Rob Syme <rob.syme@gmail.com>
|
Re-review nits addressed in 34f8fed:
|
|
Thanks for the comments Paolo. Much appreciated. Before we merge this, I want to see what it's actually like to use in a plugin. I'll do a private glacier restore plugin to catch any DX sharp edges with the new interface. Will report back here what I find. |
bentsherman
left a comment
There was a problem hiding this comment.
Overall, I am skeptical about adding an extension point like this. The task polling code is pretty complicated and hard to debug. I would like to have some confidence that this approach is truly the best way to implement something like a pre-task Glacier restore
It's not just about a change being small and "purely additive" -- the overall system needs to remain coherent
For example, I am seeing a mix of push/pull semantics -- a task gate is triggered by calling prepare() (push), but the TaskHandler already has an isReady() method (pull). This makes the overall system more difficult to reason about (e.g. to know all of the preconditions for a task). This might be resolved by moving this core logic into a task gate (see my comment below)
I'm also not sure about the name -- "task readiness gate". Not necessarily opposed to it, but I think it bears more investigation. There are a million extension systems out there, so it would be good to use something that has some historical precedent ("topic" channels -> Kafka topics, "hints" directive -> compiler hints, etc)
| gateManager.isReady(handler) | ||
| && handler.canForkProcess() | ||
| && handler.isReady() | ||
| && (capacity > 0 ? checkQueueCapacity(handler) : true) |
There was a problem hiding this comment.
If we add this extension point, I think we should move the logic in handler.isReady() into a task gate. It can be a follow-up effort
That method is currently only used by Wave to make sure that the container is resolved before submitting the task, which seems like a clear use case for this extension point
The ADR originally described a polling boolean isReady(TaskRun) contract. Review on the implementation PR (#7158) led to substantive changes that need to be reflected here: - SPI is blocking void prepare(TaskHandler) throws InterruptedException, not polling. - Orchestration lives in a new TaskGateManager class; TaskPollingMonitor delegates via three one-liners. - No executor.gateMaxWait config option — plugins own timeout policy via the existing hints directive (e.g. glacier/maxWait). - Per-process opt-out via hints (no new directive). - Exception unwrap preserves ProcessException identity and wraps everything else, so ProcessRetryableException markers reach resumeOrDie via cause. - AbstractAsyncReadinessGate helper is permanently a non-goal (not just deferred), since the blocking design needs no async wrapper. Status moved from draft to accepted. Signed-off-by: Rob Syme <rob.syme@gmail.com>
Mirrors the gate's own prepare() method name and avoids the overloaded 'submit' vocabulary in TaskPollingMonitor, which already uses submit for task-scheduling semantics. Signed-off-by: Rob Syme <rob.syme@gmail.com>
Summary
Implements the
TaskReadinessGateplugin extension point described in ADRadr/20260516-task-readiness-gate.md(#7151), using the blocking-preparedesign that emerged from review of that ADR.Plugins implement
void prepare(TaskHandler) throws InterruptedException. The method may block freely; core runs each registered gate on a managed virtual-thread executor and polls the resultingFuturefromcanSubmit. Throwing routes through the existingerrorStrategymachinery insubmitPendingTasks. Anexecutor.gateMaxWaitconfig option (default24h) bounds wait time as a safety net for stuck gates.Behavior is bit-identical when no plugin registers a gate.
Why this shape
The ADR originally proposed a polling
boolean isReady(TaskRun)contract that required plugin authors to manage their own async state and "return promptly." Paolo's review of the ADR (#7151) suggested a blockingpreparewith core owning the async runtime — smaller SPI, fewer footguns for plugin authors, noAbstractAsyncReadinessGatehelper needed. This PR implements that variant.What's in the PR
nextflow.processor.TaskReadinessGate— new SPI interface (org.pf4j extension).TaskPollingMonitor— three integration points:start(): resolves gates viaPlugins.getExtensions(TaskReadinessGate)and lazily creates a virtual-thread executor (no overhead when no gates registered).schedule(): submits each gate'sprepare(handler)to the managed executor immediately on enqueue, so async work starts before any executor slot frees up.canSubmit(): AND-chain reordered to checkallGatesReadyfirst; futures polled viaisDone, exceptions unwrapped, peer futures cancelled on failure,gateMaxWaitenforced.evict(): cancels in-flight gate futures so background work doesn't outlive the task.ExecutorConfig.gateMaxWait— newDurationconfig option, default24h(covers all Glacier Standard/Bulk restore tiers; Deep Archive Bulk users need to bump).ProcessRetryableExceptionmarker routing througherror.cause), checked-exception wrapping, external cancellation, multi-gate AND semantics, fail-fast peer cancellation,gateMaxWaittimeout, eviction cancellation.docs/developer/task-readiness-gate.md) and config reference entry.Notable side-effect change
TaskPollingMonitor's lock/condition fields (pendingLock,taskCompleteLock,taskAvail,slotAvail,taskComplete) are now initialised in the constructor rather thanstart(). The previous shape left thesenulluntilstart()was called, which made the new gate-submission code inschedule()unsafe to exercise from property-style construction (used by Spock tests). The change is safe becauseExecutor.init → createTaskMonitor → startis the only production path and lock initialisation has no dependency onstart's side effects.What's out of scope
AbstractAsyncReadinessGatehelper class — not needed under this design (gates write blocking code directly).hintsdirective; plugins inspecthandler.task.config.hintsand short-circuit. No new directive required.Test plan
:nextflow:testgreenTaskPollingMonitorTest,ParallelPollingMonitorTest,ExecutorConfigTest,TaskPollingMonitorReadinessGateTest(new) all pass locally./gradlew :nextflow:test --tests 'nextflow.processor.*' --tests 'nextflow.executor.ExecutorConfigTest'Links
adr/20260516-task-readiness-gate.md(merged as ADR: TaskReadinessGate plugin extension point #7151)🤖 Generated with Claude Code