File tree Expand file tree Collapse file tree 4 files changed +63
-0
lines changed
modules/nextflow/src/main/groovy/nextflow Expand file tree Collapse file tree 4 files changed +63
-0
lines changed Original file line number Diff line number Diff line change @@ -77,6 +77,7 @@ import nextflow.trace.TraceObserverV2
7777import nextflow.trace.TraceRecord
7878import nextflow.trace.WorkflowStatsObserver
7979import nextflow.trace.event.FilePublishEvent
80+ import nextflow.trace.event.FileStagingEvent
8081import nextflow.trace.event.TaskEvent
8182import nextflow.trace.event.WorkflowOutputEvent
8283import nextflow.util.Barrier
@@ -1116,6 +1117,10 @@ class Session implements ISession {
11161117 notifyEvent(observersV2, ob -> ob. onFilePublish(event))
11171118 }
11181119
1120+ void notifyFileStaged (FileStagingEvent event ) {
1121+ notifyEvent(observersV2, ob -> ob. onFileStaged(event))
1122+ }
1123+
11191124 void notifyFlowComplete () {
11201125 notifyEvent(observersV1, ob -> ob. onFlowComplete())
11211126 notifyEvent(observersV2, ob -> ob. onFlowComplete())
Original file line number Diff line number Diff line change @@ -39,6 +39,7 @@ import groovy.transform.ToString
3939import groovy.util.logging.Slf4j
4040import nextflow.Session
4141import nextflow.exception.ProcessStageException
42+ import nextflow.trace.event.FileStagingEvent
4243import nextflow.extension.FilesEx
4344import nextflow.util.CacheHelper
4445import nextflow.util.Duration
@@ -100,6 +101,15 @@ class FilePorter {
100101 if ( batch. size() ) {
101102 log. trace " Stage foreign files: $batch "
102103 submitStagingActions(batch. foreignPaths)
104+
105+ // Notify observers about file staging completion events
106+ for ( FileCopy copy : batch. foreignPaths ) {
107+ session. notifyFileStaged(new FileStagingEvent (
108+ source : copy. source,
109+ target : copy. target
110+ ))
111+ }
112+
103113 log. trace " Stage foreign files completed: $batch "
104114 }
105115 }
Original file line number Diff line number Diff line change @@ -21,6 +21,7 @@ import groovy.transform.CompileStatic
2121import nextflow.Session
2222import nextflow.processor.TaskProcessor
2323import nextflow.trace.event.FilePublishEvent
24+ import nextflow.trace.event.FileStagingEvent
2425import nextflow.trace.event.TaskEvent
2526import nextflow.trace.event.WorkflowOutputEvent
2627
@@ -135,4 +136,11 @@ interface TraceObserverV2 {
135136 */
136137 default void onFilePublish(FilePublishEvent event) {}
137138
139+ /**
140+ * Invoked when a file staging operation completes (after the file has been copied).
141+ *
142+ * @param event
143+ */
144+ default void onFileStaged(FileStagingEvent event) {}
145+
138146}
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright 2013-2024, Seqera Labs
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package nextflow.trace.event
18+
19+ import java.nio.file.Path
20+
21+ import groovy.transform.Canonical
22+ import groovy.transform.CompileStatic
23+
24+ /**
25+ * Models a file staging event.
26+ *
27+ * @author Robrecht Cannoodt <[email protected] > 28+ */
29+ @Canonical
30+ @CompileStatic
31+ class FileStagingEvent {
32+ /**
33+ * The original source path (e.g., remote URL or path).
34+ */
35+ Path source
36+ /**
37+ * The target staged path in the work directory.
38+ */
39+ Path target
40+ }
You can’t perform that action at this time.
0 commit comments