diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy
index 39d3c2e173..122deded2b 100644
--- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy
+++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/LogsCheckpoint.groovy
@@ -16,6 +16,9 @@
package io.seqera.tower.plugin
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
@@ -26,7 +29,27 @@ import nextflow.util.Duration
import nextflow.util.Threads
/**
* Implements a nextflow observer that periodically checkpoint
- * log, report and timeline files
+ * log, report and timeline files.
+ *
+ *
Concurrency design
+ *
+ * A single daemon worker thread loops forever, sleeping for {@code interval}
+ * between checkpoints. The sleep is implemented as {@code stopLatch.await(interval)}
+ * rather than {@code Thread.sleep}: when shutdown is requested we {@code countDown()}
+ * the latch, which wakes the worker immediately without ever setting the
+ * thread's interrupt flag. Avoiding the interrupt flag matters because cloud SDKs
+ * (e.g. the AWS S3 client) observe it and abort in-flight uploads — that side effect
+ * was the source of repeated bugs in earlier versions of this class.
+ *
+ * Why shutdown can never hang the head job
+ *
+ * {@link #onFlowComplete()} / {@link #onFlowError} run on the main shutdown thread.
+ * They signal the worker and then {@code join} it for at most {@code terminateTimeout}.
+ * If the worker is stuck in a hung network upload inside {@code saveFiles()} the join
+ * times out and we abandon the worker. Because it is a daemon thread it cannot
+ * keep the JVM alive, so the head job exits cleanly. The final, authoritative log upload
+ * is performed elsewhere (see {@code CacheCommand}), so abandoning this best-effort
+ * periodic uploader loses nothing.
*
* @author Paolo Di Tommaso
*/
@@ -38,50 +61,68 @@ class LogsCheckpoint implements TraceObserverV2 {
private Map config
private Thread thread
private Duration interval
+ private Duration terminateTimeout
private LogsHandler handler
- private final Object lock = new Object()
+ private final CountDownLatch stopLatch = new CountDownLatch(1)
@Override
void onFlowCreate(Session session) {
this.session = session
this.config = session.config
- this.handler = new LogsHandler(session, SysEnv.get())
+ this.handler = createHandler()
this.interval = config.navigate('tower.logs.checkpoint.interval', defaultInterval()) as Duration
+ this.terminateTimeout = config.navigate('tower.logs.checkpoint.terminateTimeout', defaultTerminateTimeout()) as Duration
thread = Threads.start('tower-logs-checkpoint', this.&run)
}
+ protected LogsHandler createHandler() {
+ new LogsHandler(session, SysEnv.get())
+ }
+
private String defaultInterval() {
SysEnv.get('TOWER_LOGS_CHECKPOINT_INTERVAL','90s')
}
+ private String defaultTerminateTimeout() {
+ SysEnv.get('TOWER_LOGS_CHECKPOINT_TERMINATE_TIMEOUT','120s')
+ }
+
@Override
void onFlowComplete() {
- synchronized(lock) {
- thread.interrupt()
- }
- thread.join()
+ stop()
}
@Override
void onFlowError(TaskEvent event) {
- synchronized(lock) {
- thread.interrupt()
+ stop()
+ }
+
+ /**
+ * Signal the worker thread to stop and wait a bounded amount of time for it to
+ * terminate. Never blocks shutdown indefinitely: if the worker is stuck in a hung
+ * upload it is abandoned once {@code terminateTimeout} elapses.
+ */
+ protected void stop() {
+ if( thread==null )
+ return
+ // wake the worker from its interval wait without touching the interrupt flag
+ stopLatch.countDown()
+ try {
+ thread.join(terminateTimeout.toMillis())
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt()
}
- thread.join()
+ if( thread.isAlive() )
+ log.warn "Logs checkpoint thread did not terminate within ${terminateTimeout} - abandoning it to allow the run to shut down"
}
protected void run() {
log.debug "Starting logs checkpoint thread - interval: ${interval}"
try {
- while( true ) {
- await(interval)
- if( Thread.currentThread().isInterrupted() )
- break
- synchronized(lock) {
- if( Thread.currentThread().isInterrupted() )
- break
- handler.saveFiles()
- }
+ // await() returns true when a stop has been requested, false on timeout
+ while( !await(interval) ) {
+ handler.saveFiles()
}
}
finally {
@@ -89,13 +130,20 @@ class LogsCheckpoint implements TraceObserverV2 {
}
}
- protected void await(Duration interval) {
+ /**
+ * Wait up to {@code interval} for a stop request.
+ *
+ * @return {@code true} if a stop was requested (the loop must terminate),
+ * {@code false} if the interval elapsed (time to checkpoint the logs).
+ */
+ protected boolean await(Duration interval) {
try {
- Thread.sleep(interval.toMillis())
+ return stopLatch.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
catch (InterruptedException e) {
log.debug "Interrupted logs checkpoint thread"
Thread.currentThread().interrupt()
+ return true
}
}
}
diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy
index 4115a32637..c1d1a81e0b 100644
--- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy
+++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/LogsCheckpointTest.groovy
@@ -16,10 +16,14 @@
package io.seqera.tower.plugin
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
import nextflow.Session
import nextflow.SysEnv
import nextflow.util.Duration
import spock.lang.Specification
+import spock.lang.Timeout
import test.TestHelper
/**
@@ -35,12 +39,16 @@ class LogsCheckpointTest extends Specification {
getConfig() >> [:]
}
and:
- def checkpoint = new LogsCheckpoint()
+ def checkpoint = Spy(LogsCheckpoint)
+ checkpoint.createHandler() >> Mock(LogsHandler)
when:
checkpoint.onFlowCreate(session)
then:
checkpoint.@interval == Duration.of('90s')
+
+ cleanup:
+ checkpoint.onFlowComplete()
}
def 'should configure delay via env var' () {
@@ -51,7 +59,8 @@ class LogsCheckpointTest extends Specification {
getConfig() >> [:]
}
and:
- def checkpoint = new LogsCheckpoint()
+ def checkpoint = Spy(LogsCheckpoint)
+ checkpoint.createHandler() >> Mock(LogsHandler)
when:
checkpoint.onFlowCreate(session)
@@ -59,6 +68,7 @@ class LogsCheckpointTest extends Specification {
checkpoint.@interval == Duration.of('200s')
cleanup:
+ checkpoint.onFlowComplete()
SysEnv.pop()
}
@@ -70,7 +80,8 @@ class LogsCheckpointTest extends Specification {
getWorkDir() >> TestHelper.createInMemTempDir()
}
and:
- def checkpoint = new LogsCheckpoint()
+ def checkpoint = Spy(LogsCheckpoint)
+ checkpoint.createHandler() >> Mock(LogsHandler)
when:
checkpoint.onFlowCreate(session)
@@ -78,6 +89,118 @@ class LogsCheckpointTest extends Specification {
checkpoint.@interval == Duration.of('500s')
cleanup:
+ checkpoint.onFlowComplete()
SysEnv.pop()
}
-}
+
+ @Timeout(30)
+ def 'checkpoint worker should be a daemon thread so it cannot keep the JVM alive' () {
+ given:
+ def session = Mock(Session) {
+ getWorkDir() >> TestHelper.createInMemTempDir()
+ getConfig() >> [:]
+ }
+ and:
+ def checkpoint = Spy(LogsCheckpoint)
+ checkpoint.createHandler() >> Mock(LogsHandler)
+
+ when:
+ checkpoint.onFlowCreate(session)
+ then:
+ checkpoint.@thread.isDaemon()
+
+ cleanup:
+ checkpoint.onFlowComplete()
+ }
+
+ @Timeout(30)
+ def 'should checkpoint logs periodically' () {
+ given:
+ // count down on each save; await blocks until at least 3 saves happened
+ def saves = new CountDownLatch(3)
+ def handler = Mock(LogsHandler) {
+ saveFiles() >> { saves.countDown() }
+ }
+ def session = Mock(Session) {
+ getWorkDir() >> TestHelper.createInMemTempDir()
+ getConfig() >> [tower:[logs:[checkpoint:[interval: '100ms']]]]
+ }
+ and:
+ def checkpoint = Spy(LogsCheckpoint)
+ checkpoint.createHandler() >> handler
+
+ when:
+ checkpoint.onFlowCreate(session)
+ then:
+ saves.await(10, TimeUnit.SECONDS)
+
+ cleanup:
+ checkpoint.onFlowComplete()
+ }
+
+ @Timeout(30)
+ def 'should stop promptly when idle without waiting for the full interval' () {
+ given:
+ def handler = Mock(LogsHandler)
+ def session = Mock(Session) {
+ getWorkDir() >> TestHelper.createInMemTempDir()
+ // a very long interval: stop must NOT wait for it
+ getConfig() >> [tower:[logs:[checkpoint:[interval: '1h']]]]
+ }
+ and:
+ def checkpoint = Spy(LogsCheckpoint)
+ checkpoint.createHandler() >> handler
+ and:
+ checkpoint.onFlowCreate(session)
+ // let the worker reach its wait state
+ sleep(300)
+
+ when:
+ long t0 = System.currentTimeMillis()
+ checkpoint.onFlowComplete()
+ long elapsed = System.currentTimeMillis() - t0
+
+ then:
+ elapsed < 5_000
+ !checkpoint.@thread.isAlive()
+ // never reached a checkpoint within the 1h interval
+ 0 * handler.saveFiles()
+ }
+
+ @Timeout(30)
+ def 'should not block shutdown when saveFiles is hung on a network call' () {
+ given:
+ def entered = new CountDownLatch(1)
+ def release = new CountDownLatch(1) // never released -> saveFiles blocks forever
+ def handler = Mock(LogsHandler) {
+ saveFiles() >> { entered.countDown(); release.await() }
+ }
+ def session = Mock(Session) {
+ getWorkDir() >> TestHelper.createInMemTempDir()
+ getConfig() >> [tower:[logs:[checkpoint:[interval: '50ms', terminateTimeout: '500ms']]]]
+ }
+ and:
+ def checkpoint = Spy(LogsCheckpoint)
+ checkpoint.createHandler() >> handler
+ and:
+ checkpoint.onFlowCreate(session)
+ // wait until the worker is actually stuck inside saveFiles
+ assert entered.await(10, TimeUnit.SECONDS)
+
+ when:
+ long t0 = System.currentTimeMillis()
+ checkpoint.onFlowComplete()
+ long elapsed = System.currentTimeMillis() - t0
+
+ then:
+ // returned within ~terminateTimeout, NOT waiting for the never-ending saveFiles
+ elapsed < 5_000
+ // the stuck worker was abandoned rather than joined; it is a daemon so it
+ // cannot keep the JVM alive even though it is still technically running
+ checkpoint.@thread.isAlive()
+ checkpoint.@thread.isDaemon()
+
+ cleanup:
+ release.countDown()
+ }
+}
\ No newline at end of file