Skip to content

Commit bf2dba3

Browse files
committed
added example of Structured Concurrency in Java 19
1 parent 0857330 commit bf2dba3

6 files changed

+151
-6
lines changed

java-19/README.md

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ To run each example use: `java --enable-preview --source 19 <FileName.java>`
1212
* definition: guard is the boolean expression, guarded pattern is the case with guard
1313
* guarded pattern: `case Hero h when h.getCity() == Cities.NEW_YORK`
1414
* guard: `h.getCity() == Cities.NEW_YORK`
15+
* Record patterns
16+
* added suport to deconstruct record values in pattern matcher
17+
* record pattern: `Point(int x, int y)`
18+
* now we can use type pattern and record pattern together
19+
* we can check the type and extract the record components using `instanceof` operator
20+
* `o instanceOf Point(int x, int y)`
1521
* Virtual Threads:
1622
* also called user-mode threads or [fiber](https://en.wikipedia.org/wiki/Fiber_(computer_science))
1723
* more notes about Project Loom [here](../java-loom/)
@@ -67,12 +73,27 @@ To run each example use: `java --enable-preview --source 19 <FileName.java>`
6773
* `Thread.ofPlatform().start(() -> {})`;
6874
* **do not use** any cached method from `Executors`.
6975
* [here](platform-thread-vs-virtual-thread.md) is some details about the Platform Thread vs Virtual Thread examples
70-
* Record patterns
71-
* added suport to deconstruct record values in pattern matcher
72-
* record pattern: `Point(int x, int y)`
73-
* now we can use type pattern and record pattern together
74-
* we can check the type and extract the record components using `instanceof` operator
75-
* `o instanceOf Point(int x, int y)`
76+
* Structured Concurrency
77+
* goal is simplify multithreaded programming;
78+
* treats multiple tasks running in different threads as a single unit of work:
79+
* streamlining error handling and cancellation
80+
* improving reliability
81+
* enhancing observability
82+
* steps to use:
83+
* create a scope (using `StructuredTaskScope`);
84+
* fork concurrent subtasks in the scope:
85+
* any subtask or the scope owner can call `shutdown` to request cancelation of all remaining subtasks;
86+
* scope's owner joins the scope:
87+
* blocks until all subtasks either complete (sucessfully or not) or any call shutdown;
88+
* after joining, handle any errors and process theirs results;
89+
* close the scope (implicity when using try-with-resources).
90+
* characteristics:
91+
* each fork creates its own thread (by default is a virtual thread)
92+
* allow nested scope (when creating a scope in a subtask)
93+
* shutdown causes the threads of all forks that are still active in the scope to be interrupted
94+
* when calling `close`, it propagates the the state of the unit of work (either awaits all subtasks to finish or cancel each one of them)
95+
* [output from the structured concurrency example](structured-concurrency-example.md):
96+
7697

7798
## JEPs
7899

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import java.util.concurrent.ExecutionException;
2+
import java.util.concurrent.Executors;
3+
import java.util.concurrent.Future;
4+
5+
import javax.management.RuntimeErrorException;
6+
7+
import jdk.incubator.concurrent.StructuredTaskScope;
8+
9+
/**
10+
* This class shows an example of unstructured concurrency shown in the JEP.
11+
*
12+
* To run: `java --source 19 --enable-preview --add-modules jdk.incubator.concurrent StructuredConcurrencyMotivation.java`
13+
*/
14+
public class StructuredConcurrencyMotivation {
15+
public static void main(String[] args) throws Exception {
16+
var example = new StructuredConcurrencyMotivation();
17+
18+
System.out.println("\nExecutorService:");
19+
try {
20+
// at subtask exception, the other subtasks will continue to run
21+
System.out.println(example.unstructuredHandle());
22+
} catch (Exception ex) {
23+
ex.printStackTrace();
24+
}
25+
26+
System.out.println("\nStructuredTaskScope:");
27+
try {
28+
// at subtask failure, we can see other subtasks will be canceled (and return much earlier)
29+
System.out.println(example.structuredHandle());
30+
} catch (Exception ex) {
31+
ex.printStackTrace();
32+
}
33+
}
34+
35+
/**
36+
* Problems:
37+
* - If findUser throws, then handle will throw when calling user.get(). fetchOrder will continue to run in its own
38+
* thread even after handle has failed. This is a thread leak, which, at best, wastes resources;
39+
* at worst, fetchOrder will interfere with other tasks.
40+
* - If the thread executing handle is interrupted, the interruption is not propagated to subtasks. Both the findUser
41+
* and fetchOrder threads will leak, continuing to run even after handle has failed.
42+
* - If findUser takes a long time to execute, but fetchOrder fails in the meantime, then handle will wait unnecessarily
43+
* for findUser by blocking on user.get() rather than canceling it. Only after findUser completes and user.get()
44+
* returns will order.get() throw, causing handle to fail.
45+
*/
46+
Response unstructuredHandle() throws ExecutionException, InterruptedException {
47+
var es = Executors.newCachedThreadPool();
48+
try {
49+
Future<String> user = es.submit(() -> findUser());
50+
Future<Integer> order = es.submit(() -> fetchOrder());
51+
String theUser = user.get(); // Join findUser
52+
int theOrder = order.get(); // Join fetchOrder
53+
return new Response(theUser, theOrder);
54+
} finally {
55+
es.shutdown();
56+
}
57+
}
58+
59+
/**
60+
* Benefits:
61+
* - Error handling with short-circuiting: If either findUser or fetchOrder fail, the other will be cancelled if it
62+
* hasn't yet completed (this is managed by the cancellation policy implemented by ShutdownOnFailure;
63+
* other policies are possible too).
64+
* - Cancellation Propagation: If the thread running handle is interrupted before or during the call to join, both
65+
* forks will be automatically cancelled when the scope is exited.
66+
* - Clarity: The above code has a clear structure: set up the child subtasks, wait for them (either to complete
67+
* or to becanceled), and then decide whether to succeed (and process the results of the child tasks, which are
68+
* already finished) or fail (and the subtasks are already finished, so there's nothing more to clean up.)
69+
* - Observability: A thread dump, as described below, will clearly demonstrate the task hierarchy, with the threads
70+
* running findUser and fetchOrder shown as children of the scope.
71+
*/
72+
Response structuredHandle() throws ExecutionException, InterruptedException {
73+
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
74+
Future<String> user = scope.fork(() -> findUser());
75+
Future<Integer> order = scope.fork(() -> fetchOrder());
76+
77+
scope.join(); // Join both forks
78+
scope.throwIfFailed(); // ... and propagate errors
79+
80+
// Here, both forks have succeeded, so compose their results
81+
return new Response(user.resultNow(), order.resultNow());
82+
}
83+
}
84+
85+
String findUser() {
86+
System.out.println("Finding user");
87+
sleep("Finding user");
88+
System.out.println("Found user");
89+
return "Marley";
90+
}
91+
92+
Integer fetchOrder() {
93+
System.out.println("Fetching order");
94+
// sleep("Fetching order");
95+
throw new RuntimeException();
96+
// System.out.println("Fetched order");
97+
// return Integer.valueOf(42);
98+
}
99+
100+
void sleep(String task) {
101+
try {
102+
Thread.sleep(10000);
103+
} catch (Exception ex) {
104+
System.out.println(task + " canceled");
105+
}
106+
}
107+
}
108+
109+
record Response(String user, Integer order) {}

java-19/img/2022-06-12-17-50-45.png

33.3 KB
Loading

java-19/img/2022-06-12-17-51-30.png

52.7 KB
Loading

java-19/img/2022-06-12-18-08-57.png

200 KB
Loading
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Example of Structured Concurrency
2+
3+
The output that we can see that the `StructuredTaskScope` canceled other subtask (throwing a `InterruptedException`):
4+
5+
![](img/2022-06-12-18-08-57.png)
6+
7+
Thread view of concurrency using `ExecutorService`:
8+
9+
![](img/2022-06-12-17-50-45.png)
10+
11+
Thread view of concurency using `StructuredTaskScope`:
12+
13+
![](img/2022-06-12-17-51-30.png)
14+
15+
We can not the virtual thread running with ForkJoinPool.

0 commit comments

Comments
 (0)