Skip to content

WIP 4.2: Native STOMP #9141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_confirms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
-opaque state() :: #?MODULE{}.

-export_type([
state/0
state/0,
mx/0
]).

-spec init() -> state().
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_stomp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ define PROJECT_ENV
{passcode, <<"guest">>}]},
{default_vhost, <<"/">>},
{default_topic_exchange, <<"amq.topic">>},
{default_nack_requeue, true},
{default_nack_requeue, true},
{ssl_cert_login, false},
{implicit_connect, false},
{tcp_listeners, [61613]},
Expand Down
28 changes: 25 additions & 3 deletions deps/rabbitmq_stomp/include/rabbit_stomp.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,28 @@
default_passcode,
force_default_creds = false,
implicit_connect,
ssl_cert_login}).
ssl_cert_login,
max_header_length,
max_headers,
max_body_length}).


-define(SUPPORTED_VERSIONS, ["1.0", "1.1", "1.2"]).



-define(INFO_ITEMS,
[conn_name,
name,
user,
connection,
connection_state,
session_id,
channel,
version,
implicit_connect,
auth_login,
auth_mechanism,
peer_addr,
%% peer_addr,
host,
port,
peer_host,
Expand All @@ -42,3 +49,18 @@
-define(STOMP_GUIDE_URL, <<"https://rabbitmq.com/docs/stomp">>).

-define(DEFAULT_MAX_FRAME_SIZE, 4 * 1024 * 1024).

-define(SIMPLE_METRICS,
[pid,
recv_oct,
send_oct,
reductions]).
-define(OTHER_METRICS,
[recv_cnt,
send_cnt,
send_pend,
garbage_collection,
state,
timeout]).

-type send_fun() :: fun ((async | sync, iodata()) -> ok | {atom(), any()}).
7 changes: 6 additions & 1 deletion deps/rabbitmq_stomp/include/rabbit_stomp_frame.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-record(stomp_frame, {command, headers, body_iolist}).
-record(stomp_frame, {command, headers, body_iolist_rev}).

-record(stomp_parser_config, {max_header_length = 1024*100,
max_headers = 1000,
max_body_length = 1024*1024*100}).
-define(DEFAULT_STOMP_PARSER_CONFIG, #stomp_parser_config{}).
17 changes: 17 additions & 0 deletions deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,20 @@
?HEADER_EXCLUSIVE,
?HEADER_PERSISTENT
]).


-define(QUEUE_PREFIX, "/queue").
-define(TOPIC_PREFIX, "/topic").
-define(EXCHANGE_PREFIX, "/exchange").
-define(AMQQUEUE_PREFIX, "/amq/queue").
-define(TEMP_QUEUE_PREFIX, "/temp-queue").
%% reply queues names can have slashes in the content so no further
%% parsing happens.
-define(REPLY_QUEUE_PREFIX, "/reply-queue/").

%%-------------------------------------------------

-define(DEST_PREFIXES, [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX,
?AMQQUEUE_PREFIX, ?REPLY_QUEUE_PREFIX]).

-define(ALL_DEST_PREFIXES, [?TEMP_QUEUE_PREFIX | ?DEST_PREFIXES]).
18 changes: 13 additions & 5 deletions deps/rabbitmq_stomp/src/rabbit_stomp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

-define(DEFAULT_CONFIGURATION,
#stomp_configuration{
default_login = undefined,
default_passcode = undefined,
implicit_connect = false,
ssl_cert_login = false}).
default_login = undefined,
default_passcode = undefined,
implicit_connect = false,
ssl_cert_login = false,
max_header_length = 1024*100,
max_headers = 1000,
max_body_length = 1024*1024*100}).

