Skip to content

Commit 634d6cb

Browse files
foguspuredanger
authored andcommitted
ASYNC-262: Added Java system property clojure.core.async.vthreads to control how core.async uses JDK 21+ virtual threads. When the user wishes to target vthreads then core.async will use a vthreads ExecutorService for :io workloads, including the io-thread macro. Also, the go macro will use io-thread as its basis instead of the IOC machinery. Also added a runtime check for vthreads in the case where AOT compiles to vthreads. Finally, added requiring-resolve use in the go macro to load the IOC machinery in the go macro when needed and also when supported by the CLJ version. As a backup to this, added a check at the c.a top level to load the ioc support if the CLJ version doesn't allow nested compilation patterns.
1 parent 8cc877f commit 634d6cb

File tree

3 files changed

+139
-34
lines changed

3 files changed

+139
-34
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,34 @@ core.async. If not supplied the ExecutorService for :io will be
4040
used instead.
4141
4242
The set of contexts may grow in the future so the function should
43-
return nil for unexpected contexts."
43+
return nil for unexpected contexts.
44+
45+
Use the Java system property `clojure.core.async.vthreads` to control
46+
how core.async uses JDK 21+ virtual threads. The property can be one of
47+
the following values:
48+
49+
unset - core.async will opportunistically use vthreads when available
50+
(≥ Java 21) and will otherwise use the old IOC impl. io-thread and :io
51+
thread pool will run on platform threads if vthreads are not available.
52+
If AOT compiling, go blocks will always use IOC so that the resulting
53+
bytecode works on all JVMs (so no change in compiled output)
54+
55+
\"target\" - means that you are targeting virtual threads. At runtime
56+
from source, go blocks will throw if vthreads are not available.
57+
If AOT compiling, go blocks are always compiled as normal Clojure
58+
code to be run on vthreads and will throw at runtime if vthreads are
59+
not available (Java <21)
60+
61+
\"avoid\" - means that vthreads will not be used by core.async - you can
62+
use this to minimize impacts if you are not yet ready to utilize vthreads
63+
in your app. If AOT compiling, go blocks will use IOC. At runtime, io-thread
64+
and the :io thread pool use platform threads
65+
66+
Note: existing IOC compiled go blocks from older core.async versions continue
67+
to work (we retain and load the IOC state machine runtime - this does not
68+
require the analyzer), and you can interact with the same channels from both
69+
IOC and vthread code.
70+
"
4471
(:refer-clojure :exclude [reduce transduce into merge map take partition
4572
partition-by bounded-count])
4673
(:require [clojure.core.async.impl.protocols :as impl]
@@ -49,14 +76,18 @@ return nil for unexpected contexts."
4976
[clojure.core.async.impl.timers :as timers]
5077
[clojure.core.async.impl.dispatch :as dispatch]
5178
[clojure.core.async.impl.ioc-macros :as ioc]
52-
clojure.core.async.impl.go ;; TODO: make conditional
5379
[clojure.core.async.impl.mutex :as mutex]
5480
)
5581
(:import [java.util.concurrent.atomic AtomicLong]
5682
[java.util.concurrent.locks Lock]
5783
[java.util.concurrent ThreadLocalRandom]
5884
[java.util Arrays ArrayList]))
5985

