Skip to content

Commit 62ea9f6

Browse files
authored
Merge pull request #252 from java-operator-sdk/retry-experiment
Retry Support
2 parents f2660be + f30f35e commit 62ea9f6

25 files changed

+611
-99
lines changed

operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.javaoperatorsdk.operator.processing.EventDispatcher;
1919
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
2020
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
21+
import io.javaoperatorsdk.operator.processing.retry.Retry;
2122
import java.util.Arrays;
2223
import java.util.HashMap;
2324
import java.util.Map;
@@ -36,19 +37,33 @@ public Operator(KubernetesClient k8sClient) {
3637
this.k8sClient = k8sClient;
3738
}
3839

40+
public <R extends CustomResource> void registerControllerForAllNamespaces(
41+
ResourceController<R> controller, Retry retry) throws OperatorException {
42+
registerController(controller, true, retry);
43+
}
44+
3945
public <R extends CustomResource> void registerControllerForAllNamespaces(
4046
ResourceController<R> controller) throws OperatorException {
41-
registerController(controller, true);
47+
registerController(controller, true, null);
48+
}
49+
50+
public <R extends CustomResource> void registerController(
51+
ResourceController<R> controller, Retry retry, String... targetNamespaces)
52+
throws OperatorException {
53+
registerController(controller, false, retry, targetNamespaces);
4254
}
4355

4456
public <R extends CustomResource> void registerController(
4557
ResourceController<R> controller, String... targetNamespaces) throws OperatorException {
46-
registerController(controller, false, targetNamespaces);
58+
registerController(controller, false, null, targetNamespaces);
4759
}
4860

