Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8f93764
libs: Add token-bucket library
cplaursen Nov 11, 2025
60b637b
rate-limit: Test token bucket
cplaursen Nov 27, 2025
9123696
rate-limit: Implement bucket tables
cplaursen Nov 27, 2025
1112ae5
rate-limit: Create bucket table from xapi globs
cplaursen Nov 28, 2025
e933851
xapi: Add rate limiting to do_dispatch
cplaursen Nov 28, 2025
9fd1f49
xapi rate limiting: Add logging
cplaursen Dec 1, 2025
69585f8
rate_limit: Add rate limiter to xapi initialisation
cplaursen Dec 1, 2025
0047b24
Rate limiting: Improve token_bucket documentation
cplaursen Dec 1, 2025
68993df
Rate limiting: token buckets with zero or negative fill rate fail
cplaursen Dec 1, 2025
a540ca7
rate-limit: Write unit tests for bucket table
cplaursen Dec 2, 2025
ebcbe84
rate-limit: Minor fixes to bucket table
cplaursen Dec 2, 2025
d6b68df
rate-limit: Add readers-writer lock to bucket table
cplaursen Dec 4, 2025
2cf148a
rate-limit: Handle rate limited requests in FIFO queue
cplaursen Dec 4, 2025
786abb6
rate-limit: Replace readers-writer lock with atomic Map
cplaursen Dec 5, 2025
993e6ca
rate-limit: Clarify token bucket creation docs
cplaursen Dec 5, 2025
3cf71dc
idl: Add Rate_limit datamodel
cplaursen Dec 2, 2025
18f8a88
xapi-cli-server: Add rate limit CLI operations
cplaursen Dec 3, 2025
19f18b3
token_bucket: replace mutex with lock-free atomics
cplaursen Dec 2, 2025
3b02b8c
xapi_rate_limit: Replace xapi_globs support with datamodel
cplaursen Dec 8, 2025
ff6be8f
xapi_http: Add rate limiting to all handlers
cplaursen Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@
(name tgroup)
(depends xapi-log xapi-stdext-unix))

(package
(name rate-limit)
(synopsis "Simple token bucket-based rate-limiting")
(depends
(ocaml (>= 4.12))
xapi-log xapi-stdext-unix))

(package
(name xml-light2))

Expand Down
6 changes: 6 additions & 0 deletions ocaml/idl/datamodel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10535,6 +10535,7 @@ let all_system =
; Datamodel_vm_group.t
; Datamodel_host_driver.t
; Datamodel_driver_variant.t
; Datamodel_rate_limit.t
]

(* If the relation is one-to-many, the "many" nodes (one edge each) must come before the "one" node (many edges) *)
Expand Down Expand Up @@ -10786,6 +10787,7 @@ let expose_get_all_messages_for =
; _observer
; _host_driver
; _driver_variant
; _rate_limit
]

let no_task_id_for = [_task; (* _alert; *) _event]
Expand Down Expand Up @@ -11142,6 +11144,10 @@ let http_actions =
; ("put_bundle", (Put, Constants.put_bundle_uri, true, [], _R_POOL_OP, []))
]

(* Actions that incorporate the rate limiter from Xapi_rate_limiting within their handler
For now, just RPC calls *)
let custom_rate_limit_http_actions = ["post_root"; "post_RPC2"; "post_jsonrpc"]

(* these public http actions will NOT be checked by RBAC *)
(* they are meant to be used in exceptional cases where RBAC is already *)
(* checked inside them, such as in the XMLRPC (API) calls *)
Expand Down
2 changes: 2 additions & 0 deletions ocaml/idl/datamodel_common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ let _host_driver = "Host_driver"

let _driver_variant = "Driver_variant"

let _rate_limit = "Rate_limit"

