|
12 | 12 | * GNU Lesser General Public License for more details. |
13 | 13 | *) |
14 | 14 |
|
15 | | -let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute |
| 15 | +type state = {tokens: float; last_refill: Mtime.span} |
16 | 16 |
|
17 | | -type t = { |
18 | | - burst_size: float |
19 | | - ; fill_rate: float |
20 | | - ; mutable tokens: float |
21 | | - ; mutable last_refill: Mtime.span |
22 | | - ; mutex: Mutex.t |
23 | | -} |
| 17 | +type t = {burst_size: float; fill_rate: float; state: state Atomic.t} |
24 | 18 |
|
25 | 19 | let create_with_timestamp timestamp ~burst_size ~fill_rate = |
26 | 20 | if fill_rate <= 0. then |
27 | 21 | None |
28 | 22 | else |
29 | | - Some |
30 | | - { |
31 | | - burst_size |
32 | | - ; fill_rate |
33 | | - ; tokens= burst_size |
34 | | - ; last_refill= timestamp |
35 | | - ; mutex= Mutex.create () |
36 | | - } |
| 23 | + let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in |
| 24 | + Some {burst_size; fill_rate; state} |
37 | 25 |
|
38 | 26 | let create = create_with_timestamp (Mtime_clock.elapsed ()) |
39 | 27 |
|
40 | | -let peek_with_timestamp timestamp tb = |
41 | | - let time_delta = Mtime.Span.abs_diff tb.last_refill timestamp in |
| 28 | +let compute_tokens timestamp {tokens; last_refill} ~burst_size ~fill_rate = |
| 29 | + let time_delta = Mtime.Span.abs_diff last_refill timestamp in |
42 | 30 | let time_delta_seconds = Mtime.Span.to_float_ns time_delta *. 1e-9 in |
43 | | - min tb.burst_size (tb.tokens +. (time_delta_seconds *. tb.fill_rate)) |
| 31 | + min burst_size (tokens +. (time_delta_seconds *. fill_rate)) |
| 32 | + |
| 33 | +let peek_with_timestamp timestamp tb = |
| 34 | + let tb_state = Atomic.get tb.state in |
| 35 | + compute_tokens timestamp tb_state ~burst_size:tb.burst_size |
| 36 | + ~fill_rate:tb.fill_rate |
44 | 37 |
|
45 | 38 | let peek tb = peek_with_timestamp (Mtime_clock.elapsed ()) tb |
46 | 39 |
|
47 | 40 | let consume_with_timestamp get_time tb amount = |
48 | | - let do_consume () = |
| 41 | + let rec try_consume () = |
49 | 42 | let timestamp = get_time () in |
50 | | - let new_tokens = peek_with_timestamp timestamp tb in |
51 | | - tb.last_refill <- timestamp ; |
52 | | - if new_tokens >= amount then ( |
53 | | - tb.tokens <- new_tokens -. amount ; |
54 | | - true |
55 | | - ) else ( |
56 | | - tb.tokens <- new_tokens ; |
57 | | - false |
58 | | - ) |
| 43 | + let old_state = Atomic.get tb.state in |
| 44 | + let new_tokens = |
| 45 | + compute_tokens timestamp old_state ~burst_size:tb.burst_size |
| 46 | + ~fill_rate:tb.fill_rate |
| 47 | + in |
| 48 | + let success, final_tokens = |
| 49 | + if new_tokens >= amount then |
| 50 | + (true, new_tokens -. amount) |
| 51 | + else |
| 52 | + (false, new_tokens) |
| 53 | + in |
| 54 | + let new_state = {tokens= final_tokens; last_refill= timestamp} in |
| 55 | + if Atomic.compare_and_set tb.state old_state new_state then |
| 56 | + success |
| 57 | + else |
| 58 | + try_consume () |
59 | 59 | in |
60 | | - with_lock tb.mutex do_consume |
| 60 | + try_consume () |
61 | 61 |
|
62 | 62 | let consume = consume_with_timestamp Mtime_clock.elapsed |
63 | 63 |
|
64 | 64 | let get_delay_until_available_timestamp timestamp tb amount = |
65 | | - let current_tokens = peek_with_timestamp timestamp tb in |
| 65 | + let {tokens; last_refill} = Atomic.get tb.state in |
| 66 | + let current_tokens = |
| 67 | + compute_tokens timestamp {tokens; last_refill} ~burst_size:tb.burst_size |
| 68 | + ~fill_rate:tb.fill_rate |
| 69 | + in |
66 | 70 | let required_tokens = max 0. (amount -. current_tokens) in |
67 | 71 | required_tokens /. tb.fill_rate |
68 | 72 |
|
|
0 commit comments