From c3e78528e8d089f0e43f103baa8030361e66a615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Thu, 24 Apr 2025 16:42:49 +0200 Subject: [PATCH 1/4] Lwt.awaken: replace wakeup and wakeup_later TODO: - documentation - tests --- src/core/lwt.ml | 63 ++++++++++++++++++++++++++---------------------- src/core/lwt.mli | 58 +++++++++++--------------------------------- 2 files changed, 48 insertions(+), 73 deletions(-) diff --git a/src/core/lwt.ml b/src/core/lwt.ml index 89aed2170..bf92e7133 100644 --- a/src/core/lwt.ml +++ b/src/core/lwt.ml @@ -1340,6 +1340,16 @@ include Resolution_loop module Resolving : sig + + type deferring = + | Immediatelly + | Dont_care + | Later + + val awaken_result : deferring:deferring -> 'a u -> ('a, exn) result -> unit + val awaken : deferring:deferring -> 'a u -> 'a -> unit + val awaken_exn : deferring:deferring -> 'a u -> exn -> unit + val wakeup_later_result : 'a u -> ('a, exn) result -> unit val wakeup_later : 'a u -> 'a -> unit val wakeup_later_exn : _ u -> exn -> unit @@ -1351,31 +1361,13 @@ sig val cancel : 'a t -> unit end = struct - (* Note that this function deviates from the "ideal" callback deferral - behavior: it runs callbacks directly on the current stack. It should - therefore be possible to cause a stack overflow using this function. *) - let wakeup_general api_function_name r result = - let Internal p = to_internal_resolver r in - let p = underlying p in - match p.state with - | Rejected Canceled -> - () - | Fulfilled _ -> - Printf.ksprintf invalid_arg "Lwt.%s" api_function_name - | Rejected _ -> - Printf.ksprintf invalid_arg "Lwt.%s" api_function_name - - | Pending _ -> - let result = state_of_result result in - let State_may_have_changed p = resolve ~allow_deferring:false p result in - ignore p + type deferring = + | Immediatelly + | Dont_care + | Later - let wakeup_result r result = wakeup_general "wakeup_result" r result - let wakeup r v = wakeup_general "wakeup" r (Ok v) - let wakeup_exn r exn = wakeup_general "wakeup_exn" r (Error exn) - - let wakeup_later_general api_function_name r result = + let awaken_general api_function_name deferring r result = let Internal p = to_internal_resolver r in let p = underlying p in @@ -1390,17 +1382,30 @@ struct | Pending _ -> let result = state_of_result result in let State_may_have_changed p = - resolve ~maximum_callback_nesting_depth:1 p result in + match deferring with + | Immediatelly -> resolve ~allow_deferring:false p result + | Dont_care -> resolve ~maximum_callback_nesting_depth:1 p result + | Later -> resolve ~maximum_callback_nesting_depth:min_int p result + in ignore p + let awaken_result ~deferring r result = + awaken_general "awaken_result" deferring r result + let awaken ~deferring r v = + awaken_general "awaken" deferring r (Ok v) + let awaken_exn ~deferring r exn = + awaken_general "awaken_exn" deferring r (Error exn) + + let wakeup_result r result = awaken_general "wakeup_result" Immediatelly r result + let wakeup r v = awaken_general "wakeup" Immediatelly r (Ok v) + let wakeup_exn r exn = awaken_general "wakeup_exn" Immediatelly r (Error exn) + let wakeup_later_result r result = - wakeup_later_general "wakeup_later_result" r result + awaken_general "wakeup_later_result" Dont_care r result let wakeup_later r v = - wakeup_later_general "wakeup_later" r (Ok v) + awaken_general "wakeup_later" Dont_care r (Ok v) let wakeup_later_exn r exn = - wakeup_later_general "wakeup_later_exn" r (Error exn) - - + awaken_general "wakeup_later_exn" Dont_care r (Error exn) type packed_callbacks = | Packed : _ callbacks -> packed_callbacks diff --git a/src/core/lwt.mli b/src/core/lwt.mli index 7598343d8..01d28d22b 100644 --- a/src/core/lwt.mli +++ b/src/core/lwt.mli @@ -407,25 +407,20 @@ val wait : unit -> ('a t * 'a u) (** {3 Resolving} *) -val wakeup_later : 'a u -> 'a -> unit -(** [Lwt.wakeup_later r v] {e fulfills}, with value [v], the {e pending} - {{!t} promise} associated with {{!u} resolver} [r]. This triggers callbacks - attached to the promise. +type deferring = + | Immediatelly + | Dont_care + | Later - If the promise is not pending, [Lwt.wakeup_later] raises - {!Stdlib.Invalid_argument}, unless the promise is {{!Lwt.cancel} canceled}. - If the promise is canceled, [Lwt.wakeup_later] has no effect. +val awaken_result : deferring:deferring -> 'a u -> ('a, exn) result -> unit +val awaken : deferring:deferring -> 'a u -> 'a -> unit +val awaken_exn : deferring:deferring -> 'a u -> exn -> unit - If your program has multiple threads, it is important to make sure that - [Lwt.wakeup_later] (and any similar function) is only called from the main - thread. [Lwt.wakeup_later] can trigger callbacks attached to promises - by the program, and these assume they are running in the main thread. If you - need to communicate from a worker thread to the main thread running Lwt, see - {!Lwt_preemptive} or {!Lwt_unix.send_notification}. *) +val wakeup_later : 'a u -> 'a -> unit +(** [@@ocaml.deprecated "Use awaken ~deferring:Dont_care instead"] *) val wakeup_later_exn : _ u -> exn -> unit -(** [Lwt.wakeup_later_exn r exn] is like {!Lwt.wakeup_later}, except, if the - associated {{!t} promise} is {e pending}, it is {e rejected} with [exn]. *) +(** [@@ocaml.deprecated "Use awaken_exn ~deferring:Dont_care instead"] *) val return : 'a -> 'a t (** [Lwt.return v] creates a new {{!t} promise} that is {e already fulfilled} @@ -1620,15 +1615,7 @@ val of_result : ('a, exn) result -> 'a t rejected with [exn]. *) val wakeup_later_result : 'a u -> ('a, exn) result -> unit -(** [Lwt.wakeup_later_result r result] resolves the pending promise [p] - associated to resolver [r], according to [result]: - - - If [result] is [Ok v], [p] is fulfilled with [v]. - - If [result] is [Error exn], [p] is rejected with [exn]. - - If [p] is not pending, [Lwt.wakeup_later_result] raises - [Stdlib.Invalid_argument _], except if [p] is {{!Lwt.cancel} canceled}. If - [p] is canceled, [Lwt.wakeup_later_result] has no effect. *) +(** [@@ocaml.deprecated "Use awaken_result ~deferring:Dont_care instead"] *) @@ -1762,30 +1749,13 @@ let () = (** {3 Immediate resolving} *) val wakeup : 'a u -> 'a -> unit -(** [Lwt.wakeup r v] is like {!Lwt.wakeup_later}[ r v], except it guarantees - that callbacks associated with [r] will be called immediately, deeper on the - current stack. - - In contrast, {!Lwt.wakeup_later} {e may} call callbacks immediately, or may - queue them for execution on a shallower stack – though still before the next - time Lwt blocks the process on I/O. - - Using this function is discouraged, because calling it in a loop can exhaust - the stack. The loop might be difficult to detect or predict, due to combined - mutually-recursive calls between multiple modules and libraries. - - Also, trying to use this function to guarantee the timing of callback calls - for synchronization purposes is discouraged. This synchronization effect is - obscure to readers. It is better to use explicit promises, or {!Lwt_mutex}, - {!Lwt_condition}, and/or {!Lwt_mvar}. *) +(** [@@ocaml.deprecated "Use awaken ~deferring:Immediatelly instead"] *) val wakeup_exn : _ u -> exn -> unit -(** [Lwt.wakeup_exn r exn] is like {!Lwt.wakeup_later_exn}[ r exn], but has - the same problems as {!Lwt.wakeup}. *) +(** [@@ocaml.deprecated "Use awaken_exn ~deferring:Immediatelly instead"] *) val wakeup_result : 'a u -> ('a, exn) result -> unit -(** [Lwt.wakeup_result r result] is like {!Lwt.wakeup_later_result}[ r result], - but has the same problems as {!Lwt.wakeup}. *) +(** [@@ocaml.deprecated "Use awaken_result ~deferring:Immediatelly instead"] *) From b9007a30962e78a6013ff82b971c5d76adc1b34a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Mon, 28 Apr 2025 14:55:00 +0200 Subject: [PATCH 2/4] doc and tweaks for awaken --- src/core/lwt.ml | 44 ++++++++++++------------ src/core/lwt.mli | 89 +++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 99 insertions(+), 34 deletions(-) diff --git a/src/core/lwt.ml b/src/core/lwt.ml index bf92e7133..8a11d707c 100644 --- a/src/core/lwt.ml +++ b/src/core/lwt.ml @@ -1341,14 +1341,14 @@ include Resolution_loop module Resolving : sig - type deferring = - | Immediatelly - | Dont_care + type ordering = | Later + | Dont_care + | Immediately - val awaken_result : deferring:deferring -> 'a u -> ('a, exn) result -> unit - val awaken : deferring:deferring -> 'a u -> 'a -> unit - val awaken_exn : deferring:deferring -> 'a u -> exn -> unit + val awaken_result : order:ordering -> 'a u -> ('a, exn) result -> unit + val awaken : order:ordering -> 'a u -> 'a -> unit + val awaken_exn : order:ordering -> 'a u -> exn -> unit val wakeup_later_result : 'a u -> ('a, exn) result -> unit val wakeup_later : 'a u -> 'a -> unit @@ -1362,12 +1362,12 @@ sig end = struct - type deferring = - | Immediatelly - | Dont_care + type ordering = | Later + | Dont_care + | Immediately - let awaken_general api_function_name deferring r result = + let awaken_general api_function_name order r result = let Internal p = to_internal_resolver r in let p = underlying p in @@ -1382,23 +1382,23 @@ struct | Pending _ -> let result = state_of_result result in let State_may_have_changed p = - match deferring with - | Immediatelly -> resolve ~allow_deferring:false p result - | Dont_care -> resolve ~maximum_callback_nesting_depth:1 p result + match order with | Later -> resolve ~maximum_callback_nesting_depth:min_int p result + | Dont_care -> resolve ~maximum_callback_nesting_depth:1 p result + | Immediately -> resolve ~allow_deferring:false p result in ignore p - let awaken_result ~deferring r result = - awaken_general "awaken_result" deferring r result - let awaken ~deferring r v = - awaken_general "awaken" deferring r (Ok v) - let awaken_exn ~deferring r exn = - awaken_general "awaken_exn" deferring r (Error exn) + let awaken_result ~order r result = + awaken_general "awaken_result" order r result + let awaken ~order r v = + awaken_general "awaken" order r (Ok v) + let awaken_exn ~order r exn = + awaken_general "awaken_exn" order r (Error exn) - let wakeup_result r result = awaken_general "wakeup_result" Immediatelly r result - let wakeup r v = awaken_general "wakeup" Immediatelly r (Ok v) - let wakeup_exn r exn = awaken_general "wakeup_exn" Immediatelly r (Error exn) + let wakeup_result r result = awaken_general "wakeup_result" Immediately r result + let wakeup r v = awaken_general "wakeup" Immediately r (Ok v) + let wakeup_exn r exn = awaken_general "wakeup_exn" Immediately r (Error exn) let wakeup_later_result r result = awaken_general "wakeup_later_result" Dont_care r result diff --git a/src/core/lwt.mli b/src/core/lwt.mli index 01d28d22b..d28582a3f 100644 --- a/src/core/lwt.mli +++ b/src/core/lwt.mli @@ -407,20 +407,85 @@ val wait : unit -> ('a t * 'a u) (** {3 Resolving} *) -type deferring = - | Immediatelly +(** [ordering] indicates a scheduling strategy for fullfilling or rejection of + promnises associated to resolvers. + + Specifically, when a promise is resolved via a call to {!awaken} (or + {!awaken_exn} or {!awaken_result}), the execution can be ordered in two + distinct ways: + + - [Later]: Resolves the promise later, after the current code reaches a + pause or some I/O + - [Immediatelly]: Resolves the promise immediately, come back to the current + code afterwards + + If you have no preference between those two behaviours, [Dont_care] lets the + scheduler pick depending on internal state. + + To understand the difference, consider the following setup +{[ + let has_happened = ref false ;; + let blocker, resolver = wait () ;; + let task1 = + let* () = blocker in + has_happened := true; (* side-effect happens here *) + Lwt.return () + ;; +]} + + Then, if a separate task resolves [blocker] by way of [resolver], two + different ordering can happen as examplified by: + +{[ + let task2 = + awaken ~order resolver; + (if !has_happened then + (* Using [Later] for [ordering] causes this branch to be taken *) + print_endline "Current code was prioritised over resolved promise" + else + (* Using [Immediatelly] for [ordering] causes this branch to be taken *) + print_endline "Resolved promise code was prioritised over current"); + Lwt.return () + ;; +]} + + *) +type ordering = + | Later (** Prioritise code at current location; queue awakening to come back to it later. *) | Dont_care - | Later + | Immediately (** Prioritise resolving the promise; come back to current code location later. *) + +val awaken : order:ordering -> 'a u -> 'a -> unit +(** [awaken ~order r v] fullfills the pending promise associated to the + resolver [r] with value [v]. + + The scheduling of the fullfillment happens according to the [order] + parameter. See {!type-ordering}. + + If the promise is not pending, [Lwt.awaken] raises + {!Stdlib.Invalid_argument}, unless the promise is {{!Lwt.cancel} canceled}. + If the promise is canceled, [Lwt.awaken] has no effect. + + If your program has multiple threads, it is important to make sure that + [Lwt.wakeup_later] (and any similar function) is only called from the main + thread. [Lwt.wakeup_later] can trigger callbacks attached to promises + by the program, and these assume they are running in the main thread. If you + need to communicate from a worker thread to the main thread running Lwt, see + {!Lwt_preemptive} or {!Lwt_unix.send_notification}. *) + +val awaken_exn : order:ordering -> 'a u -> exn -> unit +(** [awaken_exn] is similar to [awaken] but the promise associated to the + resolver is rejected (rather than fullfilled). *) -val awaken_result : deferring:deferring -> 'a u -> ('a, exn) result -> unit -val awaken : deferring:deferring -> 'a u -> 'a -> unit -val awaken_exn : deferring:deferring -> 'a u -> exn -> unit +val awaken_result : order:ordering -> 'a u -> ('a, exn) result -> unit +(** [awaken_result] is similar to either [awaken] or [awaken_exn] depending on + if the value is [Ok _] or [Error _]. *) val wakeup_later : 'a u -> 'a -> unit -(** [@@ocaml.deprecated "Use awaken ~deferring:Dont_care instead"] *) +(** [@@ocaml.deprecated "Use awaken ~awakening:Dont_care instead"] *) val wakeup_later_exn : _ u -> exn -> unit -(** [@@ocaml.deprecated "Use awaken_exn ~deferring:Dont_care instead"] *) +(** [@@ocaml.deprecated "Use awaken_exn ~awakening:Dont_care instead"] *) val return : 'a -> 'a t (** [Lwt.return v] creates a new {{!t} promise} that is {e already fulfilled} @@ -1615,7 +1680,7 @@ val of_result : ('a, exn) result -> 'a t rejected with [exn]. *) val wakeup_later_result : 'a u -> ('a, exn) result -> unit -(** [@@ocaml.deprecated "Use awaken_result ~deferring:Dont_care instead"] *) +(** [@@ocaml.deprecated "Use awaken_result ~order:Dont_care instead"] *) @@ -1749,13 +1814,13 @@ let () = (** {3 Immediate resolving} *) val wakeup : 'a u -> 'a -> unit -(** [@@ocaml.deprecated "Use awaken ~deferring:Immediatelly instead"] *) +(** [@@ocaml.deprecated "Use awaken ~order:Immediatelly instead"] *) val wakeup_exn : _ u -> exn -> unit -(** [@@ocaml.deprecated "Use awaken_exn ~deferring:Immediatelly instead"] *) +(** [@@ocaml.deprecated "Use awaken_exn ~order:Immediatelly instead"] *) val wakeup_result : 'a u -> ('a, exn) result -> unit -(** [@@ocaml.deprecated "Use awaken_result ~deferring:Immediatelly instead"] *) +(** [@@ocaml.deprecated "Use awaken_result ~order:Immediatelly instead"] *) From 15566e1ab6f62c54824ecb79c9eaea847325ddc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Mon, 28 Apr 2025 20:55:01 +0200 Subject: [PATCH 3/4] doc fixes for awaken --- src/core/lwt.mli | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/lwt.mli b/src/core/lwt.mli index d28582a3f..f48a00d52 100644 --- a/src/core/lwt.mli +++ b/src/core/lwt.mli @@ -416,7 +416,7 @@ val wait : unit -> ('a t * 'a u) - [Later]: Resolves the promise later, after the current code reaches a pause or some I/O - - [Immediatelly]: Resolves the promise immediately, come back to the current + - [Immediately]: Resolves the promise immediately, come back to the current code afterwards If you have no preference between those two behaviours, [Dont_care] lets the @@ -443,7 +443,7 @@ val wait : unit -> ('a t * 'a u) (* Using [Later] for [ordering] causes this branch to be taken *) print_endline "Current code was prioritised over resolved promise" else - (* Using [Immediatelly] for [ordering] causes this branch to be taken *) + (* Using [Immediately] for [ordering] causes this branch to be taken *) print_endline "Resolved promise code was prioritised over current"); Lwt.return () ;; @@ -1814,13 +1814,13 @@ let () = (** {3 Immediate resolving} *) val wakeup : 'a u -> 'a -> unit -(** [@@ocaml.deprecated "Use awaken ~order:Immediatelly instead"] *) +(** [@@ocaml.deprecated "Use awaken ~order:Immediately instead"] *) val wakeup_exn : _ u -> exn -> unit -(** [@@ocaml.deprecated "Use awaken_exn ~order:Immediatelly instead"] *) +(** [@@ocaml.deprecated "Use awaken_exn ~order:Immediately instead"] *) val wakeup_result : 'a u -> ('a, exn) result -> unit -(** [@@ocaml.deprecated "Use awaken_result ~order:Immediatelly instead"] *) +(** [@@ocaml.deprecated "Use awaken_result ~order:Immediately instead"] *) From b0c5a93898cab81b38e30d99cecf08194b940289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Fri, 2 May 2025 14:46:14 +0200 Subject: [PATCH 4/4] tweaks, docs, propagate changes --- docs/manual.mld | 10 ++++----- src/core/lwt.ml | 18 ++++++++--------- src/core/lwt.mli | 22 ++++++++++---------- src/core/lwt_condition.ml | 6 +++--- src/core/lwt_mutex.ml | 4 ++-- src/core/lwt_mvar.ml | 4 ++-- src/core/lwt_pool.ml | 6 +++--- src/core/lwt_stream.ml | 18 ++++++++--------- src/react/lwt_react.ml | 2 +- src/unix/lwt_engine.ml | 4 ++-- src/unix/lwt_io.ml | 14 ++++++------- src/unix/lwt_preemptive.ml | 4 ++-- src/unix/lwt_throttle.ml | 2 +- src/unix/lwt_unix.cppo.ml | 18 ++++++++--------- src/unix/lwt_unix_stubs.c | 2 +- test/core/test_lwt_list.ml | 6 +++--- test/core/test_lwt_mutex.ml | 14 ++++++------- test/core/test_lwt_result.ml | 16 +++++++-------- test/core/test_lwt_stream.ml | 2 +- test/test.ml | 2 +- test/unix/test_lwt_bytes.ml | 4 ++-- test/unix/test_lwt_io.ml | 8 ++++---- test/unix/test_lwt_timeout.ml | 38 +++++++++++++++++------------------ test/unix/test_lwt_unix.ml | 6 +++--- 24 files changed, 115 insertions(+), 115 deletions(-) diff --git a/docs/manual.mld b/docs/manual.mld index df72da241..71a9d8cea 100644 --- a/docs/manual.mld +++ b/docs/manual.mld @@ -154,9 +154,9 @@ val p : char Lwt.t = functions: {ul - {- [Lwt.wakeup : 'a Lwt.u -> 'a -> unit] + {- [Lwt.awaken : order:ordering -> 'a Lwt.u -> 'a -> unit] fulfills the promise with a value.} - {- [Lwt.wakeup_exn : 'a Lwt.u -> exn -> unit] + {- [Lwt.awaken_exn : order:ordering -> 'a Lwt.u -> exn -> unit] rejects the promise with an exception.}} Note that it is an error to try to resolve the same promise twice. [Lwt] @@ -170,11 +170,11 @@ val p : char Lwt.t = # Lwt.state (Lwt.fail Exit);; # let p, r = Lwt.wait ();; # Lwt.state p;; -# Lwt.wakeup r 42;; +# Lwt.awaken ~order:Nested r 42;; # Lwt.state p;; # let p, r = Lwt.wait ();; # Lwt.state p;; -# Lwt.wakeup_exn r Exit;; +# Lwt.awaken_exn ~order:Nested r Exit;; # Lwt.state p;; ]} @@ -323,7 +323,7 @@ val r2 : '_a Lwt.u = val p3 : '_a Lwt.t = # Lwt.state p3;; - : '_a Lwt.state = Lwt.Sleep -# Lwt.wakeup r2 42;; +# Lwt.awaken ~order:Nested r2 42;; - : unit = () # Lwt.state p3;; - : int Lwt.state = Lwt.Return 42 diff --git a/src/core/lwt.ml b/src/core/lwt.ml index 8a11d707c..20778779c 100644 --- a/src/core/lwt.ml +++ b/src/core/lwt.ml @@ -1342,9 +1342,9 @@ module Resolving : sig type ordering = - | Later + | Deferred | Dont_care - | Immediately + | Nested val awaken_result : order:ordering -> 'a u -> ('a, exn) result -> unit val awaken : order:ordering -> 'a u -> 'a -> unit @@ -1363,9 +1363,9 @@ end = struct type ordering = - | Later + | Deferred | Dont_care - | Immediately + | Nested let awaken_general api_function_name order r result = let Internal p = to_internal_resolver r in @@ -1383,9 +1383,9 @@ struct let result = state_of_result result in let State_may_have_changed p = match order with - | Later -> resolve ~maximum_callback_nesting_depth:min_int p result + | Deferred -> resolve ~maximum_callback_nesting_depth:min_int p result | Dont_care -> resolve ~maximum_callback_nesting_depth:1 p result - | Immediately -> resolve ~allow_deferring:false p result + | Nested -> resolve ~allow_deferring:false p result in ignore p @@ -1396,9 +1396,9 @@ struct let awaken_exn ~order r exn = awaken_general "awaken_exn" order r (Error exn) - let wakeup_result r result = awaken_general "wakeup_result" Immediately r result - let wakeup r v = awaken_general "wakeup" Immediately r (Ok v) - let wakeup_exn r exn = awaken_general "wakeup_exn" Immediately r (Error exn) + let wakeup_result r result = awaken_general "wakeup_result" Nested r result + let wakeup r v = awaken_general "wakeup" Nested r (Ok v) + let wakeup_exn r exn = awaken_general "wakeup_exn" Nested r (Error exn) let wakeup_later_result r result = awaken_general "wakeup_later_result" Dont_care r result diff --git a/src/core/lwt.mli b/src/core/lwt.mli index f48a00d52..f960a2f31 100644 --- a/src/core/lwt.mli +++ b/src/core/lwt.mli @@ -414,9 +414,9 @@ val wait : unit -> ('a t * 'a u) {!awaken_exn} or {!awaken_result}), the execution can be ordered in two distinct ways: - - [Later]: Resolves the promise later, after the current code reaches a - pause or some I/O - - [Immediately]: Resolves the promise immediately, come back to the current + - [Deferred]: Resolves the promise later, after the current code reaches a + pause or some I/O. This is often the behaviour you want. It makes the + - [Nested]: Resolves the promise immediately, come back to the current code afterwards If you have no preference between those two behaviours, [Dont_care] lets the @@ -443,7 +443,7 @@ val wait : unit -> ('a t * 'a u) (* Using [Later] for [ordering] causes this branch to be taken *) print_endline "Current code was prioritised over resolved promise" else - (* Using [Immediately] for [ordering] causes this branch to be taken *) + (* Using [Nested] for [ordering] causes this branch to be taken *) print_endline "Resolved promise code was prioritised over current"); Lwt.return () ;; @@ -451,9 +451,9 @@ val wait : unit -> ('a t * 'a u) *) type ordering = - | Later (** Prioritise code at current location; queue awakening to come back to it later. *) + | Deferred (** Prioritise code at current location; queue awakening to come back to it later. *) | Dont_care - | Immediately (** Prioritise resolving the promise; come back to current code location later. *) + | Nested (** Prioritise resolving the promise; come back to current code location later. *) val awaken : order:ordering -> 'a u -> 'a -> unit (** [awaken ~order r v] fullfills the pending promise associated to the @@ -482,10 +482,10 @@ val awaken_result : order:ordering -> 'a u -> ('a, exn) result -> unit if the value is [Ok _] or [Error _]. *) val wakeup_later : 'a u -> 'a -> unit -(** [@@ocaml.deprecated "Use awaken ~awakening:Dont_care instead"] *) +(** [@@ocaml.deprecated "Use awaken ~order:Dont_care instead"] *) val wakeup_later_exn : _ u -> exn -> unit -(** [@@ocaml.deprecated "Use awaken_exn ~awakening:Dont_care instead"] *) +(** [@@ocaml.deprecated "Use awaken_exn ~order:Dont_care instead"] *) val return : 'a -> 'a t (** [Lwt.return v] creates a new {{!t} promise} that is {e already fulfilled} @@ -1814,13 +1814,13 @@ let () = (** {3 Immediate resolving} *) val wakeup : 'a u -> 'a -> unit -(** [@@ocaml.deprecated "Use awaken ~order:Immediately instead"] *) +(** [@@ocaml.deprecated "Use awaken ~order:Nested instead"] *) val wakeup_exn : _ u -> exn -> unit -(** [@@ocaml.deprecated "Use awaken_exn ~order:Immediately instead"] *) +(** [@@ocaml.deprecated "Use awaken_exn ~order:Nested instead"] *) val wakeup_result : 'a u -> ('a, exn) result -> unit -(** [@@ocaml.deprecated "Use awaken_result ~order:Immediately instead"] *) +(** [@@ocaml.deprecated "Use awaken_result ~order:Nested instead"] *) diff --git a/src/core/lwt_condition.ml b/src/core/lwt_condition.ml index 0366dabd5..74b5a4da5 100644 --- a/src/core/lwt_condition.ml +++ b/src/core/lwt_condition.ml @@ -53,16 +53,16 @@ let wait ?mutex cvar = let signal cvar arg = try - Lwt.wakeup_later (Lwt_sequence.take_l cvar) arg + Lwt.awaken ~order:Dont_care (Lwt_sequence.take_l cvar) arg with Lwt_sequence.Empty -> () let broadcast cvar arg = let wakeners = Lwt_sequence.fold_r (fun x l -> x :: l) cvar [] in Lwt_sequence.iter_node_l Lwt_sequence.remove cvar; - List.iter (fun wakener -> Lwt.wakeup_later wakener arg) wakeners + List.iter (fun wakener -> Lwt.awaken ~order:Dont_care wakener arg) wakeners let broadcast_exn cvar exn = let wakeners = Lwt_sequence.fold_r (fun x l -> x :: l) cvar [] in Lwt_sequence.iter_node_l Lwt_sequence.remove cvar; - List.iter (fun wakener -> Lwt.wakeup_later_exn wakener exn) wakeners + List.iter (fun wakener -> Lwt.awaken_exn ~order:Dont_care wakener exn) wakeners diff --git a/src/core/lwt_mutex.ml b/src/core/lwt_mutex.ml index c49694163..a5198c6bd 100644 --- a/src/core/lwt_mutex.ml +++ b/src/core/lwt_mutex.ml @@ -29,9 +29,9 @@ let unlock m = if Lwt_sequence.is_empty m.waiters then m.locked <- false else - (* We do not use [Lwt.wakeup] here to avoid a stack overflow + (* We do not use [~order:Dont_care] here to avoid a stack overflow when unlocking a lot of threads. *) - Lwt.wakeup_later (Lwt_sequence.take_l m.waiters) () + Lwt.awaken ~order:Dont_care (Lwt_sequence.take_l m.waiters) () end let with_lock m f = diff --git a/src/core/lwt_mvar.ml b/src/core/lwt_mvar.ml index f759339ef..c4982393f 100644 --- a/src/core/lwt_mvar.ml +++ b/src/core/lwt_mvar.ml @@ -64,7 +64,7 @@ let put mvar v = | None -> mvar.mvar_contents <- Some v | Some w -> - Lwt.wakeup_later w v + Lwt.awaken ~order:Dont_care w v end; Lwt.return_unit | Some _ -> @@ -77,7 +77,7 @@ let next_writer mvar = match Lwt_sequence.take_opt_l mvar.writers with | Some(v', w) -> mvar.mvar_contents <- Some v'; - Lwt.wakeup_later w () + Lwt.awaken ~order:Dont_care w () | None -> mvar.mvar_contents <- None diff --git a/src/core/lwt_pool.ml b/src/core/lwt_pool.ml index f63e30100..00c9be3d4 100644 --- a/src/core/lwt_pool.ml +++ b/src/core/lwt_pool.ml @@ -62,7 +62,7 @@ let release p c = match Lwt_sequence.take_opt_l p.waiters with | Some wakener -> (* A promise resolver is waiting, give it the pool member. *) - Lwt.wakeup_later wakener c + Lwt.awaken ~order:Dont_care wakener c | None -> (* No one is waiting, queue it. *) Queue.push c p.list @@ -84,10 +84,10 @@ let replace_disposed p = Lwt.on_any (Lwt.apply p.create ()) (fun c -> - Lwt.wakeup_later wakener c) + Lwt.awaken ~order:Dont_care wakener c) (fun exn -> (* Creation failed, notify the waiter of the failure. *) - Lwt.wakeup_later_exn wakener exn) + Lwt.awaken_exn ~order:Dont_care wakener exn) (* Verify a member is still valid before using it. *) let validate_and_return p c = diff --git a/src/core/lwt_stream.ml b/src/core/lwt_stream.ml index d2450f2b7..b8ae4db4b 100644 --- a/src/core/lwt_stream.ml +++ b/src/core/lwt_stream.ml @@ -169,11 +169,11 @@ let create_with_reference () = source.push_signal <- new_waiter; push_signal_resolver := new_push_signal_resolver; (* Signal that a new value has been received. *) - Lwt.wakeup_later old_push_signal_resolver () + Lwt.awaken ~order:Dont_care old_push_signal_resolver () end; (* Do this at the end in case one of the function raise an exception. *) - if x = None then Lwt.wakeup close () + if x = None then Lwt.awaken ~order:Dont_care close () in (t, push, fun x -> source.push_external <- Obj.repr x) @@ -247,7 +247,7 @@ let notify_pusher info last = let waiter, wakener = Lwt.task () in info.pushb_push_waiter <- waiter; info.pushb_push_wakener <- wakener; - Lwt.wakeup_later old_wakener () + Lwt.awaken ~order:Dont_care old_wakener () class ['a] bounded_push_impl (info : 'a push_bounded) wakener_cell last close = object val mutable closed = false @@ -296,7 +296,7 @@ class ['a] bounded_push_impl (info : 'a push_bounded) wakener_cell last close = info.pushb_signal <- new_waiter; wakener_cell := new_wakener; (* Signal that a new value has been received. *) - Lwt.wakeup_later old_wakener () + Lwt.awaken ~order:Dont_care old_wakener () end; Lwt.return_unit end @@ -310,7 +310,7 @@ class ['a] bounded_push_impl (info : 'a push_bounded) wakener_cell last close = last := new_last; if info.pushb_pending <> None then begin info.pushb_pending <- None; - Lwt.wakeup_later_exn info.pushb_push_wakener Closed + Lwt.awaken_exn ~order:Dont_care info.pushb_push_wakener Closed end; (* Send a signal if at least one thread is waiting for a new element. *) @@ -318,9 +318,9 @@ class ['a] bounded_push_impl (info : 'a push_bounded) wakener_cell last close = info.pushb_waiting <- false; let old_wakener = !wakener_cell in (* Signal that a new value has been received. *) - Lwt.wakeup_later old_wakener () + Lwt.awaken ~order:Dont_care old_wakener () end; - Lwt.wakeup close (); + Lwt.awaken ~order:Nested close (); end method count = @@ -376,7 +376,7 @@ let feed s = from.from_create () >>= fun x -> (* Push the element to the end of the queue. *) enqueue x s; - if x = None then Lwt.wakeup s.close (); + if x = None then Lwt.awaken ~order:Nested s.close (); Lwt.return_unit) Lwt.reraise in @@ -388,7 +388,7 @@ let feed s = let x = f () in (* Push the element to the end of the queue. *) enqueue x s; - if x = None then Lwt.wakeup s.close (); + if x = None then Lwt.awaken ~order:Nested s.close (); Lwt.return_unit | Push push -> push.push_waiting <- true; diff --git a/src/react/lwt_react.ml b/src/react/lwt_react.ml index 8d141065c..1491efffb 100644 --- a/src/react/lwt_react.ml +++ b/src/react/lwt_react.ml @@ -24,7 +24,7 @@ module E = struct let next ev = let waiter, wakener = Lwt.task () in - let ev = map (fun x -> Lwt.wakeup wakener x) (once ev) in + let ev = map (fun x -> Lwt.awaken ~order:Nested wakener x) (once ev) in Lwt.on_cancel waiter (fun () -> stop ev); waiter diff --git a/src/unix/lwt_engine.ml b/src/unix/lwt_engine.ml index 20a8eafc7..e3f847367 100644 --- a/src/unix/lwt_engine.ml +++ b/src/unix/lwt_engine.ml @@ -211,7 +211,7 @@ class libev_deprecated = libev () (* Type of a sleeper for the select engine. *) type sleeper = { mutable time : float; - (* The time at which the sleeper should be wakeup. *) + (* The time at which the sleeper should be awoken. *) mutable stopped : bool; (* [true] iff the event has been stopped. *) @@ -275,7 +275,7 @@ class virtual select_or_poll_based = object (* Sleepers added since the last iteration of the main loop: They are not added immediately to the main sleep queue in order - to prevent them from being wakeup immediately. *) + to prevent them from being awoken immediately. *) val mutable wait_readable = Fd_map.empty (* Sequences of actions waiting for file descriptors to become diff --git a/src/unix/lwt_io.ml b/src/unix/lwt_io.ml index 9a24aa02a..f7a8eb1b6 100644 --- a/src/unix/lwt_io.ml +++ b/src/unix/lwt_io.ml @@ -89,7 +89,7 @@ and 'mode _channel = { [length] for output channels. *) abort_waiter : int Lwt.t; - (* Thread which is wakeup with an exception when the channel is + (* Thread which is awoken with an exception when the channel is closed. *) abort_wakener : int Lwt.u; @@ -312,7 +312,7 @@ let auto_flush oc = if wrapper.state = Busy_primitive then wrapper.state <- Idle; if not (Lwt_sequence.is_empty wrapper.queued) then - Lwt.wakeup_later (Lwt_sequence.take_l wrapper.queued) (); + Lwt.awaken ~order:Dont_care (Lwt_sequence.take_l wrapper.queued) (); Lwt.return_unit | Closed | Invalid -> @@ -327,7 +327,7 @@ let unlock : type m. m channel -> unit = fun wrapper -> match wrapper.state with wrapper.state <- Idle else begin wrapper.state <- Waiting_for_busy; - Lwt.wakeup_later (Lwt_sequence.take_l wrapper.queued) () + Lwt.awaken ~order:Dont_care (Lwt_sequence.take_l wrapper.queued) () end; (* Launches the auto-flusher: *) let ch = wrapper.channel in @@ -346,7 +346,7 @@ let unlock : type m. m channel -> unit = fun wrapper -> match wrapper.state with | Closed | Invalid -> (* Do not change channel state if the channel has been closed *) if not (Lwt_sequence.is_empty wrapper.queued) then - Lwt.wakeup_later (Lwt_sequence.take_l wrapper.queued) () + Lwt.awaken ~order:Dont_care (Lwt_sequence.take_l wrapper.queued) () | Idle | Waiting_for_busy -> (* We must never unlock an unlocked channel *) @@ -452,7 +452,7 @@ let rec abort wrapper = match wrapper.state with wrapper.state <- Closed; (* Abort any current real reading/writing operation on the channel: *) - Lwt.wakeup_exn + Lwt.awaken_exn ~order:Nested wrapper.channel.abort_wakener (closed_channel wrapper.channel); Lazy.force wrapper.channel.close @@ -1661,7 +1661,7 @@ let establish_server_generic () end; - Lwt.wakeup_later notify_listening_socket_closed (); + Lwt.awaken ~order:Dont_care notify_listening_socket_closed (); Lwt.return_unit | `Try_again -> accept_loop () @@ -1670,7 +1670,7 @@ let establish_server_generic let server = {shutdown = lazy begin - Lwt.wakeup_later notify_should_stop `Should_stop; + Lwt.awaken ~order:Dont_care notify_should_stop `Should_stop; wait_until_listening_socket_closed end} in diff --git a/src/unix/lwt_preemptive.ml b/src/unix/lwt_preemptive.ml index eacf32f28..bd88f13b7 100644 --- a/src/unix/lwt_preemptive.ml +++ b/src/unix/lwt_preemptive.ml @@ -124,7 +124,7 @@ let add_worker worker = | None -> Queue.add worker workers | Some w -> - Lwt.wakeup w worker + Lwt.awaken ~order:Nested w worker (* Wait for worker to be available, then return it: *) let get_worker () = @@ -187,7 +187,7 @@ let detach f args = let waiter, wakener = Lwt.wait () in let id = Lwt_unix.make_notification ~once:true - (fun () -> Lwt.wakeup_result wakener !result) + (fun () -> Lwt.awaken_result ~order:Nested wakener !result) in Lwt.finalize (fun () -> diff --git a/src/unix/lwt_throttle.ml b/src/unix/lwt_throttle.ml index 33a2db74d..b871906a6 100644 --- a/src/unix/lwt_throttle.ml +++ b/src/unix/lwt_throttle.ml @@ -68,7 +68,7 @@ module Make (H : Hashtbl.HashedType) : (S with type key = H.t) = struct (* the table is empty: we do not need to clean in 1 second *) t.cleaning <- None else launch_cleaning t; - List.iter (fun u -> Lwt.wakeup u true) to_run + List.iter (fun u -> Lwt.awaken ~order:Nested u true) to_run and launch_cleaning t = t.cleaning <- diff --git a/src/unix/lwt_unix.cppo.ml b/src/unix/lwt_unix.cppo.ml index 6fb9f8044..3a5981cb0 100644 --- a/src/unix/lwt_unix.cppo.ml +++ b/src/unix/lwt_unix.cppo.ml @@ -117,7 +117,7 @@ let call_notification id = let sleep delay = let waiter, wakener = Lwt.task () in - let ev = Lwt_engine.on_timer delay false (fun ev -> Lwt_engine.stop_event ev; Lwt.wakeup wakener ()) in + let ev = Lwt_engine.on_timer delay false (fun ev -> Lwt_engine.stop_event ev; Lwt.awaken ~order:Nested wakener ()) in Lwt.on_cancel waiter (fun () -> Lwt_engine.stop_event ev); waiter @@ -196,16 +196,16 @@ let run_job_aux async_method job result = (* Add the job to the sequence of all jobs. *) let node = Lwt_sequence.add_l ( (waiter >>= fun _ -> Lwt.return_unit), - (fun exn -> if Lwt.state waiter = Lwt.Sleep then Lwt.wakeup_exn wakener exn)) + (fun exn -> if Lwt.state waiter = Lwt.Sleep then Lwt.awaken_exn ~order:Nested wakener exn)) jobs in ignore begin - (* Create the notification for asynchronous wakeup. *) + (* Create the notification for asynchronous awaken. *) let id = make_notification ~once:true (fun () -> Lwt_sequence.remove node; let result = result job in - if Lwt.state waiter = Lwt.Sleep then Lwt.wakeup_result wakener result) + if Lwt.state waiter = Lwt.Sleep then Lwt.awaken_result ~order:Nested wakener result) in (* Give the job some time before we fallback to asynchronous notification. *) @@ -495,7 +495,7 @@ let register_writable ch = if ch.event_writable = None then ch.event_writable <- Some(Lwt_engine.on_writable ch.fd (fun _ -> Lwt_sequence.iter_l (fun f -> f ()) ch.hooks_writable)) -(* Retry a queued syscall, [wakener] is the thread to wakeup if the +(* Retry a queued syscall, [wakener] is the thread to awaken if the action succeeds: *) let rec retry_syscall node event ch wakener action = let res = @@ -521,11 +521,11 @@ let rec retry_syscall node event ch wakener action = | Success v -> Lwt_sequence.remove !node; stop_events ch; - Lwt.wakeup wakener v + Lwt.awaken ~order:Nested wakener v | Exn e -> Lwt_sequence.remove !node; stop_events ch; - Lwt.wakeup_exn wakener e + Lwt.awaken_exn ~order:Nested wakener e | Requeued event' -> if event <> event' then begin Lwt_sequence.remove !node; @@ -2369,11 +2369,11 @@ let install_sigchld_handler () = let (pid', _, _) as v = do_wait4 flags pid in if pid' <> 0 then begin Lwt_sequence.remove node; - Lwt.wakeup wakener v + Lwt.awaken ~order:Nested wakener v end with e when Lwt.Exception_filter.run e -> Lwt_sequence.remove node; - Lwt.wakeup_exn wakener e + Lwt.awaken_exn ~order:Nested wakener e end wait_children) end end diff --git a/src/unix/lwt_unix_stubs.c b/src/unix/lwt_unix_stubs.c index 443773bac..96d362e3e 100644 --- a/src/unix/lwt_unix_stubs.c +++ b/src/unix/lwt_unix_stubs.c @@ -1152,7 +1152,7 @@ CAMLprim value lwt_unix_check_job(value val_job, value val_notification_id) { lwt_unix_mutex_lock(&job->mutex); /* We are not waiting anymore. */ job->fast = 0; - /* Set the notification id for asynchronous wakeup. */ + /* Set the notification id for asynchronous awaken. */ job->notification_id = Long_val(val_notification_id); result = Val_bool(job->state == LWT_UNIX_JOB_STATE_DONE); lwt_unix_mutex_unlock(&job->mutex); diff --git a/test/core/test_lwt_list.ml b/test/core/test_lwt_list.ml index 7ea87391d..766398dea 100644 --- a/test/core/test_lwt_list.ml +++ b/test/core/test_lwt_list.ml @@ -29,7 +29,7 @@ let test_iter f test_list = in t' <=> Sleep; List.iter2 (fun v r -> assert (v = !r)) test_list l; - Lwt.wakeup w (); + Lwt.awaken ~order:Nested w (); List.iter2 (fun v r -> assert (v = !r)) [1; 1; 1] l; t' <=> Lwt.Return () in @@ -90,7 +90,7 @@ let test_map f test_list = t3 <=> Lwt.Sleep; Lwt.cancel t'; t3 <=> Lwt.Fail Lwt.Canceled; - Lwt.wakeup w (); + Lwt.awaken ~order:Nested w (); t2 <=> Lwt.Return test_list; in () @@ -98,7 +98,7 @@ let test_map f test_list = let test_parallelism map = let t, w = Lwt.wait () in let g _ = - Lwt.wakeup_later w (); + Lwt.awaken ~order:Dont_care w (); Lwt.return_unit in let f x = if x = 0 then t >>= (fun _ -> Lwt.return_unit) diff --git a/test/core/test_lwt_mutex.ml b/test/core/test_lwt_mutex.ml index ca80386ec..e5938f1b4 100644 --- a/test/core/test_lwt_mutex.ml +++ b/test/core/test_lwt_mutex.ml @@ -33,7 +33,7 @@ let suite = suite "lwt_mutex" [ >>= fun thread_2_canceled -> (* Thread 1: release the mutex. *) - Lwt.wakeup resume_thread_1 (); + Lwt.awaken ~order:Nested resume_thread_1 (); thread_1 >>= fun () -> (* Thread 3: try to take the mutex. Thread 2 should not have it locked, @@ -59,15 +59,15 @@ let suite = suite "lwt_mutex" [ Lwt_mutex.unlock mutex in - (* Thread 3: wrap the wakeup of thread 2 in a wakeup of thread 3. *) + (* Thread 3: wrap the awaken of thread 2 in a awaken of thread 3. *) let top_level_waiter, wake_top_level_waiter = Lwt.wait () in let while_waking = top_level_waiter >>= fun () -> - (* Inside thread 3 wakeup. *) + (* Inside thread 3 awaken. *) (* Thread 1: release the mutex. This queues thread 2 using - wakeup_later inside Lwt_mutex.unlock. *) - Lwt.wakeup resume_thread_1 (); + Dont_care inside Lwt_mutex.unlock. *) + Lwt.awaken ~order:Nested resume_thread_1 (); thread_1 >>= fun () -> (* Confirm the mutex is now considered locked by thread 2. *) @@ -96,11 +96,11 @@ let suite = suite "lwt_mutex" [ in (* Run thread 3. - * Keep this as wakeup_later to test the issue on 2.3.2 reported in + * Keep this as Dont_care to test the issue on 2.3.2 reported in * https://github.com/ocsigen/lwt/pull/202 * See also: * https://github.com/ocsigen/lwt/pull/261 *) - Lwt.wakeup_later wake_top_level_waiter (); + Lwt.awaken ~order:Dont_care wake_top_level_waiter (); while_waking); ] diff --git a/test/core/test_lwt_result.ml b/test/core/test_lwt_result.ml index 2f91ef5e6..b9be0cf1c 100644 --- a/test/core/test_lwt_result.ml +++ b/test/core/test_lwt_result.ml @@ -201,7 +201,7 @@ let suite = (Lwt_result.fail 0) p2 in - Lwt.wakeup_later r2 (Result.Error 1); + Lwt.awaken ~order:Dont_care r2 (Result.Error 1); Lwt.bind p (fun x -> Lwt.return (x = Result.Error 0)) ); @@ -213,7 +213,7 @@ let suite = p1 (Lwt_result.fail 1) in - Lwt.wakeup_later r1 (Result.Error 0); + Lwt.awaken ~order:Dont_care r1 (Result.Error 0); Lwt.bind p (fun x -> Lwt.return (x = Result.Error 1)) ); @@ -263,8 +263,8 @@ let suite = let* s2 = p2 in Lwt.return (Result.Ok (s1 ^ s2)) in - Lwt.wakeup r1 (Result.Ok "foo"); - Lwt.wakeup r2 (Result.Ok "bar"); + Lwt.awaken ~order:Nested r1 (Result.Ok "foo"); + Lwt.awaken ~order:Nested r2 (Result.Ok "bar"); state_is (Lwt.Return (Result.Ok "foobar")) p' ); @@ -278,8 +278,8 @@ let suite = and* s2 = p2 in Lwt.return (Result.Ok (s1 ^ s2)) in - Lwt.wakeup r1 (Result.Ok "foo"); - Lwt.wakeup r2 (Result.Ok "bar"); + Lwt.awaken ~order:Nested r1 (Result.Ok "foo"); + Lwt.awaken ~order:Nested r2 (Result.Ok "bar"); state_is (Lwt.Return (Result.Ok "foobar")) p' ); @@ -293,8 +293,8 @@ let suite = and+ s2 = p2 in s1 ^ s2 in - Lwt.wakeup r1 (Result.Ok "foo"); - Lwt.wakeup r2 (Result.Ok "bar"); + Lwt.awaken ~order:Nested r1 (Result.Ok "foo"); + Lwt.awaken ~order:Nested r2 (Result.Ok "bar"); state_is (Lwt.Return (Result.Ok "foobar")) p' ); ] diff --git a/test/core/test_lwt_stream.ml b/test/core/test_lwt_stream.ml index fb2133730..dc118ed8e 100644 --- a/test/core/test_lwt_stream.ml +++ b/test/core/test_lwt_stream.ml @@ -157,7 +157,7 @@ let suite = suite "lwt_stream" [ let acc = acc && state (push#push 5) = Fail Lwt_stream.Full in let acc = acc && state (push#push 6) = Fail Lwt_stream.Full in let acc = acc && state (Lwt_stream.get stream) = Return (Some 1) in - (* Lwt_stream uses wakeup_later so we have to wait a bit. *) + (* Lwt_stream uses Dont_care so we have to wait a bit. *) Lwt.pause () >>= fun () -> let acc = acc && state t = Return () in let acc = acc && state (Lwt_stream.get stream) = Return (Some 2) in diff --git a/test/test.ml b/test/test.ml index bc18a36bb..ad01922c2 100644 --- a/test/test.ml +++ b/test/test.ml @@ -75,7 +75,7 @@ let run_test : test -> outcome Lwt.t = fun test -> let async_exception_promise, async_exception_occurred = Lwt.task () in let old_async_exception_hook = !Lwt.async_exception_hook in Lwt.async_exception_hook := (fun exn -> - Lwt.wakeup_later async_exception_occurred (Exception exn)); + Lwt.awaken ~order:Dont_care async_exception_occurred (Exception exn)); Lwt.finalize (fun () -> diff --git a/test/unix/test_lwt_bytes.ml b/test/unix/test_lwt_bytes.ml index 6de438b8d..94b9894c0 100644 --- a/test/unix/test_lwt_bytes.ml +++ b/test/unix/test_lwt_bytes.ml @@ -15,7 +15,7 @@ let tcp_server_client_exchange server_logic client_logic = >>= fun () -> let server_address = Lwt_unix.getsockname sock in let () = Lwt_unix.listen sock 5 in - Lwt.wakeup_later notify_server_is_ready server_address; + Lwt.awaken ~order:Dont_care notify_server_is_ready server_address; Lwt_unix.accept sock >>= fun (fd_client, _) -> server_logic fd_client @@ -41,7 +41,7 @@ let udp_server_client_exchange server_logic client_logic = Lwt_unix.bind sock sockaddr >>= fun () -> let server_address = Lwt_unix.getsockname sock in - Lwt.wakeup_later notify_server_is_ready server_address; + Lwt.awaken ~order:Dont_care notify_server_is_ready server_address; server_logic sock >>= fun (_n, _sockaddr) -> Lwt_unix.close sock in diff --git a/test/unix/test_lwt_io.ml b/test/unix/test_lwt_io.ml index 157444768..850f260d6 100644 --- a/test/unix/test_lwt_io.ml +++ b/test/unix/test_lwt_io.ml @@ -35,7 +35,7 @@ struct Lwt.finalize (fun () -> f channels) (fun () -> - Lwt.wakeup notify_handler_finished (); + Lwt.awaken ~order:Nested notify_handler_finished (); Lwt.return_unit)) >>= fun server -> @@ -154,11 +154,11 @@ let suite = suite "lwt_io" [ let server = (Lwt_io.Versioned.establish_server_1 [@ocaml.warning "-3"]) - local (fun channels -> Lwt.wakeup run_handler channels) + local (fun channels -> Lwt.awaken ~order:Nested run_handler channels) in Lwt_io.with_connection local (fun _ -> Lwt.return_unit) >>= fun () -> - Lwt.wakeup client_finished (); + Lwt.awaken ~order:Nested client_finished (); Lwt_io.shutdown_server server >>= fun () -> handler); @@ -176,7 +176,7 @@ let suite = suite "lwt_io" [ Lwt.async (fun () -> Lwt_io.close in_channel >>= fun () -> Lwt_io.close out_channel >|= fun () -> - Lwt.wakeup server_finished ())) + Lwt.awaken ~order:Nested server_finished ())) in Lwt_io.with_connection local (fun _ -> diff --git a/test/unix/test_lwt_timeout.ml b/test/unix/test_lwt_timeout.ml index e4dcc1556..f15953749 100644 --- a/test/unix/test_lwt_timeout.ml +++ b/test/unix/test_lwt_timeout.ml @@ -18,7 +18,7 @@ let suite = suite "Lwt_timeout" [ let timeout = Lwt_timeout.create 1 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup_later r delta) + Lwt.awaken ~order:Dont_care r delta) in Lwt_timeout.start timeout; @@ -33,12 +33,12 @@ let suite = suite "Lwt_timeout" [ let p, r = Lwt.wait () in Lwt_timeout.create 1 (fun () -> - Lwt.wakeup_later r false) + Lwt.awaken ~order:Dont_care r false) |> ignore; Lwt.async (fun () -> Lwt_unix.sleep 3. >|= fun () -> - Lwt.wakeup_later r true); + Lwt.awaken ~order:Dont_care r true); p end; @@ -71,7 +71,7 @@ let suite = suite "Lwt_timeout" [ if !completions < 2 then Lwt_timeout.start !timeout else - Lwt.wakeup_later r true); + Lwt.awaken ~order:Dont_care r true); Lwt_timeout.start !timeout; p @@ -82,14 +82,14 @@ let suite = suite "Lwt_timeout" [ let timeout = Lwt_timeout.create 1 (fun () -> - Lwt.wakeup_later r false) + Lwt.awaken ~order:Dont_care r false) in Lwt_timeout.start timeout; Lwt_timeout.stop timeout; Lwt.async (fun () -> Lwt_unix.sleep 3. >|= fun () -> - Lwt.wakeup_later r true); + Lwt.awaken ~order:Dont_care r true); p end; @@ -117,7 +117,7 @@ let suite = suite "Lwt_timeout" [ let timeout = Lwt_timeout.create 5 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup_later r delta) + Lwt.awaken ~order:Dont_care r delta) in Lwt_timeout.change timeout 1; Lwt_timeout.start timeout; @@ -132,13 +132,13 @@ let suite = suite "Lwt_timeout" [ let timeout = Lwt_timeout.create 1 (fun () -> - Lwt.wakeup_later r false) + Lwt.awaken ~order:Dont_care r false) in Lwt_timeout.change timeout 1; Lwt.async (fun () -> Lwt_unix.sleep 3. >|= fun () -> - Lwt.wakeup_later r true); + Lwt.awaken ~order:Dont_care r true); p end; @@ -151,7 +151,7 @@ let suite = suite "Lwt_timeout" [ let timeout = Lwt_timeout.create 5 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup_later r delta) + Lwt.awaken ~order:Dont_care r delta) in Lwt_timeout.start timeout; Lwt_timeout.change timeout 1; @@ -176,7 +176,7 @@ let suite = suite "Lwt_timeout" [ Test.with_async_exception_hook (fun exn -> match exn with - | Exit -> Lwt.wakeup_later r true + | Exit -> Lwt.awaken ~order:Dont_care r true | _ -> raise exn) (fun () -> Lwt_timeout.create 1 (fun () -> raise Exit) @@ -190,7 +190,7 @@ let suite = suite "Lwt_timeout" [ Lwt_timeout.set_exn_handler (fun exn -> match exn with - | Exit -> Lwt.wakeup_later r true + | Exit -> Lwt.awaken ~order:Dont_care r true | _ -> raise exn); Lwt_timeout.create 1 (fun () -> raise Exit) @@ -210,12 +210,12 @@ let suite = suite "Lwt_timeout" [ Lwt_timeout.create 1 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup r1 delta) + Lwt.awaken ~order:Nested r1 delta) |> Lwt_timeout.start; Lwt_timeout.create 2 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup r2 delta) + Lwt.awaken ~order:Nested r2 delta) |> Lwt_timeout.start; p1 >>= fun delta1 -> @@ -232,12 +232,12 @@ let suite = suite "Lwt_timeout" [ Lwt_timeout.create 1 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup r1 delta) + Lwt.awaken ~order:Nested r1 delta) |> Lwt_timeout.start; Lwt_timeout.create 1 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup r2 delta) + Lwt.awaken ~order:Nested r2 delta) |> Lwt_timeout.start; p1 >>= fun delta1 -> @@ -254,19 +254,19 @@ let suite = suite "Lwt_timeout" [ let timeout1 = Lwt_timeout.create 1 (fun () -> - Lwt.wakeup r1 false) + Lwt.awaken ~order:Nested r1 false) in Lwt_timeout.start timeout1; Lwt_timeout.create 2 (fun () -> let delta = Unix.gettimeofday () -. start_time in - Lwt.wakeup r2 delta) + Lwt.awaken ~order:Nested r2 delta) |> Lwt_timeout.start; Lwt_timeout.stop timeout1; Lwt.async (fun () -> Lwt_unix.sleep 3. >|= fun () -> - Lwt.wakeup r1 true); + Lwt.awaken ~order:Nested r1 true); p1 >>= fun timeout1_not_fired -> p2 >|= fun delta2 -> diff --git a/test/unix/test_lwt_unix.ml b/test/unix/test_lwt_unix.ml index a4e747aa3..78060a819 100644 --- a/test/unix/test_lwt_unix.ml +++ b/test/unix/test_lwt_unix.ml @@ -1070,7 +1070,7 @@ let lwt_preemptive_tests = [ (fun () -> Lwt.pause () >>= fun () -> Lwt.pause () >>= fun () -> - Lwt.wakeup r 42; + Lwt.awaken ~order:Nested r 42; Lwt.return ()) (fun _ -> assert false) in @@ -1086,7 +1086,7 @@ let lwt_preemptive_tests = [ Lwt.pause () >>= fun () -> Lwt.pause () >>= fun () -> raise Exit) - (function Exit -> Lwt.wakeup r 45 | _ -> assert false) + (function Exit -> Lwt.awaken ~order:Nested r 45 | _ -> assert false) in Lwt_preemptive.detach f () >>= fun () -> p >>= fun x -> @@ -1100,7 +1100,7 @@ let lwt_preemptive_tests = [ (fun () -> Lwt.pause () >>= fun () -> Lwt.pause () >>= fun () -> - Lwt.wakeup r 42; + Lwt.awaken ~order:Nested r 42; Lwt.return ()) (function _ -> Stdlib.exit 2); Lwt.return ())