Skip to content

Commit 28cfa02

Browse files
committed
rate-limit: Add readers-writer lock to bucket table
Signed-off-by: Christian Pardillo Laursen <[email protected]>
1 parent ebcbe84 commit 28cfa02

File tree

2 files changed

+235
-40
lines changed

2 files changed

+235
-40
lines changed

ocaml/libs/rate-limit/bucket_table.ml

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,73 @@
1212
* GNU Lesser General Public License for more details.
1313
*)
1414

15-
type t = (string, Token_bucket.t) Hashtbl.t
15+
type t = {
16+
table: (string, Token_bucket.t) Hashtbl.t
17+
; mutable readers: int
18+
; reader_count: Mutex.t (* protects readers count *)
19+
; resource: Mutex.t (* held collectively by readers, exclusively by writers *)
20+
}
1621

17-
let create () = Hashtbl.create 16
22+
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute
1823

19-
let add_bucket table ~user_agent ~burst_size ~fill_rate =
20-
let bucket_option = Token_bucket.create ~burst_size ~fill_rate in
21-
match bucket_option with
22-
| Some bucket ->
23-
Hashtbl.replace table user_agent bucket ;
24-
true
25-
| None ->
26-
false
24+
let with_read_lock t f =
25+
with_lock t.reader_count (fun () ->
26+
t.readers <- t.readers + 1 ;
27+
if t.readers = 1 then Mutex.lock t.resource
28+
) ;
29+
Fun.protect f ~finally:(fun () ->
30+
with_lock t.reader_count (fun () ->
31+
t.readers <- t.readers - 1 ;
32+
if t.readers = 0 then Mutex.unlock t.resource
33+
)
34+
)
2735

28-
let delete_bucket table ~user_agent = Hashtbl.remove table user_agent
36+
let with_write_lock t f = with_lock t.resource f
2937

30-
let try_consume table ~user_agent amount =
31-
match Hashtbl.find_opt table user_agent with
32-
| None ->
33-
false
34-
| Some bucket ->
35-
Token_bucket.consume bucket amount
38+
let create () =
39+
{
40+
table= Hashtbl.create 10
41+
; readers= 0
42+
; reader_count= Mutex.create ()
43+
; resource= Mutex.create ()
44+
}
45+
46+
(* TODO: Indicate failure reason - did we get invalid config or try to add an
47+
already present user_agent? *)
48+
let add_bucket t ~user_agent ~burst_size ~fill_rate =
49+
with_write_lock t (fun () ->
50+
if Hashtbl.mem t.table user_agent then
51+
false
52+
else
53+
match Token_bucket.create ~burst_size ~fill_rate with
54+
| Some bucket ->
55+
Hashtbl.add t.table user_agent bucket ;
56+
true
57+
| None ->
58+
false
59+
)
60+
61+
let delete_bucket t ~user_agent =
62+
with_write_lock t (fun () -> Hashtbl.remove t.table user_agent)
63+
64+
let try_consume t ~user_agent amount =
65+
with_read_lock t (fun () ->
66+
match Hashtbl.find_opt t.table user_agent with
67+
| None ->
68+
false
69+
| Some bucket ->
70+
Token_bucket.consume bucket amount
71+
)
3672

37-
let peek table ~user_agent =
38-
Option.map Token_bucket.peek (Hashtbl.find_opt table user_agent)
73+
let peek t ~user_agent =
74+
with_read_lock t (fun () ->
75+
Option.map Token_bucket.peek (Hashtbl.find_opt t.table user_agent)
76+
)
3977

40-
let consume_and_block table ~user_agent amount =
41-
match Hashtbl.find_opt table user_agent with
78+
(* TODO this has fairness issues - fix with queue or similar *)
79+
let consume_and_block t ~user_agent amount =
80+
let bucket_opt = with_read_lock t (fun () -> Hashtbl.find_opt t.table user_agent) in
81+
match bucket_opt with
4282
| None ->
4383
()
4484
| Some bucket ->

ocaml/libs/rate-limit/test/test_bucket_table.ml

