Skip to content

Commit dd87b2f

Browse files
exthnetpditommasobentshermanchristopher-hakkaart
authored
New executor for Fujitsu Technical Computing Suite (TCS) (#5928)
Signed-off-by: Paolo Di Tommaso <[email protected]> Signed-off-by: Satoshi OHSHIMA <[email protected]> Signed-off-by: Ben Sherman <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]> Co-authored-by: Ben Sherman <[email protected]> Co-authored-by: Chris Hakkaart <[email protected]>
1 parent 118fce0 commit dd87b2f

File tree

4 files changed

+442
-1
lines changed

4 files changed

+442
-1
lines changed

docs/executor.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,3 +445,35 @@ Nextflow does not provide direct support for SLURM multi-clusters. If you need t
445445
:::{versionadded} 23.07.0-edge
446446
Some SLURM clusters require memory allocations to be specified with `--mem-per-cpu` instead of `--mem`. You can specify `executor.perCpuMemAllocation = true` in the Nextflow configuration to enable this behavior. Nextflow will automatically compute the memory per CPU for each task (by default 1 CPU is used).
447447
:::
448+
449+
(tcs-executor)=
450+
451+
## TCS
452+
453+
The `tcs` executor allows you to run your pipeline script using a [Fujitsu Technical Computing Suite (TCS)](https://software.fujitsu.com/jp/manual/manualindex/p21000155e.html).
454+
455+
Nextflow manages each process as a separate job that is submitted to the cluster using the `pjsub` command.
456+
457+
The pipeline must be launched from a node where the `pjsub` command is available, which is typically the login node.
458+
459+
To enable the TCS executor, set `process.executor = 'tcs'` in the `nextflow.config` file.
460+
461+
Resource requests and other job characteristics can be controlled via the following process directives:
462+
463+
- {ref}`process-clusterOptions`
464+
- {ref}`process-time`
465+
466+
:::{note}
467+
Use `clusterOptions` to specify system-dependent options such as queue (resource group), CPU, and node. These options vary across target systems and are not standardized. They correspond to `-L` options in the arguments of the `pjsub` command and should be configured according to the requirements of the specific cluster environment.
468+
469+
For example:
470+
471+
```groovy
472+
process {
473+
executor = 'tcs'
474+
time = '00:30:00'
475+
clusterOptions = '-L rscgrp=a-batch -L vnode-core=4'
476+
}
477+
```
478+
:::
479+

modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class ExecutorFactory {
5959
'nqsii': NqsiiExecutor,
6060
'moab': MoabExecutor,
6161
'oar': OarExecutor,
62-
'hq': HyperQueueExecutor
62+
'hq': HyperQueueExecutor,
63+
'tcs': TcsExecutor
6364
]
6465

6566
@PackageScope Map<String, Class<? extends Executor>> executorsMap
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package nextflow.executor
16+
17+
import java.nio.file.Path
18+
import java.util.regex.Pattern
19+
20+
import groovy.transform.CompileStatic
21+
import groovy.util.logging.Slf4j
22+
import nextflow.processor.TaskArrayRun
23+
import nextflow.processor.TaskRun
24+
25+
/*
26+
* Implements a executor for Fujitsu Technical Computing Suite
27+
*
28+
* https://software.fujitsu.com/jp/manual/manualindex/p21000155e.html
29+
*
30+
* @author Satoshi Ohshima <[email protected]>
31+
*/
32+
@Slf4j
33+
@CompileStatic
34+
class TcsExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
35+
36+
static private final Pattern SUBMIT_REGEX = ~/\[INFO\] PJM 0000 pjsub Job (\d+) submitted./
37+
38+
/**
39+
* Modify job name for TCS on Fugaku.
40+
*
41+
* @param name
42+
*/
43+
static String modName(String name) {
44+
return name
45+
.replaceAll("\\(", "")
46+
.replaceAll("\\)", "")
47+
}
48+
49+
/**
50+
* Gets the directives to submit the specified task to the cluster for execution
51+
*
52+
* @param task A {@link TaskRun} to be submitted
53+
* @param result The {@link List} instance to which add the job directives
54+
* @return A {@link List} containing all directive tokens and values.
55+
*/
56+
protected List<String> getDirectives(TaskRun task, List<String> result) {
57+
assert result != null, "Argument 'result' cannot be null"
58+
59+
result << '-N' << modName(getJobNameFor(task))
60+
61+
// max task duration
62+
if( task.config.getTime() ) {
63+
final duration = task.config.getTime()
64+
result << "-L" << "elapse=${duration.format('HH:mm:ss')}".toString()
65+
}
66+
67+
// output file
68+
if( task.isArray() ) {
69+
// If task is array job (bulk job), don't indicate output file (use default setting).
70+
// * TCS doesn't support /dev/null for output. (depend on system?)
71+
} else {
72+
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
73+
}
74+
75+
// output options
76+
result << '-j' << '' // marge stderr to stdout
77+
result << '-S' << '' // output information
78+
79+
// -- at the end append the command script wrapped file name
80+
addClusterOptionsDirective(task.config, result)
81+
82+
return result
83+
}
84+
85+
/**
86+
* The command line to submit this job
87+
*
88+
* @param task The {@link TaskRun} instance to submit for execution to the cluster
89+
* @param scriptFile The file containing the job launcher script
90+
* @return A list representing the submit command line
91+
*/
92+
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile ) {
93+
if( task instanceof TaskArrayRun ) {
94+
final arraySize = task.getArraySize()
95+
return pipeLauncherScript()
96+
? List.of('pjsub', '--bulk --sparam ', "0-${arraySize - 1}".toString())
97+
: List.of('pjsub', scriptFile.getName())
98+
} else {
99+
return pipeLauncherScript()
100+
? List.of('pjsub')
101+
: List.of('pjsub', scriptFile.getName())
102+
}
103+
}
104+
105+
protected String getHeaderToken() { '#PJM' }
106+
107+
/**
108+
* Parse the string returned by the {@code pjsub} command and extract the job ID string
109+
*
110+
* @param text The string returned when submitting the job
111+
* @return The actual job ID string
112+
*/
113+
@Override
114+
def parseJobId( String text ) {
115+
for( String line : text.readLines() ) {
116+
log.warn1 line
117+
def m = SUBMIT_REGEX.matcher(line)
118+
if( m.find() ) {
119+
return m.group(1).toString()
120+
}
121+
}
122+
throw new IllegalArgumentException("Invalid TCS submit response:\n$text\n\n")
123+
}
124+
125+
@Override
126+
protected List<String> getKillCommand() { ['pjdel'] }
127+
128+
@Override
129+
protected List<String> queueStatusCommand(Object queue) {
130+
return ['pjstat', '-E']
131+
}
132+
133+
static private Map<String,QueueStatus> STATUS_MAP = [
134+
'ACC': QueueStatus.PENDING, // accepted
135+
'QUE': QueueStatus.PENDING, // wait for running
136+
'RNA': QueueStatus.RUNNING, // preparing
137+
'RUN': QueueStatus.RUNNING, // running
138+
'RNO': QueueStatus.RUNNING, // cleanup
139+
'EXT': QueueStatus.DONE, // finished
140+
'CCL': QueueStatus.DONE, // canceled
141+
'HLD': QueueStatus.HOLD, // holding
142+
'ERR': QueueStatus.ERROR, // error
143+
]
144+
145+
@Override
146+
protected Map<String, QueueStatus> parseQueueStatus(String text) {
147+
final result = new LinkedHashMap<String, QueueStatus>()
148+
text.eachLine { String line ->
149+
def cols = line.split(/\s+/)
150+
if( cols.size() > 1 ) {
151+
result.put( cols[0], STATUS_MAP.get(cols[3]) )
152+
}
153+
else {
154+
log.debug "[TCS] invalid status line: `$line`"
155+
}
156+
}
157+
158+
return result
159+
}
160+
161+
@Override
162+
String getArrayIndexName() {
163+
return 'PJM_BULKNUM'
164+
}
165+
166+
@Override
167+
int getArrayIndexStart() {
168+
return 0
169+
}
170+
171+
@Override
172+
String getArrayTaskId(String jobId, int index) {
173+
assert jobId, "Missing 'jobId' argument"
174+
return "${jobId}[${index}]"
175+
}
176+
177+
}

0 commit comments

Comments
 (0)