Skip to content

Commit 805688c

Browse files
Merge pull request #7836 from rabbitmq/mergify/bp/v3.11.x/pr-7835
Unblock group of consumers on super stream partition (backport #7765) (backport #7835)
2 parents 5be4233 + ac09e33 commit 805688c

6 files changed

+427
-124
lines changed

deps/rabbit/BUILD.bazel

+3
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,9 @@ rabbitmq_suite(
800800
deps = [
801801
"//deps/rabbit_common:erlang_app",
802802
],
803+
runtime_deps = [
804+
"@meck//:erlang_app",
805+
],
803806
)
804807

805808
rabbitmq_integration_suite(

deps/rabbit/src/rabbit_core_ff.erl

+8
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@
111111
depends_on => [stream_queue]
112112
}}).
113113

114+
-rabbit_feature_flag(
115+
{stream_sac_coordinator_unblock_group,
116+
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
117+
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
118+
stability => stable,
119+
depends_on => [stream_single_active_consumer]
120+
}}).
121+
114122
%% -------------------------------------------------------------------
115123
%% Direct exchange routing v2.
116124
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

+82-21
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
253253
of
254254
{value, Consumer} ->
255255
G1 = remove_from_group(Consumer, Group0),
256-
handle_consumer_removal(G1, Consumer);
256+
handle_consumer_removal(G1, Consumer, Stream, ConsumerName);
257257
false ->
258258
{Group0, []}
259259
end,
@@ -269,19 +269,24 @@ apply(#command_activate_consumer{vhost = VirtualHost,
269269
stream = Stream,
270270
consumer_name = ConsumerName},
271271
#?MODULE{groups = StreamGroups0} = State0) ->
272+
rabbit_log:debug("Activating consumer on ~tp, group ~p",
273+
[Stream, ConsumerName]),
272274
{G, Eff} =
273275
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
274276
undefined ->
275-
rabbit_log:warning("trying to activate consumer in group ~p, but "
277+
rabbit_log:warning("Trying to activate consumer in group ~tp, but "
276278
"the group does not longer exist",
277279
[{VirtualHost, Stream, ConsumerName}]),
278280
{undefined, []};
279281
Group ->
280282
#consumer{pid = Pid, subscription_id = SubId} =
281283
evaluate_active_consumer(Group),
284+
rabbit_log:debug("New active consumer on ~tp, group ~tp " ++
285+
"is ~tp from ~tp",
286+
[Stream, ConsumerName, SubId, Pid]),
282287
Group1 =
283288
update_consumer_state_in_group(Group, Pid, SubId, true),
284-
{Group1, [notify_consumer_effect(Pid, SubId, true)]}
289+
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
285290
end,
286291
StreamGroups1 =
287292
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
@@ -521,7 +526,8 @@ do_register_consumer(VirtualHost,
521526
Effects =
522527
case Active of
523528
true ->
524-
[notify_consumer_effect(ConnectionPid, SubscriptionId, Active)];
529+
[notify_consumer_effect(ConnectionPid, SubscriptionId,
530+
Stream, ConsumerName, Active)];
525531
_ ->
526532
[]
527533
end,
@@ -549,7 +555,8 @@ do_register_consumer(VirtualHost,
549555
active = true},
550556
G1 = add_to_group(Consumer0, Group0),
551557
{G1,
552-
[notify_consumer_effect(ConnectionPid, SubscriptionId, true)]};
558+
[notify_consumer_effect(ConnectionPid, SubscriptionId,
559+
Stream, ConsumerName, true)]};
553560
_G ->
554561
%% whatever the current state is, the newcomer will be passive
555562
Consumer0 =
@@ -568,18 +575,28 @@ do_register_consumer(VirtualHost,
568575
%% the current active stays the same
569576
{G1, []};
570577
_ ->
578+
rabbit_log:debug("SAC consumer registration: " ++
579+
"active consumer change on stream ~tp, group ~tp. " ++
580+
"Notifying ~tp from ~tp it is no longer active.",
581+
[Stream, ConsumerName, ActSubId, ActPid]),
571582
%% there's a change, telling the active it's not longer active
572583
{update_consumer_state_in_group(G1,
573584
ActPid,
574585
ActSubId,
575586
false),
576587
[notify_consumer_effect(ActPid,
577588
ActSubId,
589+
Stream,
590+
ConsumerName,
578591
false,
579592
true)]}
580593
end;
581594
false ->
582-
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
595+
rabbit_log:debug("SAC consumer registration: no active consumer on stream ~tp, group ~tp. " ++
596+
"Likely waiting for a response from former active consumer.",
597+
[Stream, ConsumerName]),
598+
%% no active consumer in the (non-empty) group,
599+
%% we are waiting for the reply of a former active
583600
{G1, []}
584601
end
585602
end,
@@ -593,27 +610,27 @@ do_register_consumer(VirtualHost,
593610
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
594611
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}.
595612

596-
handle_consumer_removal(#group{consumers = []} = G, _) ->
613+
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
597614
{G, []};
598615
handle_consumer_removal(#group{partition_index = -1} = Group0,
599-
Consumer) ->
616+
Consumer, Stream, ConsumerName) ->
600617
case Consumer of
601618
#consumer{active = true} ->
602619
%% this is the active consumer we remove, computing the new one
603620
Group1 = compute_active_consumer(Group0),
604621
case lookup_active_consumer(Group1) of
605622
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
606623
%% creating the side effect to notify the new active consumer
607-
{Group1, [notify_consumer_effect(Pid, SubId, true)]};
624+
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]};
608625
_ ->
609626
%% no active consumer found in the group, nothing to do
610627
{Group1, []}
611628
end;
612629
#consumer{active = false} ->
613-
%% not the active consumer, nothing to do."),
630+
%% not the active consumer, nothing to do.
614631
{Group0, []}
615632
end;
616-
handle_consumer_removal(Group0, Consumer) ->
633+
handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
617634
case lookup_active_consumer(Group0) of
618635
{value,
619636
#consumer{pid = ActPid, subscription_id = ActSubId} =
@@ -623,40 +640,81 @@ handle_consumer_removal(Group0, Consumer) ->
623640
%% the current active stays the same
624641
{Group0, []};
625642
_ ->
643+
rabbit_log:debug("SAC consumer removal: " ++
644+
"active consumer change on stream ~tp, group ~tp. " ++
645+
"Notifying ~tp from ~tp it is no longer active.",
646+
[Stream, ConsumerName, ActSubId, ActPid]),
647+
626648
%% there's a change, telling the active it's not longer active
627649
{update_consumer_state_in_group(Group0,
628650
ActPid,
629651
ActSubId,
630652
false),
631-
[notify_consumer_effect(ActPid, ActSubId, false, true)]}
653+
[notify_consumer_effect(ActPid, ActSubId,
654+
Stream, ConsumerName, false, true)]}
632655
end;
633656
false ->
634657
case Consumer#consumer.active of
635658
true ->
636659
%% the active one is going away, picking a new one
637660
#consumer{pid = P, subscription_id = SID} =
638661
evaluate_active_consumer(Group0),
662+
rabbit_log:debug("SAC consumer removal: " ++
663+
"active consumer change on stream ~tp, group ~tp. " ++
664+
"Notifying ~tp from ~tp it is the new active consumer.",
665+
[Stream, ConsumerName, SID, P]),
639666
{update_consumer_state_in_group(Group0, P, SID, true),
640-
[notify_consumer_effect(P, SID, true)]};
667+
[notify_consumer_effect(P, SID,
668+
Stream, ConsumerName, true)]};
641669
false ->
642-
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
670+
rabbit_log:debug("SAC consumer removal: no active consumer on stream ~tp, group ~tp. " ++
671+
"Likely waiting for a response from former active consumer.",
672+
[Stream, ConsumerName]),
673+
%% no active consumer in the (non-empty) group,
674+
%% we are waiting for the reply of a former active
643675
{Group0, []}
644676
end
645677
end.
646678

