Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Commit

Permalink
Merging bug24079 to default
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Sackman committed May 27, 2011
2 parents 104da76 + 3e7f075 commit 737ccbf
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 104 deletions.
33 changes: 19 additions & 14 deletions include/amqp_client.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,25 @@

-record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).

-record(amqp_params, {username = <<"guest">>,
password = <<"guest">>,
virtual_host = <<"/">>,
host = "localhost",
port = ?PROTOCOL_PORT,
node = node(),
channel_max = 0,
frame_max = 0,
heartbeat = 0,
ssl_options = none,
auth_mechanisms = [fun amqp_auth_mechanisms:plain/3,
fun amqp_auth_mechanisms:amqplain/3],
adapter_info = none,
client_properties = []}).
-record(amqp_params_network, {username = <<"guest">>,
password = <<"guest">>,
virtual_host = <<"/">>,
host = "localhost",
port = ?PROTOCOL_PORT,
channel_max = 0,
frame_max = 0,
heartbeat = 0,
ssl_options = none,
auth_mechanisms =
[fun amqp_auth_mechanisms:plain/3,
fun amqp_auth_mechanisms:amqplain/3],
client_properties = []}).

-record(amqp_params_direct, {username = <<"guest">>,
virtual_host = <<"/">>,
node = node(),
adapter_info = none,
client_properties = []}).

-record(adapter_info, {address = unknown,
port = unknown,
Expand Down
8 changes: 4 additions & 4 deletions src/amqp_auth_mechanisms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@

plain(none, _, init) ->
{<<"PLAIN">>, []};
plain(none, #amqp_params{username = Username,
password = Password}, _State) ->
plain(none, #amqp_params_network{username = Username,
password = Password}, _State) ->
{<<0, Username/binary, 0, Password/binary>>, _State}.

amqplain(none, _, init) ->
{<<"AMQPLAIN">>, []};
amqplain(none, #amqp_params{username = Username,
password = Password}, _State) ->
amqplain(none, #amqp_params_network{username = Username,
password = Password}, _State) ->
LoginTable = [{<<"LOGIN">>, longstr, Username},
{<<"PASSWORD">>, longstr, Password}],
{rabbit_binary_generator:generate_table(LoginTable), _State}.
Expand Down
51 changes: 24 additions & 27 deletions src/amqp_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,30 @@
-include("amqp_client.hrl").

-export([open_channel/1, open_channel/2]).
-export([start/1, start/2]).
-export([start/1]).
-export([close/1, close/3]).
-export([info/2, info_keys/1, info_keys/0]).

%%---------------------------------------------------------------------------
%% Type Definitions
%%---------------------------------------------------------------------------

%% @type amqp_params() = #amqp_params{}.
%% @type adapter_info() = #adapter_info{}.
%% @type amqp_params_direct() = #amqp_params_direct{}.
%% As defined in amqp_client.hrl. It contains the following fields:
%% <ul>
%% <li>username :: binary() - The name of a user registered with the broker,
%% defaults to &lt;&lt;guest"&gt;&gt;</li>
%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
%% defaults to &lt;&lt;"/"&gt;&gt;</li>
%% <li>node :: atom() - The node the broker runs on (direct only)</li>
%% <li>adapter_info :: adapter_info() - Extra management information for if
%% this connection represents a non-AMQP network connection.</li>
%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
%% client properties to be sent to the server, defaults to []</li>
%% </ul>
%%
%% @type amqp_params_network() = #amqp_params_network{}.
%% As defined in amqp_client.hrl. It contains the following fields:
%% <ul>
%% <li>username :: binary() - The name of a user registered with the broker,
Expand All @@ -91,7 +106,6 @@
%% defaults to "localhost" (network only)</li>
%% <li>port :: integer() - The port the broker is listening on,
%% defaults to 5672 (network only)</li>
%% <li>node :: atom() - The node the broker runs on (direct only)</li>
%% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
%% defaults to 0</li>
%% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
Expand All @@ -104,43 +118,26 @@
%% client properties to be sent to the server, defaults to []</li>
%% </ul>


