Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,64 +38,87 @@ class LogsCheckpoint implements TraceObserverV2 {
private Map config
private Thread thread
private Duration interval
private Duration terminateTimeout
private LogsHandler handler
private volatile boolean stopped
private final Object lock = new Object()

@Override
void onFlowCreate(Session session) {
this.session = session
this.config = session.config
this.handler = new LogsHandler(session, SysEnv.get())
this.handler = createHandler()
this.interval = config.navigate('tower.logs.checkpoint.interval', defaultInterval()) as Duration
this.terminateTimeout = config.navigate('tower.logs.checkpoint.terminateTimeout', defaultTerminateTimeout()) as Duration
thread = Threads.start('tower-logs-checkpoint', this.&run)
}

protected LogsHandler createHandler() {
new LogsHandler(session, SysEnv.get())
}

private String defaultInterval() {
SysEnv.get('TOWER_LOGS_CHECKPOINT_INTERVAL','90s')
}

private String defaultTerminateTimeout() {
SysEnv.get('TOWER_LOGS_CHECKPOINT_TERMINATE_TIMEOUT','120s')
}

@Override
void onFlowComplete() {
synchronized(lock) {
thread.interrupt()
}
thread.join()
stop()
}

@Override
void onFlowError(TaskEvent event) {
stop()
}

protected void stop() {
if( thread == null )
return
synchronized(lock) {
thread.interrupt()
if( stopped )
return
stopped = true
// wake up the checkpoint thread without relying on thread interruption
lock.notifyAll()
}
thread.join()
// wait a bounded amount of time for the thread to terminate; if a saveFiles()
// upload is genuinely hung the join times out and the daemon worker is abandoned
// so it cannot keep the JVM alive and the run can shut down
thread.join(terminateTimeout.toMillis())
if( thread.isAlive() )
log.warn "Logs checkpoint thread did not terminate within ${terminateTimeout} - abandoning it to allow the run to shut down"
}

protected void run() {
log.debug "Starting logs checkpoint thread - interval: ${interval}"
try {
while( true ) {
await(interval)
if( Thread.currentThread().isInterrupted() )
break
while( !stopped ) {
synchronized(lock) {
if( Thread.currentThread().isInterrupted() )
// the stopped check must stay co-located with wait() under the same
// lock acquisition, otherwise stop() could notify between the check
// and the wait, causing a lost wake-up
if( stopped )
break
handler.saveFiles()
// releases the lock and waits until the timeout elapses or stop() notifies
lock.wait(interval.toMillis())
}
if( stopped )
break
// saveFiles() runs outside the lock so a hung upload cannot block stop()
// from acquiring the lock to signal shutdown and reach the bounded join
handler.saveFiles()
}
}
catch( InterruptedException e ) {
log.debug "Interrupted logs checkpoint thread - cause: ${e.message}"
Thread.currentThread().interrupt()
}
finally {
log.debug "Terminating logs checkpoint thread"
}
}

protected void await(Duration interval) {
try {
Thread.sleep(interval.toMillis())
}
catch (InterruptedException e) {
log.debug "Interrupted logs checkpoint thread"
Thread.currentThread().interrupt()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package io.seqera.tower.plugin

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import nextflow.Session
import nextflow.SysEnv
import nextflow.util.Duration
import spock.lang.Specification
import spock.lang.Timeout
import test.TestHelper

/**
Expand Down Expand Up @@ -80,4 +84,76 @@ class LogsCheckpointTest extends Specification {
cleanup:
SysEnv.pop()
}

def 'should start and stop the checkpoint thread' () {
given:
// long interval so the thread parks in wait and termination is driven by stop()
SysEnv.push(TOWER_LOGS_CHECKPOINT_INTERVAL: '1h')
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
getConfig() >> [:]
}
and:
def checkpoint = new LogsCheckpoint()

when:
checkpoint.onFlowCreate(session)
then:
checkpoint.@thread.isAlive()

when:
checkpoint.onFlowComplete()
then:
!checkpoint.@thread.isAlive()

cleanup:
SysEnv.pop()
}

def 'should be safe to stop when the thread was never started' () {
given:
def checkpoint = new LogsCheckpoint()

when:
checkpoint.onFlowComplete()
then:
noExceptionThrown()
}

@Timeout(30)
def 'should not block shutdown when saveFiles is hung on a network call' () {
given:
def entered = new CountDownLatch(1)
def release = new CountDownLatch(1) // never released -> saveFiles blocks forever
def handler = Mock(LogsHandler) {
saveFiles() >> { entered.countDown(); release.await() }
}
def session = Mock(Session) {
getWorkDir() >> TestHelper.createInMemTempDir()
getConfig() >> [tower:[logs:[checkpoint:[interval: '50ms', terminateTimeout: '500ms']]]]
}
and:
def checkpoint = Spy(LogsCheckpoint)
checkpoint.createHandler() >> handler
and:
checkpoint.onFlowCreate(session)
// wait until the worker is actually stuck inside saveFiles
assert entered.await(10, TimeUnit.SECONDS)

when:
long t0 = System.currentTimeMillis()
checkpoint.onFlowComplete()
long elapsed = System.currentTimeMillis() - t0

then:
// returned within ~terminateTimeout, not waiting for the never-ending saveFiles
elapsed < 5_000
// the stuck worker was abandoned rather than joined; it is a daemon so it
// cannot keep the JVM alive even though it is still technically running
checkpoint.@thread.isAlive()
checkpoint.@thread.isDaemon()

cleanup:
release.countDown()
}
}
Loading