Skip to content

Commit 13d9905

Browse files
committed
Add stream.read_ahead_limit config option
1 parent 470c158 commit 13d9905

File tree

4 files changed

+48
-3
lines changed

4 files changed

+48
-3
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2779,6 +2779,11 @@ end}.
27792779
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
27802780
[{datatype, {enum, [true, false]}}]}.
27812781

2782+
{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [
2783+
{datatype, [integer, string]},
2784+
{validators, ["is_supported_information_unit"]}
2785+
]}.
2786+
27822787
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27832788
{datatype, [binary]}
27842789
]}.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
-export([format_osiris_event/2]).
5050
-export([update_stream_conf/2]).
5151
-export([readers/1]).
52-
-export([read_ahead_on/0]).
52+
-export([read_ahead_on/0, read_ahead_limit/0]).
5353

5454
-export([parse_offset_arg/1,
5555
filter_spec/1]).
@@ -465,7 +465,8 @@ begin_stream(#stream_client{name = QName,
465465
Tag, Offset, Mode, AckRequired, Filter, Options0)
466466
when is_pid(LocalPid) ->
467467
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
468-
Options1 = Options0#{read_ahead => read_ahead_on()},
468+
Options1 = Options0#{read_ahead => read_ahead_on(),
469+
read_ahead_limit => read_ahead_limit()},
469470
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
470471
NextOffset = osiris_log:next_offset(Seg0) - 1,
471472
osiris:register_offset_listener(LocalPid, NextOffset),
@@ -1522,3 +1523,21 @@ queue_vm_ets() ->
15221523

15231524
read_ahead_on() ->
15241525
application:get_env(rabbit, stream_read_ahead, true).
1526+
1527+
-spec read_ahead_limit() -> integer() | undefined.
1528+
read_ahead_limit() ->
1529+
case application:get_env(rabbit, stream_read_ahead_limit, undefined) of
1530+
undefined ->
1531+
undefined;
1532+
Bytes when is_integer(Bytes) ->
1533+
Bytes;
1534+
Limit when is_list(Limit) ->
1535+
case rabbit_resource_monitor_misc:parse_information_unit(Limit) of
1536+
{ok, ParsedLimit} ->
1537+
ParsedLimit;
1538+
{error, parse_error} ->
1539+
?LOG_ERROR("Unable to parse stream read ahead limit value "
1540+
"~tp", [Limit]),
1541+
undefined
1542+
end
1543+
end.

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,26 @@ credential_validator.regexp = ^abc\\d+",
12531253
[{rabbit, [
12541254
{stream_read_ahead, false}
12551255
]}],
1256+
[]},
1257+
1258+
%%
1259+
%% Stream read limit
1260+
%%
1261+
{stream_read_ahead_limit_bytes,
1262+
"
1263+
stream.read_ahead_limit = 8192
1264+
",
1265+
[{rabbit, [
1266+
{stream_read_ahead_limit, 8192}
1267+
]}],
1268+
[]},
1269+
{stream_read_ahead_limit_information_unit,
1270+
"
1271+
stream.read_ahead_limit = 8KiB
1272+
",
1273+
[{rabbit, [
1274+
{stream_read_ahead_limit, "8KiB"}
1275+
]}],
12561276
[]}
12571277

12581278
].

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2814,7 +2814,8 @@ init_reader(ConnectionTransport,
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
28152815
Options0 = #{transport => ConnectionTransport,
28162816
chunk_selector => get_chunk_selector(Properties),
2817-
read_ahead => rabbit_stream_queue:read_ahead_on()},
2817+
read_ahead => rabbit_stream_queue:read_ahead_on(),
2818+
read_ahead_limit => rabbit_stream_queue:read_ahead_limit()},
28182819

28192820
Options1 = maps:merge(Options0,
28202821
rabbit_stream_utils:filter_spec(Properties)),

0 commit comments

Comments
 (0)