start(normal, []) ->
Config = parse_configuration(),
Listeners = parse_listener_configuration(),
rabbit_global_counters:init([{protocol, stomp}]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote to differentiate by STOMP versions, i.e. 1.0, 1.1, 1.2 since that's what we currently do for MQTT (3.1, 3.1.1, 5.0).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are the global counters per protocol and queue type initialised? Are they missing?

Result = rabbit_stomp_sup:start_link(Listeners, Config),
EMPid = case rabbit_event:start_link() of
{ok, Pid} -> Pid;
Expand Down Expand Up @@ -74,7 +78,11 @@ parse_configuration() ->
{ok, SSLLogin} = application:get_env(ssl_cert_login),
{ok, ImplicitConnect} = application:get_env(implicit_connect),
Conf = Conf0#stomp_configuration{ssl_cert_login = SSLLogin,
implicit_connect = ImplicitConnect},
implicit_connect = ImplicitConnect,
max_headers = application:get_env(max_headers, ?DEFAULT_CONFIGURATION#stomp_configuration.max_headers),
max_header_length = application:get_env(max_header_length, ?DEFAULT_CONFIGURATION#stomp_configuration.max_header_length),
max_body_length = application:get_env(max_body_length, ?DEFAULT_CONFIGURATION#stomp_configuration.max_body_length)},

report_configuration(Conf),
Conf.

Expand Down
139 changes: 107 additions & 32 deletions deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
-include("rabbit_stomp_frame.hrl").
-include("rabbit_stomp_headers.hrl").

-export([parse/2, initial_state/0]).
-export([parse/2, initial_state/0, initial_state/1]).
-export([header/2, header/3,
boolean_header/2, boolean_header/3,
integer_header/2, integer_header/3,
binary_header/2, binary_header/3]).
-export([stream_offset_header/1, stream_filter_header/1]).
-export([serialize/1, serialize/2]).

initial_state() -> none.
initial_state() -> {none, ?DEFAULT_STOMP_PARSER_CONFIG}.
initial_state(Config) -> {none, Config}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% STOMP 1.1 frames basic syntax
Expand Down Expand Up @@ -73,11 +74,29 @@ initial_state() -> none.
-define(COLON_ESC, $c).
-define(CR_ESC, $r).

-define(COMMAND_TREE,
#{$S => #{$E => #{$N => #{$D => 'SEND'}},
$U => #{$B => #{$S => #{$C => #{$R => #{$I => #{$B => #{$E => 'SUBSCRIBE'}}}}}}},
$T => #{$O => #{$M => #{$P => 'STOMP'}}}},
$U => #{$N => #{$S => #{$U => #{$B => #{$S => #{$C => #{$R => #{$I => #{$B => #{$E => 'UNSUBSCRIBE'}}}}}}}}}},
$B => #{$E => #{$G => #{$I => #{$N => 'BEGIN'}}}},
$C => #{$O => #{$M => #{$M => #{$I => #{$T => 'COMMIT'}}},
$N => #{$N => #{$E => #{$C => #{$T => {'CONNECT',
#{$E => #{$D => 'CONNECTED'}}}}}}}}},
$A => #{$B => #{$O => #{$R => #{$T => 'ABORT'}}},
$C => #{$K => 'ACK'}},
$N => #{$A => #{$C => #{$K => 'NACK'}}},
$D => #{$I => #{$S => #{$C => #{$O => #{$N => #{$N => #{$E => #{$C => #{$T => 'DISCONNECT'}}}}}}}}},
$M => #{$E => #{$S => #{$S => #{$A => #{$G => #{$E => 'MESSAGE'}}}}}},
$R => #{$E => #{$C => #{$E => #{$I => #{$P => #{$T => 'RECEIPT'}}}}}},
$E => #{$R => #{$R => #{$O => #{$R => 'ERROR'}}}}}).

%% parser state
-record(state, {acc, cmd, hdrs, hdrname}).
-record(state, {acc, cmd, cmd_tree = ?COMMAND_TREE, hdrs, hdrname, hdrl = 0,
config}).

parse(Content, {resume, Continuation}) -> Continuation(Content);
parse(Content, none ) -> parser(Content, noframe, #state{}).
parse(Content, {none, Config} ) -> parser(Content, noframe, #state{config=Config}).

more(Continuation) -> {more, {resume, Continuation}}.

Expand All @@ -103,14 +122,40 @@ parser( Rest, headers , State) -> goto(headers, hdrname,
parser(<<?COLON, Rest/binary>>, hdrname , State) -> goto(hdrname, hdrvalue, Rest, State);
parser(<<?LF, Rest/binary>>, hdrname , State) -> goto(hdrname, headers, Rest, State);
parser(<<?LF, Rest/binary>>, hdrvalue, State) -> goto(hdrvalue, headers, Rest, State);
parser(<<Ch:8, Rest/binary>>, command , #state{cmd_tree = {_, CmdTree}} = State) ->
case maps:get(Ch, CmdTree, undefined) of
undefined -> {error, unknown_command};
NewCmdTree -> parser(Rest, command, State#state{cmd_tree = NewCmdTree})
end;
parser(<<Ch:8, Rest/binary>>, command , #state{cmd_tree = #{} = CmdTree} = State) ->
case maps:get(Ch, CmdTree, undefined) of
undefined -> {error, unknown_command};
NewCmdTree -> parser(Rest, command, State#state{cmd_tree = NewCmdTree})
end;
parser(<<_Ch:8, _Rest/binary>>, command , _) ->
{error, unknown_command};
%% accumulate
%% parser(<<Ch:8, Rest/binary>>, Term = hdrname , State = #state{config = #stomp_parser_config{max_header_length = MaxHeaderLength}}) ->

%% parser(Rest, Term, accum(Ch, State));
%% parser(<<Ch:8, Rest/binary>>, Term = hdrvalue , State = #state{config = #stomp_parser_config{max_header_length = MaxHeaderLength}}) ->
%% parser(Rest, Term, accum(Ch, State));
parser(<<Ch:8, Rest/binary>>, Term , State) -> parser(Rest, Term, accum(Ch, State)).

%% state transitions
goto(noframe, command, Rest, State ) -> parser(Rest, command, State#state{acc = []});
goto(command, headers, Rest, State = #state{acc = Acc} ) -> parser(Rest, headers, State#state{cmd = lists:reverse(Acc), hdrs = []});
goto(headers, body, Rest, #state{cmd = Cmd, hdrs = Hdrs}) -> parse_body(Rest, #stomp_frame{command = Cmd, headers = Hdrs});
goto(headers, hdrname, Rest, State ) -> parser(Rest, hdrname, State#state{acc = []});
goto(noframe, command, Rest, State ) -> parser(Rest, command, State#state{acc = [], cmd_tree = ?COMMAND_TREE});
goto(command, headers, Rest, State = #state{cmd_tree = Command}) when is_atom(Command) ->
parser(Rest, headers, State#state{cmd = Command, hdrs = []});
goto(command, headers, Rest, State = #state{cmd_tree = {Command, _}}) when is_atom(Command) ->
parser(Rest, headers, State#state{cmd = Command, hdrs = []});
goto(command, headers, _Rest, _State)->
{error, unknown_command};
goto(headers, body, Rest, State ) -> parse_body(Rest, State);
goto(headers, hdrname, Rest, State = #state{hdrs = Headers, config = #stomp_parser_config{max_headers = MaxHeaders}}) ->
case length(Headers) == MaxHeaders of
true -> {error, {max_headeres, MaxHeaders}};
_ -> parser(Rest, hdrname, State#state{acc = []})
end;
goto(hdrname, hdrvalue, Rest, State = #state{acc = Acc} ) -> parser(Rest, hdrvalue, State#state{acc = [], hdrname = lists:reverse(Acc)});
goto(hdrname, headers, _Rest, #state{acc = Acc} ) -> {error, {header_no_value, lists:reverse(Acc)}}; % badly formed header -- fatal error
goto(hdrvalue, headers, Rest, State = #state{acc = Acc, hdrs = Headers, hdrname = HdrName}) ->
Expand Down Expand Up @@ -140,31 +185,54 @@ insert_header(Headers, Name, Value) ->
false -> [{Name, Value} | Headers]
end.

parse_body(Content, Frame = #stomp_frame{command = Command}) ->
case Command of
"SEND" -> parse_body(Content, Frame, [], integer_header(Frame, ?HEADER_CONTENT_LENGTH, unknown));
_ -> parse_body(Content, Frame, [], unknown)
parse_body(Content, State) ->
#state{cmd = Cmd, hdrs = Hdrs, config = #stomp_parser_config{max_body_length = MaxBodyLength}} = State,
Frame = #stomp_frame{command = Cmd, headers = Hdrs},
case Cmd of
'SEND' ->
case integer_header(Frame, ?HEADER_CONTENT_LENGTH, unknown) of
ContentLength when is_integer(ContentLength) and (ContentLength > MaxBodyLength) ->
{error, {max_body_length, ContentLength}};
ContentLength when is_integer(ContentLength) ->
parse_known_body(Content, Frame, [], ContentLength);
_ ->
parse_unknown_body(Content, Frame, [], MaxBodyLength)
end;
_ ->
parse_unknown_body(Content, Frame, [], MaxBodyLength)
end.

parse_body(Content, Frame, Chunks, unknown) ->
parse_body2(Content, Frame, Chunks, case firstnull(Content) of
-1 -> {more, unknown};
Pos -> {done, Pos}
end);
parse_body(Content, Frame, Chunks, Remaining) ->
-define(MORE_BODY(Content, Frame, Chunks, Remaining),
Chunks1 = finalize_chunk(Content, Chunks),
more(fun(Rest) -> ?FUNCTION_NAME(Rest, Frame, Chunks1, Remaining) end)).

parse_unknown_body(Content, Frame, Chunks, Remaining) ->
case firstnull(Content) of
-1 ->
ChunkSize = byte_size(Content),
case ChunkSize > Remaining of
true -> {error, {max_body_length, unknown}};
false -> ?MORE_BODY(Content, Frame, Chunks, Remaining - ChunkSize)
end;
Pos ->
case Pos > Remaining of
true -> {error, {max_body_length, unknown}};
false -> finish_body(Content, Frame, Chunks, Pos)
end
end.

parse_known_body(Content, Frame, Chunks, Remaining) ->
Size = byte_size(Content),
parse_body2(Content, Frame, Chunks, case Remaining >= Size of
true -> {more, Remaining - Size};
false -> {done, Remaining}
end).

parse_body2(Content, Frame, Chunks, {more, Left}) ->
Chunks1 = finalize_chunk(Content, Chunks),
more(fun(Rest) -> parse_body(Rest, Frame, Chunks1, Left) end);
parse_body2(Content, Frame, Chunks, {done, Pos}) ->
case Remaining >= Size of
true ->
?MORE_BODY(Content, Frame, Chunks, Remaining - Size);
false -> finish_body(Content, Frame, Chunks, Remaining)
end.

finish_body(Content, Frame, Chunks, Pos) ->
<<Chunk:Pos/binary, 0, Rest/binary>> = Content,
Body = lists:reverse(finalize_chunk(Chunk, Chunks)),
{ok, Frame#stomp_frame{body_iolist = Body}, Rest}.
Body = finalize_chunk(Chunk, Chunks),
{ok, Frame#stomp_frame{body_iolist_rev = Body}, Rest}.

finalize_chunk(<<>>, Chunks) -> Chunks;
finalize_chunk(Chunk, Chunks) -> [Chunk | Chunks].
Expand Down Expand Up @@ -249,16 +317,23 @@ serialize(Frame, true) ->
serialize(Frame, false) ++ [?LF];
serialize(#stomp_frame{command = Command,
headers = Headers,
body_iolist = BodyFragments}, false) ->
body_iolist_rev = BodyFragments}, false) ->
Len = iolist_size(BodyFragments),
[Command, ?LF,
[serialize_command(Command), ?LF,
lists:map(fun serialize_header/1,
lists:keydelete(?HEADER_CONTENT_LENGTH, 1, Headers)),
if
Len > 0 -> [?HEADER_CONTENT_LENGTH ++ ":", integer_to_list(Len), ?LF];
true -> []
end,
?LF, BodyFragments, 0].
?LF, case BodyFragments of
_ when is_binary(BodyFragments) -> BodyFragments;
_ -> lists:reverse(BodyFragments)
end, 0].

serialize_command(Command) when is_atom(Command) ->
atom_to_binary(Command, utf8);
serialize_command(Command) -> Command.

serialize_header({K, V}) when is_integer(V) -> hdr(escape(K), integer_to_list(V));
serialize_header({K, V}) when is_boolean(V) -> hdr(escape(K), boolean_to_list(V));
Expand Down
Loading