A generic batching server for erlang / elixir
gen_batch_server
is a stateful generic server similar to gen_server
that instead of processing incoming requests
one by one gathers them into batches before the are passed to the behaviour
implementation.
Batches are processed either when the erlang process mailbox has no further
messages to batch or
when the number of messages in the current batch reaches the maximum batch size
limit.
gen_batch_server
tries to trade latency for throughput by automatically growing the max batch
size limit when message ingress is high and shrinks it down again as ingress
reduces.
This behaviour makes it suitable for use as a data sink, proxy or other
kind of aggregator that benefit from processing messages in batches. Examples
would be a log writer that needs to flush messages to disk using file:sync/1
without undue delay or a metrics sink that aggregates metrics from multiple
processes and writes them to an external service. It could also be beneficial
to use gen_batch_server
to proxy a bunch processes that want to update
some resource (such as a dets
table) that doesn't handle casts.
Types:
Name = {local,Name} | {global,GlobalName} | {via,Module,ViaName}
Mod = module()
Args = term()
Opt = {debug, Dbgs} | {min_batch_size | max_batch_size, non_neg_integer()}
Opts = [Opt]
Opts = [term()]
Result = {ok,Pid} | ignore | {error,Error}
Creates a gen_batch_server
as part of a supervision tree. The minimum and
maximum batch sizes that control the bounds of the batch sizes that are processed
can be be controlled using the min_batch_size
(default: 32)
and max_batch_size
(default: 8192) options.
Types:
ServerRef = pid() | {Name :: atom(), node()} | Name :: atom()
Request = term()
Sends an asynchronous request to the gen_batch_server returning
ok
immediately.
The request tuple ({cast, Request}
) is included in the list of operations passed
to Module:handle_batch/2
.
Types:
ServerRef = pid() | {Name :: atom(), node()} | Name :: atom()
Batch = [term()]
Sends an asynchronous batch of requests to the gen_batch_server returning
ok
immediately. The batch is appended in order to the current gen_batch_server batch.
Types:
ServerRef = pid() | {Name :: atom(), node()} | Name :: atom()
Request = term()
Reply = term()
Timeout = non_neg_integer() | infinity.
Sends an synchronous request to the gen_batch_server returning
returning the
response provided for the operation by Module:handle_batch/2
. The timeout
is optional and defaults to 5000ms.
Types:
Args = term()
Result = {ok, State} | {stop, Reason}
State = term(),
Reason = term()
Called whenever a gen_batch_server
is started with the arguments provided
to start_link/4
.
Types:
Batch = [Op]
UserOp = term(),
Op = {cast, UserOp} |
{call, from(), UserOp} |
{info, UserOp}.
Result = {ok, State} | {ok, Actions, State} | {stop, Reason}
State = term()
From = {Pid :: pid(), Tag :: reference()}
Action = {reply, From, Msg}
Actions = [Action]
Reason = term()
Called whenever a new batch is ready for processing. The implementation can
optionally return a list of reply actions used to reply to call
operations.
Types:
Reason = term()
Result = term()
State = term(),
Optional. Called whenever a gen_batch_server
is terminating.
Types:
Result = term()
State = term(),
Optional. Used to provide a custom formatting of the user state.
(c) Pivotal Software Inc., 2018-Present.