Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.seqera.tower.plugin

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
Expand All @@ -26,7 +29,27 @@ import nextflow.util.Duration
import nextflow.util.Threads
/**
* Implements a nextflow observer that periodically checkpoint
* log, report and timeline files
* log, report and timeline files.
*
* <h2>Concurrency design</h2>
*
* A single daemon worker thread loops forever, sleeping for {@code interval}
* between checkpoints. The sleep is implemented as {@code stopLatch.await(interval)}
* rather than {@code Thread.sleep}: when shutdown is requested we {@code countDown()}
* the latch, which wakes the worker <em>immediately</em> without ever setting the
* thread's interrupt flag. Avoiding the interrupt flag matters because cloud SDKs
* (e.g. the AWS S3 client) observe it and abort in-flight uploads — that side effect
* was the source of repeated bugs in earlier versions of this class.
*
* <h2>Why shutdown can never hang the head job</h2>
*
* {@link #onFlowComplete()} / {@link #onFlowError} run on the main shutdown thread.
* They signal the worker and then {@code join} it for at most {@code terminateTimeout}.
* If the worker is stuck in a hung network upload inside {@code saveFiles()} the join
* times out and we <em>abandon</em> the worker. Because it is a daemon thread it cannot
* keep the JVM alive, so the head job exits cleanly. The final, authoritative log upload
* is performed elsewhere (see {@code CacheCommand}), so abandoning this best-effort
* periodic uploader loses nothing.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
Expand All @@ -38,64 +61,89 @@ class LogsCheckpoint implements TraceObserverV2 {
private Map config
private Thread thread
private Duration interval
private Duration terminateTimeout
private LogsHandler handler
private final Object lock = new Object()
private final CountDownLatch stopLatch = new CountDownLatch(1)

@Override
void onFlowCreate(Session session) {
this.session = session
this.config = session.config
this.handler = new LogsHandler(session, SysEnv.get())
this.handler = createHandler()
this.interval = config.navigate('tower.logs.checkpoint.interval', defaultInterval()) as Duration
this.terminateTimeout = config.navigate('tower.logs.checkpoint.terminateTimeout', defaultTerminateTimeout()) as Duration
thread = Threads.start('tower-logs-checkpoint', this.&run)
}

protected LogsHandler createHandler() {
new LogsHandler(session, SysEnv.get())
}

private String defaultInterval() {
SysEnv.get('TOWER_LOGS_CHECKPOINT_INTERVAL','90s')
}

private String defaultTerminateTimeout() {
SysEnv.get('TOWER_LOGS_CHECKPOINT_TERMINATE_TIMEOUT','120s')
}

@Override
void onFlowComplete() {
synchronized(lock) {
thread.interrupt()
}
thread.join()
stop()
}

@Override
void onFlowError(TaskEvent event) {
synchronized(lock) {
thread.interrupt()
stop()
}

/**
* Signal the worker thread to stop and wait a bounded amount of time for it to
* terminate. Never blocks shutdown indefinitely: if the worker is stuck in a hung
* upload it is abandoned once {@code terminateTimeout} elapses.
*/
protected void stop() {
if( thread==null )
return
// wake the worker from its interval wait without touching the interrupt flag
stopLatch.countDown()
try {
thread.join(terminateTimeout.toMillis())
}
catch (InterruptedException e) {
Thread.currentThread().interrupt()
}
thread.join()
if( thread.isAlive() )
log.warn "Logs checkpoint thread did not terminate within ${terminateTimeout} - abandoning it to allow the run to shut down"
}

