Skip to content

Commit 75392bb

Browse files
committed
Native STOMP: rewrite process config/state, add limits to parser, and just backup
1 parent a3712db commit 75392bb

File tree

14 files changed

+755
-601
lines changed

14 files changed

+755
-601
lines changed

deps/rabbit/src/rabbit_confirms.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
-opaque state() :: #?MODULE{}.
3131

3232
-export_type([
33-
state/0
33+
state/0,
34+
mx/0
3435
]).
3536

3637
-spec init() -> state().

deps/rabbitmq_stomp/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ load("@rules_erlang//:dialyze.bzl", "dialyze", "plt")
44
load(
55
"//:rabbitmq.bzl",
66
"BROKER_VERSION_REQUIREMENTS_ANY",
7-
"ENABLE_FEATURE_MAYBE_EXPR",
87
"RABBITMQ_DIALYZER_OPTS",
98
"assert_suites",
109
"broker_for_integration_suites",
@@ -108,7 +107,6 @@ eunit(
108107
":test_src_rabbit_stomp_client_beam",
109108
":test_src_rabbit_stomp_publish_test_beam",
110109
],
111-
erl_extra_args = [ENABLE_FEATURE_MAYBE_EXPR],
112110
target = ":test_erlang_app",
113111
)
114112

deps/rabbitmq_stomp/include/rabbit_stomp.hrl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,16 @@
99
default_passcode,
1010
force_default_creds = false,
1111
implicit_connect,
12-
ssl_cert_login}).
12+
ssl_cert_login,
13+
max_header_length,
14+
max_headers,
15+
max_body_length}).
16+
1317

1418
-define(SUPPORTED_VERSIONS, ["1.0", "1.1", "1.2"]).
1519

20+
21+
1622
-define(INFO_ITEMS,
1723
[conn_name,
1824
name,
@@ -56,3 +62,5 @@
5662
garbage_collection,
5763
state,
5864
timeout]).
65+
66+
-type send_fun() :: fun ((async | sync, iodata()) -> ok | {atom(), any()}).

deps/rabbitmq_stomp/include/rabbit_stomp_frame.hrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,8 @@
66
%%
77

88
-record(stomp_frame, {command, headers, body_iolist_rev}).
9+
10+
-record(stomp_parser_config, {max_header_length = 1024*100,
11+
max_headers = 1000,
12+
max_body_length = 1024*1024*100}).
13+
-define(DEFAULT_STOMP_PARSER_CONFIG, #stomp_parser_config{}).

deps/rabbitmq_stomp/src/rabbit_stomp.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020

2121
-define(DEFAULT_CONFIGURATION,
2222
#stomp_configuration{
23-
default_login = undefined,
24-
default_passcode = undefined,
25-
implicit_connect = false,
26-
ssl_cert_login = false}).
23+
default_login = undefined,
24+
default_passcode = undefined,
25+
implicit_connect = false,
26+
ssl_cert_login = false,
27+
max_header_length = 1024*100,
28+
max_headers = 1000,
29+
max_body_length = 1024*1024*100}).
2730

