-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathcatalyst_test.ml
81 lines (66 loc) · 2.51 KB
/
catalyst_test.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
module Scheduler = (val Reagents.Toy_scheduler.make 1 ())
module Reagents = Reagents.Make (Scheduler)
module Counter = Reagents.Data.Counter
open Reagents
let message_counter () =
Scheduler.run (fun () ->
let receiver_counter = Atomic.make 0 in
let assert_counter v = assert (Atomic.get receiver_counter == v) in
let (c1 : (unit, unit) Channel.endpoint), c2 = Channel.mk_chan () in
let receiver =
let open Reagents in
Channel.swap c2 >>> lift (fun () -> Atomic.incr receiver_counter)
in
Reagents.Catalyst.catalyse receiver () |> ignore;
assert_counter 0;
Reagents.run (Channel.swap c1) ();
assert_counter 1;
Reagents.run (Channel.swap c1) ();
assert_counter 2)
let three_channels_joined () =
Scheduler.run (fun () ->
let transferred = Atomic.make 0 in
let (a1 : (int, unit) Channel.endpoint), a2 = Channel.mk_chan () in
let (b1 : (int, unit) Channel.endpoint), b2 = Channel.mk_chan () in
let (c1 : (int, unit) Channel.endpoint), c2 = Channel.mk_chan () in
let forward receive send =
let open Reagents in
Channel.swap receive >>> Channel.swap send
in
Reagents.Catalyst.catalyse (forward a2 b1) () |> ignore;
Reagents.Catalyst.catalyse (forward b2 c1) () |> ignore;
Scheduler.fork (fun () ->
let v = Reagents.run (Channel.swap c2) () in
Atomic.set transferred v);
Reagents.run (Channel.swap a1) 1;
while Atomic.get transferred == 0 do
() (* not necessary with 1 thr *)
done;
assert (Atomic.get transferred == 1);
())
let message_counter_stress () =
Scheduler.run (fun () ->
let receiver_counter = Atomic.make 0 in
let assert_counter v = assert (Atomic.get receiver_counter == v) in
let (c1 : (unit, unit) Channel.endpoint), c2 = Channel.mk_chan () in
let receiver =
let open Reagents in
Channel.swap c2 >>> lift (fun () -> Atomic.incr receiver_counter)
in
Reagents.Catalyst.catalyse receiver () |> ignore;
let count = 1_000_000 in
for _ = 1 to count do
Reagents.run (Channel.swap c1) (Sys.opaque_identity ())
done;
assert_counter count)
let () =
let open Alcotest in
run "catalyst test"
[
( "simple",
[
test_case "message counter" `Quick message_counter;
test_case "three channels joined" `Quick three_channels_joined;
test_case "message counter, 10^6 items" `Quick message_counter_stress;
] );
]