647-
notify_consumer_effect(Pid, SubId, Active) ->
648-
notify_consumer_effect(Pid, SubId, Active, false).
679+
message_type() ->
680+
case has_unblock_group_support() of
681+
true ->
682+
map;
683+
false ->
684+
tuple
685+
end.
686+
687+
notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
688+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).
649689

650-
notify_consumer_effect(Pid, SubId, Active, false = _SteppingDown) ->
690+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) ->
691+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()).
692+
693+
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) ->
651694
mod_call_effect(Pid,
652695
{sac,
653-
{{subscription_id, SubId}, {active, Active},
696+
{{subscription_id, SubId},
697+
{active, Active},
654698
{extra, []}}});
655-
notify_consumer_effect(Pid, SubId, Active, true = _SteppingDown) ->
699+
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) ->
656700
mod_call_effect(Pid,
657701
{sac,
658-
{{subscription_id, SubId}, {active, Active},
659-
{extra, [{stepping_down, true}]}}}).
702+
{{subscription_id, SubId},
703+
{active, Active},
704+
{extra, [{stepping_down, true}]}}});
705+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) ->
706+
mod_call_effect(Pid,
707+
{sac, #{subscription_id => SubId,
708+
stream => Stream,
709+
consumer_name => Name,
710+
active => Active}});
711+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = _SteppingDown, map) ->
712+
mod_call_effect(Pid,
713+
{sac, #{subscription_id => SubId,
714+
stream => Stream,
715+
consumer_name => Name,
716+
active => Active,
717+
stepping_down => true}}).
660718

661719
maybe_create_group(VirtualHost,
662720
Stream,
@@ -768,3 +826,6 @@ send_message(ConnectionPid, Msg) ->
768826

769827
is_ff_enabled() ->
770828
rabbit_feature_flags:is_enabled(stream_single_active_consumer).
829+
830+
has_unblock_group_support() ->
831+
rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group).

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

+27-16
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@ end_per_group(_Group, _Config) ->
5252
ok.
5353

5454
init_per_testcase(_TestCase, Config) ->
55+
ok = meck:new(rabbit_feature_flags),
56+
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
5557
Config.
5658

5759
end_per_testcase(_TestCase, _Config) ->
60+
meck:unload(),
5861
ok.
5962

