Skip to content

Commit fbf70b1

Browse files
committed
rate-limit: Replace readers-writer lock with atomic Map
Signed-off-by: Christian Pardillo Laursen <[email protected]>
1 parent 69f33b5 commit fbf70b1

File tree

2 files changed

+57
-143
lines changed

2 files changed

+57
-143
lines changed

ocaml/libs/rate-limit/bucket_table.ml

Lines changed: 57 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -23,37 +23,13 @@ type rate_limit_data = {
2323
}
2424
[@@warning "-69"]
2525

26-
type t = {
27-
table: (string, rate_limit_data) Hashtbl.t
28-
; mutable readers: int
29-
; readers_lock: Mutex.t (* protects readers count *)
30-
; table_lock: Mutex.t
31-
(* held collectively by readers, exclusively by writers *)
32-
}
33-
34-
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute
26+
module StringMap = Map.Make (String)
3527

36-
let with_read_lock t f =
37-
with_lock t.readers_lock (fun () ->
38-
t.readers <- t.readers + 1 ;
39-
if t.readers = 1 then Mutex.lock t.table_lock
40-
) ;
41-
Fun.protect f ~finally:(fun () ->
42-
with_lock t.readers_lock (fun () ->
43-
t.readers <- t.readers - 1 ;
44-
if t.readers = 0 then Mutex.unlock t.table_lock
45-
)
46-
)
28+
type t = rate_limit_data StringMap.t Atomic.t
4729

48-
let with_write_lock t f = with_lock t.table_lock f
30+
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute
4931

50-
let create () =
51-
{
52-
table= Hashtbl.create 10
53-
; readers= 0
54-
; readers_lock= Mutex.create ()
55-
; table_lock= Mutex.create ()
56-
}
32+
let create () = Atomic.make StringMap.empty
5733

5834
(* The worker thread is responsible for calling the callback when the token
5935
amount becomes available *)
@@ -81,73 +57,70 @@ let rec worker_loop ~bucket ~process_queue ~process_queue_lock
8157
(* TODO: Indicate failure reason - did we get invalid config or try to add an
8258
already present user_agent? *)
8359
let add_bucket t ~user_agent ~burst_size ~fill_rate =
84-
with_write_lock t (fun () ->
85-
if Hashtbl.mem t.table user_agent then
60+
let map = Atomic.get t in
61+
if StringMap.mem user_agent map then
62+
false
63+
else
64+
match Token_bucket.create ~burst_size ~fill_rate with
65+
| Some bucket ->
66+
let process_queue = Queue.create () in
67+
let process_queue_lock = Mutex.create () in
68+
let worker_thread_cond = Condition.create () in
69+
let should_terminate = ref false in
70+
let worker_thread =
71+
Thread.create
72+
(fun () ->
73+
worker_loop ~bucket ~process_queue ~process_queue_lock
74+
~worker_thread_cond ~should_terminate
75+
)
76+
()
77+
in
78+
let data =
79+
{
80+
bucket
81+
; process_queue
82+
; process_queue_lock
83+
; worker_thread_cond
84+
; should_terminate
85+
; worker_thread
86+
}
87+
in
88+
let updated_map = StringMap.add user_agent data map in
89+
Atomic.set t updated_map ; true
90+
| None ->
8691
false
87-
else
88-
match Token_bucket.create ~burst_size ~fill_rate with
89-
| Some bucket ->
90-
let process_queue = Queue.create () in
91-
let process_queue_lock = Mutex.create () in
92-
let worker_thread_cond = Condition.create () in
93-
let should_terminate = ref false in
94-
let worker_thread =
95-
Thread.create
96-
(fun () ->
97-
worker_loop ~bucket ~process_queue ~process_queue_lock
98-
~worker_thread_cond ~should_terminate
99-
)
100-
()
101-
in
102-
let data =
103-
{
104-
bucket
105-
; process_queue
106-
; process_queue_lock
107-
; worker_thread_cond
108-
; should_terminate
109-
; worker_thread
110-
}
111-
in
112-
Hashtbl.add t.table user_agent data ;
113-
true
114-
| None ->
115-
false
116-
)
11792

11893
let delete_bucket t ~user_agent =
119-
with_write_lock t (fun () ->
120-
match Hashtbl.find_opt t.table user_agent with
121-
| None ->
122-
()
123-
| Some data ->
124-
Mutex.lock data.process_queue_lock ;
125-
data.should_terminate := true ;
126-
Condition.signal data.worker_thread_cond ;
127-
Mutex.unlock data.process_queue_lock ;
128-
Hashtbl.remove t.table user_agent
129-
)
94+
let map = Atomic.get t in
95+
match StringMap.find_opt user_agent map with
96+
| None ->
97+
()
98+
| Some data ->
99+
Mutex.lock data.process_queue_lock ;
100+
data.should_terminate := true ;
101+
Condition.signal data.worker_thread_cond ;
102+
Mutex.unlock data.process_queue_lock ;
103+
Atomic.set t (StringMap.remove user_agent map)
130104

