Skip to content

Add Failsafe retry mechanism in K8s #6083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
@@ -1096,6 +1096,18 @@ The following settings are available:
`k8s.pullPolicy`
: Defines the strategy to be used to pull the container image e.g. `pullPolicy: 'Always'`.

`k8s.retryPolicy.delay`
: Delay when retrying failed API requests (default: `500ms`).

`k8s.retryPolicy.jitter`
: Jitter value when retrying failed API requests (default: `0.25`).

`k8s.retryPolicy.maxAttempts`
: Max attempts when retrying failed API requests (default: `4`).

`k8s.retryPolicy.maxDelay`
: Max delay when retrying failed API requests (default: `90s`).

`k8s.runAsUser`
: Defines the user ID to be used to run the containers. Shortcut for the `securityContext` option.

Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@

public class K8sConfig implements ConfigScope {

public K8sRetryConfig retryPolicy;

@ConfigOption
@Description("""
When `true`, host paths are automatically mounted into the task pods (default: `false`). Only intended for development purposes when using a single node.
@@ -84,12 +86,6 @@ The path where the workflow is launched and the user data is stored (default: `<
""")
public String launchDir;

@ConfigOption
@Description("""
The maximum number of retries for failed requests by the Kubernetes HTTP client (default: 4).
""")
public int maxErrorRetry;

@ConfigOption
@Description("""
The Kubernetes namespace to use (default: `default`).
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nextflow.config.scopes;

import nextflow.config.schema.ConfigOption;
import nextflow.config.schema.ConfigScope;
import nextflow.script.dsl.Description;
import nextflow.script.types.Duration;

public class K8sRetryConfig implements ConfigScope {

@ConfigOption
@Description("""
Delay when retrying failed API requests (default: `500ms`).
""")
public Duration delay;

@ConfigOption
@Description("""
Jitter value when retrying failed API requests (default: `0.25`).
""")
public double jitter;

@ConfigOption
@Description("""
Max attempts when retrying failed API requests (default: `10`).
""")
public int maxAttempts;

@ConfigOption
@Description("""
Max delay when retrying failed API requests (default: `90s`).
""")
public Duration maxDelay;

}
7 changes: 6 additions & 1 deletion plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@

package nextflow.k8s

import nextflow.k8s.client.K8sRetryConfig

import javax.annotation.Nullable

import groovy.transform.CompileStatic
@@ -233,8 +235,11 @@ class K8sConfig implements Map<String,Object> {
if( target.httpReadTimeout )
result.httpReadTimeout = target.httpReadTimeout as Duration

if( target.retryPolicy )
result.retryConfig = new K8sRetryConfig(target.retryPolicy as Map)

if( target.maxErrorRetry )
result.maxErrorRetry = target.maxErrorRetry as Integer
log.warn("Config setting 'k8s.maxErrorRetry' is deprecated. Change it to 'k8s.retryPolicy.maxAttempts'")

return result
}
13 changes: 8 additions & 5 deletions plugins/nf-k8s/src/main/nextflow/k8s/client/ClientConfig.groovy
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

package nextflow.k8s.client

import groovy.util.logging.Slf4j
import nextflow.util.Duration

import javax.net.ssl.KeyManager
@@ -31,6 +32,7 @@ import groovy.transform.EqualsAndHashCode
*/
@EqualsAndHashCode
@CompileStatic
@Slf4j
class ClientConfig {

boolean verifySsl
@@ -55,7 +57,7 @@ class ClientConfig {

KeyManager[] keyManagers

Integer maxErrorRetry = 4
K8sRetryConfig retryConfig

/**
* Timeout when reading from Input stream when a connection is established to a resource.
@@ -77,11 +79,11 @@ class ClientConfig {
String getNamespace() { namespace ?: 'default' }

ClientConfig() {

retryConfig = new K8sRetryConfig()
}

String toString() {
"${this.class.getSimpleName()}[ server=$server, namespace=$namespace, serviceAccount=$serviceAccount, token=${cut(token)}, sslCert=${cut(sslCert)}, clientCert=${cut(clientCert)}, clientKey=${cut(clientKey)}, verifySsl=$verifySsl, fromFile=$isFromCluster, httpReadTimeout=$httpReadTimeout, httpConnectTimeout=$httpConnectTimeout, maxErrorRetry=$maxErrorRetry ]"
"${this.class.getSimpleName()}[ server=$server, namespace=$namespace, serviceAccount=$serviceAccount, token=${cut(token)}, sslCert=${cut(sslCert)}, clientCert=${cut(clientCert)}, clientKey=${cut(clientKey)}, verifySsl=$verifySsl, fromFile=$isFromCluster, httpReadTimeout=$httpReadTimeout, httpConnectTimeout=$httpConnectTimeout, retryConfig=$retryConfig ]"
}

private String cut(String str) {
@@ -130,9 +132,10 @@ class ClientConfig {
result.clientKey = opts.clientKey.toString().decodeBase64()
else if( opts.clientKeyFile )
result.clientKey = Paths.get(opts.clientKeyFile.toString()).bytes

if( opts.retryPolicy )
result.retryConfig = new K8sRetryConfig(opts.retryPolicy as Map)
if( opts.maxErrorRetry )
result.maxErrorRetry = opts.maxErrorRetry as Integer
log.warn("Config setting 'k8s.maxErrorRetry' is deprecated - change it to 'k8s.retryPolicy.maxAttempts'")

return result
}
84 changes: 67 additions & 17 deletions plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy
Original file line number Diff line number Diff line change
@@ -16,6 +16,12 @@

package nextflow.k8s.client

import dev.failsafe.Failsafe
import dev.failsafe.FailsafeException
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import nextflow.exception.K8sOutOfCpuException
import nextflow.exception.K8sOutOfMemoryException

@@ -38,6 +44,11 @@ import groovy.util.logging.Slf4j
import nextflow.exception.NodeTerminationException
import nextflow.exception.ProcessFailedException
import org.yaml.snakeyaml.Yaml

import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import java.util.function.Predicate

/**
* Kubernetes API client
*
@@ -621,23 +632,7 @@ class K8sClient {
* the second element is the text (json) response
*/
protected K8sResponseApi makeRequest(String method, String path, String body=null) throws K8sResponseException {

final int maxRetries = config.maxErrorRetry
int attempt = 0

while ( true ) {
try {
return makeRequestCall( method, path, body )
} catch ( K8sResponseException | SocketException | SocketTimeoutException e ) {
if ( e instanceof K8sResponseException && e.response.code != 500 )
throw e
if ( ++attempt > maxRetries )
throw e
log.debug "[K8s] API request threw socket exception: $e.message for $method $path - Retrying request (attempt=$attempt)"
final long delay = (Math.pow(3, attempt - 1) as long) * 250
sleep( delay )
}
}
return apply(() -> makeRequestCall( method, path, body ) )
}


@@ -735,6 +730,61 @@ class K8sClient {
return new K8sResponseJson(resp.text)
}

/**
* Creates a retry policy using the configuration specified by {@link nextflow.k8s.client.K8sRetryConfig}
*
* @param cond A predicate that determines when a retry should be triggered
* @return The {@link dev.failsafe.RetryPolicy} instance
*/
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
final cfg = config.retryConfig
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
@Override
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
log.debug("K8s response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
}
}
return RetryPolicy.<T>builder()
.handleIf(cond)
.withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(cfg.maxAttempts)
.withJitter(cfg.jitter)
.onRetry(listener)
.build()
}

final private static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)

/**
* Carry out the invocation of the specified action using a retry policy.
*
* @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner
* @return The result of the supplied action
*/
protected <T> T apply(CheckedSupplier<T> action) {
// define the retry condition
final cond = new Predicate<? extends Throwable>() {
@Override
boolean test(Throwable t) {
if ( t instanceof K8sResponseException && t.response.code in RETRY_CODES )
return true
if( t instanceof SocketException || t.cause instanceof SocketException )
return true
if( t instanceof SocketTimeoutException || t.cause instanceof SocketTimeoutException )
return true
return false
}
}
// create the retry policy object
final policy = retryPolicy(cond)
// apply the action with and throw the original cause
try {
return Failsafe.with(policy).get(action)
}catch(FailsafeException e){
throw e.getCause()
}
}


}

53 changes: 53 additions & 0 deletions plugins/nf-k8s/src/main/nextflow/k8s/client/K8sRetryConfig.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.k8s.client

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import nextflow.util.Duration

/**
* Model retry policy configuration
*
* @author Paolo Di Tommaso <[email protected]>
*/
@ToString(includePackage = false, includeNames = true)
@EqualsAndHashCode
@CompileStatic
class K8sRetryConfig {
Duration delay = Duration.of('250ms')
Duration maxDelay = Duration.of('90s')
int maxAttempts = 4
double jitter = 0.25

K8sRetryConfig() {
this(Collections.emptyMap())
}

K8sRetryConfig(Map config) {
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
}
}
6 changes: 3 additions & 3 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy
Original file line number Diff line number Diff line change
@@ -155,19 +155,19 @@ class K8sConfigTest extends Specification {
client.serviceAccount == 'that'
client.httpConnectTimeout == null // testing default null
client.httpReadTimeout == null // testing default null
client.maxErrorRetry == 4
client.retryConfig.maxAttempts == 4

}

def 'should set maxErrorRetry' () {
given:
def CONFIG = [maxErrorRetry: 10, namespace: 'this', serviceAccount: 'that', client: [server: 'http://foo']]
def CONFIG = [retryPolicy: [ maxAttempts: 10], namespace: 'this', serviceAccount: 'that', client: [server: 'http://foo']]

when:
def config = new K8sConfig(CONFIG)
def client = config.getClient()
then:
client.maxErrorRetry == 10
client.retryConfig.maxAttempts == 10
}

def 'should create client config with http request timeouts' () {