Lines changed: 174 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -126,33 +126,186 @@ let test_consume_and_block_nonexistent () =
126126
Alcotest.(check pass)
127127
"consume_and_block on nonexistent bucket should not block" () ()
128128

129-
let test_concurrent_access () =
129+
let test_add_same_key_race () =
130+
(* Test the check-then-act race in add_bucket.
131+
add_bucket does: if not mem then add. Without locking, multiple threads
132+
could all pass the mem check and try to add, but only one should succeed.
133+
Note: OCaml 4's GIL makes races hard to trigger, but this test verifies
134+
the invariant holds under concurrent access and would catch races if the
135+
GIL is released at allocation points within the critical section. *)
136+
let iterations = 500 in
137+
let threads_per_iter = 10 in
138+
let failures = ref 0 in
139+
let failures_mutex = Mutex.create () in
140+
for _ = 1 to iterations do
141+
let table = Bucket_table.create () in
142+
let success_count = ref 0 in
143+
let count_mutex = Mutex.create () in
144+
let barrier = ref 0 in
145+
let barrier_mutex = Mutex.create () in
146+
let threads =
147+
Array.init threads_per_iter (fun _ ->
148+
Thread.create
149+
(fun () ->
150+
(* Increment barrier and wait for all threads *)
151+
Mutex.lock barrier_mutex ;
152+
incr barrier ;
153+
Mutex.unlock barrier_mutex ;
154+
while
155+
Mutex.lock barrier_mutex ;
156+
let b = !barrier in
157+
Mutex.unlock barrier_mutex ;
158+
b < threads_per_iter
159+
do
160+
Thread.yield ()
161+
done ;
162+
(* All threads try to add the same key simultaneously *)
163+
let success =
164+
Bucket_table.add_bucket table ~user_agent:"contested_key"
165+
~burst_size:10.0 ~fill_rate:1.0
166+
in
167+
if success then (
168+
Mutex.lock count_mutex ;
169+
incr success_count ;
170+
Mutex.unlock count_mutex
171+
)
172+
)
173+
()
174+
)
175+
in
176+
Array.iter Thread.join threads ;
177+
(* Exactly one thread should succeed in adding the key *)
178+
if !success_count <> 1 then (
179+
Mutex.lock failures_mutex ;
180+
incr failures ;
181+
Mutex.unlock failures_mutex
182+
)
183+
done ;
184+
Alcotest.(check int)
185+
"Exactly one add should succeed for same key (across all iterations)" 0
186+
!failures
187+
188+
let test_concurrent_add_delete_stress () =
189+
(* Stress test: rapidly add and delete entries.
190+
Without proper locking, hashtable can get corrupted. *)
130191
let table = Bucket_table.create () in
131-
let _ =
132-
Bucket_table.add_bucket table ~user_agent:"agent1" ~burst_size:100.0
133-
~fill_rate:0.01
192+
let iterations = 1000 in
193+
let num_keys = 10 in
194+
let errors = ref 0 in
195+
let errors_mutex = Mutex.create () in
196+
let add_threads =
197+
Array.init 5 (fun t ->
198+
Thread.create
199+
(fun () ->
200+
for i = 0 to iterations - 1 do
201+
let key = Printf.sprintf "key%d" ((t * iterations + i) mod num_keys) in
202+
let _ =
203+
Bucket_table.add_bucket table ~user_agent:key ~burst_size:10.0
204+
~fill_rate:1.0
205+
in
206+
()
207+
done
208+
)
209+
()
210+
)
134211
in
135-
let successful_consumes = ref 0 in
136-
let counter_mutex = Mutex.create () in
137-
let threads =
138-
Array.init 20 (fun _ ->
212+
let delete_threads =
213+
Array.init 5 (fun t ->
139214
Thread.create
140215
(fun () ->
141-
let success =
142-
Bucket_table.try_consume table ~user_agent:"agent1" 5.0
143-
in
144-
if success then (
145-
Mutex.lock counter_mutex ;
146-
incr successful_consumes ;
147-
Mutex.unlock counter_mutex
148-
)
216+
for i = 0 to iterations - 1 do
217+
let key = Printf.sprintf "key%d" ((t * iterations + i) mod num_keys) in
218+
Bucket_table.delete_bucket table ~user_agent:key
219+
done
149220
)
150221
()
151222
)
152223
in
153-
Array.iter Thread.join threads ;
224+
let read_threads =
225+
Array.init 5 (fun t ->
226+
Thread.create
227+
(fun () ->
228+
for i = 0 to iterations - 1 do
229+
let key = Printf.sprintf "key%d" ((t * iterations + i) mod num_keys) in
230+
(* This should never crash, even if key doesn't exist *)
231+
try
232+
let _ = Bucket_table.peek table ~user_agent:key in
233+
()
234+
with _ ->
235+
Mutex.lock errors_mutex ;
236+
incr errors ;
237+
Mutex.unlock errors_mutex
238+
done
239+
)
240+
()
241+
)
242+
in
243+
Array.iter Thread.join add_threads ;
244+
Array.iter Thread.join delete_threads ;
245+
Array.iter Thread.join read_threads ;
246+
Alcotest.(check int) "No errors during concurrent operations" 0 !errors
247+
248+
let test_consume_during_delete_race () =
249+
(* Test that try_consume doesn't crash when bucket is being deleted.
250+
Without proper locking, we could try to access a deleted bucket. *)
251+
let iterations = 500 in
252+
let errors = ref 0 in
253+
let errors_mutex = Mutex.create () in
254+
for _ = 1 to iterations do
255+
let table = Bucket_table.create () in
256+
let _ =
257+
Bucket_table.add_bucket table ~user_agent:"target" ~burst_size:100.0
258+
~fill_rate:1.0
259+
in
260+
let barrier = ref 0 in
261+
let barrier_mutex = Mutex.create () in
262+
let consumer =
263+
Thread.create
264+
(fun () ->
265+
Mutex.lock barrier_mutex ;
266+
incr barrier ;
267+
Mutex.unlock barrier_mutex ;
268+
while
269+
Mutex.lock barrier_mutex ;
270+
let b = !barrier in
271+
Mutex.unlock barrier_mutex ;
272+
b < 2
273+
do
274+
Thread.yield ()
275+
done ;
276+
try
277+
let _ = Bucket_table.try_consume table ~user_agent:"target" 1.0 in
278+
()
279+
with _ ->
280+
Mutex.lock errors_mutex ;
281+
incr errors ;
282+
Mutex.unlock errors_mutex
283+
)
284+
()
285+
in
286+
let deleter =
287+
Thread.create
288+
(fun () ->
289+
Mutex.lock barrier_mutex ;
290+
incr barrier ;
291+
Mutex.unlock barrier_mutex ;
292+
while
293+
Mutex.lock barrier_mutex ;
294+
let b = !barrier in
295+
Mutex.unlock barrier_mutex ;
296+
b < 2
297+
do
298+
Thread.yield ()
299+
done ;
300+
Bucket_table.delete_bucket table ~user_agent:"target"
301+
)
302+
()
303+
in
304+
Thread.join consumer ;
305+
Thread.join deleter
306+
done ;
154307
Alcotest.(check int)
155-
"Exactly 20 consumes should succeed" 20 !successful_consumes
308+
"No crashes during consume/delete race" 0 !errors
156309

157310
let test =
158311
[
@@ -168,7 +321,9 @@ let test =
168321
; ("Multiple agents", `Quick, test_multiple_agents)
169322
; ("Consume and block", `Slow, test_consume_and_block)
170323
; ("Consume and block nonexistent", `Quick, test_consume_and_block_nonexistent)
171-
; ("Concurrent access", `Quick, test_concurrent_access)
324+
; ("Add same key race", `Quick, test_add_same_key_race)
325+
; ("Concurrent add/delete stress", `Quick, test_concurrent_add_delete_stress)
326+
; ("Consume during delete race", `Quick, test_consume_during_delete_race)
172327
]
173328

174329
let () = Alcotest.run "Bucket table library" [("Bucket table tests", test)]

0 commit comments

Comments
 (0)