Skip to content

Commit f426c39

Browse files
Add CadenceChangeVersion support (#650)
1 parent dca853a commit f426c39

File tree

9 files changed

+166
-17
lines changed

9 files changed

+166
-17
lines changed

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import com.google.common.base.Strings;
21+
import com.google.common.collect.ImmutableMap;
2122
import com.uber.cadence.ActivityType;
2223
import com.uber.cadence.HistoryEvent;
2324
import com.uber.cadence.MarkerRecordedEventAttributes;
@@ -33,7 +34,9 @@
3334
import com.uber.cadence.workflow.Functions.Func;
3435
import com.uber.cadence.workflow.Functions.Func1;
3536
import java.time.Duration;
37+
import java.util.ArrayList;
3638
import java.util.HashMap;
39+
import java.util.List;
3740
import java.util.Map;
3841
import java.util.Optional;
3942
import java.util.concurrent.CancellationException;
@@ -90,6 +93,33 @@ public void accept(Exception reason) {
9093
private boolean taskCompleted = false;
9194
private final Map<String, Integer> versionMap = new HashMap<>();
9295

96+
static final class GetVersionResult {
97+
private final int version;
98+
private final boolean shouldUpdateCadenceChangeVersion;
99+
private final Map<String, Object> searchAttributesForChangeVersion;
100+
101+
GetVersionResult(
102+
final int version,
103+
final boolean isNewlyAdded,
104+
final Map<String, Object> searchAttributesForChangeVersion) {
105+
this.version = version;
106+
this.shouldUpdateCadenceChangeVersion = isNewlyAdded;
107+
this.searchAttributesForChangeVersion = searchAttributesForChangeVersion;
108+
}
109+
110+
public int getVersion() {
111+
return version;
112+
}
113+
114+
public boolean shouldUpdateCadenceChangeVersion() {
115+
return shouldUpdateCadenceChangeVersion;
116+
}
117+
118+
public Map<String, Object> getSearchAttributesForChangeVersion() {
119+
return searchAttributesForChangeVersion;
120+
}
121+
}
122+
93123
ClockDecisionContext(
94124
DecisionsHelper decisions,
95125
BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller,
@@ -214,7 +244,8 @@ byte[] sideEffect(Func<byte[]> func) {
214244
Optional<byte[]> mutableSideEffect(
215245
String id, DataConverter converter, Func1<Optional<byte[]>, Optional<byte[]>> func) {
216246
decisions.addAllMissingVersionMarker(false, Optional.empty());
217-
return mutableSideEffectHandler.handle(id, converter, func);
247+
final MarkerHandler.HandleResult results = mutableSideEffectHandler.handle(id, converter, func);
248+
return results.getStoredData();
218249
}
219250

220251
void upsertSearchAttributes(SearchAttributes searchAttributes) {
@@ -287,7 +318,8 @@ private void handleVersionMarker(MarkerRecordedEventAttributes attributes) {
287318
versionMap.put(versionID, version);
288319
}
289320

290-
int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) {
321+
GetVersionResult getVersion(
322+
String changeId, DataConverter converter, int minSupported, int maxSupported) {
291323
Predicate<MarkerRecordedEventAttributes> changeIdEquals =
292324
(attributes) -> {
293325
MarkerHandler.MarkerInterface markerData =
@@ -296,7 +328,7 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
296328
};
297329
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));
298330

299-
Optional<byte[]> result =
331+
final MarkerHandler.HandleResult result =
300332
versionHandler.handle(
301333
changeId,
302334
converter,
@@ -307,18 +339,26 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
307339
return Optional.of(converter.toData(maxSupported));
308340
});
309341

342+
final boolean isNewlyAdded = result.isNewlyStored();
343+
Map<String, Object> searchAttributesForChangeVersion = null;
344+
if (isNewlyAdded) {
345+
searchAttributesForChangeVersion =
346+
createSearchAttributesForChangeVersion(changeId, maxSupported, versionMap);
347+
}
348+
310349
Integer version = versionMap.get(changeId);
311350
if (version != null) {
312351
validateVersion(changeId, version, minSupported, maxSupported);
313-
return version;
352+
return new GetVersionResult(version, isNewlyAdded, searchAttributesForChangeVersion);
314353
}
315354

316-
if (!result.isPresent()) {
317-
return WorkflowInternal.DEFAULT_VERSION;
355+
if (!result.getStoredData().isPresent()) {
356+
return new GetVersionResult(
357+
WorkflowInternal.DEFAULT_VERSION, false, searchAttributesForChangeVersion);
318358
}
319-
version = converter.fromData(result.get(), Integer.class, Integer.class);
359+
version = converter.fromData(result.getStoredData().get(), Integer.class, Integer.class);
320360
validateVersion(changeId, version, minSupported, maxSupported);
321-
return version;
361+
return new GetVersionResult(version, isNewlyAdded, searchAttributesForChangeVersion);
322362
}
323363

324364
private void validateVersion(String changeID, int version, int minSupported, int maxSupported) {
@@ -331,6 +371,29 @@ private void validateVersion(String changeID, int version, int minSupported, int
331371
}
332372
}
333373

374+
private Map<String, Object> createSearchAttributesForChangeVersion(
375+
String changeID, Integer version, Map<String, Integer> existingChangeVersions) {
376+
return ImmutableMap.of(
377+
WorkflowInternal.CADENCE_CHANGE_VERSION,
378+
getChangeVersions(changeID, version, existingChangeVersions));
379+
}
380+
381+
private List<String> getChangeVersions(
382+
String changeID, Integer version, Map<String, Integer> existingChangeVersions) {
383+
ArrayList<String> res = new ArrayList<>();
384+
// as the convention, the first element is always the latest version
385+
res.add(getChangeVersion(changeID, version));
386+
387+
for (Map.Entry<String, Integer> entry : existingChangeVersions.entrySet()) {
388+
res.add(getChangeVersion(entry.getKey(), entry.getValue()));
389+
}
390+
return res;
391+
}
392+
393+
private String getChangeVersion(String changeID, Integer version) {
394+
return changeID + "-" + version;
395+
}
396+
334397
Consumer<Exception> scheduleLocalActivityTask(
335398
ExecuteLocalActivityParameters params, BiConsumer<byte[], Exception> callback) {
336399
final OpenRequestInfo<byte[], ActivityType> context =

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.uber.cadence.WorkflowType;
3030
import com.uber.cadence.context.ContextPropagator;
3131
import com.uber.cadence.converter.DataConverter;
32+
import com.uber.cadence.internal.common.InternalUtils;
3233
import com.uber.cadence.internal.metrics.ReplayAwareScope;
3334
import com.uber.cadence.internal.worker.LocalActivityWorker;
3435
import com.uber.cadence.internal.worker.SingleWorkerOptions;
@@ -277,7 +278,14 @@ public Optional<byte[]> mutableSideEffect(
277278
@Override
278279
public int getVersion(
279280
String changeID, DataConverter converter, int minSupported, int maxSupported) {
280-
return workflowClock.getVersion(changeID, converter, minSupported, maxSupported);
281+
final ClockDecisionContext.GetVersionResult results =
282+
workflowClock.getVersion(changeID, converter, minSupported, maxSupported);
283+
if (results.shouldUpdateCadenceChangeVersion()) {
284+
upsertSearchAttributes(
285+
InternalUtils.convertMapToSearchAttributes(
286+
results.getSearchAttributesForChangeVersion()));
287+
}
288+
return results.getVersion();
281289
}
282290

283291
@Override

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import static com.uber.cadence.internal.sync.WorkflowInternal.CADENCE_CHANGE_VERSION;
21+
2022
import com.uber.cadence.ActivityTaskCancelRequestedEventAttributes;
2123
import com.uber.cadence.ActivityTaskCanceledEventAttributes;
2224
import com.uber.cadence.ActivityTaskScheduledEventAttributes;
@@ -669,6 +671,40 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) {
669671
nextDecisionEventId++;
670672
}
671673

674+
// GetVersion API may use CadenceChangeVersion, or may not(legacy execution)
675+
// This method only adds the decision if the upsert CadenceChangeVersion search attribute event
676+
// exists in the history
677+
public void addPossibleMissingDecisionForChangeVersionSearchAttribute() {
678+
// add next possible event for CadenceChangeVersion search attribute
679+
final Optional<HistoryEvent> optionalEvent = getOptionalDecisionEvent(nextDecisionEventId);
680+
if (!optionalEvent.isPresent()) {
681+
return;
682+
}
683+
if (optionalEvent.get().getEventType() != EventType.UpsertWorkflowSearchAttributes) {
684+
return;
685+
}
686+
final SearchAttributes searchAttributes =
687+
optionalEvent
688+
.get()
689+
.getUpsertWorkflowSearchAttributesEventAttributes()
690+
.getSearchAttributes();
691+
if (searchAttributes.getIndexedFields().containsKey(CADENCE_CHANGE_VERSION)) {
692+
Decision upsertSearchAttrDecision =
693+
new Decision()
694+
.setDecisionType(DecisionType.UpsertWorkflowSearchAttributes)
695+
.setUpsertWorkflowSearchAttributesDecisionAttributes(
696+
new UpsertWorkflowSearchAttributesDecisionAttributes()
697+
.setSearchAttributes(searchAttributes));
698+
DecisionId decisionId =
699+
new DecisionId(DecisionTarget.UPSERT_SEARCH_ATTRIBUTES, nextDecisionEventId);
700+
addDecision(
701+
decisionId,
702+
new UpsertSearchAttributesDecisionStateMachine(decisionId, upsertSearchAttrDecision));
703+
return;
704+
}
705+
return;
706+
}
707+
672708
// This is to support the case where a getVersion call presents during workflow execution but
673709
// is removed in replay.
674710
void addAllMissingVersionMarker(
@@ -718,9 +754,10 @@ private boolean addMissingVersionMarker(
718754
.setDecisionType(DecisionType.RecordMarker)
719755
.setRecordMarkerDecisionAttributes(marker);
720756
DecisionId markerDecisionId = new DecisionId(DecisionTarget.MARKER, nextDecisionEventId);
721-
decisions.put(
722-
markerDecisionId, new MarkerDecisionStateMachine(markerDecisionId, markerDecision));
723-
nextDecisionEventId++;
757+
758+
addDecision(markerDecisionId, new MarkerDecisionStateMachine(markerDecisionId, markerDecision));
759+
// also may need to increase for search attribute if using CadenceChangeVersion
760+
addPossibleMissingDecisionForChangeVersionSearchAttribute();
724761
return true;
725762
}
726763

src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,24 @@ public int getAccessCount() {
175175
}
176176
}
177177

178+
static final class HandleResult {
179+
private Optional<byte[]> storedData;
180+
private boolean isNewlyStored;
181+
182+
HandleResult(final Optional<byte[]> storedData, final boolean isNewlyStored) {
183+
this.storedData = storedData;
184+
this.isNewlyStored = isNewlyStored;
185+
}
186+
187+
public boolean isNewlyStored() {
188+
return isNewlyStored;
189+
}
190+
191+
public Optional<byte[]> getStoredData() {
192+
return storedData;
193+
}
194+
}
195+
178196
private final DecisionsHelper decisions;
179197
private final String markerName;
180198
private final ReplayAware replayContext;
@@ -194,7 +212,7 @@ public int getAccessCount() {
194212
* nothing is recorded into the history.
195213
* @return the latest value returned by func
196214
*/
197-
Optional<byte[]> handle(
215+
HandleResult handle(
198216
String id, DataConverter converter, Func1<Optional<byte[]>, Optional<byte[]>> func) {
199217
MarkerResult result = mutableMarkerResults.get(id);
200218
Optional<byte[]> stored;
@@ -211,23 +229,25 @@ Optional<byte[]> handle(
211229
if (data.isPresent()) {
212230
// Need to insert marker to ensure that eventId is incremented
213231
recordMutableMarker(id, eventId, data.get(), accessCount, converter);
214-
return data;
232+
// also may need to increase for search attribute if using CadenceChangeVersion
233+
decisions.addPossibleMissingDecisionForChangeVersionSearchAttribute();
234+
return new HandleResult(data, false);
215235
}
216236

217237
if (!stored.isPresent()) {
218238
mutableMarkerResults.put(
219239
id, new MarkerResult(converter.toData(WorkflowInternal.DEFAULT_VERSION)));
220240
}
221241

222-
return stored;
242+
return new HandleResult(stored, false);
223243
}
224244
Optional<byte[]> toStore = func.apply(stored);
225245
if (toStore.isPresent()) {
226246
byte[] data = toStore.get();
227247
recordMutableMarker(id, eventId, data, accessCount, converter);
228-
return toStore;
248+
return new HandleResult(toStore, true);
229249
}
230-
return stored;
250+
return new HandleResult(stored, false);
231251
}
232252

233253
private Optional<byte[]> getMarkerDataFromHistory(

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.sync;
1919

2020
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
21+
import static com.uber.cadence.internal.sync.WorkflowInternal.CADENCE_CHANGE_VERSION;
2122

2223
import com.uber.cadence.ActivityType;
2324
import com.uber.cadence.SearchAttributes;
@@ -748,6 +749,11 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
748749
throw new IllegalArgumentException("Empty search attributes");
749750
}
750751

752+
if (searchAttributes.containsKey(CADENCE_CHANGE_VERSION)) {
753+
throw new IllegalArgumentException(
754+
"CadenceChangeVersion is a reserved key that cannot be set, please use other key");
755+
}
756+
751757
SearchAttributes attr = InternalUtils.convertMapToSearchAttributes(searchAttributes);
752758
context.upsertSearchAttributes(attr);
753759
}

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
*/
6565
public final class WorkflowInternal {
6666
public static final int DEFAULT_VERSION = -1;
67+
public static final String CADENCE_CHANGE_VERSION = "CadenceChangeVersion";
6768

6869
public static WorkflowThread newThread(boolean ignoreParentCancellation, Runnable runnable) {
6970
return WorkflowThread.newThread(runnable, ignoreParentCancellation);

src/test/java/com/uber/cadence/converter/JsonDataConverterTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ public NonSerializableException(Throwable cause) {
244244
}
245245
}
246246

247+
// TODO flaky test in local env: expected:<class java.lang.IllegalArgumentException> but
248+
// was:<class java.lang.StackOverflowError>
247249
@Test
248250
public void testException() {
249251
RuntimeException rootException = new RuntimeException("root exception");

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4778,6 +4778,17 @@ public void testGetVersionAdded() {
47784778
}
47794779
}
47804780

4781+
@Test
4782+
public void testGetVersionAddedWithCadenceChangeVersion() {
4783+
try {
4784+
WorkflowReplayer.replayWorkflowExecutionFromResource(
4785+
"testGetVersionHistoryWithCadenceChangeVersion.json", TestGetVersionAddedImpl.class);
4786+
} catch (Exception e) {
4787+
e.printStackTrace();
4788+
fail();
4789+
}
4790+
}
4791+
47814792
public static class TestGetVersionRemovedImpl implements TestWorkflow1 {
47824793

47834794
@Override
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[{"eventId":1,"timestamp":1637197053636160000,"eventType":"WorkflowExecutionStarted","version":0,"taskId":5242880,"workflowExecutionStartedEventAttributes":{"workflowType":{"name":"TestWorkflow1::execute"},"taskList":{"name":"HelloTest"},"input":"IkhlbGxvVGVzdCI=","executionStartToCloseTimeoutSeconds":10,"taskStartToCloseTimeoutSeconds":10,"continuedExecutionRunId":"","originalExecutionRunId":"a315790c-d98a-41bb-8ce6-221fafe80f6e","identity":"","firstExecutionRunId":"a315790c-d98a-41bb-8ce6-221fafe80f6e","attempt":0,"cronSchedule":"","firstDecisionTaskBackoffSeconds":0}},{"eventId":2,"timestamp":1637197053636307800,"eventType":"DecisionTaskScheduled","version":0,"taskId":5242881,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"HelloTest"},"startToCloseTimeoutSeconds":10,"attempt":0}},{"eventId":3,"timestamp":1637197053668023400,"eventType":"DecisionTaskStarted","version":0,"taskId":5242886,"decisionTaskStartedEventAttributes":{"scheduledEventId":2,"identity":"23953@IT-USA-25920","requestId":"feb07c3e-be7d-4daa-a384-a4743829b5df"}},{"eventId":4,"timestamp":1637197053834026300,"eventType":"DecisionTaskCompleted","version":0,"taskId":5242889,"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3,"identity":"23953@IT-USA-25920","binaryChecksum":""}},{"eventId":5,"timestamp":1637197053834069100,"eventType":"MarkerRecorded","version":0,"taskId":5242890,"markerRecordedEventAttributes":{"markerName":"Version","details":"MQ==","decisionTaskCompletedEventId":4,"header":{"fields":{"MutableMarkerHeader":"eyJpZCI6ImNpZDEiLCJldmVudElkIjo1LCJhY2Nlc3NDb3VudCI6MH0="}}}},{"eventId":6,"timestamp":1637197053834139100,"eventType":"UpsertWorkflowSearchAttributes","version":0,"taskId":5242891,"upsertWorkflowSearchAttributesEventAttributes":{"decisionTaskCompletedEventId":4,"searchAttributes":{"indexedFields":{"CadenceChangeVersion":"WyJjaWQxLTEiXQ=="}}}},{"eventId":7,"timestamp":1637197053834178300,"eventType":"ActivityTaskScheduled","version":0,"taskId":5242892,"activityTaskScheduledEventAttributes":{"activityId":"0","activityType":{"name":"customActivity1"},"taskList":{"name":"HelloTest"},"input":"MQ==","scheduleToCloseTimeoutSeconds":5,"scheduleToStartTimeoutSeconds":5,"startToCloseTimeoutSeconds":10,"heartbeatTimeoutSeconds":5,"decisionTaskCompletedEventId":4}},{"eventId":8,"timestamp":1637197053854447700,"eventType":"ActivityTaskStarted","version":0,"taskId":5242897,"activityTaskStartedEventAttributes":{"scheduledEventId":7,"identity":"23953@IT-USA-25920","requestId":"0593a6a2-a3f7-4b08-bf1e-eeff1b955fb6","attempt":0,"lastFailureReason":""}},{"eventId":9,"timestamp":1637197053882546000,"eventType":"ActivityTaskCompleted","version":0,"taskId":5242900,"activityTaskCompletedEventAttributes":{"result":"MQ==","scheduledEventId":7,"startedEventId":8,"identity":"23953@IT-USA-25920"}},{"eventId":10,"timestamp":1637197053882619200,"eventType":"DecisionTaskScheduled","version":0,"taskId":5242902,"decisionTaskScheduledEventAttributes":{"taskList":{"name":"sticky:IT-USA-25920:523a6c6c-f5e9-4ba9-8760-ffb9b6352089"},"startToCloseTimeoutSeconds":10,"attempt":0}},{"eventId":11,"timestamp":1637197053898561900,"eventType":"DecisionTaskStarted","version":0,"taskId":5242906,"decisionTaskStartedEventAttributes":{"scheduledEventId":10,"identity":"523a6c6c-f5e9-4ba9-8760-ffb9b6352089","requestId":"762d468d-36a5-48d8-b0cb-ba1f8de49175"}},{"eventId":12,"timestamp":1637197053932341600,"eventType":"DecisionTaskCompleted","version":0,"taskId":5242909,"decisionTaskCompletedEventAttributes":{"scheduledEventId":10,"startedEventId":11,"identity":"23953@IT-USA-25920","binaryChecksum":""}},{"eventId":13,"timestamp":1637197053932522000,"eventType":"WorkflowExecutionCompleted","version":0,"taskId":5242910,"workflowExecutionCompletedEventAttributes":{"result":"ImhlbGxvMSI=","decisionTaskCompletedEventId":12}}]

0 commit comments

Comments
 (0)