Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ The `local` executor is useful for developing and testing a pipeline script on y

Resource requests and other job characteristics can be controlled via the following process directives:

- {ref}`process-accelerator`
- {ref}`process-cpus`
- {ref}`process-memory`
- {ref}`process-time`
Expand All @@ -241,6 +242,25 @@ The local executor supports two types of tasks:
- Script tasks (processes with a `script` or `shell` block) - executed via a Bash wrapper
- Native tasks (processes with an `exec` block) - executed directly in the JVM.

(local-accelerators)=

### Accelerators

:::{versionadded} 25.10.0
:::

The local executor can use the `accelerator` directive to allocate accelerators, such as GPUs. To use accelerators, set the corresponding environment variable:

- `CUDA_VISIBLE_DEVICES` for [NVIDIA CUDA](https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#cuda-environment-variables) applications

- `HIP_VISIBLE_DEVICES` for [HIP](https://rocm.docs.amd.com/projects/HIP/en/docs-develop/reference/env_variables.html) applications

- `ROCR_VISIBLE_DEVICES` for [AMD ROCm](https://rocm.docs.amd.com/en/latest/conceptual/gpu-isolation.html) applications

Set the environment variable to a comma-separated list of device IDs for Nextflow to access. Nextflow uses this environment variable to allocate accelerators for tasks that request them.

For example, to use all GPUs on a node with four NVIDIA GPUs, set `CUDA_VISIBLE_DEVICES` to `0,1,2,3`. If four tasks each request one GPU, they will be executed with `CUDA_VISIBLE_DEVICES` set to `0`, `1`, `2`, and `3`, respectively.

(lsf-executor)=

## LSF
Expand Down
6 changes: 6 additions & 0 deletions docs/migrations/25-10.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ export NXF_PLUGINS_REGISTRY_URL="https://raw.githubusercontent.com/nextflow-io/p
Plugin developers will not be able to submit PRs to the legacy plugin index once the plugin registry is generally available. Plugins should be updated to publish to the Nextflow plugin registry using the {ref}`Nextflow Gradle plugin <gradle-plugin-page>` instead. See {ref}`migrate-plugin-registry-page` for details.
:::

<h3>GPU scheduling for local executor</h3>

The local executor can now schedule GPUs using the `accelerator` directive. This feature is useful when running Nextflow on a single machine with multiple GPUs.

See {ref}`local-accelerators` for details.

<h3>New syntax for workflow handlers</h3>

The workflow `onComplete` and `onError` handlers were previously defined by calling `workflow.onComplete` and `workflow.onError` in the pipeline script. You can now define handlers as `onComplete` and `onError` sections in an entry workflow:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {

private volatile TaskResult result

String acceleratorEnv

List<String> acceleratorIds

LocalTaskHandler(TaskRun task, LocalExecutor executor) {
super(task)
// create the task handler
Expand Down Expand Up @@ -142,11 +146,13 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {
final workDir = task.workDir.toFile()
final logFile = new File(workDir, TaskRun.CMD_LOG)

return new ProcessBuilder()
final pb = new ProcessBuilder()
.redirectErrorStream(true)
.redirectOutput(logFile)
.directory(workDir)
.command(cmd)
prepareAccelerators(pb)
return pb
}

protected ProcessBuilder fusionProcessBuilder() {
Expand All @@ -162,10 +168,18 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {

final logPath = Files.createTempFile('nf-task','.log')

return new ProcessBuilder()
final pb = new ProcessBuilder()
.redirectErrorStream(true)
.redirectOutput(logPath.toFile())
.command(List.of('sh','-c', cmd))
prepareAccelerators(pb)
return pb
}

protected void prepareAccelerators(ProcessBuilder pb) {
if( !acceleratorEnv )
return
pb.environment().put(acceleratorEnv, acceleratorIds.join(','))
}

protected ProcessBuilder createLaunchProcessBuilder() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import groovy.transform.CompileStatic
import nextflow.SysEnv
import nextflow.util.TrackingSemaphore

/**
* Specialized semaphore that keeps track of accelerators by
* id. The id can be an integer or a UUID.
*
* @author Ben Sherman <[email protected]>
*/
@CompileStatic
class AcceleratorTracker {

private static final List<String> DEVICE_ENV_NAMES = [
'CUDA_VISIBLE_DEVICES',
'HIP_VISIBLE_DEVICES',
'ROCR_VISIBLE_DEVICES'
]

static AcceleratorTracker create() {
return create(SysEnv.get())
}

static AcceleratorTracker create(Map<String,String> env) {
return DEVICE_ENV_NAMES.stream()
.filter(name -> env.containsKey(name))
.map((name) -> {
final ids = env.get(name).tokenize(',')
return new AcceleratorTracker(name, ids)
})
.findFirst().orElse(new AcceleratorTracker())
}

private final String name
private final TrackingSemaphore semaphore

private AcceleratorTracker(String name, List<String> ids) {
this.name = name
this.semaphore = new TrackingSemaphore(ids)
}

private AcceleratorTracker() {
this(null, [])
}

String name() {
return name
}

int total() {
return semaphore.totalPermits()
}

int available() {
return semaphore.availablePermits()
}

List<String> acquire(int permits) {
return semaphore.acquire(permits)
}

void release(List<String> ids) {
semaphore.release(ids)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.executor.ExecutorConfig
import nextflow.exception.ProcessUnrecoverableException
import nextflow.executor.local.LocalTaskHandler
import nextflow.util.Duration
import nextflow.util.MemoryUnit

Expand Down Expand Up @@ -59,6 +60,11 @@ class LocalPollingMonitor extends TaskPollingMonitor {
*/
private final long maxMemory

/**
* Tracks the total and available accelerators in the system
*/
private AcceleratorTracker acceleratorTracker

/**
* Create the task polling monitor with the provided named parameters object.
* <p>
Expand All @@ -76,6 +82,7 @@ class LocalPollingMonitor extends TaskPollingMonitor {
super(params)
this.availCpus = maxCpus = params.cpus as int
this.availMemory = maxMemory = params.memory as long
this.acceleratorTracker = AcceleratorTracker.create()
assert availCpus>0, "Local avail `cpus` attribute cannot be zero"
assert availMemory>0, "Local avail `memory` attribute cannot zero"
}
Expand Down Expand Up @@ -154,6 +161,16 @@ class LocalPollingMonitor extends TaskPollingMonitor {
handler.task.getConfig()?.getMemory()?.toBytes() ?: 1L
}

/**
* @param handler
* A {@link TaskHandler} instance
* @return
* The number of accelerators requested to execute the specified task
*/
private static int accelerators(TaskHandler handler) {
handler.task.getConfig()?.getAccelerator()?.getRequest() ?: 0
}

/**
* Determines if a task can be submitted for execution checking if the resources required
* (cpus and memory) match the amount of avail resource
Expand All @@ -179,9 +196,13 @@ class LocalPollingMonitor extends TaskPollingMonitor {
if( taskMemory>maxMemory)
throw new ProcessUnrecoverableException("Process requirement exceeds available memory -- req: ${new MemoryUnit(taskMemory)}; avail: ${new MemoryUnit(maxMemory)}")

final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory
final taskAccelerators = accelerators(handler)
if( taskAccelerators > acceleratorTracker.total() )
throw new ProcessUnrecoverableException("Process requirement exceeds available accelerators -- req: $taskAccelerators; avail: ${acceleratorTracker.total()}")

final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory && taskAccelerators <= acceleratorTracker.available()
if( !result && log.isTraceEnabled( ) ) {
log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)}"
log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)} && taskAccelerators: $taskAccelerators <= availAccelerators: ${acceleratorTracker.available()}"
}
return result
}
Expand All @@ -194,9 +215,16 @@ class LocalPollingMonitor extends TaskPollingMonitor {
*/
@Override
protected void submit(TaskHandler handler) {
super.submit(handler)
availCpus -= cpus(handler)
availMemory -= mem(handler)

final taskAccelerators = accelerators(handler)
if( handler instanceof LocalTaskHandler && taskAccelerators > 0 ) {
handler.acceleratorEnv = acceleratorTracker.name()
handler.acceleratorIds = acceleratorTracker.acquire(taskAccelerators)
}

super.submit(handler)
}

/**
Expand All @@ -209,11 +237,14 @@ class LocalPollingMonitor extends TaskPollingMonitor {
* {@code true} when the task is successfully removed from polling queue,
* {@code false} otherwise
*/
@Override
protected boolean remove(TaskHandler handler) {
final result = super.remove(handler)
if( result ) {
availCpus += cpus(handler)
availMemory += mem(handler)
if( handler instanceof LocalTaskHandler )
acceleratorTracker.release(handler.acceleratorIds ?: Collections.<String>emptyList())
}
return result
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.util

import java.util.concurrent.Semaphore

import groovy.transform.CompileStatic

/**
* Specialized semaphore that keeps track of which permits
* are being used.
*
* @author Ben Sherman <[email protected]>
*/
@CompileStatic
class TrackingSemaphore {
private final Semaphore semaphore
private final Map<String,Boolean> availIds

TrackingSemaphore(List<String> ids) {
semaphore = new Semaphore(ids.size())
availIds = new HashMap<>(ids.size())
for( final id : ids )
availIds.put(id, true)
}

int totalPermits() {
return availIds.size()
}

int availablePermits() {
return semaphore.availablePermits()
}

List<String> acquire(int permits) {
semaphore.acquire(permits)
final result = new ArrayList<String>(permits)
for( final entry : availIds.entrySet() ) {
if( entry.getValue() ) {
entry.setValue(false)
result.add(entry.getKey())
}
if( result.size() == permits )
break
}
return result
}

void release(List<String> ids) {
semaphore.release(ids.size())
for( final id : ids )
availIds.put(id, true)
}

}
Loading
Loading