From cd0e5026fa4dcaedcb058d6ab1b2206368e7f19b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 8 Jul 2025 10:17:48 -0400 Subject: [PATCH 01/22] add the `lwt_direct` package, for direct-style control flow --- dune-project | 9 ++++ lwt_direct.opam | 34 ++++++++++++++ src/direct/dune | 9 ++++ src/direct/lwt_direct.ml | 95 +++++++++++++++++++++++++++++++++++++++ src/direct/lwt_direct.mli | 33 ++++++++++++++ 5 files changed, 180 insertions(+) create mode 100644 lwt_direct.opam create mode 100644 src/direct/dune create mode 100644 src/direct/lwt_direct.ml create mode 100644 src/direct/lwt_direct.mli diff --git a/dune-project b/dune-project index 5f0af6dfa..5c5c9d915 100644 --- a/dune-project +++ b/dune-project @@ -44,6 +44,15 @@ (react (>= 1.0.0)) (bisect_ppx :with-test))) +(package + (name lwt_direct) + (synopsis "Direct style control flow and `await` for Lwt") + (depends + (ocaml (>= 5.0)) + base-unix + (lwt (>= 3.0.0)) + (bisect_ppx :with-test))) + (package (name lwt) (synopsis "Promises and event-driven I/O") diff --git a/lwt_direct.opam b/lwt_direct.opam new file mode 100644 index 000000000..549413838 --- /dev/null +++ b/lwt_direct.opam @@ -0,0 +1,34 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Direct style control flow and `await` for Lwt" +maintainer: [ + "Raphaël Proust " "Anton Bachin " +] +authors: ["Jérôme Vouillon" "Jérémie Dimino"] +license: "MIT" +homepage: "https://github.com/ocsigen/lwt" +doc: "https://ocsigen.org/lwt" +bug-reports: "https://github.com/ocsigen/lwt/issues" +depends: [ + "dune" {>= "2.7"} + "ocaml" {>= "5.0"} + "base-unix" + "lwt" {>= "3.0.0"} + "bisect_ppx" {with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/ocsigen/lwt.git" diff --git a/src/direct/dune b/src/direct/dune new file mode 100644 index 000000000..46cc24b8a --- /dev/null +++ b/src/direct/dune @@ -0,0 +1,9 @@ +(library + (public_name lwt_direct) + (synopsis "Direct style control flow and `await` for Lwt") + (enabled_if (>= %{ocaml_version} "5.0")) + (libraries lwt lwt.unix) + (instrumentation + (backend bisect_ppx))) + + diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml new file mode 100644 index 000000000..8bac3c9fa --- /dev/null +++ b/src/direct/lwt_direct.ml @@ -0,0 +1,95 @@ +module ED = Effect.Deep + +type _ Effect.t += + | Await : 'a Lwt.t -> 'a Effect.t + | Yield : unit Effect.t + +(** Queue of microtasks that are ready *) +let tasks : (unit -> unit) Queue.t = Queue.create () + +let[@inline] push_task f : unit = Queue.push f tasks + +let default_on_uncaught_exn exn bt = + Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt) + +let run_all_tasks () : unit = + let n_processed = ref 0 in + let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in + while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do + let t = Queue.pop tasks in + incr n_processed; + try t () + with exn -> + let bt = Printexc.get_raw_backtrace () in + default_on_uncaught_exn exn bt + done; + (* make sure we don't sleep forever if there's no lwt promise + ready but [tasks] contains ready tasks *) + if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) + +let setup_hooks = + let already_done = ref false in + fun () -> + if not !already_done then ( + already_done := true; + let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in + let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in + () + ) + +let await (fut : 'a Lwt.t) : 'a = + match Lwt.state fut with + | Lwt.Return x -> x + | Lwt.Fail exn -> raise exn + | Lwt.Sleep -> Effect.perform (Await fut) + +let yield () : unit = Effect.perform Yield + +(** the main effect handler *) +let handler : _ ED.effect_handler = + let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = + function + | Yield -> + Some (fun k -> push_task (fun () -> ED.continue k ())) + | Await fut -> + Some + (fun k -> + Lwt.on_any fut + (fun res -> push_task (fun () -> ED.continue k res)) + (fun exn -> push_task (fun () -> ED.discontinue k exn))) + | _ -> None + in + { effc } + +let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit = + let res = ref (Error (Failure "not resolved")) in + let run_f_and_set_res () = + (try + let r = f () in + res := Ok r + with exn -> res := Error exn); + Lwt.wakeup_result promise !res + in + ED.try_with run_f_and_set_res () handler + +let run f : _ Lwt.t = + setup_hooks (); + let lwt, resolve = Lwt.wait () in + push_task (run_inside_effect_handler_and_resolve_ resolve f); + lwt + +let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = + let run_f () : unit = + try + f () + with exn -> + let bt = Printexc.get_raw_backtrace () in + on_uncaught_exn exn bt + in + ED.try_with run_f () handler + +let run_in_the_background ?(on_uncaught_exn=default_on_uncaught_exn) f : unit = + setup_hooks (); + push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli new file mode 100644 index 000000000..e1cbd4222 --- /dev/null +++ b/src/direct/lwt_direct.mli @@ -0,0 +1,33 @@ +(** Direct style control flow for Lwt. *) + +val run : (unit -> 'a) -> 'a Lwt.t +(** [run f] runs the function [f ()] in a task within + the [Lwt_unix] event loop. [f ()] can create [Lwt] + promises and use {!await} to wait for them. Like any promise + in Lwt, [f ()] can starve the event loop if it runs long computations + without yielding to the event loop. + + When [f ()] terminates (successfully or not), the promise + [run f] is resolved with [f ()]'s result, or the exception + raised by [f ()]. *) + +val run_in_the_background : + ?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) -> + (unit -> unit) -> + unit +(** [run_in_the_background f] is similar to [ignore (run f)]. + The computation [f()] runs in the background in the event loop + and returns no result. + @param on_uncaught_exn if provided, this is called when [f()] + raises an exception. *) + +val yield : unit -> unit +(** Yield to the event loop. + Can only be used inside {!run} or {!run_in_the_background}. *) + +val await : 'a Lwt.t -> 'a +(** [await prom] returns the result of [prom], or re-raises the + exception with which [prom] failed if it failed. + If [prom] is not resolved yet, [await prom] will suspend the + current task and resume it when [prom] is resolved. + Can only be used inside {!run} or {!run_in_the_background}. *) From 7145d6138a09d3e2daa2838b33b2eb071dd39ea4 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 9 Jul 2025 22:05:07 -0400 Subject: [PATCH 02/22] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Raphaël Proust --- src/direct/lwt_direct.mli | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index e1cbd4222..a357fafaf 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -23,11 +23,13 @@ val run_in_the_background : val yield : unit -> unit (** Yield to the event loop. - Can only be used inside {!run} or {!run_in_the_background}. *) + calling [yield] outside of {!run} or {!run_in_the_background} will raise an exception, + crash your program, or otherwise cause errors. It is a programming error to do so. *) val await : 'a Lwt.t -> 'a (** [await prom] returns the result of [prom], or re-raises the exception with which [prom] failed if it failed. If [prom] is not resolved yet, [await prom] will suspend the current task and resume it when [prom] is resolved. - Can only be used inside {!run} or {!run_in_the_background}. *) + calling [yield] outside of {!run} or {!run_in_the_background} will raise an exception, + crash your program, or otherwise cause errors. It is a programming error to do so. *) From 8cab71ea7287bbb02ed2664f04afe552b4834985 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 09:50:23 -0400 Subject: [PATCH 03/22] some docs --- src/direct/lwt_direct.mli | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index a357fafaf..e8e5575be 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -1,4 +1,39 @@ -(** Direct style control flow for Lwt. *) +(** Direct style control flow for Lwt. + + This module relies on OCaml 5's + {{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. + Instead of chaining promises using {!Lwt.bind} and {!Lwt.map} + and other combinators, it becomes possible to start + lightweight "tasks" using [Lwt_direct.run (fun () -> ...)]. + The body of such a task is written in direct-style code, + using OCaml's standard control flow structures such as loops, + higher-order functions, exception handlers, [match], etc. + + Interactions with the rest of lwt can be done using [await], + for example: + + {[ + Lwt_direct.run (fun () -> + let continue = ref true in + while !continue do + match Lwt_io.read_line in_channel |> Lwt_direct.await with + | exception End_of_file -> continue := false + | line -> + let uppercase_line = String.uppercase_ascii line in + Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await + done) + ]} + + This code snippet contains a simple "task" that repeatedly reads + a line from a [Lwt_io] channel, uppercases it, and writes the + uppercase version to another channel. + + This task is itself a [unit Lwt.t], which is resolved when the function + returns. It is possible to use + {!Lwt_direct.run_in_the_background} to ignore the result and + let the task run in the background instead. + + *) val run : (unit -> 'a) -> 'a Lwt.t (** [run f] runs the function [f ()] in a task within @@ -31,5 +66,5 @@ val await : 'a Lwt.t -> 'a exception with which [prom] failed if it failed. If [prom] is not resolved yet, [await prom] will suspend the current task and resume it when [prom] is resolved. - calling [yield] outside of {!run} or {!run_in_the_background} will raise an exception, + calling [await] outside of {!run} or {!run_in_the_background} will raise an exception, crash your program, or otherwise cause errors. It is a programming error to do so. *) From 0978471fdfa4623b0c100ba94ce7ebf48b2a2a44 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 09:55:28 -0400 Subject: [PATCH 04/22] doc --- src/direct/lwt_direct.mli | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index e8e5575be..52c1170d6 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -58,7 +58,8 @@ val run_in_the_background : val yield : unit -> unit (** Yield to the event loop. - calling [yield] outside of {!run} or {!run_in_the_background} will raise an exception, + + Calling [yield] outside of {!run} or {!run_in_the_background} will raise an exception, crash your program, or otherwise cause errors. It is a programming error to do so. *) val await : 'a Lwt.t -> 'a @@ -66,5 +67,6 @@ val await : 'a Lwt.t -> 'a exception with which [prom] failed if it failed. If [prom] is not resolved yet, [await prom] will suspend the current task and resume it when [prom] is resolved. - calling [await] outside of {!run} or {!run_in_the_background} will raise an exception, + + Calling [await] outside of {!run} or {!run_in_the_background} will raise an exception, crash your program, or otherwise cause errors. It is a programming error to do so. *) From 5b746aa628fa1ea245fc6253716ae1e692000635 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 22:04:37 -0400 Subject: [PATCH 05/22] lwt: expose some storage primitives in `Private` --- src/core/lwt.ml | 40 +++++++++++++++++++++++++--------------- src/core/lwt.mli | 11 +++++++++++ 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/core/lwt.ml b/src/core/lwt.ml index 257134c63..c5c9f1ed4 100644 --- a/src/core/lwt.ml +++ b/src/core/lwt.ml @@ -729,7 +729,10 @@ sig type 'v key val new_key : unit -> _ key val get : 'v key -> 'v option + val get_from_storage : 'v key -> storage -> 'v option + val modify_storage : 'v key -> 'v option -> storage -> storage val with_value : 'v key -> 'v option -> (unit -> 'b) -> 'b + val empty_storage : storage (* Internal interface *) val current_storage : storage ref @@ -773,28 +776,30 @@ struct next_key_id := id + 1; {id = id; value = None} - let current_storage = ref Storage_map.empty + let empty_storage = Storage_map.empty + let current_storage = ref empty_storage - let get key = - if Storage_map.mem key.id !current_storage then begin - let refresh = Storage_map.find key.id !current_storage in + let get_from_storage key storage = + match Storage_map.find key.id storage with + | refresh -> refresh (); let value = key.value in key.value <- None; value - end - else - None + | exception Not_found -> None + + let get key = get_from_storage key !current_storage + + let modify_storage key value storage = + match value with + | Some _ -> + let refresh = fun () -> key.value <- value in + Storage_map.add key.id refresh storage + | None -> + Storage_map.remove key.id storage let with_value key value f = - let new_storage = - match value with - | Some _ -> - let refresh = fun () -> key.value <- value in - Storage_map.add key.id refresh !current_storage - | None -> - Storage_map.remove key.id !current_storage - in + let new_storage = modify_storage key value !current_storage in let saved_storage = !current_storage in current_storage := new_storage; @@ -3228,3 +3233,8 @@ struct let (let+) x f = map f x let (and+) = both end + +module Private = struct + type nonrec storage = storage + module Sequence_associated_storage = Sequence_associated_storage +end diff --git a/src/core/lwt.mli b/src/core/lwt.mli index 7598343d8..76905bb27 100644 --- a/src/core/lwt.mli +++ b/src/core/lwt.mli @@ -2061,3 +2061,14 @@ val backtrace_try_bind : val abandon_wakeups : unit -> unit val debug_state_is : 'a state -> 'a t -> bool t + +module Private : sig + type storage + + module Sequence_associated_storage : sig + val get_from_storage : 'a key -> storage -> 'a option + val modify_storage : 'a key -> 'a option -> storage -> storage + val empty_storage : storage + val current_storage : storage ref + end +end From 33527db7fdbf91cfd9ec4250b429d6ae932a0a34 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 22:07:48 -0400 Subject: [PATCH 06/22] lwt_direct: expose basic storage primitives --- src/direct/lwt_direct.ml | 30 ++++++++++++++++++++++++++---- src/direct/lwt_direct.mli | 22 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml index 8bac3c9fa..469e6ba7e 100644 --- a/src/direct/lwt_direct.ml +++ b/src/direct/lwt_direct.ml @@ -1,5 +1,18 @@ module ED = Effect.Deep +module Storage = struct + module Lwt_storage= Lwt.Private.Sequence_associated_storage + type 'a key = 'a Lwt.key + let new_key = Lwt.new_key + let get = Lwt.get + let set k v = Lwt_storage.(current_storage := modify_storage k (Some v) !current_storage) + let remove k = Lwt_storage.(current_storage := modify_storage k None !current_storage) + + let reset_to_empty () = Lwt_storage.(current_storage := empty_storage) + let save_current () = !Lwt_storage.current_storage + let restore_current saved = Lwt_storage.current_storage := saved +end + type _ Effect.t += | Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t @@ -52,20 +65,28 @@ let handler : _ ED.effect_handler = let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = function | Yield -> - Some (fun k -> push_task (fun () -> ED.continue k ())) + Some (fun k -> + let storage = Storage.save_current () in + push_task (fun () -> + Storage.restore_current storage; + ED.continue k ())) | Await fut -> Some (fun k -> + let storage = Storage.save_current () in Lwt.on_any fut - (fun res -> push_task (fun () -> ED.continue k res)) - (fun exn -> push_task (fun () -> ED.discontinue k exn))) + (fun res -> push_task (fun () -> + Storage.restore_current storage; ED.continue k res)) + (fun exn -> push_task (fun () -> + Storage.restore_current storage; ED.discontinue k exn))) | _ -> None in { effc } let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit = - let res = ref (Error (Failure "not resolved")) in let run_f_and_set_res () = + let res = ref (Error (Failure "not resolved")) in + Storage.reset_to_empty(); (try let r = f () in res := Ok r @@ -82,6 +103,7 @@ let run f : _ Lwt.t = let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = let run_f () : unit = + Storage.reset_to_empty(); try f () with exn -> diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index 52c1170d6..0a48eba42 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -70,3 +70,25 @@ val await : 'a Lwt.t -> 'a Calling [await] outside of {!run} or {!run_in_the_background} will raise an exception, crash your program, or otherwise cause errors. It is a programming error to do so. *) + +(** Local storage. + + This storage is the same as the one described with {!Lwt.key}, + except that it is usable from the inside of {!run} or + {!run_in_the_background}. + + Each task has its own storage, independent from other tasks or promises. *) +module Storage : sig + type 'a key = 'a Lwt.key + val new_key : unit -> 'a key + (** Alias to {!Lwt.new_key} *) + + val get : 'a key -> 'a option + (** get the value associated with this key in local storage, or [None] *) + + val set : 'a key -> 'a -> unit + (** [set k v] sets the key to the value for the rest of the task. *) + + val remove : 'a key -> unit + (** Remove the value associated with this key, if any *) +end From b869daa4b57f166a0fa4d6586475a7a48671992e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 22:39:33 -0400 Subject: [PATCH 07/22] add tests for Lwt_direct --- test/direct/dune | 5 ++ test/direct/main.ml | 3 + test/direct/test_lwt_direct.ml | 136 +++++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 test/direct/dune create mode 100644 test/direct/main.ml create mode 100644 test/direct/test_lwt_direct.ml diff --git a/test/direct/dune b/test/direct/dune new file mode 100644 index 000000000..9767d4f15 --- /dev/null +++ b/test/direct/dune @@ -0,0 +1,5 @@ + +(test + (name main) + (package lwt_direct) + (libraries lwt_direct lwt.unix lwttester)) diff --git a/test/direct/main.ml b/test/direct/main.ml new file mode 100644 index 000000000..5b9b13dba --- /dev/null +++ b/test/direct/main.ml @@ -0,0 +1,3 @@ + +Test.run "lwt_direct" + Test_lwt_direct.suites diff --git a/test/direct/test_lwt_direct.ml b/test/direct/test_lwt_direct.ml new file mode 100644 index 000000000..111ccaeb6 --- /dev/null +++ b/test/direct/test_lwt_direct.ml @@ -0,0 +1,136 @@ + +open Test +open Lwt_direct +open Lwt.Syntax + +let main_tests = suite "main" [ + test "basic await" begin fun () -> + let fut = run @@ fun () -> + Lwt_unix.sleep 1e-6 |> await; + 42 + in + let+ res = fut in + res = 42 + end; + + test "await multiple values" begin fun () -> + let fut1 = let+ () = Lwt_unix.sleep 1e-6 in 1 in + let fut2 = let+ () = Lwt_unix.sleep 2e-6 in 2 in + let fut3 = let+ () = Lwt_unix.sleep 3e-6 in 3 in + + run @@ fun () -> + let x1 = fut1 |> await in + let x2 = fut2 |> await in + let x3 = fut3 |> await in + x1 = 1 && x2 = 2 && x3 = 3 + end; + + test "list.iter await" begin fun () -> + let items = List.init 101 (fun i -> Lwt.return i) in + run @@ fun () -> + let sum = ref 0 in + List.iter (fun fut -> sum := !sum + await fut) items; + !sum = 5050 + end; + + test "run in background" begin fun () -> + let stream, push = Lwt_stream.create_bounded 2 in + run_in_the_background (fun () -> + for i = 1 to 10 do + push#push i |> await + done; + push#close); + run @@ fun () -> + let continue = ref true in + let seen = ref [] in + + while !continue do + match Lwt_stream.get stream |> await with + | None -> continue := false + | Some x -> seen := x :: !seen + done; + List.rev !seen = [1;2;3;4;5;6;7;8;9;10] + end; + + test "list.iter await with yield" begin fun () -> + let items = List.init 101 (fun i -> Lwt.return i) in + run @@ fun () -> + let sum = ref 0 in + List.iter (fun fut -> yield(); sum := !sum + await fut) items; + !sum = 5050 + end; +] + +let storage_tests = suite "storage" [ + test "get set" begin fun () -> + let k1 = Storage.new_key () in + let k2 = Storage.new_key () in + run @@ fun () -> + assert (Storage.get k1 = None); + assert (Storage.get k2 = None); + Storage.set k1 42; + assert (Storage.get k1 = Some 42); + assert (Storage.get k2 = None); + Storage.set k2 true; + assert (Storage.get k1 = Some 42); + assert (Storage.get k2 = Some true); + Storage.remove k1; + assert (Storage.get k1 = None); + assert (Storage.get k2 = Some true); + true + end; + + test "storage across await" begin fun () -> + let k = Storage.new_key () in + + (* run another promise that touches storage *) + let run_promise_async () = + Lwt.async @@ fun () -> + Lwt.with_value k (Some "something else") @@ fun () -> + assert (Lwt.get k = Some "something else"); + Lwt.return_unit + in + + let run_promise () : unit Lwt.t = + Lwt.with_value k (Some "another one") @@ fun () -> + assert (Lwt.get k = Some "another one"); + Lwt.return_unit + in + + let one_task () = + run_promise_async(); + assert (Storage.get k = None); + Storage.set k "v1"; + assert (Storage.get k = Some "v1"); + run_promise () |> await; + assert (Storage.get k = Some "v1"); + Storage.remove k; + assert (Storage.get k = None); + yield(); + assert (Storage.get k = None); + run_promise () |> await; + assert (Storage.get k = None); + run_promise_async(); + yield(); + assert (Storage.get k = None); + Storage.set k "v2"; + assert (Storage.get k = Some "v2"); + run_promise_async(); + yield(); + run_promise () |> await; + assert (Storage.get k = Some "v2"); + in + + (* run multiple such tasks *) + let tasks = [ run one_task; run one_task; run one_task ] in + + run @@ fun () -> + List.iter await tasks; + true + end; +] + +let suites = [ + main_tests; + storage_tests +] From 495852e4554378610cfa0c39b86d5befcca6c215 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 22:54:00 -0400 Subject: [PATCH 08/22] more tests for Lwt_direct --- test/direct/test_lwt_direct.ml | 45 +++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/test/direct/test_lwt_direct.ml b/test/direct/test_lwt_direct.ml index 111ccaeb6..ca308dfa6 100644 --- a/test/direct/test_lwt_direct.ml +++ b/test/direct/test_lwt_direct.ml @@ -130,7 +130,50 @@ let storage_tests = suite "storage" [ end; ] +let io_tests = suite "io" [ + test "read io" begin fun () -> + let str = "some\ninteresting\ntext string here!\n" in + let ic = Lwt_io.of_bytes ~mode:Input (Lwt_bytes.of_string str) in + run @@ fun () -> + let lines = ref [] in + while + try + yield (); + let line = Lwt_io.read_line ic |> await in + lines := line :: !lines; + true + with End_of_file -> false + do () + done; + List.rev !lines = ["some"; "interesting"; "text string here!"] + end; + + test "pipe" begin fun () -> + let ic, oc = Lwt_io.pipe() in + run_in_the_background (fun () -> + for i = 1 to 100 do + Lwt_io.write_line oc (string_of_int i) |> await; + Lwt_io.flush oc |> await + done; + Lwt_io.close oc |> await; + ); + + run @@ fun () -> + let sum = ref 0 in + let continue = ref true in + while !continue do + match Lwt_io.read_line ic |> await |> String.trim |> int_of_string with + | exception End_of_file -> continue := false + | i -> + sum := !sum + i + done; + Lwt_io.close ic |> await; + !sum = 5050 + end +] + let suites = [ main_tests; - storage_tests + storage_tests; + io_tests; ] From d776f41fa85942b99f2978a792237e2742b0ed1e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 22:54:06 -0400 Subject: [PATCH 09/22] CI: see if --best-effort helps --- .github/workflows/workflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 284919d57..3ad35d69e 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -54,7 +54,7 @@ jobs: - run: opam install conf-libev if: ${{ matrix.libev == true }} - - run: opam install . --deps-only --with-test + - run: opam install . --deps-only --with-test --best-effort - run: opam exec -- dune build From dc85027b94fca6cf3acb1b900efd941d7627da88 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 22:58:11 -0400 Subject: [PATCH 10/22] CI --- .github/workflows/workflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 3ad35d69e..584bccbb8 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -54,7 +54,7 @@ jobs: - run: opam install conf-libev if: ${{ matrix.libev == true }} - - run: opam install . --deps-only --with-test --best-effort + - run: opam install --deps-only --with-test ./lwt.opam ./lwt_ppx.opam ./lwt_react.opam ./lwt_retry.opam - run: opam exec -- dune build From 9bee28380d7ba856b220034977d29429d7c516b6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 23:11:59 -0400 Subject: [PATCH 11/22] only test lwt_direct if OCaml >= 5.0 --- test/direct/dune | 1 + 1 file changed, 1 insertion(+) diff --git a/test/direct/dune b/test/direct/dune index 9767d4f15..2a3e92128 100644 --- a/test/direct/dune +++ b/test/direct/dune @@ -2,4 +2,5 @@ (test (name main) (package lwt_direct) + (enabled_if (>= %{ocaml_version} "5.0")) (libraries lwt_direct lwt.unix lwttester)) From c5aa6c5c7e1dfab1ed3c8e40e16c60e42529c277 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 23:16:11 -0400 Subject: [PATCH 12/22] fix test on 4.xx --- test/direct/dune | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/direct/dune b/test/direct/dune index 2a3e92128..93aac1e8b 100644 --- a/test/direct/dune +++ b/test/direct/dune @@ -2,5 +2,5 @@ (test (name main) (package lwt_direct) - (enabled_if (>= %{ocaml_version} "5.0")) + (build_if (>= %{ocaml_version} "5.0")) (libraries lwt_direct lwt.unix lwttester)) From cdf51cef466a0d8319a8ea29ddba51ed9ad198ef Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 10 Jul 2025 23:35:40 -0400 Subject: [PATCH 13/22] dune --- test/direct/dune | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/direct/dune b/test/direct/dune index 93aac1e8b..fa6ef7d37 100644 --- a/test/direct/dune +++ b/test/direct/dune @@ -1,6 +1,12 @@ -(test +(executable (name main) - (package lwt_direct) - (build_if (>= %{ocaml_version} "5.0")) + (enabled_if (>= %{ocaml_version} "5.0")) (libraries lwt_direct lwt.unix lwttester)) + +(rule + (alias runtest) + (package lwt_direct) + (enabled_if (>= %{ocaml_version} "5.0")) + (action (run ./main.exe))) + From 6e408a89d5df57c53a67dd7da88d4ae54a3f0392 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Fri, 11 Jul 2025 15:38:16 +0200 Subject: [PATCH 14/22] some improvements as discussed in PR's review --- dune-project | 4 ++-- lwt_direct.opam | 6 +++--- src/core/lwt.mli | 2 +- src/direct/dune | 4 +--- src/direct/lwt_direct.ml | 29 ++++++++++++++++++----------- test/direct/test_lwt_direct.ml | 13 ++++++++++++- 6 files changed, 37 insertions(+), 21 deletions(-) diff --git a/dune-project b/dune-project index 5c5c9d915..e899dabe2 100644 --- a/dune-project +++ b/dune-project @@ -46,11 +46,11 @@ (package (name lwt_direct) - (synopsis "Direct style control flow and `await` for Lwt") + (synopsis "Direct-style control-flow and `await` for Lwt") (depends (ocaml (>= 5.0)) base-unix - (lwt (>= 3.0.0)) + (lwt (>= 6)) (bisect_ppx :with-test))) (package diff --git a/lwt_direct.opam b/lwt_direct.opam index 549413838..5f28ad64c 100644 --- a/lwt_direct.opam +++ b/lwt_direct.opam @@ -1,10 +1,9 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" -synopsis: "Direct style control flow and `await` for Lwt" +synopsis: "Direct-style control-flow and `await` for Lwt" maintainer: [ "Raphaël Proust " "Anton Bachin " ] -authors: ["Jérôme Vouillon" "Jérémie Dimino"] license: "MIT" homepage: "https://github.com/ocsigen/lwt" doc: "https://ocsigen.org/lwt" @@ -13,7 +12,7 @@ depends: [ "dune" {>= "2.7"} "ocaml" {>= "5.0"} "base-unix" - "lwt" {>= "3.0.0"} + "lwt" {>= "6"} "bisect_ppx" {with-test} "odoc" {with-doc} ] @@ -32,3 +31,4 @@ build: [ ] ] dev-repo: "git+https://github.com/ocsigen/lwt.git" +authors: ["Simon Cruanes"] diff --git a/src/core/lwt.mli b/src/core/lwt.mli index 76905bb27..7ec5efb24 100644 --- a/src/core/lwt.mli +++ b/src/core/lwt.mli @@ -2071,4 +2071,4 @@ module Private : sig val empty_storage : storage val current_storage : storage ref end -end +end [@@alert trespassing "for internal use only, keep away"] diff --git a/src/direct/dune b/src/direct/dune index 46cc24b8a..9ea910ebc 100644 --- a/src/direct/dune +++ b/src/direct/dune @@ -1,9 +1,7 @@ (library (public_name lwt_direct) - (synopsis "Direct style control flow and `await` for Lwt") + (synopsis "Direct-style control-flow and `await` for Lwt") (enabled_if (>= %{ocaml_version} "5.0")) (libraries lwt lwt.unix) (instrumentation (backend bisect_ppx))) - - diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml index 469e6ba7e..202854ea3 100644 --- a/src/direct/lwt_direct.ml +++ b/src/direct/lwt_direct.ml @@ -1,13 +1,14 @@ module ED = Effect.Deep module Storage = struct + [@@@alert "-trespassing"] module Lwt_storage= Lwt.Private.Sequence_associated_storage + [@@@alert "+trespassing"] type 'a key = 'a Lwt.key let new_key = Lwt.new_key let get = Lwt.get let set k v = Lwt_storage.(current_storage := modify_storage k (Some v) !current_storage) let remove k = Lwt_storage.(current_storage := modify_storage k None !current_storage) - let reset_to_empty () = Lwt_storage.(current_storage := empty_storage) let save_current () = !Lwt_storage.current_storage let restore_current saved = Lwt_storage.current_storage := saved @@ -27,9 +28,13 @@ let default_on_uncaught_exn exn bt = (Printexc.to_string exn) (Printexc.raw_backtrace_to_string bt) +let absolute_max_number_of_steps = + (* TODO 6.0: what's a good number here? should it be customisable? *) + 10_000 + let run_all_tasks () : unit = let n_processed = ref 0 in - let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in + let max_number_of_steps = min absolute_max_number_of_steps (2 * Queue.length tasks) in while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do let t = Queue.pop tasks in incr n_processed; @@ -38,15 +43,20 @@ let run_all_tasks () : unit = let bt = Printexc.get_raw_backtrace () in default_on_uncaught_exn exn bt done; - (* make sure we don't sleep forever if there's no lwt promise - ready but [tasks] contains ready tasks *) - if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) + (* In the case where there are no promises ready for wakeup, the scheduler's + engine will pause until some IO completes. There might never be completed + IO, depending on the program structure and the state of the world. If this + happens and the queue is not empty, we add a [pause] so that the engine has + something to wakeup for so that the rest of the queue can be processed. *) + if not (Queue.is_empty tasks) && Lwt.paused_count () = 0 then ignore (Lwt.pause () : unit Lwt.t) let setup_hooks = let already_done = ref false in fun () -> if not !already_done then ( already_done := true; + (* TODO 6.0: assess whether we should have both hooks or just one (which + one). Tempted to say we should only have the enter hook. *) let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in () @@ -85,13 +95,10 @@ let handler : _ ED.effect_handler = let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit = let run_f_and_set_res () = - let res = ref (Error (Failure "not resolved")) in Storage.reset_to_empty(); - (try - let r = f () in - res := Ok r - with exn -> res := Error exn); - Lwt.wakeup_result promise !res + match f () with + | res -> Lwt.wakeup promise res + | exception exc -> Lwt.wakeup_exn promise exc in ED.try_with run_f_and_set_res () handler diff --git a/test/direct/test_lwt_direct.ml b/test/direct/test_lwt_direct.ml index ca308dfa6..e9207784b 100644 --- a/test/direct/test_lwt_direct.ml +++ b/test/direct/test_lwt_direct.ml @@ -1,4 +1,3 @@ - open Test open Lwt_direct open Lwt.Syntax @@ -33,6 +32,18 @@ let main_tests = suite "main" [ !sum = 5050 end; + test "lwt_list.iter_p run" begin fun () -> + let items = List.init 101 (fun i -> i) in + let+ items = Lwt_list.map_p + (fun i -> run (fun () -> + for _ = 0 to i mod 5 do yield () done; + i + )) + items + in + List.fold_left (+) 0 items = 5050 + end; + test "run in background" begin fun () -> let stream, push = Lwt_stream.create_bounded 2 in run_in_the_background (fun () -> From 020ae8b161990ec2d22c019fd3ef59fc8587ef7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Fri, 11 Jul 2025 15:53:52 +0200 Subject: [PATCH 15/22] purely cosmetics tweaks --- src/core/lwt.ml | 17 ++++---- src/direct/lwt_direct.ml | 85 ++++++++++++++++++++++----------------- src/direct/lwt_direct.mli | 2 +- 3 files changed, 60 insertions(+), 44 deletions(-) diff --git a/src/core/lwt.ml b/src/core/lwt.ml index c5c9f1ed4..660819adc 100644 --- a/src/core/lwt.ml +++ b/src/core/lwt.ml @@ -729,9 +729,9 @@ sig type 'v key val new_key : unit -> _ key val get : 'v key -> 'v option + val with_value : 'v key -> 'v option -> (unit -> 'b) -> 'b val get_from_storage : 'v key -> storage -> 'v option val modify_storage : 'v key -> 'v option -> storage -> storage - val with_value : 'v key -> 'v option -> (unit -> 'b) -> 'b val empty_storage : storage (* Internal interface *) @@ -776,19 +776,17 @@ struct next_key_id := id + 1; {id = id; value = None} + (* generic storage *) let empty_storage = Storage_map.empty - let current_storage = ref empty_storage let get_from_storage key storage = - match Storage_map.find key.id storage with - | refresh -> + match Storage_map.find_opt key.id storage with + | Some refresh -> refresh (); let value = key.value in key.value <- None; value - | exception Not_found -> None - - let get key = get_from_storage key !current_storage + | None -> None let modify_storage key value storage = match value with @@ -798,6 +796,11 @@ struct | None -> Storage_map.remove key.id storage + (* built-in storage: propagated by bind and such *) + let current_storage = ref empty_storage + + let get key = get_from_storage key !current_storage + let with_value key value f = let new_storage = modify_storage key value !current_storage in diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml index 202854ea3..954676c15 100644 --- a/src/direct/lwt_direct.ml +++ b/src/direct/lwt_direct.ml @@ -1,33 +1,15 @@ -module ED = Effect.Deep +(* Direct-style wrapper for Lwt code -module Storage = struct - [@@@alert "-trespassing"] - module Lwt_storage= Lwt.Private.Sequence_associated_storage - [@@@alert "+trespassing"] - type 'a key = 'a Lwt.key - let new_key = Lwt.new_key - let get = Lwt.get - let set k v = Lwt_storage.(current_storage := modify_storage k (Some v) !current_storage) - let remove k = Lwt_storage.(current_storage := modify_storage k None !current_storage) - let reset_to_empty () = Lwt_storage.(current_storage := empty_storage) - let save_current () = !Lwt_storage.current_storage - let restore_current saved = Lwt_storage.current_storage := saved -end + The implementation of the direct-style wrapper relies on ocaml5's effect + system capturing continuations and adding them as a callback to some lwt + promises. *) -type _ Effect.t += - | Await : 'a Lwt.t -> 'a Effect.t - | Yield : unit Effect.t +(* part 1: tasks, getting the scheduler to call them *) -(** Queue of microtasks that are ready *) let tasks : (unit -> unit) Queue.t = Queue.create () let[@inline] push_task f : unit = Queue.push f tasks -let default_on_uncaught_exn exn bt = - Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" - (Printexc.to_string exn) - (Printexc.raw_backtrace_to_string bt) - let absolute_max_number_of_steps = (* TODO 6.0: what's a good number here? should it be customisable? *) 10_000 @@ -40,8 +22,9 @@ let run_all_tasks () : unit = incr n_processed; try t () with exn -> - let bt = Printexc.get_raw_backtrace () in - default_on_uncaught_exn exn bt + (* TODO 6.0: change async_exception handler to accept a backtrace, pass it + here and at the other use site. *) + !Lwt.async_exception_hook exn done; (* In the case where there are no promises ready for wakeup, the scheduler's engine will pause until some IO completes. There might never be completed @@ -62,6 +45,12 @@ let setup_hooks = () ) +(* part 2: effects, performing them *) + +type _ Effect.t += + | Await : 'a Lwt.t -> 'a Effect.t + | Yield : unit Effect.t + let await (fut : 'a Lwt.t) : 'a = match Lwt.state fut with | Lwt.Return x -> x @@ -70,29 +59,48 @@ let await (fut : 'a Lwt.t) : 'a = let yield () : unit = Effect.perform Yield -(** the main effect handler *) -let handler : _ ED.effect_handler = - let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = +(* interlude: task-local storage helpers *) + +module Storage = struct + [@@@alert "-trespassing"] + module Lwt_storage= Lwt.Private.Sequence_associated_storage + [@@@alert "+trespassing"] + type 'a key = 'a Lwt.key + let new_key = Lwt.new_key + let get = Lwt.get + let set k v = Lwt_storage.(current_storage := modify_storage k (Some v) !current_storage) + let remove k = Lwt_storage.(current_storage := modify_storage k None !current_storage) + let reset_to_empty () = Lwt_storage.(current_storage := empty_storage) + let save_current () = !Lwt_storage.current_storage + let restore_current saved = Lwt_storage.current_storage := saved +end + +(* part 3: handling effects *) + +let handler : _ Effect.Deep.effect_handler = + let effc : type b. b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = function | Yield -> Some (fun k -> let storage = Storage.save_current () in push_task (fun () -> Storage.restore_current storage; - ED.continue k ())) + Effect.Deep.continue k ())) | Await fut -> Some (fun k -> let storage = Storage.save_current () in Lwt.on_any fut (fun res -> push_task (fun () -> - Storage.restore_current storage; ED.continue k res)) + Storage.restore_current storage; Effect.Deep.continue k res)) (fun exn -> push_task (fun () -> - Storage.restore_current storage; ED.discontinue k exn))) + Storage.restore_current storage; Effect.Deep.discontinue k exn))) | _ -> None in { effc } +(* part 4: putting it all together: running tasks *) + let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit = let run_f_and_set_res () = Storage.reset_to_empty(); @@ -100,7 +108,7 @@ let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : u | res -> Lwt.wakeup promise res | exception exc -> Lwt.wakeup_exn promise exc in - ED.try_with run_f_and_set_res () handler + Effect.Deep.try_with run_f_and_set_res () handler let run f : _ Lwt.t = setup_hooks (); @@ -108,17 +116,22 @@ let run f : _ Lwt.t = push_task (run_inside_effect_handler_and_resolve_ resolve f); lwt +(* part 4 (encore): running a task in the background *) + let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = let run_f () : unit = Storage.reset_to_empty(); try f () with exn -> - let bt = Printexc.get_raw_backtrace () in - on_uncaught_exn exn bt + on_uncaught_exn exn in - ED.try_with run_f () handler + Effect.Deep.try_with run_f () handler -let run_in_the_background ?(on_uncaught_exn=default_on_uncaught_exn) f : unit = +let run_in_the_background ?on_uncaught_exn f : unit = + let on_uncaught_exn = match on_uncaught_exn with + | Some handler -> handler + | None -> !Lwt.async_exception_hook + in setup_hooks (); push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index 0a48eba42..1e01dbec1 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -47,7 +47,7 @@ val run : (unit -> 'a) -> 'a Lwt.t raised by [f ()]. *) val run_in_the_background : - ?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) -> + ?on_uncaught_exn:(exn -> unit) -> (unit -> unit) -> unit (** [run_in_the_background f] is similar to [ignore (run f)]. From 92c1d3e705c171f2bcfb4d94f4410ccb36b84440 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 11 Jul 2025 22:07:43 -0400 Subject: [PATCH 16/22] opam stuff --- dune-project | 1 + lwt_direct.opam | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dune-project b/dune-project index e899dabe2..3befa8530 100644 --- a/dune-project +++ b/dune-project @@ -47,6 +47,7 @@ (package (name lwt_direct) (synopsis "Direct-style control-flow and `await` for Lwt") + (authors "Simon Cruanes") (depends (ocaml (>= 5.0)) base-unix diff --git a/lwt_direct.opam b/lwt_direct.opam index 5f28ad64c..7db18c542 100644 --- a/lwt_direct.opam +++ b/lwt_direct.opam @@ -4,6 +4,7 @@ synopsis: "Direct-style control-flow and `await` for Lwt" maintainer: [ "Raphaël Proust " "Anton Bachin " ] +authors: ["Simon Cruanes"] license: "MIT" homepage: "https://github.com/ocsigen/lwt" doc: "https://ocsigen.org/lwt" @@ -31,4 +32,3 @@ build: [ ] ] dev-repo: "git+https://github.com/ocsigen/lwt.git" -authors: ["Simon Cruanes"] From 61f338d55edd8b1e719e9a1f21351937d7e36c3e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 11 Jul 2025 22:07:51 -0400 Subject: [PATCH 17/22] tighten a bit Lwt_direct, use Lwt.async_exception_hook reuse as much as possible from lwt --- src/direct/lwt_direct.ml | 16 ++++++---------- src/direct/lwt_direct.mli | 4 +--- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml index 954676c15..c7ed52e07 100644 --- a/src/direct/lwt_direct.ml +++ b/src/direct/lwt_direct.ml @@ -118,20 +118,16 @@ let run f : _ Lwt.t = (* part 4 (encore): running a task in the background *) -let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = +let run_inside_effect_handler_in_the_background_ f () : unit = let run_f () : unit = Storage.reset_to_empty(); try - f () - with exn -> - on_uncaught_exn exn + f () + with exn -> + !Lwt.async_exception_hook exn in Effect.Deep.try_with run_f () handler -let run_in_the_background ?on_uncaught_exn f : unit = - let on_uncaught_exn = match on_uncaught_exn with - | Some handler -> handler - | None -> !Lwt.async_exception_hook - in +let run_in_the_background f : unit = setup_hooks (); - push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) + push_task (run_inside_effect_handler_in_the_background_ f) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index 1e01dbec1..7d5922128 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -47,14 +47,12 @@ val run : (unit -> 'a) -> 'a Lwt.t raised by [f ()]. *) val run_in_the_background : - ?on_uncaught_exn:(exn -> unit) -> (unit -> unit) -> unit (** [run_in_the_background f] is similar to [ignore (run f)]. The computation [f()] runs in the background in the event loop and returns no result. - @param on_uncaught_exn if provided, this is called when [f()] - raises an exception. *) + If [f()] raises an exception, {!Lwt.async_exception_hook} is called. *) val yield : unit -> unit (** Yield to the event loop. From 05d32330b664567aa3f6e0e1f7d2e3a72af1ca30 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 11 Jul 2025 22:08:22 -0400 Subject: [PATCH 18/22] test: increase coverage --- test/direct/test_lwt_direct.ml | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/direct/test_lwt_direct.ml b/test/direct/test_lwt_direct.ml index e9207784b..b8eb0cd28 100644 --- a/test/direct/test_lwt_direct.ml +++ b/test/direct/test_lwt_direct.ml @@ -70,6 +70,39 @@ let main_tests = suite "main" [ List.iter (fun fut -> yield(); sum := !sum + await fut) items; !sum = 5050 end; + + test "awaiting on failing promise" begin fun () -> + let fut: unit Lwt.t = let* () = Lwt.pause () in let* () = Lwt_unix.sleep 0.0001 in Lwt.fail Exit in + run @@ fun () -> + try await fut; false + with Exit -> true + end; + + test "run can fail" begin fun () -> + run @@ fun () -> + let sub: unit Lwt.t = run @@ fun () -> + Lwt_unix.sleep 0.00001 |> await; + raise Exit + in + try await sub; false + with Exit -> true + end; + + test "concurrent fib" begin fun () -> + let rec badfib n = + if n <= 2 then Lwt.return 1 + else + run begin fun () -> + let f1 = badfib (n-1) in + let f2 = badfib (n-2) in + await f1 + await f2 + end + in + run @@ fun () -> + let fib12 = badfib 12 in + let fib12 = await fib12 in + fib12 = 144 + end ] let storage_tests = suite "storage" [ From d6966749dad4c84cacd5457c65172bea523be78c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Jul 2025 11:05:37 +0200 Subject: [PATCH 19/22] add TODO in comment --- src/direct/lwt_direct.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml index c7ed52e07..f02d0b902 100644 --- a/src/direct/lwt_direct.ml +++ b/src/direct/lwt_direct.ml @@ -24,6 +24,7 @@ let run_all_tasks () : unit = with exn -> (* TODO 6.0: change async_exception handler to accept a backtrace, pass it here and at the other use site. *) + (* TODO 6.0: this and other try-with: respect exception-filter *) !Lwt.async_exception_hook exn done; (* In the case where there are no promises ready for wakeup, the scheduler's From 8587e967b6a3d81774355eec4012c6596f9c2b5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 15 Jul 2025 11:07:15 +0200 Subject: [PATCH 20/22] CHAGNELOG --- CHANGES | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGES b/CHANGES index 76a6cdb80..2b8e0e40c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,10 @@ +===== dev ===== + +====== Additions ====== + + * Lwt_direct using Lwt in direct-style. (Simon Cruanes, #1060) + + ===== 5.9.0 ===== ====== Additions ====== From 09fba3a988bdda0fd6a472815c11409f96f3e258 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 16 Jul 2025 22:06:21 -0400 Subject: [PATCH 21/22] rename `run` to `spawn` --- src/direct/lwt_direct.ml | 4 +-- src/direct/lwt_direct.mli | 20 +++++++-------- test/direct/test_lwt_direct.ml | 46 +++++++++++++++++----------------- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml index f02d0b902..346b07fda 100644 --- a/src/direct/lwt_direct.ml +++ b/src/direct/lwt_direct.ml @@ -111,7 +111,7 @@ let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : u in Effect.Deep.try_with run_f_and_set_res () handler -let run f : _ Lwt.t = +let spawn f : _ Lwt.t = setup_hooks (); let lwt, resolve = Lwt.wait () in push_task (run_inside_effect_handler_and_resolve_ resolve f); @@ -129,6 +129,6 @@ let run_inside_effect_handler_in_the_background_ f () : unit = in Effect.Deep.try_with run_f () handler -let run_in_the_background f : unit = +let spawn_in_the_background f : unit = setup_hooks (); push_task (run_inside_effect_handler_in_the_background_ f) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index 7d5922128..0cc4284e3 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -4,7 +4,7 @@ {{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. Instead of chaining promises using {!Lwt.bind} and {!Lwt.map} and other combinators, it becomes possible to start - lightweight "tasks" using [Lwt_direct.run (fun () -> ...)]. + lightweight "tasks" using [Lwt_direct.spawn (fun () -> ...)]. The body of such a task is written in direct-style code, using OCaml's standard control flow structures such as loops, higher-order functions, exception handlers, [match], etc. @@ -13,7 +13,7 @@ for example: {[ - Lwt_direct.run (fun () -> + Lwt_direct.spawn (fun () -> let continue = ref true in while !continue do match Lwt_io.read_line in_channel |> Lwt_direct.await with @@ -35,21 +35,21 @@ *) -val run : (unit -> 'a) -> 'a Lwt.t -(** [run f] runs the function [f ()] in a task within +val spawn : (unit -> 'a) -> 'a Lwt.t +(** [spawn f] runs the function [f ()] in a task within the [Lwt_unix] event loop. [f ()] can create [Lwt] promises and use {!await} to wait for them. Like any promise in Lwt, [f ()] can starve the event loop if it runs long computations without yielding to the event loop. When [f ()] terminates (successfully or not), the promise - [run f] is resolved with [f ()]'s result, or the exception + [spawn f] is resolved with [f ()]'s result, or the exception raised by [f ()]. *) -val run_in_the_background : +val spawn_in_the_background : (unit -> unit) -> unit -(** [run_in_the_background f] is similar to [ignore (run f)]. +(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. The computation [f()] runs in the background in the event loop and returns no result. If [f()] raises an exception, {!Lwt.async_exception_hook} is called. *) @@ -57,7 +57,7 @@ val run_in_the_background : val yield : unit -> unit (** Yield to the event loop. - Calling [yield] outside of {!run} or {!run_in_the_background} will raise an exception, + Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise an exception, crash your program, or otherwise cause errors. It is a programming error to do so. *) val await : 'a Lwt.t -> 'a @@ -66,13 +66,13 @@ val await : 'a Lwt.t -> 'a If [prom] is not resolved yet, [await prom] will suspend the current task and resume it when [prom] is resolved. - Calling [await] outside of {!run} or {!run_in_the_background} will raise an exception, + Calling [await] outside of {!spawn} or {!run_in_the_background} will raise an exception, crash your program, or otherwise cause errors. It is a programming error to do so. *) (** Local storage. This storage is the same as the one described with {!Lwt.key}, - except that it is usable from the inside of {!run} or + except that it is usable from the inside of {!spawn} or {!run_in_the_background}. Each task has its own storage, independent from other tasks or promises. *) diff --git a/test/direct/test_lwt_direct.ml b/test/direct/test_lwt_direct.ml index b8eb0cd28..74cab01ef 100644 --- a/test/direct/test_lwt_direct.ml +++ b/test/direct/test_lwt_direct.ml @@ -4,7 +4,7 @@ open Lwt.Syntax let main_tests = suite "main" [ test "basic await" begin fun () -> - let fut = run @@ fun () -> + let fut = spawn @@ fun () -> Lwt_unix.sleep 1e-6 |> await; 42 in @@ -17,7 +17,7 @@ let main_tests = suite "main" [ let fut2 = let+ () = Lwt_unix.sleep 2e-6 in 2 in let fut3 = let+ () = Lwt_unix.sleep 3e-6 in 3 in - run @@ fun () -> + spawn @@ fun () -> let x1 = fut1 |> await in let x2 = fut2 |> await in let x3 = fut3 |> await in @@ -26,16 +26,16 @@ let main_tests = suite "main" [ test "list.iter await" begin fun () -> let items = List.init 101 (fun i -> Lwt.return i) in - run @@ fun () -> + spawn @@ fun () -> let sum = ref 0 in List.iter (fun fut -> sum := !sum + await fut) items; !sum = 5050 end; - test "lwt_list.iter_p run" begin fun () -> + test "lwt_list.iter_p spawn" begin fun () -> let items = List.init 101 (fun i -> i) in let+ items = Lwt_list.map_p - (fun i -> run (fun () -> + (fun i -> spawn (fun () -> for _ = 0 to i mod 5 do yield () done; i )) @@ -44,14 +44,14 @@ let main_tests = suite "main" [ List.fold_left (+) 0 items = 5050 end; - test "run in background" begin fun () -> + test "spawn in background" begin fun () -> let stream, push = Lwt_stream.create_bounded 2 in - run_in_the_background (fun () -> + spawn_in_the_background (fun () -> for i = 1 to 10 do push#push i |> await done; push#close); - run @@ fun () -> + spawn @@ fun () -> let continue = ref true in let seen = ref [] in @@ -65,7 +65,7 @@ let main_tests = suite "main" [ test "list.iter await with yield" begin fun () -> let items = List.init 101 (fun i -> Lwt.return i) in - run @@ fun () -> + spawn @@ fun () -> let sum = ref 0 in List.iter (fun fut -> yield(); sum := !sum + await fut) items; !sum = 5050 @@ -73,14 +73,14 @@ let main_tests = suite "main" [ test "awaiting on failing promise" begin fun () -> let fut: unit Lwt.t = let* () = Lwt.pause () in let* () = Lwt_unix.sleep 0.0001 in Lwt.fail Exit in - run @@ fun () -> + spawn @@ fun () -> try await fut; false with Exit -> true end; - test "run can fail" begin fun () -> - run @@ fun () -> - let sub: unit Lwt.t = run @@ fun () -> + test "spawn can fail" begin fun () -> + spawn @@ fun () -> + let sub: unit Lwt.t = spawn @@ fun () -> Lwt_unix.sleep 0.00001 |> await; raise Exit in @@ -92,13 +92,13 @@ let main_tests = suite "main" [ let rec badfib n = if n <= 2 then Lwt.return 1 else - run begin fun () -> + spawn begin fun () -> let f1 = badfib (n-1) in let f2 = badfib (n-2) in await f1 + await f2 end in - run @@ fun () -> + spawn @@ fun () -> let fib12 = badfib 12 in let fib12 = await fib12 in fib12 = 144 @@ -109,7 +109,7 @@ let storage_tests = suite "storage" [ test "get set" begin fun () -> let k1 = Storage.new_key () in let k2 = Storage.new_key () in - run @@ fun () -> + spawn @@ fun () -> assert (Storage.get k1 = None); assert (Storage.get k2 = None); Storage.set k1 42; @@ -127,7 +127,7 @@ let storage_tests = suite "storage" [ test "storage across await" begin fun () -> let k = Storage.new_key () in - (* run another promise that touches storage *) + (* spawn another promise that touches storage *) let run_promise_async () = Lwt.async @@ fun () -> Lwt.with_value k (Some "something else") @@ fun () -> @@ -165,10 +165,10 @@ let storage_tests = suite "storage" [ assert (Storage.get k = Some "v2"); in - (* run multiple such tasks *) - let tasks = [ run one_task; run one_task; run one_task ] in + (* spawn multiple such tasks *) + let tasks = [ spawn one_task; spawn one_task; spawn one_task ] in - run @@ fun () -> + spawn @@ fun () -> List.iter await tasks; true end; @@ -178,7 +178,7 @@ let io_tests = suite "io" [ test "read io" begin fun () -> let str = "some\ninteresting\ntext string here!\n" in let ic = Lwt_io.of_bytes ~mode:Input (Lwt_bytes.of_string str) in - run @@ fun () -> + spawn @@ fun () -> let lines = ref [] in while try @@ -194,7 +194,7 @@ let io_tests = suite "io" [ test "pipe" begin fun () -> let ic, oc = Lwt_io.pipe() in - run_in_the_background (fun () -> + spawn_in_the_background (fun () -> for i = 1 to 100 do Lwt_io.write_line oc (string_of_int i) |> await; Lwt_io.flush oc |> await @@ -202,7 +202,7 @@ let io_tests = suite "io" [ Lwt_io.close oc |> await; ); - run @@ fun () -> + spawn @@ fun () -> let sum = ref 0 in let continue = ref true in while !continue do From 24e7ccef43e234fe1242509c7adeb2813d816652 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 27 Jul 2025 21:02:05 -0400 Subject: [PATCH 22/22] document cancelation (or lack thereof) --- src/direct/lwt_direct.mli | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli index 0cc4284e3..2c1c39391 100644 --- a/src/direct/lwt_direct.mli +++ b/src/direct/lwt_direct.mli @@ -44,7 +44,10 @@ val spawn : (unit -> 'a) -> 'a Lwt.t When [f ()] terminates (successfully or not), the promise [spawn f] is resolved with [f ()]'s result, or the exception - raised by [f ()]. *) + raised by [f ()]. + + The promise returned by [spawn f] is not cancellable. Canceling it + will have no effect. *) val spawn_in_the_background : (unit -> unit) ->