diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index ac8b9f2a4ba9..d8e7013ca02b 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -265,23 +265,29 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) -> %% A disposition will be notified to the sender by a message of the %% following stucture: %% {amqp10_disposition, {accepted | rejected, DeliveryTag}} --spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg(). +-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()], boolean()) -> amqp10_msg(). new(DeliveryTag, Bin, Settled) when is_binary(Bin) -> Body = [#'v1_0.data'{content = Bin}], new(DeliveryTag, Body, Settled); new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types - #amqp10_msg{ - transfer = #'v1_0.transfer'{ - delivery_tag = {binary, DeliveryTag}, - settled = Settled, - message_format = {uint, ?MESSAGE_FORMAT}}, - %% This lib is safe by default. - header = #'v1_0.header'{durable = true}, - body = Body}. + Transfer = #'v1_0.transfer'{ + delivery_tag = {binary, DeliveryTag}, + settled = Settled, + message_format = {uint, ?MESSAGE_FORMAT}}, + case is_amqp10_body(Body) orelse (not is_list(Body)) of + true -> + #amqp10_msg{ + transfer = Transfer, + %% This lib is safe by default. + header = #'v1_0.header'{durable = true}, + body = Body}; + false -> + from_amqp_records([Transfer | Body]) + end. %% @doc Create a new settled amqp10 message using the specified delivery tag %% and body. --spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg(). +-spec new(delivery_tag(), amqp10_body() | binary() | [amqp10_client_types:amqp10_msg_record()]) -> amqp10_msg(). new(DeliveryTag, Body) -> new(DeliveryTag, Body, false). @@ -462,3 +468,19 @@ uint(B) -> {uint, B}. has_value(undefined) -> false; has_value(_) -> true. + +is_amqp10_body(#'v1_0.amqp_value'{}) -> + true; +is_amqp10_body(List) when is_list(List) -> + lists:all(fun(#'v1_0.data'{}) -> + true; + (_) -> + false + end, List) orelse + lists:all(fun(#'v1_0.amqp_sequence'{}) -> + true; + (_) -> + false + end, List); +is_amqp10_body(_) -> + false. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index caa2024fa1e9..d50b7a69858d 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2440,7 +2440,6 @@ incoming_link_transfer( validate_message_size(PayloadSize, MaxMessageSize), rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize), messages_received(Settled), - Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of {ok, X, RoutingKeys, Mc1, PermCache} -> diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index 1740e7aad2a1..c9bcbf7e3f23 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -12,6 +12,7 @@ -behaviour(rabbit_shovel_behaviour). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbit/include/mc.hrl"). -include("rabbit_shovel.hrl"). -export([ @@ -33,7 +34,7 @@ ack/3, nack/3, status/1, - forward/4 + forward/3 ]). %% Function references should not be stored on the metadata store. @@ -169,8 +170,8 @@ forward_pending(State) -> case pop_pending(State) of empty -> State; - {{Tag, Props, Payload}, S} -> - S2 = do_forward(Tag, Props, Payload, S), + {{Tag, Mc}, S} -> + S2 = do_forward(Tag, Mc, S), S3 = control_throttle(S2), case is_blocked(S3) of true -> @@ -183,91 +184,50 @@ forward_pending(State) -> end end. -forward(IncomingTag, Props, Payload, State) -> +forward(IncomingTag, Mc, State) -> case is_blocked(State) of true -> %% We are blocked by client-side flow-control and/or %% `connection.blocked` message from the destination %% broker. Simply cache the forward. - PendingEntry = {IncomingTag, Props, Payload}, + PendingEntry = {IncomingTag, Mc}, add_pending(PendingEntry, State); false -> - State1 = do_forward(IncomingTag, Props, Payload, State), + State1 = do_forward(IncomingTag, Mc, State), control_throttle(State1) end. -do_forward(IncomingTag, Props, Payload, +do_forward(IncomingTag, Mc0, State0 = #{dest := #{props_fun := {M, F, Args}, current := {_, _, DstUri}, fields_fun := {Mf, Ff, Argsf}}}) -> SrcUri = rabbit_shovel_behaviour:source_uri(State0), % do publish - Exchange = maps:get(exchange, Props, undefined), - RoutingKey = maps:get(routing_key, Props, undefined), + Exchange = mc:exchange(Mc0), + RoutingKey = case mc:routing_keys(Mc0) of + [RK | _] -> RK; + Any -> Any + end, Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey}, Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]), - Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]), + Mc = mc:convert(mc_amqpl, Mc0), + {Props, Payload} = rabbit_basic_common:from_content(mc:protocol_state(Mc)), + Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, Props]), payload = Payload}, publish(IncomingTag, Method1, Msg1, State0). -props_from_map(Map) -> - #'P_basic'{content_type = maps:get(content_type, Map, undefined), - content_encoding = maps:get(content_encoding, Map, undefined), - headers = maps:get(headers, Map, undefined), - delivery_mode = maps:get(delivery_mode, Map, undefined), - priority = maps:get(priority, Map, undefined), - correlation_id = maps:get(correlation_id, Map, undefined), - reply_to = maps:get(reply_to, Map, undefined), - expiration = maps:get(expiration, Map, undefined), - message_id = maps:get(message_id, Map, undefined), - timestamp = maps:get(timestamp, Map, undefined), - type = maps:get(type, Map, undefined), - user_id = maps:get(user_id, Map, undefined), - app_id = maps:get(app_id, Map, undefined), - cluster_id = maps:get(cluster_id, Map, undefined)}. - -map_from_props(#'P_basic'{content_type = Content_type, - content_encoding = Content_encoding, - headers = Headers, - delivery_mode = Delivery_mode, - priority = Priority, - correlation_id = Correlation_id, - reply_to = Reply_to, - expiration = Expiration, - message_id = Message_id, - timestamp = Timestamp, - type = Type, - user_id = User_id, - app_id = App_id, - cluster_id = Cluster_id}) -> - lists:foldl(fun({_K, undefined}, Acc) -> Acc; - ({K, V}, Acc) -> Acc#{K => V} - end, #{}, [{content_type, Content_type}, - {content_encoding, Content_encoding}, - {headers, Headers}, - {delivery_mode, Delivery_mode}, - {priority, Priority}, - {correlation_id, Correlation_id}, - {reply_to, Reply_to}, - {expiration, Expiration}, - {message_id, Message_id}, - {timestamp, Timestamp}, - {type, Type}, - {user_id, User_id}, - {app_id, App_id}, - {cluster_id, Cluster_id} - ]). - handle_source(#'basic.consume_ok'{}, State) -> State; handle_source({#'basic.deliver'{delivery_tag = Tag, exchange = Exchange, routing_key = RoutingKey}, #amqp_msg{props = Props0, payload = Payload}}, State) -> - Props = (map_from_props(Props0))#{exchange => Exchange, - routing_key => RoutingKey}, + Content = rabbit_basic_common:build_content(Props0, Payload), + Msg0 = mc:init(mc_amqpl, Content, #{}), + Msg1 = mc:set_annotation(?ANN_ROUTING_KEYS, [RoutingKey], Msg0), + Msg = mc:set_annotation(?ANN_EXCHANGE, Exchange, Msg1), % forward to destination - rabbit_shovel_behaviour:forward(Tag, Props, Payload, State); + rabbit_shovel_behaviour:forward(Tag, Msg, State); handle_source({'EXIT', Conn, Reason}, #{source := #{current := {Conn, _, _}}}) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 37e8b1dd34b6..038b71f92267 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -9,6 +9,7 @@ -behaviour(rabbit_shovel_behaviour). +-include_lib("rabbit/include/mc.hrl"). -include("rabbit_shovel.hrl"). -export([ @@ -30,7 +31,7 @@ ack/3, nack/3, status/1, - forward/4 + forward/3 ]). -import(rabbit_misc, [pget/2, pget/3]). @@ -184,10 +185,12 @@ dest_endpoint(#{shovel_type := dynamic, -spec handle_source(Msg :: any(), state()) -> not_handled | state() | {stop, any()}. -handle_source({amqp10_msg, _LinkRef, Msg}, State) -> - Tag = amqp10_msg:delivery_id(Msg), - Payload = amqp10_msg:body_bin(Msg), - rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State); +handle_source({amqp10_msg, _LinkRef, Msg0}, State) -> + Tag = amqp10_msg:delivery_id(Msg0), + [_ | Rest] = amqp10_msg:to_amqp_records(Msg0), + Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]), + Msg = mc:init(mc_amqp, Bin, #{}), + rabbit_shovel_behaviour:forward(Tag, Msg, State); handle_source({amqp10_event, {connection, Conn, opened}}, State = #{source := #{current := #{conn := Conn}}}) -> State; @@ -260,8 +263,8 @@ handle_dest({amqp10_event, {link, Link, credited}}, %% we have credit so can begin to forward State = State0#{dest => Dst#{link_state => credited, pending => []}}, - lists:foldl(fun ({A, B, C}, S) -> - forward(A, B, C, S) + lists:foldl(fun ({A, B}, S) -> + forward(A, B, S) end, State, lists:reverse(Pend)); handle_dest({amqp10_event, {link, Link, _Evt}}, State= #{dest := #{current := #{link := Link}}}) -> @@ -315,27 +318,26 @@ status(_) -> %% Destination not yet connected ignore. --spec forward(Tag :: tag(), Props :: #{atom() => any()}, - Payload :: binary(), state()) -> +-spec forward(Tag :: tag(), Mc :: mc:state(), state()) -> state() | {stop, any()}. -forward(_Tag, _Props, _Payload, +forward(_Tag, _Mc, #{source := #{remaining_unacked := 0}} = State) -> State; -forward(Tag, Props, Payload, +forward(Tag, Mc, #{dest := #{current := #{link_state := attached}, pending := Pend0} = Dst} = State) -> %% simply cache the forward oo - Pend = [{Tag, Props, Payload} | Pend0], + Pend = [{Tag, Mc} | Pend0], State#{dest => Dst#{pending => {Pend}}}; -forward(Tag, Props, Payload, +forward(Tag, Msg0, #{dest := #{current := #{link := Link}, unacked := Unacked} = Dst, ack_mode := AckMode} = State) -> OutTag = rabbit_data_coercion:to_binary(Tag), - Msg0 = new_message(OutTag, Payload, State), - Msg = add_timestamp_header( - State, set_message_properties( - Props, add_forward_headers(State, Msg0))), + Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)), + Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]), + Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm), + Msg = update_amqp10_message(Msg2, mc:exchange(Msg0), mc:routing_keys(Msg0), State), case send_msg(Link, Msg) of ok -> rabbit_shovel_behaviour:decr_remaining_unacked( @@ -364,14 +366,15 @@ send_msg(Link, Msg) -> end end. -new_message(Tag, Payload, #{ack_mode := AckMode, - dest := #{properties := Props, - application_properties := AppProps, - message_annotations := MsgAnns}}) -> - Msg0 = amqp10_msg:new(Tag, Payload, AckMode =/= on_confirm), +update_amqp10_message(Msg0, Exchange, RK, #{dest := #{properties := Props, + application_properties := AppProps0, + message_annotations := MsgAnns}} = State) -> Msg1 = amqp10_msg:set_properties(Props, Msg0), - Msg = amqp10_msg:set_message_annotations(MsgAnns, Msg1), - amqp10_msg:set_application_properties(AppProps, Msg). + Msg2 = amqp10_msg:set_message_annotations(MsgAnns, Msg1), + AppProps = AppProps0#{<<"exchange">> => Exchange, + <<"routing_key">> => RK}, + Msg = amqp10_msg:set_application_properties(AppProps, Msg2), + add_timestamp_header(State, add_forward_headers(State, Msg)). add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) -> P =#{creation_time => os:system_time(milli_seconds)}, @@ -379,58 +382,9 @@ add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) -> add_timestamp_header(_, Msg) -> Msg. add_forward_headers(#{dest := #{cached_forward_headers := Props}}, Msg) -> - amqp10_msg:set_application_properties(Props, Msg); + amqp10_msg:set_application_properties(Props, Msg); add_forward_headers(_, Msg) -> Msg. -set_message_properties(Props, Msg) -> - %% this is effectively special handling properties from amqp 0.9.1 - maps:fold( - fun(content_type, Ct, M) -> - amqp10_msg:set_properties( - #{content_type => to_binary(Ct)}, M); - (content_encoding, Ct, M) -> - amqp10_msg:set_properties( - #{content_encoding => to_binary(Ct)}, M); - (delivery_mode, 2, M) -> - amqp10_msg:set_headers(#{durable => true}, M); - (delivery_mode, 1, M) -> - % by default the durable flag is false - M; - (priority, P, M) when is_integer(P) -> - amqp10_msg:set_headers(#{priority => P}, M); - (correlation_id, Ct, M) -> - amqp10_msg:set_properties(#{correlation_id => to_binary(Ct)}, M); - (reply_to, Ct, M) -> - amqp10_msg:set_properties(#{reply_to => to_binary(Ct)}, M); - (message_id, Ct, M) -> - amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M); - (timestamp, Ct, M) -> - amqp10_msg:set_properties(#{creation_time => Ct}, M); - (user_id, Ct, M) -> - amqp10_msg:set_properties(#{user_id => Ct}, M); - (headers, Headers0, M) when is_list(Headers0) -> - %% AMPQ 0.9.1 are added as applicatin properties - %% TODO: filter headers to make safe - Headers = lists:foldl( - fun ({K, _T, V}, Acc) -> - case is_amqp10_compat(V) of - true -> - Acc#{to_binary(K) => V}; - false -> - Acc - end - end, #{}, Headers0), - amqp10_msg:set_application_properties(Headers, M); - (Key, Value, M) -> - case is_amqp10_compat(Value) of - true -> - amqp10_msg:set_application_properties( - #{to_binary(Key) => Value}, M); - false -> - M - end - end, Msg, Props). - gen_unique_name(Pre0, Post0) -> Pre = to_binary(Pre0), Post = to_binary(Post0), @@ -441,8 +395,3 @@ bin_to_hex(Bin) -> <<<= 10 -> N -10 + $a; true -> N + $0 end>> || <> <= Bin>>. - -is_amqp10_compat(T) -> - is_binary(T) orelse - is_number(T) orelse - is_boolean(T). diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index 823dd481e9dc..c9304f1516f9 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -24,7 +24,7 @@ dest_protocol/1, source_endpoint/1, dest_endpoint/1, - forward/4, + forward/3, ack/3, nack/3, status/1, @@ -80,8 +80,7 @@ -callback ack(Tag :: tag(), Multi :: boolean(), state()) -> state(). -callback nack(Tag :: tag(), Multi :: boolean(), state()) -> state(). --callback forward(Tag :: tag(), Props :: #{atom() => any()}, - Payload :: binary(), state()) -> +-callback forward(Tag :: tag(), Msg :: mc:state(), state()) -> state() | {stop, any()}. -callback status(state()) -> rabbit_shovel_status:shovel_status(). @@ -142,10 +141,10 @@ source_endpoint(#{source := #{module := Mod}} = State) -> dest_endpoint(#{dest := #{module := Mod}} = State) -> Mod:dest_endpoint(State). --spec forward(tag(), #{atom() => any()}, binary(), state()) -> +-spec forward(tag(), mc:state(), state()) -> state() | {stop, any()}. -forward(Tag, Props, Payload, #{dest := #{module := Mod}} = State) -> - Mod:forward(Tag, Props, Payload, State). +forward(Tag, Msg, #{dest := #{module := Mod}} = State) -> + Mod:forward(Tag, Msg, State). -spec ack(tag(), boolean(), state()) -> state(). ack(Tag, Multi, #{source := #{module := Mod}} = State) -> diff --git a/deps/rabbitmq_shovel/test/amqp10_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_SUITE.erl index 937d37037cd3..3683dad53ac1 100644 --- a/deps/rabbitmq_shovel/test/amqp10_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_SUITE.erl @@ -118,6 +118,7 @@ amqp10_destination(Config, AckMode) -> receive {amqp10_msg, Receiver, InMsg} -> + ct:pal("GOT ~p", [InMsg]), [<<42>>] = amqp10_msg:body(InMsg), #{content_type := ?UNSHOVELLED, content_encoding := ?UNSHOVELLED, @@ -129,10 +130,12 @@ amqp10_destination(Config, AckMode) -> % creation_time := Timestamp } = amqp10_msg:properties(InMsg), #{<<"routing_key">> := ?TO_SHOVEL, - <<"type">> := ?UNSHOVELLED, + <<"exchange">> := ?EXCHANGE, <<"header1">> := 1, <<"header2">> := <<"h2">> } = amqp10_msg:application_properties(InMsg), + #{<<"x-basic-type">> := ?UNSHOVELLED + } = amqp10_msg:message_annotations(InMsg), #{durable := true} = amqp10_msg:headers(InMsg), ok after ?TIMEOUT -> diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index 639045c76ae7..8870e76b7e8b 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -28,7 +28,8 @@ groups() -> autodelete_amqp091_dest_on_publish, simple_amqp10_dest, simple_amqp10_src, - amqp091_to_amqp10_with_dead_lettering + amqp091_to_amqp10_with_dead_lettering, + amqp10_to_amqp091_application_properties ]}, {with_map_config, [], [ simple, @@ -153,6 +154,7 @@ test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) -> <<"message-ann-value">>}] end}]), Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>), + ct:pal("GOT ~p", [Msg]), AppProps = amqp10_msg:application_properties(Msg), ?assertMatch((#{user_id := <<"guest">>, creation_time := _}), @@ -193,6 +195,42 @@ simple_amqp10_src(Config) -> ok end). +amqp10_to_amqp091_application_properties(Config) -> + MapConfig = ?config(map_config, Config), + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_session(Config, + fun (Sess) -> + shovel_test_utils:set_param( + Config, + <<"test">>, [{<<"src-protocol">>, <<"amqp10">>}, + {<<"src-address">>, Src}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"dest-queue">>, Dest}, + {<<"add-forward-headers">>, true}, + {<<"dest-add-timestamp-header">>, true}, + {<<"publish-properties">>, + case MapConfig of + true -> #{<<"cluster_id">> => <<"x">>}; + _ -> [{<<"cluster_id">>, <<"x">>}] + end} + ]), + + MsgSent = amqp10_msg:set_application_properties( + #{<<"key">> => <<"value">>}, + amqp10_msg:set_headers( + #{durable => true}, + amqp10_msg:new(<<"tag1">>, <<"hello">>, false))), + + Msg = publish_expect_msg(Sess, Src, Dest, MsgSent), + AppProps = amqp10_msg:application_properties(Msg), + ct:pal("MSG ~p", [Msg]), + + ?assertMatch(#{<<"key">> := <<"value">>}, + AppProps), + ok + end). + change_definition(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), @@ -257,8 +295,8 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> {<<"ack-mode">>, AckMode} ]), await_autodelete(Config, <<"test">>), - expect_count(Session, Dest, <<"hello">>, ExpDest), - expect_count(Session, Src, <<"hello">>, ExpSrc) + expect_count(Session, Dest, ExpDest), + expect_count(Session, Src, ExpSrc) end. autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) -> @@ -277,8 +315,8 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) -> {<<"ack-mode">>, AckMode} ]), await_autodelete(Config, <<"test">>), - expect_count(Session, Dest, <<"hello">>, ExpDest), - expect_count(Session, Src, <<"hello">>, ExpSrc) + expect_count(Session, Dest, ExpDest), + expect_count(Session, Src, ExpSrc) end. autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) -> @@ -297,8 +335,8 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) -> {<<"ack-mode">>, AckMode} ]), await_autodelete(Config, <<"test">>), - expect_count(Session, Dest, <<"hello">>, ExpDest), - expect_count(Session, Src, <<"hello">>, ExpSrc) + expect_count(Session, Dest, ExpDest), + expect_count(Session, Src, ExpSrc) end. %%---------------------------------------------------------------------------- @@ -323,6 +361,15 @@ publish(Sender, Tag, Payload) when is_binary(Payload) -> exit(publish_disposition_not_received) end. +publish(Sender, Msg) -> + ok = amqp10_client:send_msg(Sender, Msg), + Tag = amqp10_msg:delivery_tag(Msg), + receive + {amqp10_disposition, {accepted, Tag}} -> ok + after 3000 -> + exit(publish_disposition_not_received) + end. + publish_expect(Session, Source, Dest, Tag, Payload) -> LinkName = <<"dynamic-sender-", Dest/binary>>, {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source, @@ -330,7 +377,16 @@ publish_expect(Session, Source, Dest, Tag, Payload) -> ok = await_amqp10_event(link, Sender, attached), publish(Sender, Tag, Payload), amqp10_client:detach_link(Sender), - expect_one(Session, Dest, Payload). + expect_one(Session, Dest). + +publish_expect_msg(Session, Source, Dest, Msg) -> + LinkName = <<"dynamic-sender-", Dest/binary>>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source, + unsettled, unsettled_state), + ok = await_amqp10_event(link, Sender, attached), + publish(Sender, Msg), + amqp10_client:detach_link(Sender), + expect_one(Session, Dest). await_amqp10_event(On, Ref, Evt) -> receive @@ -339,17 +395,17 @@ await_amqp10_event(On, Ref, Evt) -> exit({amqp10_event_timeout, On, Ref, Evt}) end. -expect_one(Session, Dest, Payload) -> +expect_one(Session, Dest) -> LinkName = <<"dynamic-receiver-", Dest/binary>>, {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, Dest, settled, unsettled_state), ok = amqp10_client:flow_link_credit(Receiver, 1, never), - Msg = expect(Receiver, Payload), + Msg = expect(Receiver), amqp10_client:detach_link(Receiver), Msg. -expect(Receiver, _Payload) -> +expect(Receiver) -> receive {amqp10_msg, Receiver, InMsg} -> InMsg @@ -379,7 +435,7 @@ publish_count(Session, Address, Payload, Count) -> end || I <- lists:seq(1, Count)], amqp10_client:detach_link(Sender). -expect_count(Session, Address, Payload, Count) -> +expect_count(Session, Address, Count) -> {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"dynamic-receiver", Address/binary>>, @@ -387,7 +443,7 @@ expect_count(Session, Address, Payload, Count) -> unsettled_state), ok = amqp10_client:flow_link_credit(Receiver, Count, never), [begin - expect(Receiver, Payload) + expect(Receiver) end || _ <- lists:seq(1, Count)], expect_empty(Session, Address), amqp10_client:detach_link(Receiver). diff --git a/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl index 9550c1b74309..834813fe6aea 100644 --- a/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl @@ -62,8 +62,8 @@ end_per_testcase(_TestCase, _Config) -> amqp_encoded_data_list(_Config) -> meck:new(rabbit_shovel_behaviour, [passthrough]), meck:expect(rabbit_shovel_behaviour, forward, - fun (_, _, Pay, S) -> - ?assert(erlang:is_binary(Pay)), + fun (_, Msg, S) -> + ?assert(mc:is(Msg)), S end), %% fake some shovel state @@ -83,8 +83,8 @@ amqp_encoded_data_list(_Config) -> amqp_encoded_amqp_value(_Config) -> meck:new(rabbit_shovel_behaviour, [passthrough]), meck:expect(rabbit_shovel_behaviour, forward, - fun (_, _, Pay, S) -> - ?assert(erlang:is_binary(Pay)), + fun (_, Msg, S) -> + ?assert(mc:is(Msg)), S end), %% fake some shovel state