Skip to content

Commit 9e675c6

Browse files
jorgeepditommaso
andauthored
Add Failsafe retry mechanism in K8s (#6083) [ci fast]
Signed-off-by: jorgee <[email protected]> Signed-off-by: Jorge Ejarque <[email protected]> Signed-off-by: Paolo Di Tommaso <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]>
1 parent b9dace9 commit 9e675c6

File tree

8 files changed

+200
-32
lines changed

8 files changed

+200
-32
lines changed

docs/reference/config.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,18 @@ The following settings are available:
10961096
`k8s.pullPolicy`
10971097
: Defines the strategy to be used to pull the container image e.g. `pullPolicy: 'Always'`.
10981098

1099+
`k8s.retryPolicy.delay`
1100+
: Delay when retrying failed API requests (default: `500ms`).
1101+
1102+
`k8s.retryPolicy.jitter`
1103+
: Jitter value when retrying failed API requests (default: `0.25`).
1104+
1105+
`k8s.retryPolicy.maxAttempts`
1106+
: Max attempts when retrying failed API requests (default: `4`).
1107+
1108+
`k8s.retryPolicy.maxDelay`
1109+
: Max delay when retrying failed API requests (default: `90s`).
1110+
10991111
`k8s.runAsUser`
11001112
: Defines the user ID to be used to run the containers. Shortcut for the `securityContext` option.
11011113