let update_guidances =
Enum
( "update_guidances"
Expand Down
10 changes: 10 additions & 0 deletions ocaml/idl/datamodel_lifecycle.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
let prototyped_of_class = function
| "Rate_limit" ->
Some "25.38.0-next"
| "Driver_variant" ->
Some "25.2.0"
| "Host_driver" ->
Expand All @@ -13,6 +15,14 @@ let prototyped_of_class = function
None

let prototyped_of_field = function
| "Rate_limit", "fill_rate" ->
Some "25.38.0-next"
| "Rate_limit", "burst_size" ->
Some "25.38.0-next"
| "Rate_limit", "client_id" ->
Some "25.38.0-next"
| "Rate_limit", "uuid" ->
Some "25.38.0-next"
| "Driver_variant", "status" ->
Some "25.2.0"
| "Driver_variant", "priority" ->
Expand Down
40 changes: 40 additions & 0 deletions ocaml/idl/datamodel_rate_limit.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
(*
* Copyright (C) 2023 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

open Datamodel_types
open Datamodel_common
open Datamodel_roles

let lifecycle = []

let t =
create_obj ~name:_rate_limit ~descr:"Rate limiting policy for a XAPI client"
~doccomments:[] ~gen_constructor_destructor:true ~gen_events:true
~in_db:true ~lifecycle:[] ~persist:PersistEverything ~in_oss_since:None
~messages_default_allowed_roles:_R_POOL_ADMIN
~contents:
([uid _rate_limit ~lifecycle]
@ [
field ~qualifier:StaticRO ~ty:String ~lifecycle "client_id"
"An identifier for the rate limited client" ~ignore_foreign_key:true
~default_value:(Some (VString ""))
; field ~qualifier:StaticRO ~ty:Float ~lifecycle "burst_size"
"Amount of tokens that can be consumed in one burst"
Copy link
Member

@psafont psafont Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the idl should mention tokens or buckets at all, instead I would try to communicate the meaning of the parameters in a way that allows users to make a mental model of how rate limiting works:

Suggested change
"Amount of tokens that can be consumed in one burst"
"Amount of RPC calls that the client can do in burst"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we shouldn't talk about token buckets and I'll change that. The plan is to assign higher token costs to more expensive calls, e.g. VM create, so we can't simplify to the level of RPC calls, but I'll figure out how to document this for users.

~ignore_foreign_key:true ~default_value:(Some (VFloat 0.))
; field ~qualifier:StaticRO ~ty:Float ~lifecycle "fill_rate"
"Tokens added to token bucket per second" ~ignore_foreign_key:true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Tokens added to token bucket per second" ~ignore_foreign_key:true
"Calls per second afforded to the client" ~ignore_foreign_key:true

~default_value:(Some (VFloat 0.))
]
)
~messages:[] ()
2 changes: 1 addition & 1 deletion ocaml/idl/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
datamodel_values datamodel_schema datamodel_certificate
datamodel_diagnostics datamodel_repository datamodel_lifecycle
datamodel_vtpm datamodel_observer datamodel_vm_group api_version
datamodel_host_driver datamodel_driver_variant)
datamodel_host_driver datamodel_driver_variant datamodel_rate_limit)
(libraries
rpclib.core
sexplib0
Expand Down
2 changes: 1 addition & 1 deletion ocaml/idl/schematest.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ let hash x = Digest.string x |> Digest.to_hex
(* BEWARE: if this changes, check that schema has been bumped accordingly in
ocaml/idl/datamodel_common.ml, usually schema_minor_vsn *)

let last_known_schema_hash = "3b20f4304cfaaa7b6213af91ae632e64"
let last_known_schema_hash = "4708cb1f0cf7c1231c6958590ee1ed04"

let current_schema_hash : string =
let open Datamodel_types in
Expand Down
158 changes: 158 additions & 0 deletions ocaml/libs/rate-limit/bucket_table.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
(*
* Copyright (C) 2025 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

type rate_limit_data = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use a simpler name like bucket, consumer or similar.

bucket: Token_bucket.t
; process_queue:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to let the caller know how long it should delay before moving ahead?
For example:

consume t amount |> Option.iter Thread.delay ;
handle ()

The schedules of different threads can be partially sorted by the delays returned to them.

(float * (unit -> unit)) Queue.t (* contains token cost and callback *)
; process_queue_lock: Mutex.t
; worker_thread_cond: Condition.t
; should_terminate: bool ref (* signal termination to worker thread *)
; worker_thread: Thread.t
}
[@@warning "-69"]

module StringMap = Map.Make (String)

type t = rate_limit_data StringMap.t Atomic.t

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

let create () = Atomic.make StringMap.empty

let mem t ~user_agent =
let map = Atomic.get t in
StringMap.mem user_agent map

(* The worker thread is responsible for calling the callback when the token
amount becomes available *)
let rec worker_loop ~bucket ~process_queue ~process_queue_lock
~worker_thread_cond ~should_terminate =
let process_item cost callback =
Token_bucket.delay_then_consume bucket cost ;
callback ()
in
Mutex.lock process_queue_lock ;
while Queue.is_empty process_queue && not !should_terminate do
Condition.wait worker_thread_cond process_queue_lock
done ;
let item_opt = Queue.take_opt process_queue in
Mutex.unlock process_queue_lock ;
match item_opt with
| None ->
(* Queue is empty only when termination was signalled *)
()
| Some (cost, callback) ->
process_item cost callback ;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that synchronous API calls, which do not include a Thread.create call in do_dispatch, are now executed on this worker thread rather than on the thread that handles the incoming connection. This means that no other call from the same client (using the same token bucket) will be executed concurrently, even if enough tokens become available in the bucket. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed, that's right. I'm doing this under the assumption that synchronous calls take a relatively small amount of time, so if they have already been rate limited then the rate limiting should be the bottleneck, rather than execution time. If this doesn't hold (as would be the case for a bucket with a high enough refill rate) then we can introduce a thread pool instead of a single thread for handling rate limited requests.

worker_loop ~bucket ~process_queue ~process_queue_lock ~worker_thread_cond
~should_terminate

(* TODO: Indicate failure reason - did we get invalid config or try to add an
already present user_agent? *)
let add_bucket t ~user_agent ~burst_size ~fill_rate =
let map = Atomic.get t in
if StringMap.mem user_agent map then
false
else
match Token_bucket.create ~burst_size ~fill_rate with
| Some bucket ->
let process_queue = Queue.create () in
let process_queue_lock = Mutex.create () in
let worker_thread_cond = Condition.create () in
let should_terminate = ref false in
let worker_thread =
Thread.create
(fun () ->
worker_loop ~bucket ~process_queue ~process_queue_lock
~worker_thread_cond ~should_terminate
)
()
in
let data =
{
bucket
; process_queue
; process_queue_lock
; worker_thread_cond
; should_terminate
; worker_thread
}
in
let updated_map = StringMap.add user_agent data map in
Atomic.set t updated_map ; true
| None ->
false

let delete_bucket t ~user_agent =
let map = Atomic.get t in
match StringMap.find_opt user_agent map with
| None ->
()
| Some data ->
Mutex.lock data.process_queue_lock ;
data.should_terminate := true ;
Condition.signal data.worker_thread_cond ;
Mutex.unlock data.process_queue_lock ;
Atomic.set t (StringMap.remove user_agent map)

let try_consume t ~user_agent amount =
let map = Atomic.get t in
match StringMap.find_opt user_agent map with
| None ->
false
| Some data ->
Token_bucket.consume data.bucket amount

let peek t ~user_agent =
let map = Atomic.get t in
Option.map
(fun contents -> Token_bucket.peek contents.bucket)
(StringMap.find_opt user_agent map)

(* The callback should return quickly - if it is a longer task it is
responsible for creating a thread to do the task *)
let submit t ~user_agent ~callback amount =
let map = Atomic.get t in
match StringMap.find_opt user_agent map with
| None ->
callback ()
| Some {bucket; process_queue; process_queue_lock; worker_thread_cond; _} ->
with_lock process_queue_lock (fun () ->
if Queue.is_empty process_queue && Token_bucket.consume bucket amount
then
callback ()
else
let need_signal = Queue.is_empty process_queue in
Queue.add (amount, callback) process_queue ;
if need_signal then Condition.signal worker_thread_cond
)

let submit_sync t ~user_agent ~callback amount =
let result = ref None in
let mutex = Mutex.create () in
let condition = Condition.create () in
let wrapped_callback () =
let r = callback () in
Mutex.lock mutex ;
result := Some r ;
Condition.signal condition ;
Mutex.unlock mutex
in
submit t ~user_agent ~callback:wrapped_callback amount ;
Mutex.lock mutex ;
while Option.is_none !result do
Condition.wait condition mutex
done ;
Mutex.unlock mutex ;
Option.get !result
51 changes: 51 additions & 0 deletions ocaml/libs/rate-limit/bucket_table.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
(*
* Copyright (C) 2025 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

(** Hash table mapping client identifiers to their token buckets for rate limiting. *)
type t

val create : unit -> t
(** [create ()] creates a new empty bucket table. *)

val add_bucket :
t -> user_agent:string -> burst_size:float -> fill_rate:float -> bool
(** [add_bucket table ~user_agent ~burst_size ~fill_rate] adds a token bucket
for the given user agent. Returns [false] if a bucket already exists, or if
the bucket configuration is invalid, e.g. negative/zero fill rate. *)

val mem : t -> user_agent:string -> bool
(** [mem table ~user_agent] returns whether [user_agent] has an associated
token bucket in the bucket table *)

val peek : t -> user_agent:string -> float option
(** [peek table ~user_agent] returns the current token count for the user agent,
or [None] if no bucket exists. *)

val delete_bucket : t -> user_agent:string -> unit
(** [delete_bucket table ~user_agent] removes the bucket for the user agent. *)

val try_consume : t -> user_agent:string -> float -> bool
(** [try_consume table ~user_agent amount] attempts to consume tokens.
Returns [true] on success, [false] if insufficient tokens. *)

val submit : t -> user_agent:string -> callback:(unit -> unit) -> float -> unit
(** [submit table ~user_agent ~callback amount] submits a callback to be executed
under rate limiting. If tokens are immediately available and no callbacks are
queued, the callback runs synchronously. Otherwise, it is enqueued and will
be executed by a worker thread when tokens become available. Returns immediately. *)

val submit_sync : t -> user_agent:string -> callback:(unit -> 'a) -> float -> 'a
(** [submit_sync table ~user_agent ~callback amount] submits a callback to be
executed under rate limiting and blocks until it completes, returning the
callback's result. *)
7 changes: 7 additions & 0 deletions ocaml/libs/rate-limit/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(library
(name rate_limit)
(public_name rate-limit)
(libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock)
)


4 changes: 4 additions & 0 deletions ocaml/libs/rate-limit/test/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(tests
(names test_token_bucket test_bucket_table)
(package rate-limit)
(libraries rate_limit alcotest qcheck-core qcheck-alcotest mtime mtime.clock.os fmt xapi-log threads.posix))
Loading
Loading