Skip to content

Commit 5abf005

Browse files
authored
Merge pull request #696 from fjtirado/Fix_#695
[Fix #695] Suspending/cancelling/resume
2 parents f423335 + 396029d commit 5abf005

File tree

11 files changed

+230
-62
lines changed

11 files changed

+230
-62
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ public WorkflowPosition position() {
122122
return position;
123123
}
124124

125-
@Override
126125
public Map<String, Object> variables() {
127126
return contextVariables;
128127
}
@@ -132,7 +131,6 @@ public Instant startedAt() {
132131
return startedAt;
133132
}
134133

135-
@Override
136134
public Optional<TaskContext> parent() {
137135
return parentContext;
138136
}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
1919
import java.time.Instant;
20-
import java.util.Map;
21-
import java.util.Optional;
2220

2321
public interface TaskContextData {
2422

@@ -34,12 +32,8 @@ public interface TaskContextData {
3432

3533
WorkflowPosition position();
3634

37-
Map<String, Object> variables();
38-
3935
Instant startedAt();
4036

41-
Optional<TaskContext> parent();
42-
4337
String taskName();
4438

4539
Instant completedAt();

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@
1919

2020
public interface WorkflowInstance extends WorkflowInstanceData {
2121
CompletableFuture<WorkflowModel> start();
22+
23+
boolean suspend();
24+
25+
boolean cancel();
26+
27+
boolean resume();
2228
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;
1919

2020
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
21+
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
2122
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
2223
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2324
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
2425
import java.time.Instant;
2526
import java.util.Optional;
27+
import java.util.concurrent.CancellationException;
2628
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CompletionException;
2730
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
2833

2934
public class WorkflowMutableInstance implements WorkflowInstance {
3035

@@ -36,7 +41,11 @@ public class WorkflowMutableInstance implements WorkflowInstance {
3641
private Instant startedAt;
3742
private Instant completedAt;
3843
private volatile WorkflowModel output;
44+
private Lock statusLock = new ReentrantLock();
3945
private CompletableFuture<WorkflowModel> completableFuture;
46+
private CompletableFuture<TaskContext> suspended;
47+
private TaskContext suspendedTask;
48+
private CompletableFuture<TaskContext> cancelled;
4049

4150
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
4251
this.id = definition.application().idFactory().get();
@@ -70,7 +79,17 @@ public CompletableFuture<WorkflowModel> start() {
7079
private void whenFailed(WorkflowModel result, Throwable ex) {
7180
completedAt = Instant.now();
7281
if (ex != null) {
73-
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.FAULTED);
82+
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
83+
}
84+
}
85+
86+
private void handleException(Throwable ex) {
87+
if (ex instanceof CancellationException) {
88+
status.set(WorkflowStatus.CANCELLED);
89+
publishEvent(
90+
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
91+
} else {
92+
status.set(WorkflowStatus.FAULTED);
7493
publishEvent(
7594
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
7695
}
@@ -146,4 +165,76 @@ public String toString() {
146165
+ completedAt
147166
+ "]";
148167
}
168+
169+
@Override
170+
public boolean suspend() {
171+
try {
172+
statusLock.lock();
173+
if (TaskExecutorHelper.isActive(status.get())) {
174+
suspended = new CompletableFuture<TaskContext>();
175+
return true;
176+
} else {
177+
return false;
178+
}
179+
} finally {
180+
statusLock.unlock();
181+
}
182+
}
183+
184+
@Override
185+
public boolean resume() {
186+
try {
187+
statusLock.lock();
188+
if (suspended != null) {
189+
if (suspendedTask != null) {
190+
CompletableFuture<TaskContext> toBeCompleted = suspended;
191+
suspended = null;
192+
toBeCompleted.complete(suspendedTask);
193+
} else {
194+
suspended = null;
195+
}
196+
return true;
197+
} else {
198+
return false;
199+
}
200+
} finally {
201+
statusLock.unlock();
202+
}
203+
}
204+
205+
public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
206+
try {
207+
statusLock.lock();
208+
if (suspended != null) {
209+
suspendedTask = t;
210+
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
211+
return suspended;
212+
}
213+
if (cancelled != null) {
214+
cancelled = new CompletableFuture<TaskContext>();
215+
workflowContext.instance().status(WorkflowStatus.CANCELLED);
216+
cancelled.completeExceptionally(
217+
new CancellationException("Task " + t.taskName() + " has been cancelled"));
218+
return cancelled;
219+
}
220+
} finally {
221+
statusLock.unlock();
222+
}
223+
return CompletableFuture.completedFuture(t);
224+
}
225+
226+
@Override
227+
public boolean cancel() {
228+
try {
229+
statusLock.lock();
230+
if (TaskExecutorHelper.isActive(status.get())) {
231+
cancelled = new CompletableFuture<TaskContext>();
232+
return true;
233+
} else {
234+
return false;
235+
}
236+
} finally {
237+
statusLock.unlock();
238+
}
239+
}
149240
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public enum WorkflowStatus {
2121
WAITING,
2222
COMPLETED,
2323
FAULTED,
24-
CANCELLED
24+
CANCELLED,
25+
SUSPENDED
2526
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.serverlessworkflow.impl.WorkflowPosition;
3535
import io.serverlessworkflow.impl.WorkflowPredicate;
3636
import io.serverlessworkflow.impl.WorkflowStatus;
37+
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
3738
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
3839
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
3940
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
@@ -43,7 +44,9 @@
4344
import java.util.Iterator;
4445
import java.util.Map;
4546
import java.util.Optional;
47+
import java.util.concurrent.CancellationException;
4648
import java.util.concurrent.CompletableFuture;
49+
import java.util.concurrent.CompletionException;
4750

4851
public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {
4952

@@ -201,13 +204,16 @@ public CompletableFuture<TaskContext> apply(
201204
return t;
202205
})
203206
.thenCompose(t -> execute(workflowContext, t))
207+
.thenCompose(t -> workflowContext.instance().completedChecks(t))
204208
.whenComplete(
205209
(t, e) -> {
206210
if (e != null) {
207-
publishEvent(
211+
handleException(
208212
workflowContext,
209-
l ->
210-
l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
213+
taskContext,
214+
e instanceof CompletionException ? e.getCause() : e);
215+
} else {
216+
workflowContext.instance().status(WorkflowStatus.RUNNING);
211217
}
212218
})
213219
.thenApply(
@@ -235,6 +241,19 @@ public CompletableFuture<TaskContext> apply(
235241
}
236242
}
237243

244+
private void handleException(
245+
WorkflowContext workflowContext, TaskContext taskContext, Throwable e) {
246+
if (e instanceof CancellationException) {
247+
publishEvent(
248+
workflowContext,
249+
l -> l.onTaskCancelled(new TaskCancelledEvent(workflowContext, taskContext)));
250+
} else {
251+
publishEvent(
252+
workflowContext,
253+
l -> l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e)));
254+
}
255+
}
256+
238257
protected abstract TransitionInfo getSkipTransition();
239258

240259
protected abstract CompletableFuture<TaskContext> execute(

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ protected CompletableFuture<WorkflowModel> internalExecute(
238238
processCe(converter.apply(ce), output, workflow, taskContext, future)))
239239
.thenApply(
240240
v -> {
241-
workflow.instance().status(WorkflowStatus.RUNNING);
242241
registrations.forEach(reg -> eventConsumer.unregister(reg));
243242
return output;
244243
});

impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ protected CompletableFuture<WorkflowModel> internalExecute(
7676
((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING);
7777
return new CompletableFuture<WorkflowModel>()
7878
.completeOnTimeout(taskContext.output(), millisToWait.toMillis(), TimeUnit.MILLISECONDS)
79-
.thenApply(
80-
node -> {
81-
workflow.instance().status(WorkflowStatus.RUNNING);
82-
return node;
83-
});
79+
.thenApply(this::complete);
80+
}
81+
82+
private WorkflowModel complete(WorkflowModel model) {
83+
return model;
8484
}
8585
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2222
import java.io.PrintWriter;
2323
import java.io.StringWriter;
24-
import java.util.concurrent.CompletionException;
2524

2625
public record WorkflowErrorCEData(
2726
String type, Integer status, String instance, String title, String detail) {
@@ -35,10 +34,6 @@ public static WorkflowErrorCEData error(WorkflowFailedEvent ev) {
3534
}
3635

3736
private static WorkflowErrorCEData error(Throwable cause) {
38-
39-
if (cause instanceof CompletionException) {
40-
cause = cause.getCause();
41-
}
4237
return cause instanceof WorkflowException ex ? error(ex) : commonError(cause);
4338
}
4439

0 commit comments

Comments
 (0)