From 541c291ac7d59af0f2e75b5bbac3b5b2f6e4eb16 Mon Sep 17 00:00:00 2001 From: Robrecht Cannoodt Date: Sun, 12 Oct 2025 06:59:24 +0200 Subject: [PATCH 1/4] Add test Signed-off-by: Robrecht Cannoodt --- .../test/groovy/nextflow/SessionTest.groovy | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy b/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy index 18710b064a..9e0dab7ab0 100644 --- a/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy @@ -462,4 +462,42 @@ class SessionTest extends Specification { true | true } + + def 'test notifyEvent is asynchronous'() { + given: + def events = [] + def latch = new java.util.concurrent.CountDownLatch(2) + def observer1 = new nextflow.trace.TraceObserverV2() { + @Override + void onFlowBegin() { + Thread.sleep(50) // Simulate slow observer + events << 'observer1' + latch.countDown() + } + } + def observer2 = new nextflow.trace.TraceObserverV2() { + @Override + void onFlowBegin() { + events << 'observer2' + latch.countDown() + } + } + + def session = new Session() + session.@observersV2 = [observer1, observer2] + + when: + def startTime = System.currentTimeMillis() + session.notifyFlowBegin() + def notifyTime = System.currentTimeMillis() - startTime + latch.await(1, java.util.concurrent.TimeUnit.SECONDS) + def completeTime = System.currentTimeMillis() - startTime + + then: + notifyTime < 50 // Should return quickly without waiting for observers + completeTime >= 50 // But observers should complete + events.size() == 2 + events.contains('observer1') + events.contains('observer2') + } } From 8172f6bfca065d6e35ab709bb07065235b7a3111 Mon Sep 17 00:00:00 2001 From: Robrecht Cannoodt Date: Sun, 12 Oct 2025 06:59:46 +0200 Subject: [PATCH 2/4] Make events asynchronous Signed-off-by: Robrecht Cannoodt --- .../src/main/groovy/nextflow/Session.groovy | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 2a07576f09..ce8bfc8a87 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -19,6 +19,7 @@ package nextflow import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths +import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -1147,12 +1148,14 @@ class Session implements ISession { private static void notifyEvent(List observers, Consumer action) { for ( int i=0; i Date: Sun, 12 Oct 2025 08:12:09 +0200 Subject: [PATCH 3/4] use Runnable for asynchronous execution Signed-off-by: Robrecht Cannoodt --- .../src/main/groovy/nextflow/Session.groovy | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index ce8bfc8a87..18ce951e16 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -1148,12 +1148,15 @@ class Session implements ISession { private static void notifyEvent(List observers, Consumer action) { for ( int i=0; i Date: Sun, 12 Oct 2025 09:18:25 +0200 Subject: [PATCH 4/4] Wait for completion before shutdown Signed-off-by: Robrecht Cannoodt --- .../src/main/groovy/nextflow/Session.groovy | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 18ce951e16..c68dc58bd4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -1118,8 +1118,11 @@ class Session implements ISession { } void notifyFlowComplete() { - notifyEvent(observersV1, ob -> ob.onFlowComplete()) - notifyEvent(observersV2, ob -> ob.onFlowComplete()) + final futures = [] + futures.addAll(notifyEvent(observersV1, ob -> ob.onFlowComplete())) + futures.addAll(notifyEvent(observersV2, ob -> ob.onFlowComplete())) + // Wait for all async notifications to complete before proceeding with shutdown + CompletableFuture.allOf(futures as CompletableFuture[]).join() } /** @@ -1131,8 +1134,11 @@ class Session implements ISession { void notifyError( TaskHandler handler ) { final trace = handler?.safeTraceRecord() - notifyEvent(observersV1, ob -> ob.onFlowError(handler, trace)) - notifyEvent(observersV2, ob -> ob.onFlowError(new TaskEvent(handler, trace))) + final futures = [] + futures.addAll(notifyEvent(observersV1, ob -> ob.onFlowError(handler, trace))) + futures.addAll(notifyEvent(observersV2, ob -> ob.onFlowError(new TaskEvent(handler, trace)))) + // Wait for all async notifications to complete before proceeding + CompletableFuture.allOf(futures as CompletableFuture[]).join() if( !errorAction ) return @@ -1145,10 +1151,11 @@ class Session implements ISession { } } - private static void notifyEvent(List observers, Consumer action) { + private static List> notifyEvent(List observers, Consumer action) { + final futures = new ArrayList>(observers.size()) for ( int i=0; i