Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/manual.mld
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ val p : char Lwt.t = <abstr>
functions:

{ul
{- [Lwt.wakeup : 'a Lwt.u -> 'a -> unit]
{- [Lwt.awaken : order:ordering -> 'a Lwt.u -> 'a -> unit]
fulfills the promise with a value.}
{- [Lwt.wakeup_exn : 'a Lwt.u -> exn -> unit]
{- [Lwt.awaken_exn : order:ordering -> 'a Lwt.u -> exn -> unit]
rejects the promise with an exception.}}

Note that it is an error to try to resolve the same promise twice. [Lwt]
Expand All @@ -170,11 +170,11 @@ val p : char Lwt.t = <abstr>
# Lwt.state (Lwt.fail Exit);;
# let p, r = Lwt.wait ();;
# Lwt.state p;;
# Lwt.wakeup r 42;;
# Lwt.awaken ~order:Nested r 42;;
# Lwt.state p;;
# let p, r = Lwt.wait ();;
# Lwt.state p;;
# Lwt.wakeup_exn r Exit;;
# Lwt.awaken_exn ~order:Nested r Exit;;
# Lwt.state p;;
]}

Expand Down Expand Up @@ -323,7 +323,7 @@ val r2 : '_a Lwt.u = <abstr>
val p3 : '_a Lwt.t = <abstr>
# Lwt.state p3;;
- : '_a Lwt.state = Lwt.Sleep
# Lwt.wakeup r2 42;;
# Lwt.awaken ~order:Nested r2 42;;
- : unit = ()
# Lwt.state p3;;
- : int Lwt.state = Lwt.Return 42
Expand Down
63 changes: 34 additions & 29 deletions src/core/lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,16 @@ include Resolution_loop

module Resolving :
sig

type ordering =
| Deferred
| Dont_care
| Nested

val awaken_result : order:ordering -> 'a u -> ('a, exn) result -> unit
val awaken : order:ordering -> 'a u -> 'a -> unit
val awaken_exn : order:ordering -> 'a u -> exn -> unit

val wakeup_later_result : 'a u -> ('a, exn) result -> unit
val wakeup_later : 'a u -> 'a -> unit
val wakeup_later_exn : _ u -> exn -> unit
Expand All @@ -1351,31 +1361,13 @@ sig
val cancel : 'a t -> unit
end =
struct
(* Note that this function deviates from the "ideal" callback deferral
behavior: it runs callbacks directly on the current stack. It should
therefore be possible to cause a stack overflow using this function. *)
let wakeup_general api_function_name r result =
let Internal p = to_internal_resolver r in
let p = underlying p in

match p.state with
| Rejected Canceled ->
()
| Fulfilled _ ->
Printf.ksprintf invalid_arg "Lwt.%s" api_function_name
| Rejected _ ->
Printf.ksprintf invalid_arg "Lwt.%s" api_function_name

| Pending _ ->
let result = state_of_result result in
let State_may_have_changed p = resolve ~allow_deferring:false p result in
ignore p
type ordering =
| Deferred
| Dont_care
| Nested

let wakeup_result r result = wakeup_general "wakeup_result" r result
let wakeup r v = wakeup_general "wakeup" r (Ok v)
let wakeup_exn r exn = wakeup_general "wakeup_exn" r (Error exn)

let wakeup_later_general api_function_name r result =
let awaken_general api_function_name order r result =
let Internal p = to_internal_resolver r in
let p = underlying p in

Expand All @@ -1390,17 +1382,30 @@ struct
| Pending _ ->
let result = state_of_result result in
let State_may_have_changed p =
resolve ~maximum_callback_nesting_depth:1 p result in
match order with
| Deferred -> resolve ~maximum_callback_nesting_depth:min_int p result
| Dont_care -> resolve ~maximum_callback_nesting_depth:1 p result
| Nested -> resolve ~allow_deferring:false p result
in
ignore p

let awaken_result ~order r result =
awaken_general "awaken_result" order r result
let awaken ~order r v =
awaken_general "awaken" order r (Ok v)
let awaken_exn ~order r exn =
awaken_general "awaken_exn" order r (Error exn)

let wakeup_result r result = awaken_general "wakeup_result" Nested r result
let wakeup r v = awaken_general "wakeup" Nested r (Ok v)
let wakeup_exn r exn = awaken_general "wakeup_exn" Nested r (Error exn)

let wakeup_later_result r result =
wakeup_later_general "wakeup_later_result" r result
awaken_general "wakeup_later_result" Dont_care r result
let wakeup_later r v =
wakeup_later_general "wakeup_later" r (Ok v)
awaken_general "wakeup_later" Dont_care r (Ok v)
let wakeup_later_exn r exn =
wakeup_later_general "wakeup_later_exn" r (Error exn)


awaken_general "wakeup_later_exn" Dont_care r (Error exn)

type packed_callbacks =
| Packed : _ callbacks -> packed_callbacks
Expand Down
109 changes: 72 additions & 37 deletions src/core/lwt.mli
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,64 @@ val wait : unit -> ('a t * 'a u)

