Skip to content

Commit e174ab0

Browse files
Merge pull request #14891 from rabbitmq/mergify/bp/v4.2.x/pr-14890
Shovel: swap queue for lqueue (backport #14890)
2 parents c063881 + f89dfe5 commit e174ab0

File tree

4 files changed

+20
-19
lines changed

4 files changed

+20
-19
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,16 +362,16 @@ status(_) ->
362362
running.
363363

364364
pending_count(#{dest := Dest}) ->
365-
Pending = maps:get(pending, Dest, queue:new()),
366-
queue:len(Pending).
365+
Pending = maps:get(pending, Dest, lqueue:new()),
366+
lqueue:len(Pending).
367367

368368
add_pending(Elem, State = #{dest := Dest}) ->
369-
Pending = maps:get(pending, Dest, queue:new()),
370-
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
369+
Pending = maps:get(pending, Dest, lqueue:new()),
370+
State#{dest => Dest#{pending => lqueue:in(Elem, Pending)}}.
371371

372372
pop_pending(State = #{dest := Dest}) ->
373-
Pending = maps:get(pending, Dest, queue:new()),
374-
case queue:out(Pending) of
373+
Pending = maps:get(pending, Dest, lqueue:new()),
374+
case lqueue:out(Pending) of
375375
{empty, _} ->
376376
empty;
377377
{{value, Elem}, Pending2} ->

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ status(State) ->
476476
end.
477477

478478
pending_count(#{dest := #{pending_delivery := Pending}}) ->
479-
queue:len(Pending);
479+
lqueue:len(Pending);
480480
pending_count(_) ->
481481
0.
482482

@@ -947,12 +947,12 @@ is_blocked(_) ->
947947
false.
948948

949949
add_pending_delivery(Elem, State = #{dest := Dest}) ->
950-
Pending = maps:get(pending_delivery, Dest, queue:new()),
951-
State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}.
950+
Pending = maps:get(pending_delivery, Dest, lqueue:new()),
951+
State#{dest => Dest#{pending_delivery => lqueue:in(Elem, Pending)}}.
952952

953953
pop_pending_delivery(State = #{dest := Dest}) ->
954-
Pending = maps:get(pending_delivery, Dest, queue:new()),
955-
case queue:out(Pending) of
954+
Pending = maps:get(pending_delivery, Dest, lqueue:new()),
955+
case lqueue:out(Pending) of
956956
{empty, _} ->
957957
empty;
958958
{{value, Elem}, Pending2} ->

deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ init_per_suite(Config0) ->
108108
"source_queue_down",
109109
"dest_queue_down",
110110
"inbound_link_detached",
111-
"not_found"
111+
"not_found",
112+
"dependent process"
112113
]}
113114
]),
114115
rabbit_ct_helpers:run_setup_steps(
@@ -318,7 +319,7 @@ missing_src_queue_with_src_predeclared(Config) ->
318319
Ch, #'queue.bind'{queue = Dest,
319320
exchange = <<"dest-ex">>,
320321
routing_key = <<"dest-key">>}),
321-
322+
322323
set_param_nowait(Config,
323324
?PARAM, ?config(shovel_args, Config) ++
324325
[{<<"src-queue">>, Src},
@@ -327,7 +328,7 @@ missing_src_queue_with_src_predeclared(Config) ->
327328
{<<"dest-exchange-key">>, <<"dest-key">>}]),
328329
await_shovel(Config, 0, ?PARAM, terminated),
329330
expect_missing_queue(Ch, Src),
330-
331+
331332
with_amqp091_ch(
332333
Config,
333334
fun(Ch2) ->
@@ -368,7 +369,7 @@ missing_dest_queue_with_dest_predeclared(Config) ->
368369
fun(Ch2) ->
369370
amqp_channel:call(
370371
Ch2, #'queue.declare'{queue = Dest,
371-
durable = true}),
372+
durable = true}),
372373
await_shovel(Config, 0, ?PARAM, running),
373374
amqp091_publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, Dest, <<"hello!">>)
374375
end)
@@ -392,15 +393,15 @@ missing_src_queue_without_src_predeclared(Config) ->
392393
Ch, #'queue.bind'{queue = Dest,
393394
exchange = <<"dest-ex">>,
394395
routing_key = <<"dest-key">>}),
395-
396+
396397
set_param_nowait(Config, ?PARAM,
397398
?config(shovel_args, Config) ++
398399
[{<<"src-queue">>, Src},
399400
{<<"dest-exchange">>, <<"dest-ex">>},
400401
{<<"dest-exchange-key">>, <<"dest-key">>}]),
401402
await_shovel(Config, 0, ?PARAM, terminated),
402403
expect_missing_queue(Ch, Src),
403-
404+
404405
with_amqp091_ch(
405406
Config,
406407
fun(Ch2) ->

deps/rabbitmq_shovel/test/pending_count_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ amqp091_pending_count_empty_queue(_Config) ->
6969

7070
amqp091_pending_count_with_messages(_Config) ->
7171
%% Test that pending_count returns correct count when messages are pending
72-
PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
72+
PendingQueue = lqueue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
7373
State = #{dest => #{pending => PendingQueue}},
7474
?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)).
7575

7676
amqp091_pending_count_after_drain(_Config) ->
7777
%% Test that pending_count returns 0 after messages are drained
78-
EmptyQueue = queue:new(),
78+
EmptyQueue = lqueue:new(),
7979
State = #{dest => #{pending => EmptyQueue}},
8080
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).
8181

0 commit comments

Comments
 (0)