modules/nf-lang/src/main/java/nextflow/config/scopes/K8sConfig.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
public class K8sConfig implements ConfigScope {
2727

28+
public K8sRetryConfig retryPolicy;
29+
2830
@ConfigOption
2931
@Description("""
3032
When `true`, host paths are automatically mounted into the task pods (default: `false`). Only intended for development purposes when using a single node.
@@ -84,12 +86,6 @@ The path where the workflow is launched and the user data is stored (default: `<
8486
""")
8587
public String launchDir;
8688

87-
@ConfigOption
88-
@Description("""
89-
The maximum number of retries for failed requests by the Kubernetes HTTP client (default: 4).
90-
""")
91-
public int maxErrorRetry;
92-
9389
@ConfigOption
9490
@Description("""
9591
The Kubernetes namespace to use (default: `default`).
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2024-2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package nextflow.config.scopes;
17+
18+
import nextflow.config.schema.ConfigOption;
19+
import nextflow.config.schema.ConfigScope;
20+
import nextflow.script.dsl.Description;
21+
import nextflow.script.types.Duration;
22+
23+
public class K8sRetryConfig implements ConfigScope {
24+
25+
@ConfigOption
26+
@Description("""
27+
Delay when retrying failed API requests (default: `500ms`).
28+
""")
29+
public Duration delay;
30+
31+
@ConfigOption
32+
@Description("""
33+
Jitter value when retrying failed API requests (default: `0.25`).
34+
""")
35+
public double jitter;
36+
37+
@ConfigOption
38+
@Description("""
39+
Max attempts when retrying failed API requests (default: `10`).
40+
""")
41+
public int maxAttempts;
42+
43+
@ConfigOption
44+
@Description("""
45+
Max delay when retrying failed API requests (default: `90s`).
46+
""")
47+
public Duration maxDelay;
48+
49+
}

plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package nextflow.k8s
1818

19+
import nextflow.k8s.client.K8sRetryConfig
20+
1921
import javax.annotation.Nullable
2022

2123
import groovy.transform.CompileStatic
@@ -233,8 +235,11 @@ class K8sConfig implements Map<String,Object> {
233235
if( target.httpReadTimeout )
234236
result.httpReadTimeout = target.httpReadTimeout as Duration
235237

238+
if( target.retryPolicy )
239+
result.retryConfig = new K8sRetryConfig(target.retryPolicy as Map)
240+
236241
if( target.maxErrorRetry )
237-
result.maxErrorRetry = target.maxErrorRetry as Integer
242+
log.warn("Config setting 'k8s.maxErrorRetry' is deprecated. Change it to 'k8s.retryPolicy.maxAttempts'")
238243

239244
return result
240245
}

plugins/nf-k8s/src/main/nextflow/k8s/client/ClientConfig.groovy

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package nextflow.k8s.client
1818

19+
import groovy.util.logging.Slf4j
1920
import nextflow.util.Duration
2021

2122
import javax.net.ssl.KeyManager
@@ -31,6 +32,7 @@ import groovy.transform.EqualsAndHashCode
3132
*/
3233
@EqualsAndHashCode
3334
@CompileStatic
35+
@Slf4j
3436
class ClientConfig {
3537

3638
boolean verifySsl
@@ -55,7 +57,7 @@ class ClientConfig {
5557

5658
KeyManager[] keyManagers
5759

58-
Integer maxErrorRetry = 4
60+
K8sRetryConfig retryConfig
5961

6062
/**
6163
* Timeout when reading from Input stream when a connection is established to a resource.
@@ -77,11 +79,11 @@ class ClientConfig {
7779
String getNamespace() { namespace ?: 'default' }
7880

7981
ClientConfig() {
80-
82+
retryConfig = new K8sRetryConfig()
8183
}
8284

8385
String toString() {
84-
"${this.class.getSimpleName()}[ server=$server, namespace=$namespace, serviceAccount=$serviceAccount, token=${cut(token)}, sslCert=${cut(sslCert)}, clientCert=${cut(clientCert)}, clientKey=${cut(clientKey)}, verifySsl=$verifySsl, fromFile=$isFromCluster, httpReadTimeout=$httpReadTimeout, httpConnectTimeout=$httpConnectTimeout, maxErrorRetry=$maxErrorRetry ]"
86+
"${this.class.getSimpleName()}[ server=$server, namespace=$namespace, serviceAccount=$serviceAccount, token=${cut(token)}, sslCert=${cut(sslCert)}, clientCert=${cut(clientCert)}, clientKey=${cut(clientKey)}, verifySsl=$verifySsl, fromFile=$isFromCluster, httpReadTimeout=$httpReadTimeout, httpConnectTimeout=$httpConnectTimeout, retryConfig=$retryConfig ]"
8587
}
8688

8789
private String cut(String str) {
@@ -130,9 +132,10 @@ class ClientConfig {
130132
result.clientKey = opts.clientKey.toString().decodeBase64()
131133
else if( opts.clientKeyFile )
132134
result.clientKey = Paths.get(opts.clientKeyFile.toString()).bytes
133-
135+
if( opts.retryPolicy )
136+
result.retryConfig = new K8sRetryConfig(opts.retryPolicy as Map)
134137
if( opts.maxErrorRetry )
135-
result.maxErrorRetry = opts.maxErrorRetry as Integer
138+
log.warn("Config setting 'k8s.maxErrorRetry' is deprecated - change it to 'k8s.retryPolicy.maxAttempts'")
136139

137140
return result
138141
}

plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
package nextflow.k8s.client
1818

19+
import dev.failsafe.Failsafe
20+
import dev.failsafe.FailsafeException
21+
import dev.failsafe.RetryPolicy
22+
import dev.failsafe.event.EventListener
23+
import dev.failsafe.event.ExecutionAttemptedEvent
24+
import dev.failsafe.function.CheckedSupplier
1925
import nextflow.exception.K8sOutOfCpuException
2026
import nextflow.exception.K8sOutOfMemoryException
2127

@@ -38,6 +44,11 @@ import groovy.util.logging.Slf4j
3844
import nextflow.exception.NodeTerminationException
3945
import nextflow.exception.ProcessFailedException
4046
import org.yaml.snakeyaml.Yaml
47+
48+
import java.time.temporal.ChronoUnit
49+
import java.util.concurrent.TimeoutException
50+
import java.util.function.Predicate
51+
4152
/**
4253
* Kubernetes API client
4354
*
@@ -621,23 +632,7 @@ class K8sClient {
621632
* the second element is the text (json) response
622633
*/
623634
protected K8sResponseApi makeRequest(String method, String path, String body=null) throws K8sResponseException {
624-
625-
final int maxRetries = config.maxErrorRetry
626-
int attempt = 0
627-
628-
while ( true ) {
629-
try {
630-
return makeRequestCall( method, path, body )
631-
} catch ( K8sResponseException | SocketException | SocketTimeoutException e ) {
632-
if ( e instanceof K8sResponseException && e.response.code != 500 )
633-
throw e
634-
if ( ++attempt > maxRetries )
635-
throw e
636-
log.debug "[K8s] API request threw socket exception: $e.message for $method $path - Retrying request (attempt=$attempt)"
637-
final long delay = (Math.pow(3, attempt - 1) as long) * 250
638-
sleep( delay )
639-
}
640-
}
635+
return apply(() -> makeRequestCall( method, path, body ) )
641636
}
642637

643638

@@ -735,6 +730,61 @@ class K8sClient {
735730
return new K8sResponseJson(resp.text)
736731
}
737732

733+
/**
734+
* Creates a retry policy using the configuration specified by {@link nextflow.k8s.client.K8sRetryConfig}
735+
*
736+
* @param cond A predicate that determines when a retry should be triggered
737+
* @return The {@link dev.failsafe.RetryPolicy} instance
738+
*/
739+
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
740+
final cfg = config.retryConfig
741+
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
742+
@Override
743+
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
744+
log.debug("K8s response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
745+
}
746+
}
747+
return RetryPolicy.<T>builder()
748+
.handleIf(cond)
749+
.withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS)
750+
.withMaxAttempts(cfg.maxAttempts)
751+
.withJitter(cfg.jitter)
752+
.onRetry(listener)
753+
.build()
754+
}
755+
756+
final private static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)
757+
758+
/**
759+
* Carry out the invocation of the specified action using a retry policy.
760+
*
761+
* @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner
762+
* @return The result of the supplied action
763+
*/
764+
protected <T> T apply(CheckedSupplier<T> action) {
765+
// define the retry condition
766+
final cond = new Predicate<? extends Throwable>() {
767+
@Override
768+
boolean test(Throwable t) {
769+
if ( t instanceof K8sResponseException && t.response.code in RETRY_CODES )
770+
return true
771+
if( t instanceof SocketException || t.cause instanceof SocketException )
772+
return true
773+
if( t instanceof SocketTimeoutException || t.cause instanceof SocketTimeoutException )
774+
return true
775+
return false
776+
}
777+
}
778+
// create the retry policy object
779+
final policy = retryPolicy(cond)
780+
// apply the action with and throw the original cause
781+
try {
782+
return Failsafe.with(policy).get(action)
783+
}catch(FailsafeException e){
784+
throw e.getCause()
785+
}
786+
}
787+
738788

739789
}
740790

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package nextflow.k8s.client
19+
20+
import groovy.transform.CompileStatic
21+
import groovy.transform.EqualsAndHashCode
22+
import groovy.transform.ToString
23+
import nextflow.util.Duration
24+
25+
/**
26+
* Model retry policy configuration
27+
*
28+
* @author Paolo Di Tommaso <[email protected]>
29+
*/
30+
@ToString(includePackage = false, includeNames = true)
31+
@EqualsAndHashCode
32+
@CompileStatic
33+
class K8sRetryConfig {
34+
Duration delay = Duration.of('250ms')
35+
Duration maxDelay = Duration.of('90s')
36+
int maxAttempts = 4
37+
double jitter = 0.25
38+
39+
K8sRetryConfig() {
40+
this(Collections.emptyMap())
41+
}
42+
43+
K8sRetryConfig(Map config) {
44+
if( config.delay )
45+
delay = config.delay as Duration
46+
if( config.maxDelay )
47+
maxDelay = config.maxDelay as Duration
48+
if( config.maxAttempts )
49+
maxAttempts = config.maxAttempts as int
50+
if( config.jitter )
51+
jitter = config.jitter as double
52+
}
53+
}

plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,19 +155,19 @@ class K8sConfigTest extends Specification {
155155
client.serviceAccount == 'that'
156156
client.httpConnectTimeout == null // testing default null
157157
client.httpReadTimeout == null // testing default null
158-
client.maxErrorRetry == 4
158+
client.retryConfig.maxAttempts == 4
159159

160160
}
161161

162162
def 'should set maxErrorRetry' () {
163163
given:
164-
def CONFIG = [maxErrorRetry: 10, namespace: 'this', serviceAccount: 'that', client: [server: 'http://foo']]
164+
def CONFIG = [retryPolicy: [ maxAttempts: 10], namespace: 'this', serviceAccount: 'that', client: [server: 'http://foo']]
165165

166166
when:
167167
def config = new K8sConfig(CONFIG)
168168
def client = config.getClient()
169169
then:
170-
client.maxErrorRetry == 10
170+
client.retryConfig.maxAttempts == 10
171171
}
172172

173173
def 'should create client config with http request timeouts' () {

0 commit comments

Comments
 (0)