(** {3 Resolving} *)

val wakeup_later : 'a u -> 'a -> unit
(** [Lwt.wakeup_later r v] {e fulfills}, with value [v], the {e pending}
{{!t} promise} associated with {{!u} resolver} [r]. This triggers callbacks
attached to the promise.
(** [ordering] indicates a scheduling strategy for fullfilling or rejection of
promnises associated to resolvers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
promnises associated to resolvers.
promises associated to resolvers.


Specifically, when a promise is resolved via a call to {!awaken} (or
{!awaken_exn} or {!awaken_result}), the execution can be ordered in two
distinct ways:

- [Deferred]: Resolves the promise later, after the current code reaches a
pause or some I/O. This is often the behaviour you want. It makes the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence got cut off here.

- [Nested]: Resolves the promise immediately, come back to the current
code afterwards

If you have no preference between those two behaviours, [Dont_care] lets the
scheduler pick depending on internal state.

To understand the difference, consider the following setup
{[
let has_happened = ref false ;;
let blocker, resolver = wait () ;;
let task1 =
let* () = blocker in
has_happened := true; (* side-effect happens here *)
Lwt.return ()
;;
]}

Then, if a separate task resolves [blocker] by way of [resolver], two
different ordering can happen as examplified by:

{[
let task2 =
awaken ~order resolver;
(if !has_happened then
(* Using [Later] for [ordering] causes this branch to be taken *)
print_endline "Current code was prioritised over resolved promise"
else
(* Using [Nested] for [ordering] causes this branch to be taken *)
print_endline "Resolved promise code was prioritised over current");
Comment on lines +442 to +447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it not the reverse way? !has_happened is only true once task1 is resolved.

Lwt.return ()
;;
]}

If the promise is not pending, [Lwt.wakeup_later] raises
*)
type ordering =
| Deferred (** Prioritise code at current location; queue awakening to come back to it later. *)
| Dont_care
| Nested (** Prioritise resolving the promise; come back to current code location later. *)

val awaken : order:ordering -> 'a u -> 'a -> unit
(** [awaken ~order r v] fullfills the pending promise associated to the
resolver [r] with value [v].

The scheduling of the fullfillment happens according to the [order]
parameter. See {!type-ordering}.

If the promise is not pending, [Lwt.awaken] raises
{!Stdlib.Invalid_argument}, unless the promise is {{!Lwt.cancel} canceled}.
If the promise is canceled, [Lwt.wakeup_later] has no effect.
If the promise is canceled, [Lwt.awaken] has no effect.

If your program has multiple threads, it is important to make sure that
[Lwt.wakeup_later] (and any similar function) is only called from the main
Expand All @@ -423,9 +473,19 @@ val wakeup_later : 'a u -> 'a -> unit
need to communicate from a worker thread to the main thread running Lwt, see
{!Lwt_preemptive} or {!Lwt_unix.send_notification}. *)

val awaken_exn : order:ordering -> 'a u -> exn -> unit
(** [awaken_exn] is similar to [awaken] but the promise associated to the
resolver is rejected (rather than fullfilled). *)

val awaken_result : order:ordering -> 'a u -> ('a, exn) result -> unit
(** [awaken_result] is similar to either [awaken] or [awaken_exn] depending on
if the value is [Ok _] or [Error _]. *)

val wakeup_later : 'a u -> 'a -> unit
(** [@@ocaml.deprecated "Use awaken ~order:Dont_care instead"] *)

val wakeup_later_exn : _ u -> exn -> unit
(** [Lwt.wakeup_later_exn r exn] is like {!Lwt.wakeup_later}, except, if the
associated {{!t} promise} is {e pending}, it is {e rejected} with [exn]. *)
(** [@@ocaml.deprecated "Use awaken_exn ~order:Dont_care instead"] *)

val return : 'a -> 'a t
(** [Lwt.return v] creates a new {{!t} promise} that is {e already fulfilled}
Expand Down Expand Up @@ -1620,15 +1680,7 @@ val of_result : ('a, exn) result -> 'a t
rejected with [exn]. *)

val wakeup_later_result : 'a u -> ('a, exn) result -> unit
(** [Lwt.wakeup_later_result r result] resolves the pending promise [p]
associated to resolver [r], according to [result]:

- If [result] is [Ok v], [p] is fulfilled with [v].
- If [result] is [Error exn], [p] is rejected with [exn].

If [p] is not pending, [Lwt.wakeup_later_result] raises
[Stdlib.Invalid_argument _], except if [p] is {{!Lwt.cancel} canceled}. If
[p] is canceled, [Lwt.wakeup_later_result] has no effect. *)
(** [@@ocaml.deprecated "Use awaken_result ~order:Dont_care instead"] *)



Expand Down Expand Up @@ -1762,30 +1814,13 @@ let () =
(** {3 Immediate resolving} *)