86+
(def ^:private lazy-loading-supported? (dispatch/at-least-clojure-version? [1 12 3]))
87+
88+
(when-not lazy-loading-supported?
89+
(require 'clojure.core.async.impl.go))
90+
6091
(alias 'core 'clojure.core)
6192

6293
(set! *warn-on-reflection* false)
@@ -138,6 +169,21 @@ return nil for unexpected contexts."
138169
[^long msecs]
139170
(timers/timeout msecs))
140171

172+
(defn- defparkingop* [op doc arglist]
173+
(let [as (mapv #(list 'quote %) arglist)
174+
blockingop (-> op name (str "!") symbol)]
175+
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
176+
(fn [~'& ~'args]
177+
(if (dispatch/in-vthread?)
178+
~(list* apply blockingop '[args])
179+
(assert nil ~(str op " used not in (go ...) block")))))))
180+
181+
(defmacro defparkingop
182+
"Emits a Var with a function that checks if it's running in a virtual thread. If so then
183+
the related blocking op will be called, otherwise the function throws."
184+
[op doc arglist]
185+
(defparkingop* op doc arglist))
186+
141187
(defmacro defblockingop
142188
[op doc arglist & body]
143189
(let [as (mapv #(list 'quote %) arglist)]
@@ -162,11 +208,11 @@ return nil for unexpected contexts."
162208
@ret
163209
(deref p))))
164210

165-
(defn <!
166-
"takes a val from port. Must be called inside a (go ...) block. Will
167-
return nil if closed. Will park if nothing is available."
168-
[port]
169-
(assert nil "<! used not in (go ...) block"))
211+
(defparkingop <!
212+
"takes a val from port. Must be called inside a (go ...) block, or on
213+
a virtual thread (no matter how it was started). Will return nil if
214+
closed. Will park if nothing is available."
215+
[port])
170216

171217
(defn take!
172218
"Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -201,12 +247,12 @@ return nil for unexpected contexts."
201247
@ret
202248
(deref p))))
203249

204-
(defn >!
250+
(defparkingop >!
205251
"puts a val into port. nil values are not allowed. Must be called
206-
inside a (go ...) block. Will park if no buffer space is available.
252+
inside a (go ...) block, or on a virtual thread (no matter how it
253+
was started). Will park if no buffer space is available.
207254
Returns true unless port is already closed."
208-
[port val]
209-
(assert nil ">! used not in (go ...) block"))
255+
[port val])
210256

211257
(defn- nop [_])
212258
(def ^:private fhnop (fn-handler nop))
@@ -344,16 +390,16 @@ return nil for unexpected contexts."
344390
@ret
345391
(deref p))))
346392

347-
(defn alts!
393+
(defparkingop alts!
348394
"Completes at most one of several channel operations. Must be called
349-
inside a (go ...) block. ports is a vector of channel endpoints,
350-
which can be either a channel to take from or a vector of
351-
[channel-to-put-to val-to-put], in any combination. Takes will be
352-
made as if by <!, and puts will be made as if by >!. Unless
353-
the :priority option is true, if more than one port operation is
354-
ready a non-deterministic choice will be made. If no operation is
355-
ready and a :default value is supplied, [default-val :default] will
356-
be returned, otherwise alts! will park until the first operation to
395+
inside a (go ...) block, or on a virtual thread (no matter how it was
396+
started). ports is a vector of channel endpoints, which can be either
397+
a channel to take from or a vector of [channel-to-put-to val-to-put],
398+
in any combination. Takes will be made as if by <!, and puts will be
399+
made as if by >!. Unless the :priority option is true, if more than one
400+
port operation is ready a non-deterministic choice will be made. If no
401+
operation is ready and a :default value is supplied, [default-val :default]
402+
will be returned, otherwise alts! will park until the first operation to
357403
become ready completes. Returns [val port] of the completed
358404
operation, where val is the value taken for takes, and a
359405
boolean (true unless already closed, as per put!) for puts.
@@ -367,8 +413,7 @@ return nil for unexpected contexts."
367413
used, nor in what order should they be, so they should not be
368414
depended upon for side effects."
369415

370-
[ports & {:as opts}]
371-
(assert nil "alts! used not in (go ...) block"))
416+
[ports & {:as opts}])
372417

373418
(defn do-alt [alts clauses]
374419
(assert (even? (count clauses)) "unbalanced clauses")
@@ -471,6 +516,22 @@ return nil for unexpected contexts."
471516
(let [ret (impl/take! port (fn-handler nop false))]
472517
(when ret @ret)))
473518

