@@ -24,7 +24,7 @@ all() ->
24
24
{group , rabbitmq },
25
25
{group , rabbitmq_strict },
26
26
{group , activemq },
27
- % {group, ibmmq}
27
+ % {group, ibmmq}
28
28
{group , activemq_no_anon },
29
29
{group , mock }
30
30
].
@@ -62,11 +62,10 @@ groups() ->
62
62
]}
63
63
].
64
64
65
- test () ->
65
+ test () ->
66
66
[
67
67
open_close_connection ,
68
- basic_roundtrip ,
69
- set_sender_capabilities
68
+ basic_roundtrip
70
69
].
71
70
72
71
shared () ->
@@ -115,9 +114,9 @@ stop_amqp10_client_app(Config) ->
115
114
% % -------------------------------------------------------------------
116
115
117
116
init_per_group (rabbitmq , Config0 ) ->
118
- Config = rabbit_ct_helpers :set_config (Config0 ,
119
- {sasl , {plain , <<" guest" >>, <<" guest" >>}}),
117
+ Config = rabbit_ct_helpers :set_config (Config0 ,{sasl , {plain , <<" guest" >>, <<" guest" >>}}),
120
118
rabbit_ct_helpers :run_steps (Config , rabbit_ct_broker_helpers :setup_steps ());
119
+
121
120
init_per_group (rabbitmq_strict , Config0 ) ->
122
121
Config = rabbit_ct_helpers :set_config (Config0 ,
123
122
{sasl , {plain , <<" guest" >>, <<" guest" >>}}),
@@ -131,9 +130,10 @@ init_per_group(activemq, Config0) ->
131
130
activemq_ct_helpers :setup_steps (" activemq.xml" ));
132
131
133
132
init_per_group (ibmmq , Config0 ) ->
134
- rabbit_ct_helpers :set_config (Config0 , [ {rmq_hostname , " localhost" },
133
+ NodeConfig = [{tcp_port_amqp , 5672 }],
134
+ rabbit_ct_helpers :set_config (Config0 , [ {rmq_nodes , [NodeConfig ]},
135
+ {rmq_hostname , " localhost" },
135
136
{tcp_hostname_amqp , " localhost" },
136
- {tcp_port_amqp , 5677 },
137
137
{sasl , {plain , <<" app" >>, <<" passw0rd" >>}} ]);
138
138
init_per_group (activemq_no_anon , Config0 ) ->
139
139
Config = rabbit_ct_helpers :set_config (
@@ -149,7 +149,9 @@ init_per_group(azure, Config) ->
149
149
{sb_port , 5671 }
150
150
]);
151
151
init_per_group (mock , Config ) ->
152
- rabbit_ct_helpers :set_config (Config , [{tcp_port_amqp , 25000 },
152
+ rabbit_ct_helpers :set_config (Config , [{mock_port , 25000 },
153
+ {tcp_port_amqp , 25000 },
154
+ {mock_host , " localhost" },
153
155
{tcp_hostname_amqp , " localhost" },
154
156
{sasl , none }
155
157
]).
@@ -169,9 +171,11 @@ end_per_group(_, Config) ->
169
171
% % -------------------------------------------------------------------
170
172
171
173
init_per_testcase (_Test , Config ) ->
174
+ ct :log (" Setting per test case" ),
172
175
case lists :keyfind (mock_port , 1 , Config ) of
173
176
{_ , Port } ->
174
177
M = mock_server :start (Port ),
178
+ ct :log (" Setting mock server" ),
175
179
rabbit_ct_helpers :set_config (Config , {mock_server , M });
176
180
_ -> Config
177
181
end .
@@ -186,22 +190,21 @@ end_per_testcase(_Test, Config) ->
186
190
187
191
open_close_connection (Config ) ->
188
192
Hostname = ? config (rmq_hostname , Config ),
189
- Port = ? config (tcp_port_amqp , Config ),
190
- % % Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
193
+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
191
194
% % an address list
192
195
OpnConf = #{addresses => [Hostname ],
193
196
port => Port ,
194
197
notify => self (),
195
198
container_id => <<" open_close_connection_container" >>,
196
199
sasl => ? config (sasl , Config )},
197
- {ok , Connection } = amqp10_client :open_connection (Hostname , Port ),
198
200
{ok , Connection2 } = amqp10_client :open_connection (OpnConf ),
199
201
receive
200
- {amqp10_event , {connection , Connection2 , opened }} -> ok
202
+ {amqp10_event , {connection , Connection2 , opened }} -> ct : log ( " connection opened " ), ok
201
203
after 5000 -> exit (connection_timeout )
202
204
end ,
205
+ ct :log (" Closing connection ..." ),
203
206
ok = amqp10_client :close_connection (Connection2 ),
204
- ok = amqp10_client : close_connection ( Connection ).
207
+ ct : log ( " Closed connection . " ).
205
208
206
209
open_connection_plain_sasl (Config ) ->
207
210
Hostname = ? config (rmq_hostname , Config ),
@@ -270,7 +273,7 @@ basic_roundtrip(Config) ->
270
273
application :start (sasl ),
271
274
Hostname = ? config (rmq_hostname , Config ),
272
275
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
273
- OpenConf = #{address => Hostname , port => Port , sasl => anon },
276
+ OpenConf = #{address => Hostname , port => Port , sasl => ? config ( sasl , Config ) },
274
277
roundtrip (OpenConf ).
275
278
276
279
basic_roundtrip_tls (Config ) ->
@@ -337,20 +340,40 @@ roundtrip_large_messages(Config) ->
337
340
DataMb = rand :bytes (1024 * 1024 ),
338
341
Data8Mb = rand :bytes (8 * 1024 * 1024 ),
339
342
Data64Mb = rand :bytes (64 * 1024 * 1024 ),
340
- ok = roundtrip (OpenConf , DataKb ),
341
- ok = roundtrip (OpenConf , DataMb ),
342
- ok = roundtrip (OpenConf , Data8Mb ),
343
- ok = roundtrip (OpenConf , Data64Mb ).
343
+ ok = roundtrip (OpenConf , [{body , DataKb }]),
344
+ ok = roundtrip (OpenConf , [{body , DataMb }]),
345
+ ok = roundtrip (OpenConf , [{body , Data8Mb }]),
346
+ ok = roundtrip (OpenConf , [{body , Data64Mb }]).
347
+
348
+ basic_roundtrip_ibmmq (Config ) ->
349
+ application :start (sasl ),
350
+ Hostname = ? config (rmq_hostname , Config ),
351
+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
352
+ OpenConf = #{address => Hostname , port => Port , sasl => ? config (sasl , Config )},
353
+ roundtrip (OpenConf , [{body , <<" banana" >>}, {destination , <<" DEV.QUEUE.1" >>}]).
344
354
345
355
roundtrip (OpenConf ) ->
346
- roundtrip (OpenConf , << " banana " >> ).
356
+ roundtrip (OpenConf , [] ).
347
357
348
- roundtrip (OpenConf , Body ) ->
358
+ roundtrip (OpenConf , Args ) ->
359
+ Body = proplists :get_value (body , Args , <<" banana" >>),
360
+ Destination = proplists :get_value (destination , Args , <<" test1" >>),
349
361
{ok , Connection } = amqp10_client :open_connection (OpenConf ),
350
362
{ok , Session } = amqp10_client :begin_session (Connection ),
351
- {ok , Sender } = amqp10_client :attach_sender_link (
352
- Session , <<" banana-sender" >>, <<" test1" >>, settled , unsettled_state ),
353
- await_link (Sender , credited , link_credit_timeout ),
363
+ ct :log (" Session attached " ),
364
+ SenderAttachArgs = #{name => <<" banana-sender" >>,
365
+ role => {sender , #{address => Destination ,
366
+ durable => unsettled_state ,
367
+ capabilities => <<" queue" >>}},
368
+ snd_settle_mode => settled ,
369
+ rcv_settle_mode => first ,
370
+ filter => #{},
371
+ properties => #{}
372
+ },
373
+ {ok , Sender } = amqp10_client :attach_link (Session , SenderAttachArgs ),
374
+ % %await_link(Sender, credited, link_credit_timeout),
375
+ await_link (Sender , attached , attached_timeout ),
376
+ ct :log (" Sender attached " ),
354
377
355
378
Now = os :system_time (millisecond ),
356
379
Props = #{creation_time => Now ,
@@ -369,8 +392,17 @@ roundtrip(OpenConf, Body) ->
369
392
await_link (Sender , {detached , normal }, link_detach_timeout ),
370
393
371
394
{error , link_not_found } = amqp10_client :detach_link (Sender ),
372
- {ok , Receiver } = amqp10_client :attach_receiver_link (
373
- Session , <<" banana-receiver" >>, <<" test1" >>, settled , unsettled_state ),
395
+ ReceiverAttachArgs = #{
396
+ name => <<" banana-receiver" >>,
397
+ role => {receiver , #{address => Destination ,
398
+ durable => unsettled_state ,
399
+ capabilities => <<" queue" >>}, self ()},
400
+ snd_settle_mode => settled ,
401
+ rcv_settle_mode => first ,
402
+ filter => #{},
403
+ properties => #{}
404
+ },
405
+ {ok , Receiver } = amqp10_client :attach_link (Session , ReceiverAttachArgs ),
374
406
{ok , OutMsg } = amqp10_client :get_msg (Receiver , 4 * 60_000 ),
375
407
ok = amqp10_client :end_session (Session ),
376
408
ok = amqp10_client :close_connection (Connection ),
@@ -384,14 +416,17 @@ roundtrip(OpenConf, Body) ->
384
416
ok .
385
417
386
418
filtered_roundtrip (OpenConf ) ->
387
- filtered_roundtrip (OpenConf , <<" banana" >>).
419
+ filtered_roundtrip (OpenConf , []).
420
+
421
+ filtered_roundtrip (OpenConf , Args ) ->
422
+ Body = proplists :get_value (body , Args , <<" banana" >>),
423
+ Destination = proplists :get_value (destination , Args , <<" test1" >>),
388
424
389
- filtered_roundtrip (OpenConf , Body ) ->
390
425
{ok , Connection } = amqp10_client :open_connection (OpenConf ),
391
426
{ok , Session } = amqp10_client :begin_session (Connection ),
392
427
{ok , Sender } = amqp10_client :attach_sender_link (Session ,
393
428
<<" default-sender" >>,
394
- << " test1 " >> ,
429
+ Destination ,
395
430
settled ,
396
431
unsettled_state ),
397
432
await_link (Sender , credited , link_credit_timeout ),
@@ -403,7 +438,7 @@ filtered_roundtrip(OpenConf, Body) ->
403
438
404
439
{ok , DefaultReceiver } = amqp10_client :attach_receiver_link (Session ,
405
440
<<" default-receiver" >>,
406
- << " test1 " >> ,
441
+ Destination ,
407
442
settled ,
408
443
unsettled_state ),
409
444
ok = amqp10_client :send_msg (Sender , Msg1 ),
@@ -421,7 +456,7 @@ filtered_roundtrip(OpenConf, Body) ->
421
456
422
457
{ok , FilteredReceiver } = amqp10_client :attach_receiver_link (Session ,
423
458
<<" filtered-receiver" >>,
424
- << " test1 " >> ,
459
+ Destination ,
425
460
settled ,
426
461
unsettled_state ,
427
462
#{<<" apache.org:selector-filter:string" >> => <<" amqp.annotation.x-opt-enqueuedtimeutc > " , Now2Binary /binary >>}),
@@ -727,7 +762,7 @@ subscribe_with_auto_flow_unsettled(Config) ->
727
762
ok = amqp10_client :end_session (Session ),
728
763
ok = amqp10_client :close_connection (Connection ).
729
764
730
-
765
+
731
766
insufficient_credit (Config ) ->
732
767
Hostname = ? config (mock_host , Config ),
733
768
Port = ? config (mock_port , Config ),
@@ -837,7 +872,7 @@ set_receiver_capabilities(Config) ->
837
872
838
873
% Hostname = ?config(mock_host, Config),
839
874
% Port = ?config(mock_port, Config),
840
-
875
+
841
876
OpenStep = fun ({0 = Ch , # 'v1_0.open' {}, _Pay }) ->
842
877
{Ch , [# 'v1_0.open' {container_id = {utf8 , <<" mock" >>}}]}
843
878
end ,
@@ -948,7 +983,7 @@ set_sender_capabilities(Config) ->
948
983
capabilities => <<" capability-1" >>}},
949
984
snd_settle_mode => mixed ,
950
985
rcv_settle_mode => first },
951
- {ok , Sender } = amqp10_client :attach_link (Session , AttachArgs ),
986
+ {ok , Sender } = amqp10_client :attach_link (Session , AttachArgs ),
952
987
await_link (Sender , attached , attached_timeout ),
953
988
Msg = amqp10_msg :new (<<" mock-tag" >>, <<" banana" >>, true ),
954
989
{error , insufficient_credit } = amqp10_client :send_msg (Sender , Msg ),
@@ -961,7 +996,7 @@ set_sender_capabilities(Config) ->
961
996
set_sender_sync_capabilities (Config ) ->
962
997
Hostname = ? config (tcp_hostname_amqp , Config ),
963
998
Port = ? config (tcp_port_amqp , Config ),
964
-
999
+
965
1000
OpenStep = fun ({0 = Ch , # 'v1_0.open' {}, _Pay }) ->
966
1001
{Ch , [# 'v1_0.open' {container_id = {utf8 , <<" mock" >>}}]}
967
1002
end ,
0 commit comments