val wakeup : 'a u -> 'a -> unit
(** [Lwt.wakeup r v] is like {!Lwt.wakeup_later}[ r v], except it guarantees
that callbacks associated with [r] will be called immediately, deeper on the
current stack.

In contrast, {!Lwt.wakeup_later} {e may} call callbacks immediately, or may
queue them for execution on a shallower stack – though still before the next
time Lwt blocks the process on I/O.

Using this function is discouraged, because calling it in a loop can exhaust
the stack. The loop might be difficult to detect or predict, due to combined
mutually-recursive calls between multiple modules and libraries.

Also, trying to use this function to guarantee the timing of callback calls
for synchronization purposes is discouraged. This synchronization effect is
obscure to readers. It is better to use explicit promises, or {!Lwt_mutex},
{!Lwt_condition}, and/or {!Lwt_mvar}. *)
(** [@@ocaml.deprecated "Use awaken ~order:Nested instead"] *)

val wakeup_exn : _ u -> exn -> unit
(** [Lwt.wakeup_exn r exn] is like {!Lwt.wakeup_later_exn}[ r exn], but has
the same problems as {!Lwt.wakeup}. *)
(** [@@ocaml.deprecated "Use awaken_exn ~order:Nested instead"] *)

val wakeup_result : 'a u -> ('a, exn) result -> unit
(** [Lwt.wakeup_result r result] is like {!Lwt.wakeup_later_result}[ r result],
but has the same problems as {!Lwt.wakeup}. *)
(** [@@ocaml.deprecated "Use awaken_result ~order:Nested instead"] *)



Expand Down
6 changes: 3 additions & 3 deletions src/core/lwt_condition.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ let wait ?mutex cvar =

let signal cvar arg =
try
Lwt.wakeup_later (Lwt_sequence.take_l cvar) arg
Lwt.awaken ~order:Dont_care (Lwt_sequence.take_l cvar) arg
with Lwt_sequence.Empty ->
()

let broadcast cvar arg =
let wakeners = Lwt_sequence.fold_r (fun x l -> x :: l) cvar [] in
Lwt_sequence.iter_node_l Lwt_sequence.remove cvar;
List.iter (fun wakener -> Lwt.wakeup_later wakener arg) wakeners
List.iter (fun wakener -> Lwt.awaken ~order:Dont_care wakener arg) wakeners

let broadcast_exn cvar exn =
let wakeners = Lwt_sequence.fold_r (fun x l -> x :: l) cvar [] in
Lwt_sequence.iter_node_l Lwt_sequence.remove cvar;
List.iter (fun wakener -> Lwt.wakeup_later_exn wakener exn) wakeners
List.iter (fun wakener -> Lwt.awaken_exn ~order:Dont_care wakener exn) wakeners
4 changes: 2 additions & 2 deletions src/core/lwt_mutex.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ let unlock m =
if Lwt_sequence.is_empty m.waiters then
m.locked <- false
else
(* We do not use [Lwt.wakeup] here to avoid a stack overflow
(* We do not use [~order:Dont_care] here to avoid a stack overflow
when unlocking a lot of threads. *)
Lwt.wakeup_later (Lwt_sequence.take_l m.waiters) ()
Lwt.awaken ~order:Dont_care (Lwt_sequence.take_l m.waiters) ()
Comment on lines +32 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment disagrees with the code

end

let with_lock m f =
Expand Down
4 changes: 2 additions & 2 deletions src/core/lwt_mvar.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ let put mvar v =
| None ->
mvar.mvar_contents <- Some v
| Some w ->
Lwt.wakeup_later w v
Lwt.awaken ~order:Dont_care w v
end;
Lwt.return_unit
| Some _ ->
Expand All @@ -77,7 +77,7 @@ let next_writer mvar =
match Lwt_sequence.take_opt_l mvar.writers with
| Some(v', w) ->
mvar.mvar_contents <- Some v';
Lwt.wakeup_later w ()
Lwt.awaken ~order:Dont_care w ()
| None ->
mvar.mvar_contents <- None

Expand Down
6 changes: 3 additions & 3 deletions src/core/lwt_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let release p c =
match Lwt_sequence.take_opt_l p.waiters with
| Some wakener ->
(* A promise resolver is waiting, give it the pool member. *)
Lwt.wakeup_later wakener c
Lwt.awaken ~order:Dont_care wakener c
| None ->
(* No one is waiting, queue it. *)
Queue.push c p.list
Expand All @@ -84,10 +84,10 @@ let replace_disposed p =
Lwt.on_any
(Lwt.apply p.create ())
(fun c ->
Lwt.wakeup_later wakener c)
Lwt.awaken ~order:Dont_care wakener c)
(fun exn ->
(* Creation failed, notify the waiter of the failure. *)
Lwt.wakeup_later_exn wakener exn)
Lwt.awaken_exn ~order:Dont_care wakener exn)

(* Verify a member is still valid before using it. *)
let validate_and_return p c =
Expand Down
Loading