4961
@SuppressWarnings("rawtypes")
5062
private <R extends CustomResource> void registerController(
51-
ResourceController<R> controller, boolean watchAllNamespaces, String... targetNamespaces)
63+
ResourceController<R> controller,
64+
boolean watchAllNamespaces,
65+
Retry retry,
66+
String... targetNamespaces)
5267
throws OperatorException {
5368
Class<R> resClass = getCustomResourceClass(controller);
5469
CustomResourceDefinitionContext crd = getCustomResourceDefinitionForController(controller);
@@ -67,10 +82,10 @@ private <R extends CustomResource> void registerController(
6782
CustomResourceCache customResourceCache = new CustomResourceCache();
6883
DefaultEventHandler defaultEventHandler =
6984
new DefaultEventHandler(
70-
customResourceCache, eventDispatcher, controller.getClass().getName());
85+
customResourceCache, eventDispatcher, controller.getClass().getName(), retry);
7186
DefaultEventSourceManager eventSourceManager =
72-
new DefaultEventSourceManager(defaultEventHandler);
73-
defaultEventHandler.setDefaultEventSourceManager(eventSourceManager);
87+
new DefaultEventSourceManager(defaultEventHandler, retry != null);
88+
defaultEventHandler.setEventSourceManager(eventSourceManager);
7489
eventDispatcher.setEventSourceManager(eventSourceManager);
7590

7691
customResourceClients.put(resClass, (CustomResourceOperationsImpl) client);

operator-framework/src/main/java/io/javaoperatorsdk/operator/api/Context.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import io.fabric8.kubernetes.client.CustomResource;
44
import io.javaoperatorsdk.operator.processing.event.EventList;
55
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
6+
import java.util.Optional;
67

78
public interface Context<T extends CustomResource> {
89

910
EventSourceManager getEventSourceManager();
1011

1112
EventList getEvents();
13+
14+
Optional<RetryInfo> getRetryInfo();
1215
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
import io.fabric8.kubernetes.client.CustomResource;
44
import io.javaoperatorsdk.operator.processing.event.EventList;
55
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
6+
import java.util.Optional;
67

78
public class DefaultContext<T extends CustomResource> implements Context<T> {
89

10+
private final RetryInfo retryInfo;
911
private final EventList events;
1012
private final EventSourceManager eventSourceManager;
1113

12-
public DefaultContext(EventSourceManager eventSourceManager, EventList events) {
14+
public DefaultContext(
15+
EventSourceManager eventSourceManager, EventList events, RetryInfo retryInfo) {
16+
this.retryInfo = retryInfo;
1317
this.events = events;
1418
this.eventSourceManager = eventSourceManager;
1519
}
@@ -23,4 +27,9 @@ public EventSourceManager getEventSourceManager() {
2327
public EventList getEvents() {
2428
return events;
2529
}
30+
31+
@Override
32+
public Optional<RetryInfo> getRetryInfo() {
33+
return Optional.ofNullable(retryInfo);
34+
}
2635
}
Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,8 @@
11
package io.javaoperatorsdk.operator.api;
22

3-
public class RetryInfo {
3+
public interface RetryInfo {
44

5-
private int retryNumber;
6-
private boolean lastAttempt;
5+
int getAttemptCount();
76

8-
public RetryInfo(int retryNumber, boolean lastAttempt) {
9-
this.retryNumber = retryNumber;
10-
this.lastAttempt = lastAttempt;
11-
}
12-
13-
public int getRetryNumber() {
14-
return retryNumber;
15-
}
16-
17-
public boolean isLastAttempt() {
18-
return lastAttempt;
19-
}
7+
boolean isLastAttempt();
208
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
66

77
import io.fabric8.kubernetes.client.CustomResource;
8+
import io.javaoperatorsdk.operator.api.RetryInfo;
89
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
910
import io.javaoperatorsdk.operator.processing.event.Event;
1011
import io.javaoperatorsdk.operator.processing.event.EventHandler;
12+
import io.javaoperatorsdk.operator.processing.retry.Retry;
13+
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
14+
import java.util.*;
1115
import java.util.HashSet;
1216
import java.util.Optional;
1317
import java.util.Set;
@@ -30,16 +34,20 @@ public class DefaultEventHandler implements EventHandler {
3034
private final Set<String> underProcessing = new HashSet<>();
3135
private final ScheduledThreadPoolExecutor executor;
3236
private final EventDispatcher eventDispatcher;
33-
private DefaultEventSourceManager defaultEventSourceManager;
37+
private final Retry retry;
38+
private final Map<String, RetryExecution> retryState = new HashMap<>();
39+
private DefaultEventSourceManager eventSourceManager;
3440

3541
private final ReentrantLock lock = new ReentrantLock();
3642

3743
public DefaultEventHandler(
3844
CustomResourceCache customResourceCache,
3945
EventDispatcher eventDispatcher,
40-
String relatedControllerName) {
46+
String relatedControllerName,
47+
Retry retry) {
4148
this.customResourceCache = customResourceCache;
4249
this.eventDispatcher = eventDispatcher;
50+
this.retry = retry;
4351
eventBuffer = new EventBuffer();
4452
executor =
4553
new ScheduledThreadPoolExecutor(
@@ -52,8 +60,8 @@ public Thread newThread(Runnable runnable) {
5260
});
5361
}
5462

55-
public void setDefaultEventSourceManager(DefaultEventSourceManager defaultEventSourceManager) {
56-
this.defaultEventSourceManager = defaultEventSourceManager;
63+
public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) {
64+
this.eventSourceManager = eventSourceManager;
5765
}
5866

5967
@Override
@@ -79,7 +87,8 @@ private void executeBufferedEvents(String customResourceUid) {
7987
ExecutionScope executionScope =
8088
new ExecutionScope(
8189
eventBuffer.getAndRemoveEventsForExecution(customResourceUid),
82-
latestCustomResource.get());
90+
latestCustomResource.get(),
91+
retryInfo(customResourceUid));
8392
log.debug("Executing events for custom resource. Scope: {}", executionScope);
8493
executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this));
8594
} else {
@@ -93,12 +102,28 @@ private void executeBufferedEvents(String customResourceUid) {
93102
}
94103
}
95104

105+
private RetryInfo retryInfo(String customResourceUid) {
106+
return retryState.get(customResourceUid);
107+
}
108+
96109
void eventProcessingFinished(
97110
ExecutionScope executionScope, PostExecutionControl postExecutionControl) {
98111
try {
99112
lock.lock();
100-
log.debug("Event processing finished. Scope: {}", executionScope);
113+
log.debug(
114+
"Event processing finished. Scope: {}, PostExecutionControl: {}",
115+
executionScope,
116+
postExecutionControl);
101117
unsetUnderExecution(executionScope.getCustomResourceUid());
118+
119+
if (retry != null && postExecutionControl.exceptionDuringExecution()) {
120+
handleRetryOnException(executionScope);
121+
return;
122+
}
123+
124+
if (retry != null) {
125+
markSuccessfulExecutionRegardingRetry(executionScope);
126+
}
102127
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
103128
cleanupAfterDeletedEvent(executionScope.getCustomResourceUid());
104129
} else {
@@ -110,6 +135,53 @@ void eventProcessingFinished(
110135
}
111136
}
112137

138+
/**
139+
* Regarding the events there are 2 approaches we can take. Either retry always when there are new
140+
* events (received meanwhile retry is in place or already in buffer) instantly or always wait
141+
* according to the retry timing if there was an exception.
142+
*/
143+
private void handleRetryOnException(ExecutionScope executionScope) {
144+
RetryExecution execution = getOrInitRetryExecution(executionScope);
145+
boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid());
146+
eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents());
147+
148+
if (newEventsExists) {
149+
log.debug("New events exists for for resource id: {}", executionScope.getCustomResourceUid());
150+
executeBufferedEvents(executionScope.getCustomResourceUid());
151+
return;
152+
}
153+
Optional<Long> nextDelay = execution.nextDelay();
154+
155+
nextDelay.ifPresent(
156+
delay -> {
157+
log.debug(
158+
"Scheduling timer event for retry with delay:{} for resource: {}",
159+
delay,
160+
executionScope.getCustomResourceUid());
161+
eventSourceManager
162+
.getRetryTimerEventSource()
163+
.scheduleOnce(executionScope.getCustomResource(), delay);
164+
});
165+
}
166+
167+
private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) {
168+
log.debug(
169+
"Marking successful execution for resource: {}", executionScope.getCustomResourceUid());
170+
retryState.remove(executionScope.getCustomResourceUid());
171+
eventSourceManager
172+
.getRetryTimerEventSource()
173+
.cancelOnceSchedule(executionScope.getCustomResourceUid());
174+
}
175+
176+
private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) {
177+
RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid());
178+
if (retryExecution == null) {
179+
retryExecution = retry.initExecution();
180+
retryState.put(executionScope.getCustomResourceUid(), retryExecution);
181+
}
182+
return retryExecution;
183+
}
184+
113185
/**
114186
* Here we try to cache the latest resource after an update. The goal is to solve a concurrency
115187
* issue we've seen: If an execution is finished, where we updated a custom resource, but there
@@ -146,7 +218,7 @@ private void cacheUpdatedResourceIfChanged(
146218
}
147219

148220
private void cleanupAfterDeletedEvent(String customResourceUid) {
149-
defaultEventSourceManager.cleanup(customResourceUid);
221+
eventSourceManager.cleanup(customResourceUid);
150222
eventBuffer.cleanup(customResourceUid);
151223
customResourceCache.cleanup(customResourceUid);
152224
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@ public void addEvent(Event event) {
1717
crEvents.add(event);
1818
}
1919

20+
public boolean newEventsExists(String resourceId) {
21+
return events.get(resourceId) != null && !events.get(resourceId).isEmpty();
22+
}
23+
24+
public void putBackEvents(String resourceUid, List<Event> oldEvents) {
25+
List<Event> crEvents =
26+
events.computeIfAbsent(resourceUid, (id) -> new ArrayList<>(oldEvents.size()));
27+
crEvents.addAll(0, oldEvents);
28+
}
29+
2030
public boolean containsEvents(String customResourceId) {
2131
return events.get(customResourceId) != null;
2232
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,16 @@ public void setEventSourceManager(EventSourceManager eventSourceManager) {
4343
this.eventSourceManager = eventSourceManager;
4444
}
4545

46-
public PostExecutionControl handleEvent(ExecutionScope event) {
46+
public PostExecutionControl handleExecution(ExecutionScope executionScope) {
4747
try {
48-
return handDispatch(event);
48+
return handleDispatch(executionScope);
4949
} catch (RuntimeException e) {
50-
log.error("Error during event processing {} failed.", event, e);
51-
return PostExecutionControl.defaultDispatch();
50+
log.error("Error during event processing {} failed.", executionScope, e);
51+
return PostExecutionControl.exceptionDuringExecution(e);
5252
}
5353
}
5454

55-
private PostExecutionControl handDispatch(ExecutionScope executionScope) {
55+
private PostExecutionControl handleDispatch(ExecutionScope executionScope) {
5656
CustomResource resource = executionScope.getCustomResource();
5757
log.debug(
5858
"Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata());
@@ -72,7 +72,10 @@ private PostExecutionControl handDispatch(ExecutionScope executionScope) {
7272
return PostExecutionControl.defaultDispatch();
7373
}
7474
Context context =
75-
new DefaultContext(eventSourceManager, new EventList(executionScope.getEvents()));
75+
new DefaultContext(
76+
eventSourceManager,
77+
new EventList(executionScope.getEvents()),
78+
executionScope.getRetryInfo());
7679
if (markedForDeletion(resource)) {
7780
return handleDelete(resource, context);
7881
} else {

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class ExecutionConsumer implements Runnable {
2222

2323
@Override
2424
public void run() {
25-
PostExecutionControl postExecutionControl = eventDispatcher.handleEvent(executionScope);
25+
PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope);
2626
defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl);
2727
}
2828
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.processing;
22

33
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.javaoperatorsdk.operator.api.RetryInfo;
45
import io.javaoperatorsdk.operator.processing.event.Event;
56
import java.util.List;
67

@@ -10,9 +11,12 @@ public class ExecutionScope {
1011
// the latest custom resource from cache
1112
private CustomResource customResource;
1213

13-
public ExecutionScope(List<Event> list, CustomResource customResource) {
14+
private RetryInfo retryInfo;
15+
16+
public ExecutionScope(List<Event> list, CustomResource customResource, RetryInfo retryInfo) {
1417
this.events = list;
1518
this.customResource = customResource;
19+
this.retryInfo = retryInfo;
1620
}
1721

1822
public List<Event> getEvents() {
@@ -38,4 +42,8 @@ public String toString() {
3842
+ customResource.getMetadata().getResourceVersion()
3943
+ '}';
4044
}
45+
46+
public RetryInfo getRetryInfo() {
47+
return retryInfo;
48+
}
4149
}

0 commit comments

Comments
 (0)