Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit fe553ed

Browse files
committedApr 16, 2025··
Implement node purge in stream SAC coordinator
1 parent 43dcb43 commit fe553ed

4 files changed

+246
-56
lines changed
 

‎deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ members() ->
812812
end
813813
end.
814814

815-
maybe_resize_coordinator_cluster(MachineVersion) ->
815+
maybe_resize_coordinator_cluster(LeaderPid, MachineVersion) ->
816816
spawn(fun() ->
817817
RabbitIsRunning = rabbit:is_running(),
818818
case members() of
@@ -848,27 +848,29 @@ maybe_resize_coordinator_cluster(MachineVersion) ->
848848
remove_member(Leader, Members, Old)
849849
end,
850850
maybe_handle_stale_nodes(MemberNodes, RabbitNodes,
851+
LeaderPid,
851852
MachineVersion);
852-
_ ->
853+
_ ->
853854
ok
854855
end
855856
end).
856857

857858
maybe_handle_stale_nodes(MemberNodes, ExpectedNodes,
858-
MachineVersion) when MachineVersion > 4 ->
859+
LeaderPid, MachineVersion) when MachineVersion > 4 ->
859860
case MemberNodes -- ExpectedNodes of
860861
[] ->
861862
ok;
862863
Stale when length(ExpectedNodes) > 0 ->
863864
rabbit_log:debug("Stale nodes detected in stream SAC "
864865
"coordinator: ~w. Purging state.",
865866
[Stale]),
866-
%% TODO SAC pipeline command to purge state from stale nodes
867+
Mod = sac_module(MachineVersion),
868+
ra:pipeline_command(LeaderPid, Mod:make_purge_nodes(Stale)),
867869
ok;
868870
_ ->
869871
ok
870872
end;
871-
maybe_handle_stale_nodes(_, _, _) ->
873+
maybe_handle_stale_nodes(_, _, _, _) ->
872874
ok.
873875

874876
add_member(Members, Node) ->
@@ -945,8 +947,9 @@ init_aux(_Name) ->
945947
%% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout?
946948
handle_aux(leader, _, maybe_resize_coordinator_cluster,
947949
#aux{resizer = undefined} = Aux, RaAux) ->
950+
Leader = ra_aux:leader_id(RaAux),
948951
MachineVersion = ra_aux:effective_machine_version(RaAux),
949-
Pid = maybe_resize_coordinator_cluster(MachineVersion),
952+
Pid = maybe_resize_coordinator_cluster(Leader, MachineVersion),
950953
{no_reply, Aux#aux{resizer = Pid}, RaAux, [{monitor, process, aux, Pid}]};
951954
handle_aux(leader, _, maybe_resize_coordinator_cluster,
952955
AuxState, RaAux) ->

‎deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 58 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
-opaque command() :: #command_register_consumer{} |
2222
#command_unregister_consumer{} |
2323
#command_activate_consumer{} |
24-
#command_connection_reconnected{}.
24+
#command_connection_reconnected{} |
25+
#command_purge_nodes{}.
2526

2627
-opaque state() :: #?MODULE{}.
2728

@@ -47,6 +48,7 @@
4748
group_consumers/5,
4849
overview/1,
4950
import_state/2]).
51+
-export([make_purge_nodes/1]).
5052

5153
%% exported for unit tests only
5254
-ifdef(TEST).
@@ -84,25 +86,13 @@ register_consumer(VirtualHost,
8486
ConnectionPid,
8587
Owner,
8688
SubscriptionId) ->
87-
process_command({sac,
88-
#command_register_consumer{vhost =
89-
VirtualHost,
90-
stream =
91-
Stream,
92-
partition_index
93-
=
94-
PartitionIndex,
95-
consumer_name
96-
=
97-
ConsumerName,
98-
connection_pid
99-
=
100-
ConnectionPid,
101-
owner =
102-
Owner,
103-
subscription_id
104-
=
105-
SubscriptionId}}).
89+
process_command(#command_register_consumer{vhost = VirtualHost,
90+
stream = Stream,
91+
partition_index = PartitionIndex,
92+
consumer_name = ConsumerName,
93+
connection_pid = ConnectionPid,
94+
owner = Owner,
95+
subscription_id = SubscriptionId}).
10696