6063
simple_sac_test(_) ->
@@ -71,7 +74,7 @@ simple_sac_test(_) ->
7174
rabbit_stream_sac_coordinator:apply(Command0, State0),
7275
?assert(Active1),
7376
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
74-
assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
77+
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
7578

7679
Command1 =
7780
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
@@ -107,7 +110,7 @@ simple_sac_test(_) ->
107110
?assertEqual([consumer(ConnectionPid, 1, true),
108111
consumer(ConnectionPid, 2, false)],
109112
Consumers4),
110-
assertSendMessageEffect(ConnectionPid, 1, true, Effects4),
113+
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),
111114

112115
Command4 =
113116
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
@@ -116,7 +119,7 @@ simple_sac_test(_) ->
116119
ok, Effects5} =
117120
rabbit_stream_sac_coordinator:apply(Command4, State4),
118121
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
119-
assertSendMessageEffect(ConnectionPid, 2, true, Effects5),
122+
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),
120123

121124
Command5 =
122125
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
@@ -141,7 +144,7 @@ super_stream_partition_sac_test(_) ->
141144
rabbit_stream_sac_coordinator:apply(Command0, State0),
142145
?assert(Active1),
143146
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
144-
assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
147+
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
145148

146149
Command1 =
147150
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
@@ -155,7 +158,7 @@ super_stream_partition_sac_test(_) ->
155158
?assertEqual([consumer(ConnectionPid, 0, false),
156159
consumer(ConnectionPid, 1, false)],
157160
Consumers2),
158-
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Effects2),
161+
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Stream, ConsumerName, Effects2),
159162

160163
Command2 = activate_consumer_command(Stream, ConsumerName),
161164
{#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
@@ -167,7 +170,7 @@ super_stream_partition_sac_test(_) ->
167170
?assertEqual([consumer(ConnectionPid, 0, false),
168171
consumer(ConnectionPid, 1, true)],
169172
Consumers3),
170-
assertSendMessageEffect(ConnectionPid, 1, true, Effects3),
173+
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),
171174

172175
Command3 =
173176
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2),
@@ -197,7 +200,7 @@ super_stream_partition_sac_test(_) ->
197200
consumer(ConnectionPid, 2, false)],
198201
Consumers5),
199202

200-
assertSendMessageSteppingDownEffect(ConnectionPid, 1, Effects5),
203+
assertSendMessageSteppingDownEffect(ConnectionPid, 1, Stream, ConsumerName, Effects5),
201204

202205
Command5 = activate_consumer_command(Stream, ConsumerName),
203206
{#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} =
@@ -208,7 +211,7 @@ super_stream_partition_sac_test(_) ->
208211
?assertEqual([consumer(ConnectionPid, 1, false),
209212
consumer(ConnectionPid, 2, true)],
210213
Consumers6),
211-
assertSendMessageEffect(ConnectionPid, 2, true, Effects6),
214+
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),
212215

213216
Command6 =
214217
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
@@ -310,7 +313,9 @@ ensure_monitors_test(_) ->
310313
ok.
311314

312315
handle_connection_down_test(_) ->
313-
GroupId = {<<"/">>, <<"stream">>, <<"app">>},
316+
Stream = <<"stream">>,
317+
ConsumerName = <<"app">>,
318+
GroupId = {<<"/">>, Stream, ConsumerName},
314319
Pid0 = self(),
315320
Pid1 = spawn(fun() -> ok end),
316321
Group =
@@ -326,7 +331,7 @@ handle_connection_down_test(_) ->
326331
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
327332
assertSize(1, PidsGroups1),
328333
assertSize(1, maps:get(Pid1, PidsGroups1)),
329-
assertSendMessageEffect(Pid1, 1, true, Effects1),
334+
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
330335
?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
331336
Groups1),
332337
{#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
@@ -397,22 +402,28 @@ activate_consumer_command(Stream, ConsumerName) ->
397402
stream = Stream,
398403
consumer_name = ConsumerName}.
399404

400-
assertSendMessageEffect(Pid, SubId, Active, [Effect]) ->
405+
assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
401406
?assertEqual({mod_call,
402407
rabbit_stream_sac_coordinator,
403408
send_message,
404409
[Pid,
405410
{sac,
406-
{{subscription_id, SubId}, {active, Active},
407-
{extra, []}}}]},
411+
#{subscription_id => SubId,
412+
stream => Stream,
413+
consumer_name => ConsumerName,
414+
active => Active}
415+
}]},
408416
Effect).
409417

410-
assertSendMessageSteppingDownEffect(Pid, SubId, [Effect]) ->
418+
assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, [Effect]) ->
411419
?assertEqual({mod_call,
412420
rabbit_stream_sac_coordinator,
413421
send_message,
414422
[Pid,
415423
{sac,
416-
{{subscription_id, SubId}, {active, false},
417-
{extra, [{stepping_down, true}]}}}]},
424+
#{subscription_id => SubId,
425+
stream => Stream,
426+
consumer_name => ConsumerName,
427+
active => false,
428+
stepping_down => true}}]},
418429
Effect).

0 commit comments

Comments
 (0)