131105
let try_consume t ~user_agent amount =
132-
with_read_lock t (fun () ->
133-
match Hashtbl.find_opt t.table user_agent with
134-
| None ->
135-
false
136-
| Some data ->
137-
Token_bucket.consume data.bucket amount
138-
)
106+
let map = Atomic.get t in
107+
match StringMap.find_opt user_agent map with
108+
| None ->
109+
false
110+
| Some data ->
111+
Token_bucket.consume data.bucket amount
139112

140113
let peek t ~user_agent =
141-
with_read_lock t (fun () ->
142-
Option.map
143-
(fun contents -> Token_bucket.peek contents.bucket)
144-
(Hashtbl.find_opt t.table user_agent)
145-
)
114+
let map = Atomic.get t in
115+
Option.map
116+
(fun contents -> Token_bucket.peek contents.bucket)
117+
(StringMap.find_opt user_agent map)
146118

147119
(* The callback should return quickly - if it is a longer task it is
148120
responsible for creating a thread to do the task *)
149121
let submit t ~user_agent ~callback amount =
150-
match with_read_lock t (fun () -> Hashtbl.find_opt t.table user_agent) with
122+
let map = Atomic.get t in
123+
match StringMap.find_opt user_agent map with
151124
| None ->
152125
callback ()
153126
| Some {bucket; process_queue; process_queue_lock; worker_thread_cond; _} ->

ocaml/libs/rate-limit/test/test_bucket_table.ml

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -198,64 +198,6 @@ let test_submit_sync () =
198198
Alcotest.(check bool)
199199
"blocked waiting for tokens" true (elapsed_seconds >= 0.4)
200200

201-
let test_add_same_key_race () =
202-
(* Test the check-then-act race in add_bucket.
203-
add_bucket does: if not mem then add. Without locking, multiple threads
204-
could all pass the mem check and try to add, but only one should succeed.
205-
Note: OCaml 4's GIL makes races hard to trigger, but this test verifies
206-
the invariant holds under concurrent access and would catch races if the
207-
GIL is released at allocation points within the critical section. *)
208-
let iterations = 500 in
209-
let threads_per_iter = 10 in
210-
let failures = ref 0 in
211-
let failures_mutex = Mutex.create () in
212-
for _ = 1 to iterations do
213-
let table = Bucket_table.create () in
214-
let success_count = ref 0 in
215-
let count_mutex = Mutex.create () in
216-
let barrier = ref 0 in
217-
let barrier_mutex = Mutex.create () in
218-
let threads =
219-
Array.init threads_per_iter (fun _ ->
220-
Thread.create
221-
(fun () ->
222-
(* Increment barrier and wait for all threads *)
223-
Mutex.lock barrier_mutex ;
224-
incr barrier ;
225-
Mutex.unlock barrier_mutex ;
226-
while
227-
Mutex.lock barrier_mutex ;
228-
let b = !barrier in
229-
Mutex.unlock barrier_mutex ; b < threads_per_iter
230-
do
231-
Thread.yield ()
232-
done ;
233-
(* All threads try to add the same key simultaneously *)
234-
let success =
235-
Bucket_table.add_bucket table ~user_agent:"contested_key"
236-
~burst_size:10.0 ~fill_rate:1.0
237-
in
238-
if success then (
239-
Mutex.lock count_mutex ;
240-
incr success_count ;
241-
Mutex.unlock count_mutex
242-
)
243-
)
244-
()
245-
)
246-
in
247-
Array.iter Thread.join threads ;
248-
(* Exactly one thread should succeed in adding the key *)
249-
if !success_count <> 1 then (
250-
Mutex.lock failures_mutex ;
251-
incr failures ;
252-
Mutex.unlock failures_mutex
253-
)
254-
done ;
255-
Alcotest.(check int)
256-
"Exactly one add should succeed for same key (across all iterations)" 0
257-
!failures
258-
259201
let test_concurrent_add_delete_stress () =
260202
(* Stress test: rapidly add and delete entries.
261203
Without proper locking, hashtable can get corrupted. *)
@@ -394,7 +336,6 @@ let test =
394336
; ("Submit nonexistent", `Quick, test_submit_nonexistent)
395337
; ("Submit fairness", `Slow, test_submit_fairness)
396338
; ("Submit sync", `Slow, test_submit_sync)
397-
; ("Add same key race", `Quick, test_add_same_key_race)
398339
; ("Concurrent add/delete stress", `Quick, test_concurrent_add_delete_stress)
399340
; ("Consume during delete race", `Quick, test_consume_during_delete_race)
400341
]

0 commit comments

Comments
 (0)