2831
start(normal, []) ->
2932
Config = parse_configuration(),
@@ -75,7 +78,11 @@ parse_configuration() ->
7578
{ok, SSLLogin} = application:get_env(ssl_cert_login),
7679
{ok, ImplicitConnect} = application:get_env(implicit_connect),
7780
Conf = Conf0#stomp_configuration{ssl_cert_login = SSLLogin,
78-
implicit_connect = ImplicitConnect},
81+
implicit_connect = ImplicitConnect,
82+
max_headers = application:get_env(max_headers, ?DEFAULT_CONFIGURATION#stomp_configuration.max_headers),
83+
max_header_length = application:get_env(max_header_length, ?DEFAULT_CONFIGURATION#stomp_configuration.max_header_length),
84+
max_body_length = application:get_env(max_body_length, ?DEFAULT_CONFIGURATION#stomp_configuration.max_body_length)},
85+
7986
report_configuration(Conf),
8087
Conf.
8188

deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl

Lines changed: 100 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@
1010
-include("rabbit_stomp_frame.hrl").
1111
-include("rabbit_stomp_headers.hrl").
1212

13-
-export([parse/2, initial_state/0]).
13+
-export([parse/2, initial_state/0, initial_state/1]).
1414
-export([header/2, header/3,
1515
boolean_header/2, boolean_header/3,
1616
integer_header/2, integer_header/3,
1717
binary_header/2, binary_header/3]).
1818
-export([stream_offset_header/1, stream_filter_header/1]).
1919
-export([serialize/1, serialize/2]).
2020

21-
initial_state() -> none.
21+
initial_state() -> {none, ?DEFAULT_STOMP_PARSER_CONFIG}.
22+
initial_state(Config) -> {none, Config}.
2223

2324
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2425
%% STOMP 1.1 frames basic syntax
@@ -73,11 +74,29 @@ initial_state() -> none.
7374
-define(COLON_ESC, $c).
7475
-define(CR_ESC, $r).
7576

77+
-define(COMMAND_TREE,
78+
#{$S => #{$E => #{$N => #{$D => 'SEND'}},
79+
$U => #{$B => #{$S => #{$C => #{$R => #{$I => #{$B => #{$E => 'SUBSCRIBE'}}}}}}},
80+
$T => #{$O => #{$M => #{$P => 'STOMP'}}}},
81+
$U => #{$N => #{$S => #{$U => #{$B => #{$S => #{$C => #{$R => #{$I => #{$B => #{$E => 'UNSUBSCRIBE'}}}}}}}}}},
82+
$B => #{$E => #{$G => #{$I => #{$N => 'BEGIN'}}}},
83+
$C => #{$O => #{$M => #{$M => #{$I => #{$T => 'COMMIT'}}},
84+
$N => #{$N => #{$E => #{$C => #{$T => {'CONNECT',
85+
#{$E => #{$D => 'CONNECTED'}}}}}}}}},
86+
$A => #{$B => #{$O => #{$R => #{$T => 'ABORT'}}},
87+
$C => #{$K => 'ACK'}},
88+
$N => #{$A => #{$C => #{$K => 'NACK'}}},
89+
$D => #{$I => #{$S => #{$C => #{$O => #{$N => #{$N => #{$E => #{$C => #{$T => 'DISCONNECT'}}}}}}}}},
90+
$M => #{$E => #{$S => #{$S => #{$A => #{$G => #{$E => 'MESSAGE'}}}}}},
91+
$R => #{$E => #{$C => #{$E => #{$I => #{$P => #{$T => 'RECEIPT'}}}}}},
92+
$E => #{$R => #{$R => #{$O => #{$R => 'ERROR'}}}}}).
93+
7694
%% parser state
77-
-record(state, {acc, cmd, hdrs, hdrname}).
95+
-record(state, {acc, cmd, cmd_tree = ?COMMAND_TREE, hdrs, hdrname, hdrl = 0,
96+
config}).
7897

7998
parse(Content, {resume, Continuation}) -> Continuation(Content);
80-
parse(Content, none ) -> parser(Content, noframe, #state{}).
99+
parse(Content, {none, Config} ) -> parser(Content, noframe, #state{config=Config}).
81100

82101
more(Continuation) -> {more, {resume, Continuation}}.
83102

@@ -103,14 +122,40 @@ parser( Rest, headers , State) -> goto(headers, hdrname,
103122
parser(<<?COLON, Rest/binary>>, hdrname , State) -> goto(hdrname, hdrvalue, Rest, State);
104123
parser(<<?LF, Rest/binary>>, hdrname , State) -> goto(hdrname, headers, Rest, State);
105124
parser(<<?LF, Rest/binary>>, hdrvalue, State) -> goto(hdrvalue, headers, Rest, State);
125+
parser(<<Ch:8, Rest/binary>>, command , #state{cmd_tree = {_, CmdTree}} = State) ->
126+
case maps:get(Ch, CmdTree, undefined) of
127+
undefined -> {error, unknown_command};
128+
NewCmdTree -> parser(Rest, command, State#state{cmd_tree = NewCmdTree})
129+
end;
130+
parser(<<Ch:8, Rest/binary>>, command , #state{cmd_tree = #{} = CmdTree} = State) ->
131+
case maps:get(Ch, CmdTree, undefined) of
132+
undefined -> {error, unknown_command};
133+
NewCmdTree -> parser(Rest, command, State#state{cmd_tree = NewCmdTree})
134+
end;
135+
parser(<<_Ch:8, _Rest/binary>>, command , _) ->
136+
{error, unknown_command};
106137
%% accumulate
138+
%% parser(<<Ch:8, Rest/binary>>, Term = hdrname , State = #state{config = #stomp_parser_config{max_header_length = MaxHeaderLength}}) ->
139+
140+
%% parser(Rest, Term, accum(Ch, State));
141+
%% parser(<<Ch:8, Rest/binary>>, Term = hdrvalue , State = #state{config = #stomp_parser_config{max_header_length = MaxHeaderLength}}) ->
142+
%% parser(Rest, Term, accum(Ch, State));
107143
parser(<<Ch:8, Rest/binary>>, Term , State) -> parser(Rest, Term, accum(Ch, State)).
108144

109145
%% state transitions
110-
goto(noframe, command, Rest, State ) -> parser(Rest, command, State#state{acc = []});
111-
goto(command, headers, Rest, State = #state{acc = Acc} ) -> parser(Rest, headers, State#state{cmd = lists:reverse(Acc), hdrs = []});
112-
goto(headers, body, Rest, #state{cmd = Cmd, hdrs = Hdrs}) -> parse_body(Rest, #stomp_frame{command = Cmd, headers = Hdrs});
113-
goto(headers, hdrname, Rest, State ) -> parser(Rest, hdrname, State#state{acc = []});
146+
goto(noframe, command, Rest, State ) -> parser(Rest, command, State#state{acc = [], cmd_tree = ?COMMAND_TREE});
147+
goto(command, headers, Rest, State = #state{cmd_tree = Command}) when is_atom(Command) ->
148+
parser(Rest, headers, State#state{cmd = Command, hdrs = []});
149+
goto(command, headers, Rest, State = #state{cmd_tree = {Command, _}}) when is_atom(Command) ->
150+
parser(Rest, headers, State#state{cmd = Command, hdrs = []});
151+
goto(command, headers, _Rest, _State)->
152+
{error, unknown_command};
153+
goto(headers, body, Rest, State ) -> parse_body(Rest, State);
154+
goto(headers, hdrname, Rest, State = #state{hdrs = Headers, config = #stomp_parser_config{max_headers = MaxHeaders}}) ->
155+
case length(Headers) == MaxHeaders of
156+
true -> {error, {max_headeres, MaxHeaders}};
157+
_ -> parser(Rest, hdrname, State#state{acc = []})
158+
end;
114159
goto(hdrname, hdrvalue, Rest, State = #state{acc = Acc} ) -> parser(Rest, hdrvalue, State#state{acc = [], hdrname = lists:reverse(Acc)});
115160
goto(hdrname, headers, _Rest, #state{acc = Acc} ) -> {error, {header_no_value, lists:reverse(Acc)}}; % badly formed header -- fatal error
116161
goto(hdrvalue, headers, Rest, State = #state{acc = Acc, hdrs = Headers, hdrname = HdrName}) ->
@@ -140,28 +185,51 @@ insert_header(Headers, Name, Value) ->
140185
false -> [{Name, Value} | Headers]
141186
end.
142187

143-
parse_body(Content, Frame = #stomp_frame{command = Command}) ->
144-
case Command of
145-
"SEND" -> parse_body(Content, Frame, [], integer_header(Frame, ?HEADER_CONTENT_LENGTH, unknown));
146-
_ -> parse_body(Content, Frame, [], unknown)
188+
parse_body(Content, State) ->
189+
#state{cmd = Cmd, hdrs = Hdrs, config = #stomp_parser_config{max_body_length = MaxBodyLength}} = State,
190+
Frame = #stomp_frame{command = Cmd, headers = Hdrs},
191+
case Cmd of
192+
'SEND' ->
193+
case integer_header(Frame, ?HEADER_CONTENT_LENGTH, unknown) of
194+
ContentLength when is_integer(ContentLength) and (ContentLength > MaxBodyLength) ->
195+
{error, {max_body_length, ContentLength}};
196+
ContentLength when is_integer(ContentLength) ->
197+
parse_known_body(Content, Frame, [], ContentLength);
198+
_ ->
199+
parse_unknown_body(Content, Frame, [], MaxBodyLength)
200+
end;
201+
_ ->
202+
parse_unknown_body(Content, Frame, [], MaxBodyLength)
203+
end.
204+
205+
-define(MORE_BODY(Content, Frame, Chunks, Remaining),
206+
Chunks1 = finalize_chunk(Content, Chunks),
207+
more(fun(Rest) -> ?FUNCTION_NAME(Rest, Frame, Chunks1, Remaining) end)).
208+
209+
parse_unknown_body(Content, Frame, Chunks, Remaining) ->
210+
case firstnull(Content) of
211+
-1 ->
212+
ChunkSize = byte_size(Content),
213+
case ChunkSize > Remaining of
214+
true -> {error, {max_body_length, unknown}};
215+
false -> ?MORE_BODY(Content, Frame, Chunks, Remaining - ChunkSize)
216+
end;
217+
Pos ->
218+
case Pos > Remaining of
219+
true -> {error, {max_body_length, unknown}};
220+
false -> finish_body(Content, Frame, Chunks, Pos)
221+
end
147222
end.
148223

149-
parse_body(Content, Frame, Chunks, unknown) ->
150-
parse_body2(Content, Frame, Chunks, case firstnull(Content) of
151-
-1 -> {more, unknown};
152-
Pos -> {done, Pos}
153-
end);
154-
parse_body(Content, Frame, Chunks, Remaining) ->
224+
parse_known_body(Content, Frame, Chunks, Remaining) ->
155225
Size = byte_size(Content),
156-
parse_body2(Content, Frame, Chunks, case Remaining >= Size of
157-
true -> {more, Remaining - Size};
158-
false -> {done, Remaining}
159-
end).
160-
161-
parse_body2(Content, Frame, Chunks, {more, Left}) ->
162-
Chunks1 = finalize_chunk(Content, Chunks),
163-
more(fun(Rest) -> parse_body(Rest, Frame, Chunks1, Left) end);
164-
parse_body2(Content, Frame, Chunks, {done, Pos}) ->
226+
case Remaining >= Size of
227+
true ->
228+
?MORE_BODY(Content, Frame, Chunks, Remaining - Size);
229+
false -> finish_body(Content, Frame, Chunks, Remaining)
230+
end.
231+
232+
finish_body(Content, Frame, Chunks, Pos) ->
165233
<<Chunk:Pos/binary, 0, Rest/binary>> = Content,
166234
Body = finalize_chunk(Chunk, Chunks),
167235
{ok, Frame#stomp_frame{body_iolist_rev = Body}, Rest}.
@@ -251,7 +319,7 @@ serialize(#stomp_frame{command = Command,
251319
headers = Headers,
252320
body_iolist_rev = BodyFragments}, false) ->
253321
Len = iolist_size(BodyFragments),
254-
[Command, ?LF,
322+
[serialize_command(Command), ?LF,
255323
lists:map(fun serialize_header/1,
256324
lists:keydelete(?HEADER_CONTENT_LENGTH, 1, Headers)),
257325
if
@@ -263,6 +331,10 @@ serialize(#stomp_frame{command = Command,
263331
_ -> lists:reverse(BodyFragments)
264332
end, 0].
265333

334+
serialize_command(Command) when is_atom(Command) ->
335+
atom_to_binary(Command, utf8);
336+
serialize_command(Command) -> Command.
337+
266338
serialize_header({K, V}) when is_integer(V) -> hdr(escape(K), integer_to_list(V));
267339
serialize_header({K, V}) when is_boolean(V) -> hdr(escape(K), boolean_to_list(V));
268340
serialize_header({K, V}) when is_list(V) -> hdr(escape(K), escape(V)).

0 commit comments

Comments
 (0)