protected void run() {
log.debug "Starting logs checkpoint thread - interval: ${interval}"
try {
while( true ) {
await(interval)
if( Thread.currentThread().isInterrupted() )
break
synchronized(lock) {
if( Thread.currentThread().isInterrupted() )
break
handler.saveFiles()
}
// await() returns true when a stop has been requested, false on timeout
while( !await(interval) ) {
handler.saveFiles()
}
}
finally {
log.debug "Terminating logs checkpoint thread"
}
}

protected void await(Duration interval) {
/**
* Wait up to {@code interval} for a stop request.
*
* @return {@code true} if a stop was requested (the loop must terminate),
* {@code false} if the interval elapsed (time to checkpoint the logs).
*/
protected boolean await(Duration interval) {
try {
Thread.sleep(interval.toMillis())
return stopLatch.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
catch (InterruptedException e) {
log.debug "Interrupted logs checkpoint thread"
Thread.currentThread().interrupt()
return true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package io.seqera.tower.plugin

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import nextflow.Session
import nextflow.SysEnv
import nextflow.util.Duration
import spock.lang.Specification
import spock.lang.Timeout
import test.TestHelper

/**
Expand All @@ -35,12 +39,16 @@ class LogsCheckpointTest extends Specification {
getConfig() >> [:]
}
and:
def checkpoint = new LogsCheckpoint()
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> Mock(LogsHandler)

when:
checkpoint.onFlowCreate(session)
then:
checkpoint.@interval == Duration.of('90s')

cleanup:
checkpoint.onFlowComplete()
}

def 'should configure delay via env var' () {
Expand All @@ -51,14 +59,16 @@ class LogsCheckpointTest extends Specification {
getConfig() >> [:]
}
and:
def checkpoint = new LogsCheckpoint()
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> Mock(LogsHandler)

when:
checkpoint.onFlowCreate(session)
then:
checkpoint.@interval == Duration.of('200s')

cleanup:
checkpoint.onFlowComplete()
SysEnv.pop()
}

Expand All @@ -70,14 +80,127 @@ class LogsCheckpointTest extends Specification {
getWorkDir() >> TestHelper.createInMemTempDir()
}
and:
def checkpoint = new LogsCheckpoint()
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> Mock(LogsHandler)

when:
checkpoint.onFlowCreate(session)
then:
checkpoint.@interval == Duration.of('500s')

cleanup:
checkpoint.onFlowComplete()
SysEnv.pop()
}
}

@Timeout(30)
def 'checkpoint worker should be a daemon thread so it cannot keep the JVM alive' () {
given:
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
getConfig() >> [:]
}
and:
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> Mock(LogsHandler)

when:
checkpoint.onFlowCreate(session)
then:
checkpoint.@thread.isDaemon()

cleanup:
checkpoint.onFlowComplete()
}

@Timeout(30)
def 'should checkpoint logs periodically' () {
given:
// count down on each save; await blocks until at least 3 saves happened
def saves = new CountDownLatch(3)
def handler = Mock(LogsHandler) {
saveFiles() >> { saves.countDown() }
}
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
getConfig() >> [tower:[logs:[checkpoint:[interval: '100ms']]]]
}
and:
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> handler

when:
checkpoint.onFlowCreate(session)
then:
saves.await(10, TimeUnit.SECONDS)

cleanup:
checkpoint.onFlowComplete()
}

@Timeout(30)
def 'should stop promptly when idle without waiting for the full interval' () {
given:
def handler = Mock(LogsHandler)
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
// a very long interval: stop must NOT wait for it
getConfig() >> [tower:[logs:[checkpoint:[interval: '1h']]]]
}
and:
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> handler
and:
checkpoint.onFlowCreate(session)
// let the worker reach its wait state
sleep(300)

when:
long t0 = System.currentTimeMillis()
checkpoint.onFlowComplete()
long elapsed = System.currentTimeMillis() - t0

then:
elapsed < 5_000
!checkpoint.@thread.isAlive()
// never reached a checkpoint within the 1h interval
0 * handler.saveFiles()
}

@Timeout(30)
def 'should not block shutdown when saveFiles is hung on a network call' () {
given:
def entered = new CountDownLatch(1)
def release = new CountDownLatch(1) // never released -> saveFiles blocks forever
def handler = Mock(LogsHandler) {
saveFiles() >> { entered.countDown(); release.await() }
}
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
getConfig() >> [tower:[logs:[checkpoint:[interval: '50ms', terminateTimeout: '500ms']]]]
}
and:
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> handler
and:
checkpoint.onFlowCreate(session)
// wait until the worker is actually stuck inside saveFiles
assert entered.await(10, TimeUnit.SECONDS)

when:
long t0 = System.currentTimeMillis()
checkpoint.onFlowComplete()
long elapsed = System.currentTimeMillis() - t0

then:
// returned within ~terminateTimeout, NOT waiting for the never-ending saveFiles
elapsed < 5_000
// the stuck worker was abandoned rather than joined; it is a daemon so it
// cannot keep the JVM alive even though it is still technically running
checkpoint.@thread.isAlive()
checkpoint.@thread.isDaemon()

cleanup:
release.countDown()
}
}
Loading