%%---------------------------------------------------------------------------
%% Starting a connection
%%---------------------------------------------------------------------------

%% @spec (Type) -> {ok, Connection} | {error, Error}
%% where
%% Type = network | direct
%% Connection = pid()
%% @doc Starts a connection to an AMQP server. Use network type to connect
%% to a remote AMQP server - default connection settings are used, meaning that
%% the server is expected to be at localhost:5672, with a vhost of "/"
%% authorising a user guest/guest. Use direct type for a direct connection to
%% a RabbitMQ server, assuming that the server is running in the same process
%% space, and with a default set of amqp_params. If a different host, port,
%% vhost or credential set is required, start/2 should be used.
start(Type) ->
start(Type, #amqp_params{}).

%% @spec (Type, amqp_params()) -> {ok, Connection} | {error, Error}
%% @spec (Params) -> {ok, Connection} | {error, Error}
%% where
%% Type = network | direct
%% Params = amqp_params_network() | amqp_params_direct()
%% Connection = pid()
%% @doc Starts a connection to an AMQP server. Use network type to connect
%% to a remote AMQP server or direct type for a direct connection to
%% @doc Starts a connection to an AMQP server. Use network params to connect
%% to a remote AMQP server or direct params for a direct connection to
%% a RabbitMQ server, assuming that the server is running in the same process
%% space.
start(Type, AmqpParams) ->
start(AmqpParams) ->
case amqp_client:start() of
ok -> ok;
{error, {already_started, amqp_client}} -> ok;
{error, _} = E -> throw(E)
end,
{ok, _Sup, Connection} =
amqp_sup:start_connection_sup(
Type, case Type of direct -> amqp_direct_connection;
network -> amqp_network_connection
end, AmqpParams),
{ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams),
amqp_gen_connection:connect(Connection).

%%---------------------------------------------------------------------------
Expand Down
9 changes: 7 additions & 2 deletions src/amqp_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@

-behaviour(supervisor2).

-export([start_link/3]).
-export([start_link/1]).
-export([init/1]).

%%---------------------------------------------------------------------------
%% Interface
%%---------------------------------------------------------------------------

start_link(Type, Module, AmqpParams) ->
start_link(AmqpParams) ->
{ok, Sup} = supervisor2:start_link(?MODULE, []),
{Type, Module} =
case AmqpParams of
#amqp_params_direct{} -> {direct, amqp_direct_connection};
#amqp_params_network{} -> {network, amqp_network_connection}
end,
SChMF = start_channels_manager_fun(Sup, Type),
SIF = start_infrastructure_fun(Sup, Type),
{ok, Connection} = supervisor2:start_child(
Expand Down
19 changes: 10 additions & 9 deletions src/amqp_direct_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ terminate(_Reason, #state{node = Node}) ->
i(type, _State) -> direct;
i(pid, _State) -> self();
%% AMQP Params
i(user, #state{params = P}) -> P#amqp_params.username;
i(vhost, #state{params = P}) -> P#amqp_params.virtual_host;
i(client_properties, #state{params = P}) -> P#amqp_params.client_properties;
i(user, #state{params = P}) -> P#amqp_params_direct.username;
i(vhost, #state{params = P}) -> P#amqp_params_direct.virtual_host;
i(client_properties, #state{params = P}) ->
P#amqp_params_direct.client_properties;
%% Optional adapter info
i(protocol, #state{adapter_info = I}) -> I#adapter_info.protocol;
i(address, #state{adapter_info = I}) -> I#adapter_info.address;
Expand All @@ -90,17 +91,17 @@ info_keys() ->
infos(Items, State) ->
[{Item, i(Item, State)} || Item <- Items].

connect(Params = #amqp_params{username = Username,
password = Pass,
node = Node,
adapter_info = Info,
virtual_host = VHost}, SIF, _ChMgr, State) ->
connect(Params = #amqp_params_direct{username = Username,
node = Node,
adapter_info = Info,
virtual_host = VHost},
SIF, _ChMgr, State) ->
State1 = State#state{node = Node,
vhost = VHost,
params = Params,
adapter_info = ensure_adapter_info(Info)},
case rpc:call(Node, rabbit_direct, connect,
[Username, Pass, VHost, ?PROTOCOL,
[Username, VHost, ?PROTOCOL,
infos(?CREATION_EVENT_KEYS, State1)]) of
{ok, {User, ServerProperties}} ->
{ok, Collector} = SIF(),
Expand Down
30 changes: 16 additions & 14 deletions src/amqp_network_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,19 @@ info_keys() ->
%% Handshake
%%---------------------------------------------------------------------------

connect(AmqpParams = #amqp_params{ssl_options = none,
host = Host,
port = Port}, SIF, ChMgr, State) ->
connect(AmqpParams = #amqp_params_network{ssl_options = none,
host = Host,
port = Port},
SIF, ChMgr, State) ->
case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
{ok, Sock} -> try_handshake(AmqpParams, SIF, ChMgr,
State#state{sock = Sock});
{error, _} = E -> E
end;
connect(AmqpParams = #amqp_params{ssl_options = SslOpts,
host = Host,
port = Port}, SIF, ChMgr, State) ->
connect(AmqpParams = #amqp_params_network{ssl_options = SslOpts,
host = Host,
port = Port},
SIF, ChMgr, State) ->
rabbit_misc:start_applications([crypto, public_key, ssl]),
case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
{ok, Sock} ->
Expand Down Expand Up @@ -139,16 +141,16 @@ start_infrastructure(SIF, ChMgr, State = #state{sock = Sock}) ->
{ok, {_MainReader, _AState, Writer, SHF}} = SIF(Sock, ChMgr),
{SHF, State#state{writer0 = Writer}}.

network_handshake(AmqpParams, SHF, State0) ->
network_handshake(AmqpParams = #amqp_params_network{virtual_host = VHost},
SHF, State0) ->
Start = #'connection.start'{server_properties = ServerProperties,
mechanisms = Mechanisms} =
handshake_recv('connection.start'),
ok = check_version(Start),
Tune = login(AmqpParams, Mechanisms, State0),
{TuneOk, ChannelMax, State1} = tune(Tune, AmqpParams, SHF, State0),
do2(TuneOk, State1),
do2(#'connection.open'{virtual_host = AmqpParams#amqp_params.virtual_host},
State1),
do2(#'connection.open'{virtual_host = VHost}, State1),
Params = {ServerProperties, ChannelMax, State1},
case handshake_recv('connection.open_ok') of
#'connection.open_ok'{} -> {ok, Params};
Expand All @@ -169,9 +171,9 @@ check_version(#'connection.start'{version_major = Major,
tune(#'connection.tune'{channel_max = ServerChannelMax,
frame_max = ServerFrameMax,
heartbeat = ServerHeartbeat},
#amqp_params{channel_max = ClientChannelMax,
frame_max = ClientFrameMax,
heartbeat = ClientHeartbeat}, SHF, State) ->
#amqp_params_network{channel_max = ClientChannelMax,
frame_max = ClientFrameMax,
heartbeat = ClientHeartbeat}, SHF, State) ->
[ChannelMax, Heartbeat, FrameMax] =
lists:zipwith(fun (Client, Server) when Client =:= 0; Server =:= 0 ->
lists:max([Client, Server]);
Expand All @@ -192,8 +194,8 @@ start_heartbeat(SHF, #state{sock = Sock, heartbeat = Heartbeat}) ->
ReceiveFun = fun () -> Connection ! heartbeat_timeout end,
SHF(Sock, Heartbeat, SendFun, Heartbeat, ReceiveFun).

login(Params = #amqp_params{auth_mechanisms = ClientMechanisms,
client_properties = UserProps},
login(Params = #amqp_params_network{auth_mechanisms = ClientMechanisms,
client_properties = UserProps},
ServerMechanismsStr, State) ->
ServerMechanisms = string:tokens(binary_to_list(ServerMechanismsStr), " "),
case [{N, S, F} || F <- ClientMechanisms,
Expand Down
6 changes: 3 additions & 3 deletions src/amqp_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

-behaviour(supervisor2).

-export([start_link/0, start_connection_sup/3]).
-export([start_link/0, start_connection_sup/1]).
-export([init/1]).

%%---------------------------------------------------------------------------
Expand All @@ -31,8 +31,8 @@
start_link() ->
supervisor2:start_link({local, amqp_sup}, ?MODULE, []).

start_connection_sup(Type, Module, AmqpParams) ->
supervisor2:start_child(amqp_sup, [Type, Module, AmqpParams]).
start_connection_sup(AmqpParams) ->
supervisor2:start_child(amqp_sup, [AmqpParams]).

%%---------------------------------------------------------------------------
%% supervisor2 callbacks
Expand Down
20 changes: 10 additions & 10 deletions test/direct_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,13 @@ channel_death_test() ->
negative_test_util:channel_death_test(new_connection()).

non_existent_user_test() ->
negative_test_util:non_existent_user_test(fun new_connection/1).

invalid_password_test() ->
negative_test_util:invalid_password_test(fun new_connection/1).
negative_test_util:non_existent_user_test(fun new_connection/3).

non_existent_vhost_test() ->
negative_test_util:non_existent_vhost_test(fun new_connection/1).
negative_test_util:non_existent_vhost_test(fun new_connection/3).

no_permission_test() ->
negative_test_util:no_permission_test(fun new_connection/1).
negative_test_util:no_permission_test(fun new_connection/3).

command_invalid_over_channel_test() ->
negative_test_util:command_invalid_over_channel_test(new_connection()).
Expand All @@ -136,12 +133,15 @@ command_invalid_over_channel_test() ->
%%---------------------------------------------------------------------------

new_connection() ->
new_connection(#amqp_params{}).
new_connection(#amqp_params_direct{}).

new_connection(Username, _Password, VHost) ->
new_connection(#amqp_params_direct{username = Username,
virtual_host = VHost}).

new_connection(AmqpParams) ->
case amqp_connection:start(
direct,
AmqpParams#amqp_params{node = rabbit_misc:makenode(rabbit)}) of
Node = rabbit_misc:makenode(rabbit),
case amqp_connection:start(AmqpParams#amqp_params_direct{node = Node}) of
{ok, Conn} -> Conn;
{error, _} = E -> E
end.
Expand Down
24 changes: 13 additions & 11 deletions test/negative_test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,22 @@ assert_down_with_error(MonitorRef, CodeAtom) ->
end.

non_existent_user_test(StartConnectionFun) ->
Params = #amqp_params{username = test_util:uuid(),
password = test_util:uuid()},
?assertMatch({error, auth_failure}, StartConnectionFun(Params)).
?assertMatch({error, auth_failure}, StartConnectionFun(test_util:uuid(),
test_util:uuid(),
test_util:uuid())).

invalid_password_test(StartConnectionFun) ->
Params = #amqp_params{username = <<"guest">>,
password = test_util:uuid()},
?assertMatch({error, auth_failure}, StartConnectionFun(Params)).
?assertMatch({error, auth_failure}, StartConnectionFun(<<"guest">>,
test_util:uuid(),
test_util:uuid())).

non_existent_vhost_test(StartConnectionFun) ->
Params = #amqp_params{virtual_host = test_util:uuid()},
?assertMatch({error, access_refused}, StartConnectionFun(Params)).
?assertMatch({error, access_refused}, StartConnectionFun(<<"guest">>,
<<"guest">>,
test_util:uuid())).

no_permission_test(StartConnectionFun) ->
Params = #amqp_params{username = <<"test_user_no_perm">>,
password = <<"test_user_no_perm">>},
?assertMatch({error, access_refused}, StartConnectionFun(Params)).
?assertMatch({error, access_refused},
StartConnectionFun(<<"test_user_no_perm">>,
<<"test_user_no_perm">>,
<<"/">>)).
Loading

0 comments on commit 737ccbf

Please sign in to comment.