519+
(defn- go* [body env]
520+
(cond (and (not dispatch/virtual-threads-available?)
521+
dispatch/target-vthreads?
522+
(not clojure.core/*compile-files*))
523+
(dispatch/report-vthreads-not-available-error!)
524+
525+
(or dispatch/target-vthreads?
526+
(and dispatch/unset-vthreads?
527+
dispatch/virtual-threads-available?
528+
(not clojure.core/*compile-files*)))
529+
`(do (dispatch/ensure-runtime-vthreads!)
530+
(thread-call (^:once fn* [] ~@body) :io))
531+
532+
:else
533+
((requiring-resolve 'clojure.core.async.impl.go/go-impl) env body)))
534+
474535
(defmacro go
475536
"Asynchronously executes the body, returning immediately to the
476537
calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
@@ -487,7 +548,7 @@ return nil for unexpected contexts."
487548
Returns a channel which will receive the result of the body when
488549
completed"
489550
[& body]
490-
(#'clojure.core.async.impl.go/go-impl &env body))
551+
(go* body &env))
491552

492553
(defonce ^:private thread-macro-executor nil)
493554

src/main/clojure/clojure/core/async/impl/dispatch.clj

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,63 @@
7272
[workload]
7373
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workload) "-%d") true)))
7474

75+
(defn at-least-clojure-version?
76+
[[maj min incr]]
77+
(let [{:keys [major minor incremental]} *clojure-version*]
78+
(not (neg? (compare [major minor incremental] [maj min incr])))))
79+
80+
(def virtual-threads-available?
81+
(try
82+
(Class/forName "java.lang.Thread$Builder$OfVirtual")
83+
true
84+
(catch ClassNotFoundException _
85+
false)))
86+
87+
(defn- vthreads-directive
88+
"Retrieves the value of the sysprop clojure.core.async.vthreads."
89+
[]
90+
(System/getProperty "clojure.core.async.vthreads"))
91+
92+
(def target-vthreads?
93+
(= (vthreads-directive) "target"))
94+
95+
(def unset-vthreads?
96+
(nil? (vthreads-directive)))
97+
98+
(def vthreads-available-and-allowed?
99+
(and virtual-threads-available?
100+
(not= (vthreads-directive) "avoid")))
101+
102+
(def ^:private virtual-thread?
103+
(if virtual-threads-available?
104+
(eval `(fn [^Thread t#] (~'.isVirtual t#)))
105+
(constantly false)))
106+
107+
(defn in-vthread? []
108+
(and virtual-threads-available?
109+
(virtual-thread? (Thread/currentThread))))
110+
111+
(defn report-vthreads-not-available-error! []
112+
(throw (ex-info "Code compiled to target virtual threads, but is running without vthread support."
113+
{:runtime-jvm-version (System/getProperty "java.version")
114+
:vthreads-directive (vthreads-directive)})))
115+
116+
(defn ensure-runtime-vthreads! []
117+
(when (not vthreads-available-and-allowed?)
118+
(report-vthreads-not-available-error!)))
119+
120+
(defn- make-io-executor
121+
[]
122+
(if vthreads-available-and-allowed?
123+
(-> (.getDeclaredMethod Executors "newVirtualThreadPerTaskExecutor" (make-array Class 0))
124+
(.invoke nil (make-array Class 0)))
125+
(make-ctp-named :io)))
126+
75127
(defn ^:private create-default-executor
76128
[workload]
77129
(case workload
78130
:compute (make-ctp-named :compute)
79-
:io (make-ctp-named :io)
131+
:io (make-io-executor)
80132
:mixed (make-ctp-named :mixed)))
81133

82134
(def executor-for

src/test/clojure/clojure/core/async_test.clj

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,7 @@
193193
(io-thread (>!! c2 (clojure.string/upper-case (<!! c1))))
194194
(io-thread (>!! c3 (clojure.string/reverse (<!! c2))))
195195
(>!! c1 "loop")
196-
(is (= "POOL" (<!! c3)))))
197-
(testing "io-thread parking op should fail"
198-
(let [c1 (chan)]
199-
(io-thread
200-
(try
201-
(>! c1 :no)
202-
(catch AssertionError _
203-
(>!! c1 :yes))))
204-
(is (= :yes (<!! c1))))))
196+
(is (= "POOL" (<!! c3))))))
205197

206198
(deftest ops-tests
207199
(testing "map<"
@@ -488,4 +480,4 @@
488480
(thrown? AssertionError
489481
(let [c1 (a/chan)
490482
c2 (a/chan)]
491-
(a/alts!! [c1 [c2 nil]])))))
483+
(a/alts!! [c1 [c2 nil]])))))

0 commit comments

Comments
 (0)