9
9
10
10
-behaviour (rabbit_shovel_behaviour ).
11
11
12
+ -include_lib (" rabbit/include/mc.hrl" ).
12
13
-include (" rabbit_shovel.hrl" ).
13
14
14
15
-export ([
30
31
ack /3 ,
31
32
nack /3 ,
32
33
status /1 ,
33
- forward /4
34
+ forward /3
34
35
]).
35
36
36
37
-import (rabbit_misc , [pget /2 , pget /3 ]).
@@ -184,10 +185,12 @@ dest_endpoint(#{shovel_type := dynamic,
184
185
185
186
-spec handle_source (Msg :: any (), state ()) ->
186
187
not_handled | state () | {stop , any ()}.
187
- handle_source ({amqp10_msg , _LinkRef , Msg }, State ) ->
188
- Tag = amqp10_msg :delivery_id (Msg ),
189
- Payload = amqp10_msg :body_bin (Msg ),
190
- rabbit_shovel_behaviour :forward (Tag , #{}, Payload , State );
188
+ handle_source ({amqp10_msg , _LinkRef , Msg0 }, State ) ->
189
+ Tag = amqp10_msg :delivery_id (Msg0 ),
190
+ [_ | Rest ] = amqp10_msg :to_amqp_records (Msg0 ),
191
+ Bin = iolist_to_binary ([amqp10_framing :encode_bin (D ) || D <- Rest ]),
192
+ Msg = mc :init (mc_amqp , Bin , #{}),
193
+ rabbit_shovel_behaviour :forward (Tag , Msg , State );
191
194
handle_source ({amqp10_event , {connection , Conn , opened }},
192
195
State = #{source := #{current := #{conn := Conn }}}) ->
193
196
State ;
@@ -260,8 +263,8 @@ handle_dest({amqp10_event, {link, Link, credited}},
260
263
% % we have credit so can begin to forward
261
264
State = State0 #{dest => Dst #{link_state => credited ,
262
265
pending => []}},
263
- lists :foldl (fun ({A , B , C }, S ) ->
264
- forward (A , B , C , S )
266
+ lists :foldl (fun ({A , B }, S ) ->
267
+ forward (A , B , S )
265
268
end , State , lists :reverse (Pend ));
266
269
handle_dest ({amqp10_event , {link , Link , _Evt }},
267
270
State = #{dest := #{current := #{link := Link }}}) ->
@@ -315,27 +318,26 @@ status(_) ->
315
318
% % Destination not yet connected
316
319
ignore .
317
320
318
- -spec forward (Tag :: tag (), Props :: #{atom () => any ()},
319
- Payload :: binary (), state ()) ->
321
+ -spec forward (Tag :: tag (), Mc :: mc :state (), state ()) ->
320
322
state () | {stop , any ()}.
321
- forward (_Tag , _Props , _Payload ,
323
+ forward (_Tag , _Mc ,
322
324
#{source := #{remaining_unacked := 0 }} = State ) ->
323
325
State ;
324
- forward (Tag , Props , Payload ,
326
+ forward (Tag , Mc ,
325
327
#{dest := #{current := #{link_state := attached },
326
328
pending := Pend0 } = Dst } = State ) ->
327
329
% % simply cache the forward oo
328
- Pend = [{Tag , Props , Payload } | Pend0 ],
330
+ Pend = [{Tag , Mc } | Pend0 ],
329
331
State #{dest => Dst #{pending => {Pend }}};
330
- forward (Tag , Props , Payload ,
332
+ forward (Tag , Msg0 ,
331
333
#{dest := #{current := #{link := Link },
332
334
unacked := Unacked } = Dst ,
333
335
ack_mode := AckMode } = State ) ->
334
336
OutTag = rabbit_data_coercion :to_binary (Tag ),
335
- Msg0 = new_message ( OutTag , Payload , State ),
336
- Msg = add_timestamp_header (
337
- State , set_message_properties (
338
- Props , add_forward_headers ( State , Msg0 )) ),
337
+ Msg1 = mc : protocol_state ( mc : convert ( mc_amqp , Msg0 ) ),
338
+ Records = lists : flatten ([ amqp10_framing : decode_bin ( iolist_to_binary ( S )) || S <- Msg1 ]),
339
+ Msg2 = amqp10_msg : new ( OutTag , Records , AckMode =/= on_confirm ),
340
+ Msg = update_amqp10_message ( Msg2 , mc : exchange ( Msg0 ), mc : routing_keys ( Msg0 ), State ),
339
341
case send_msg (Link , Msg ) of
340
342
ok ->
341
343
rabbit_shovel_behaviour :decr_remaining_unacked (
@@ -364,73 +366,25 @@ send_msg(Link, Msg) ->
364
366
end
365
367
end .
366
368
367
- new_message (Tag , Payload , #{ack_mode := AckMode ,
368
- dest := #{properties := Props ,
369
- application_properties := AppProps ,
370
- message_annotations := MsgAnns }}) ->
371
- Msg0 = amqp10_msg :new (Tag , Payload , AckMode =/= on_confirm ),
369
+ update_amqp10_message (Msg0 , Exchange , RK , #{dest := #{properties := Props ,
370
+ application_properties := AppProps0 ,
371
+ message_annotations := MsgAnns }} = State ) ->
372
372
Msg1 = amqp10_msg :set_properties (Props , Msg0 ),
373
- Msg = amqp10_msg :set_message_annotations (MsgAnns , Msg1 ),
374
- amqp10_msg :set_application_properties (AppProps , Msg ).
373
+ Msg2 = amqp10_msg :set_message_annotations (MsgAnns , Msg1 ),
374
+ AppProps = AppProps0 #{<<" exchange" >> => Exchange ,
375
+ <<" routing_key" >> => RK },
376
+ Msg = amqp10_msg :set_application_properties (AppProps , Msg2 ),
377
+ add_timestamp_header (State , add_forward_headers (State , Msg )).
375
378
376
379
add_timestamp_header (#{dest := #{add_timestamp_header := true }}, Msg ) ->
377
380
P = #{creation_time => os :system_time (milli_seconds )},
378
381
amqp10_msg :set_properties (P , Msg );
379
382
add_timestamp_header (_ , Msg ) -> Msg .
380
383
381
384
add_forward_headers (#{dest := #{cached_forward_headers := Props }}, Msg ) ->
382
- amqp10_msg :set_application_properties (Props , Msg );
385
+ amqp10_msg :set_application_properties (Props , Msg );
383
386
add_forward_headers (_ , Msg ) -> Msg .
384
387
385
- set_message_properties (Props , Msg ) ->
386
- % % this is effectively special handling properties from amqp 0.9.1
387
- maps :fold (
388
- fun (content_type , Ct , M ) ->
389
- amqp10_msg :set_properties (
390
- #{content_type => to_binary (Ct )}, M );
391
- (content_encoding , Ct , M ) ->
392
- amqp10_msg :set_properties (
393
- #{content_encoding => to_binary (Ct )}, M );
394
- (delivery_mode , 2 , M ) ->
395
- amqp10_msg :set_headers (#{durable => true }, M );
396
- (delivery_mode , 1 , M ) ->
397
- % by default the durable flag is false
398
- M ;
399
- (priority , P , M ) when is_integer (P ) ->
400
- amqp10_msg :set_headers (#{priority => P }, M );
401
- (correlation_id , Ct , M ) ->
402
- amqp10_msg :set_properties (#{correlation_id => to_binary (Ct )}, M );
403
- (reply_to , Ct , M ) ->
404
- amqp10_msg :set_properties (#{reply_to => to_binary (Ct )}, M );
405
- (message_id , Ct , M ) ->
406
- amqp10_msg :set_properties (#{message_id => to_binary (Ct )}, M );
407
- (timestamp , Ct , M ) ->
408
- amqp10_msg :set_properties (#{creation_time => Ct }, M );
409
- (user_id , Ct , M ) ->
410
- amqp10_msg :set_properties (#{user_id => Ct }, M );
411
- (headers , Headers0 , M ) when is_list (Headers0 ) ->
412
- % % AMPQ 0.9.1 are added as applicatin properties
413
- % % TODO: filter headers to make safe
414
- Headers = lists :foldl (
415
- fun ({K , _T , V }, Acc ) ->
416
- case is_amqp10_compat (V ) of
417
- true ->
418
- Acc #{to_binary (K ) => V };
419
- false ->
420
- Acc
421
- end
422
- end , #{}, Headers0 ),
423
- amqp10_msg :set_application_properties (Headers , M );
424
- (Key , Value , M ) ->
425
- case is_amqp10_compat (Value ) of
426
- true ->
427
- amqp10_msg :set_application_properties (
428
- #{to_binary (Key ) => Value }, M );
429
- false ->
430
- M
431
- end
432
- end , Msg , Props ).
433
-
434
388
gen_unique_name (Pre0 , Post0 ) ->
435
389
Pre = to_binary (Pre0 ),
436
390
Post = to_binary (Post0 ),
@@ -441,8 +395,3 @@ bin_to_hex(Bin) ->
441
395
<<<<if N >= 10 -> N - 10 + $a ;
442
396
true -> N + $0 end >>
443
397
|| <<N :4 >> <= Bin >>.
444
-
445
- is_amqp10_compat (T ) ->
446
- is_binary (T ) orelse
447
- is_number (T ) orelse
448
- is_boolean (T ).
0 commit comments