10797
-spec unregister_consumer(binary(),
10898
binary(),
@@ -115,35 +105,24 @@ unregister_consumer(VirtualHost,
115105
ConsumerName,
116106
ConnectionPid,
117107
SubscriptionId) ->
118-
process_command({sac,
119-
#command_unregister_consumer{vhost =
120-
VirtualHost,
121-
stream =
122-
Stream,
123-
consumer_name
124-
=
125-
ConsumerName,
126-
connection_pid
127-
=
128-
ConnectionPid,
129-
subscription_id
130-
=
131-
SubscriptionId}}).
108+
process_command(#command_unregister_consumer{vhost = VirtualHost,
109+
stream = Stream,
110+
consumer_name = ConsumerName,
111+
connection_pid = ConnectionPid,
112+
subscription_id = SubscriptionId}).
132113

133114
-spec activate_consumer(binary(), binary(), binary()) -> ok.
134115
activate_consumer(VH, Stream, Name) ->
135-
process_command({sac,
136-
#command_activate_consumer{vhost =VH,
137-
stream = Stream,
138-
consumer_name= Name}}).
116+
process_command(#command_activate_consumer{vhost =VH,
117+
stream = Stream,
118+
consumer_name= Name}).
139119

140120
-spec connection_reconnected(connection_pid()) -> ok.
141121
connection_reconnected(Pid) ->
142-
process_command({sac,
143-
#command_connection_reconnected{pid = Pid}}).
122+
process_command(#command_connection_reconnected{pid = Pid}).
144123

145124
process_command(Cmd) ->
146-
case rabbit_stream_coordinator:process_command(Cmd) of
125+
case rabbit_stream_coordinator:process_command(wrap_cmd(Cmd)) of
147126
{ok, Res, _} ->
148127
Res;
149128
{error, _} = Err ->
@@ -152,6 +131,10 @@ process_command(Cmd) ->
152131
Err
153132
end.
154133

134+
-spec wrap_cmd(command()) -> {sac, command()}.
135+
wrap_cmd(Cmd) ->
136+
{sac, Cmd}.
137+
155138
%% return the current groups for a given virtual host
156139
-spec consumer_groups(binary(), [atom()]) ->
157140
{ok,
@@ -306,8 +289,31 @@ apply(#command_connection_reconnected{pid = Pid},
306289
handle_group_connection_reconnected(Pid, St, Eff, G)
307290
end, {State0, []}, Groups0),
308291

292+
{State1, ok, Eff};
293+
apply(#command_purge_nodes{nodes = Nodes}, State0) ->
294+
{State1, Eff} = lists:foldl(fun(N, {S0, Eff0}) ->
295+
{S1, Eff1} = purge_node(N, S0),
296+
{S1, Eff1 ++ Eff0}
297+
end, {State0, []}, Nodes),
309298
{State1, ok, Eff}.
310299

300+
purge_node(Node, #?MODULE{groups = Groups0} = State0) ->
301+
PidsGroups =
302+
maps:fold(fun(K, #group{consumers = Consumers}, Acc) ->
303+
lists:foldl(fun(#consumer{pid = Pid}, AccIn)
304+
when node(Pid) =:= Node ->
305+
PG0 = maps:get(Pid, AccIn, #{}),
306+
PG1 = PG0#{K => true},
307+
AccIn#{Pid => PG1};
308+
(_, AccIn) ->
309+
AccIn
310+
end, Acc, Consumers)
311+
end, #{}, Groups0),
312+
maps:fold(fun(Pid, Groups, {S0, Eff0}) ->
313+
{S1, Eff1} = handle_connection_down0(Pid, S0, Groups),
314+
{S1, Eff1 ++ Eff0}
315+
end, {State0, []}, PidsGroups).
316+
311317
handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
312318
Eff0, {VH, S, Name} = K) ->
313319
%% TODO sac: handle forgotten_active case (reconciliate state with current active)
@@ -573,6 +579,7 @@ ensure_monitors(#command_connection_reconnected{pid = Pid},
573579
Monitors#{Pid => sac},
574580
[{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]};
575581
ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) ->
582+
%% TODO sac: ensure the pid-group mapping after purge_nodes?
576583
{State0, Monitors, Effects}.
577584

578585
-spec handle_connection_down(connection_pid(), state()) ->
@@ -584,11 +591,14 @@ handle_connection_down(Pid,
584591
{State0, []};
585592
{Groups, PidsGroups1} ->
586593
State1 = State0#?MODULE{pids_groups = PidsGroups1},
587-
maps:fold(fun(G, _, Acc) ->
588-
handle_group_after_connection_down(Pid, Acc, G)
589-
end, {State1, []}, Groups)
594+
handle_connection_down0(Pid, State1, Groups)
590595
end.
591596

597+
handle_connection_down0(Pid, State, Groups) ->
598+
maps:fold(fun(G, _, Acc) ->
599+
handle_group_after_connection_down(Pid, Acc, G)
600+
end, {State, []}, Groups).
601+
592602
-spec handle_connection_node_disconnected(connection_pid(), state()) ->
593603
{state(), ra_machine:effects()}.
594604
handle_connection_node_disconnected(ConnPid,
@@ -730,6 +740,10 @@ import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
730740
#?MODULE{groups = map_to_groups(Groups),
731741
pids_groups = map_to_pids_groups(PidsGroups)}.
732742

743+
- spec make_purge_nodes([node()]) -> command().
744+
make_purge_nodes(Nodes) ->
745+
wrap_cmd(#command_purge_nodes{nodes = Nodes}).
746+
733747
map_to_groups(Groups) when is_map(Groups) ->
734748
maps:fold(fun(K, V, Acc) ->
735749
Acc#{K => map_to_group(V)}

‎deps/rabbit/src/rabbit_stream_sac_coordinator.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,5 @@
6464
consumer_name :: consumer_name()}).
6565
-record(command_connection_reconnected,
6666
{pid :: connection_pid()}).
67+
-record(command_purge_nodes,
68+
{nodes :: [node()]}).

‎deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 177 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ connection_reconnected_simple_disconnected_becomes_connected_test(_) ->
646646
Groups0 = #{GId => Group},
647647
State0 = state(Groups0),
648648

649-
Cmd = connection_reconnection_command(Pid0),
649+
Cmd = connection_reconnected_command(Pid0),
650650
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
651651

652652
assertHasGroup(GId, cgroup([consumer(Pid0, 0, {connected, active}),
@@ -670,7 +670,7 @@ connection_reconnected_simple_active_should_be_first_test(_) ->
670670
Groups0 = #{GId => Group},
671671
State0 = state(Groups0),
672672

673-
Cmd = connection_reconnection_command(Pid0),
673+
Cmd = connection_reconnected_command(Pid0),
674674
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
675675

676676
assertHasGroup(GId, cgroup([consumer(Pid1, 1, {connected, active}),
@@ -692,7 +692,7 @@ connection_reconnected_super_disconnected_becomes_connected_test(_) ->
692692
Groups0 = #{GId => Group},
693693
State0 = state(Groups0),
694694

695-
Cmd = connection_reconnection_command(Pid0),
695+
Cmd = connection_reconnected_command(Pid0),
696696
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
697697

698698
assertHasGroup(GId, cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
@@ -724,7 +724,7 @@ forget_connection_simple_disconnected_becomes_forgotten_test(_) ->
724724
assertSendMessageEffect(Pid1, 1, stream(), name(), true, Eff),
725725
ok.
726726

727-
forget_connection_super_disconnected_becomes_forgotten_test(_) ->
727+
forget_connection_super_stream_disconnected_becomes_forgotten_test(_) ->
728728
Pid0 = spawn(fun() -> ok end),
729729
Pid1 = spawn(fun() -> ok end),
730730
Pid2 = spawn(fun() -> ok end),
@@ -936,8 +936,153 @@ handle_connection_node_disconnected_super_stream_disconn_active_block_rebalancin
936936
assertNodeDisconnectedTimerEffect(Pid0, Eff),
937937
ok.
938938

939+
connection_reconnected_simple_disconn_active_block_rebalancing_test(_) ->
940+
Pid0 = spawn(fun() -> ok end),
941+
Pid1 = spawn(fun() -> ok end),
942+
Pid2 = spawn(fun() -> ok end),
943+
GId = group_id(),
944+
Group = cgroup([consumer(Pid0, 0, {disconnected, waiting}),
945+
consumer(Pid1, 0, {disconnected, active}),
946+
consumer(Pid2, 0, {connected, waiting})]),
947+
948+
Groups0 = #{GId => Group},
949+
State0 = state(Groups0),
950+
Cmd = connection_reconnected_command(Pid0),
951+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
952+
953+
assertHasGroup(GId, cgroup([consumer(Pid1, 0, {disconnected, active}),
954+
consumer(Pid0, 0, {connected, waiting}),
955+
consumer(Pid2, 0, {connected, waiting})]),
956+
Groups1),
957+
assertEmpty(Eff),
958+
ok.
959+
960+
connection_reconnected_super_stream_disconn_active_block_rebalancing_test(_) ->
961+
Pid0 = spawn(fun() -> ok end),
962+
Pid1 = spawn(fun() -> ok end),
963+
Pid2 = spawn(fun() -> ok end),
964+
GId = group_id(),
965+
Group = cgroup(1, [consumer(Pid0, 0, {disconnected, active}),
966+
consumer(Pid1, 0, {disconnected, waiting}),
967+
consumer(Pid2, 0, {connected, waiting})]),
968+
969+
Groups0 = #{GId => Group},
970+
State0 = state(Groups0),
971+
Cmd = connection_reconnected_command(Pid1),
972+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
973+
974+
assertHasGroup(GId, cgroup(1, [consumer(Pid0, 0, {disconnected, active}),
975+
consumer(Pid1, 0, {connected, waiting}),
976+
consumer(Pid2, 0, {connected, waiting})]),
977+
Groups1),
978+
assertEmpty(Eff),
979+
ok.
980+
981+
forget_connection_simple_disconn_active_block_rebalancing_test(_) ->
982+
Pid0 = spawn(fun() -> ok end),
983+
Pid1 = spawn(fun() -> ok end),
984+
Pid2 = spawn(fun() -> ok end),
985+
GId = group_id(),
986+
Group = cgroup([consumer(Pid0, {disconnected, waiting}),
987+
consumer(Pid1, {connected, waiting}),
988+
consumer(Pid2, {disconnected, active})]),
989+
990+
Groups0 = #{GId => Group},
991+
State0 = state(Groups0),
992+
993+
{#?STATE{groups = Groups1}, Eff} = ?MOD:forget_connection(Pid0, State0),
994+
995+
assertHasGroup(GId, cgroup([consumer(Pid2, {disconnected, active}),
996+
consumer(Pid0, {forgotten, waiting}),
997+
consumer(Pid1, {connected, waiting})]),
998+
Groups1),
999+
assertEmpty(Eff),
1000+
ok.
1001+
1002+
forget_connection_super_stream_disconn_active_block_rebalancing_test(_) ->
1003+
Pid0 = spawn(fun() -> ok end),
1004+
Pid1 = spawn(fun() -> ok end),
1005+
Pid2 = spawn(fun() -> ok end),
1006+
GId = group_id(),
1007+
Group = cgroup(1, [consumer(Pid0, {disconnected, waiting}),
1008+
consumer(Pid1, {connected, waiting}),
1009+
consumer(Pid2, {disconnected, active})]),
1010+
1011+
Groups0 = #{GId => Group},
1012+
State0 = state(Groups0),
1013+
1014+
{#?STATE{groups = Groups1}, Eff} = ?MOD:forget_connection(Pid0, State0),
1015+
1016+
assertHasGroup(GId, cgroup(1, [consumer(Pid0, {forgotten, waiting}),
1017+
consumer(Pid1, {connected, waiting}),
1018+
consumer(Pid2, {disconnected, active})]),
1019+
Groups1),
1020+
assertEmpty(Eff),
1021+
ok.
1022+
1023+
purge_nodes_test(_) ->
1024+
N0 = node(),
1025+
{ok, N1Pid, N1} = peer:start(#{
1026+
name => ?FUNCTION_NAME,
1027+
connection => standard_io,
1028+
shutdown => close
1029+
}),
1030+
1031+
N0P0 = spawn(N0, fun() -> ok end),
1032+
N0P1 = spawn(N0, fun() -> ok end),
1033+
N0P2 = spawn(N0, fun() -> ok end),
1034+
N1P0 = spawn(N1, fun() -> ok end),
1035+
N1P1 = spawn(N1, fun() -> ok end),
1036+
N1P2 = spawn(N1, fun() -> ok end),
1037+
1038+
S0 = <<"s0">>,
1039+
S1 = <<"s1">>,
1040+
S2 = <<"s2">>,
1041+
1042+
GId0 = group_id(S0),
1043+
GId1 = group_id(S1),
1044+
GId2 = group_id(S2),
1045+
1046+
Group0 = cgroup([consumer(N1P0, {disconnected, active}),
1047+
consumer(N0P1, {connected, waiting}),
1048+
consumer(N0P2, {connected, waiting})]),
1049+
1050+
Group1 = cgroup(1, [consumer(N1P1, {disconnected, waiting}),
1051+
consumer(N1P2, {disconnected, active}),
1052+
consumer(N0P0, {connected, waiting})]),
1053+
1054+
Group2 = cgroup([consumer(N0P0, {connected, active}),
1055+
consumer(N0P1, {connected, waiting}),
1056+
consumer(N0P2, {connected, waiting})]),
1057+
1058+
1059+
State0 = state(#{GId0 => Group0, GId1 => Group1, GId2 => Group2}),
1060+
Cmd = purge_nodes_command([N1]),
1061+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1062+
1063+
assertSize(3, Groups1),
1064+
assertHasGroup(GId0, cgroup([consumer(N0P1, {connected, active}),
1065+
consumer(N0P2, {connected, waiting})]),
1066+
Groups1),
1067+
assertHasGroup(GId1, cgroup(1, [consumer(N0P0, {connected, active})]),
1068+
Groups1),
1069+
assertHasGroup(GId2, cgroup([consumer(N0P0, {connected, active}),
1070+
consumer(N0P1, {connected, waiting}),
1071+
consumer(N0P2, {connected, waiting})]),
1072+
Groups1),
1073+
1074+
assertSize(2, Eff),
1075+
assertContainsSendMessageEffect(N0P1, S0, true, Eff),
1076+
assertContainsSendMessageEffect(N0P0, S1, true, Eff),
1077+
1078+
_ = peer:stop(N1Pid),
1079+
ok.
1080+
9391081
group_id() ->
940-
{<<"/">>, stream(), name()}.
1082+
group_id(stream()).
1083+
1084+
group_id(S) ->
1085+
{<<"/">>, S, name()}.
9411086

9421087
stream() ->
9431088
<<"sO">>.
@@ -964,6 +1109,9 @@ assertHasGroup(GroupId, Group, Groups) ->
9641109
G = maps:get(GroupId, Groups),
9651110
?assertEqual(Group, G).
9661111

1112+
consumer(Pid, Status) ->
1113+
consumer(Pid, 0, Status).
1114+
9671115
consumer(Pid, SubId, {Connectivity, Status}) ->
9681116
#consumer{pid = Pid,
9691117
subscription_id = SubId,
@@ -1015,9 +1163,32 @@ activate_consumer_command(Stream, ConsumerName) ->
10151163
stream = Stream,
10161164
consumer_name = ConsumerName}.
10171165

1018-
connection_reconnection_command(Pid) ->
1166+
connection_reconnected_command(Pid) ->
10191167
#command_connection_reconnected{pid = Pid}.
10201168

1169+
purge_nodes_command(Nodes) ->
1170+
#command_purge_nodes{nodes = Nodes}.
1171+
1172+
1173+
assertContainsSendMessageEffect(Pid, Stream, Active, Effects) ->
1174+
assertContainsSendMessageEffect(Pid, 0, Stream, name(), Active, Effects).
1175+
1176+
assertContainsSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active,
1177+
Effects) ->
1178+
Contains = lists:any(fun(Eff) ->
1179+
Eff =:= {mod_call,
1180+
rabbit_stream_sac_coordinator,
1181+
send_message,
1182+
[Pid,
1183+
{sac,
1184+
#{subscription_id => SubId,
1185+
stream => Stream,
1186+
consumer_name => ConsumerName,
1187+
active => Active}}]}
1188+
end, Effects),
1189+
?assert(Contains).
1190+
1191+
10211192
assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
10221193
?assertEqual({mod_call,
10231194
rabbit_stream_sac_coordinator,

0 commit comments

Comments
 (0)
Please sign in to comment.