diff --git a/BUILD.bazel b/BUILD.bazel index ffab183c1118..4e26b1ab14cf 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -76,6 +76,12 @@ rabbitmq_home( plugins = PLUGINS, ) +rabbitmq_run_command( + name = "add-node", + rabbitmq_run = ":rabbitmq-run", + subcommand = "add-node", +) + rabbitmq_run( name = "rabbitmq-run", home = ":broker-home", diff --git a/MODULE.bazel b/MODULE.bazel index ac501018db39..9a48c6af6873 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -279,11 +279,17 @@ erlang_package.hex_package( version = "0.2.1", ) -erlang_package.hex_package( - name = "ra", +#erlang_package.hex_package( +# name = "ra", +# build_file = "@rabbitmq-server//bazel:BUILD.ra", +# sha256 = "13b03f02cf6c1837c527edd4a953f0c09da0abad0af6985b64bfd66943c4c5c3", +# version = "2.5.1", +#) + +erlang_package.git_package( + branch = "sunge/handle_status_callback", + repository = "SimonUnge/ra", build_file = "@rabbitmq-server//bazel:BUILD.ra", - sha256 = "13b03f02cf6c1837c527edd4a953f0c09da0abad0af6985b64bfd66943c4c5c3", - version = "2.5.1", ) erlang_package.hex_package( diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 07643b050a16..ebba2361d6aa 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -776,6 +776,13 @@ end}. {datatype, [integer, {list, string}]} ]}. +{mapping, "default_policies.operator.$id.target_group_size", "rabbit.default_policies.operator", [ + {include_default, 1}, + {commented, 1}, + {validators, ["non_zero_positive_integer"]}, + {datatype, integer} +]}. + {translation, "rabbit.default_policies.operator", fun(Conf) -> Props = rabbit_cuttlefish:aggregate_props( Conf, diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index eba9be0d3528..cb1b5964baea 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -26,6 +26,8 @@ tick/2, overview/1, + eval_members/3, + get_checked_out/4, %% versioning version/0, @@ -899,6 +901,10 @@ tick(Ts, #?MODULE{cfg = #cfg{name = _Name, [{aux, {handle_tick, [QName, overview(State), all_nodes(State)]}}] end. +eval_members({ClusterName, _} = _Leader, Cluster, + #?MODULE{cfg = #cfg{resource = QName}} = _State) -> + rabbit_quorum_queue:eval_members(ClusterName, Cluster, QName). + -spec overview(state()) -> map(). overview(#?MODULE{consumers = Cons, enqueuers = Enqs, diff --git a/deps/rabbit/src/rabbit_process.erl b/deps/rabbit/src/rabbit_process.erl index 0fe093ff7fe8..1b38032f37cc 100644 --- a/deps/rabbit/src/rabbit_process.erl +++ b/deps/rabbit/src/rabbit_process.erl @@ -88,6 +88,5 @@ is_registered_process_alive(Name) -> is_process_hibernated(Pid) when is_pid(Pid) -> {current_function,{erlang,hibernate,3}} == erlang:process_info(Pid, current_function); -is_process_hibernated(_) -> - %% some queue types, eg QQs, have a tuple as a Pid, but they are never hibernated +is_process_hibernated(Pid) when is_tuple(Pid) -> false. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index aeceae7caf8f..9b931784a90d 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -8,6 +8,8 @@ -module(rabbit_quorum_queue). -behaviour(rabbit_queue_type). +-behaviour(rabbit_policy_validator). +-behaviour(rabbit_policy_merge_strategy). -export([init/1, close/1, @@ -30,7 +32,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, handle_tick/3, spawn_deleter/1]). +-export([become_leader/2, handle_tick/3, spawn_deleter/1, eval_members/3]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -65,6 +67,7 @@ is_compatible/3, declare/2, is_stateful/0]). +-export([validate_policy/1, merge_policy_value/3]). -import(rabbit_queue_type_util, [args_policy_lookup/3, qname_to_internal_name/1]). @@ -111,6 +114,34 @@ -define(ADD_MEMBER_TIMEOUT, 5000). -define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096 +-define(EVAL_MEMBERS_TIMEOUT, 60000*60). +-define(EVAL_MEMBERS_EVENT_TIMEOUT, 60000). + + +%%----------- QQ policies --------------------------------------------------- + +-rabbit_boot_step( + {?MODULE, + [{description, "QQ policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"target-group-size">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [operator_policy_validator, <<"target-group-size">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_merge_strategy, <<"target-group-size">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +validate_policy(Args) -> + Count = proplists:get_value(<<"target-group-size">>, Args, none), + case is_integer(Count) andalso Count > 0 of + true -> ok; + false -> {error, "~tp is not a valid qq target count value", [Count]} + end. + +merge_policy_value(<<"target-group-size">>, _Val, OpVal) -> + OpVal. + %%----------- rabbit_queue_type --------------------------------------------- -spec is_enabled() -> boolean(). @@ -177,7 +208,7 @@ start_cluster(Q) -> Arguments = amqqueue:get_arguments(Q), Opts = amqqueue:get_options(Q), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), - QuorumSize = get_default_quorum_initial_group_size(Arguments), + QuorumSize = get_default_quorum_initial_group_size(Arguments, Q), RaName = case qname_to_internal_name(QName) of {ok, A} -> A; @@ -193,11 +224,7 @@ start_cluster(Q) -> [QuorumSize, rabbit_misc:rs(QName), Leader]), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> - TickTimeout = application:get_env(rabbit, quorum_tick_interval, - ?TICK_TIMEOUT), - SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval, - ?SNAPSHOT_INTERVAL), - RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval) + RaConfs = [make_ra_conf(NewQ, ServerId) || ServerId <- members(NewQ)], try erpc_call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs, ?START_CLUSTER_TIMEOUT], @@ -550,6 +577,96 @@ reductions(Name) -> 0 end. +eval_members(ClusterName, Cluster, QName) -> + MemberNodes = [N || {_, N} <- Cluster], + ExpectedNodes = rabbit_nodes:list_members(), + Remove = MemberNodes -- ExpectedNodes, + case Remove of + [] -> + add_member_effects(ClusterName, Cluster, QName, MemberNodes); + _ -> + remove_member_effects(ClusterName, Cluster, QName, Remove) + end. + +add_member_effects(ClusterName, Cluster, QName, MemberNodes) -> + Running = rabbit_nodes:list_running(), + {ok, Q} = rabbit_amqqueue:lookup(QName), + New = Running -- MemberNodes, + Size = get_target_size(Q), + CurrentSize = length(MemberNodes), + case {CurrentSize < Size, New} of + {true, NewNodes} when NewNodes =/= [] -> + NodesToAdd = lists:sublist(grow_order_sort(NewNodes), + Size - CurrentSize), + create_add_member_effects(ClusterName, Cluster, + Q, QName, NodesToAdd); + {_,_} -> + rabbit_log:debug("CALLED: NOOP ~p~n",[QName]), + undefined + end. + +get_target_size(Q) -> + case rabbit_policy:get(<<"target-group-size">>, Q) of + undefined -> + 0; + N -> + N + end. + +create_add_member_effects(ClusterName, Cluster, Q, QName, New) -> + rabbit_log:debug("CALLED: WILL ADD ~p for Q ~p~n",[New, QName]), + NewMembers = [make_add_member_effect(Q, QName, {ClusterName, N}) || N <- New], + {add_members, NewMembers, Cluster}. + +make_add_member_effect(Q, QName, {_ClusterName, Node} = ServerId) -> + Conf = make_ra_conf(Q, ServerId), + ResultFun = fun({ok, _, Leader}) -> + Fun = fun(Q1) -> + Q2 = update_type_state( + Q1, fun(#{nodes := Nodes} = Ts) -> + Ts#{nodes => [Node | Nodes]} + end), + amqqueue:set_pid(Q2, Leader) + end, + _ = rabbit_amqqueue:update(QName, Fun) + end, + {{ServerId, Conf}, ResultFun}. + +%% Just an idea to pick the 'right' node. In the future maybe a have availabilty +%% zone filters etc for picking a node. +%% TODO Reuse logic from `rabbit_queue_location:select_leader_and_followers` instead +grow_order_sort(Nodes) -> + QueueLenFun = + fun(Node) -> + length([Q || Q <- rabbit_amqqueue:list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(Node, rabbit_amqqueue:get_quorum_nodes(Q))]) + end, + NodeWithQLen = lists:keysort( + 2, + [{Node, QueueLenFun(Node)} || Node <- Nodes]), + [N || {N,_} <- NodeWithQLen]. + +remove_member_effects(ClusterName, Cluster, QName, RemovedFromCluster) -> + rabbit_log:debug("CALLED: WILL REMOVE ~p~n",[RemovedFromCluster]), + RemoveMembers = [make_remove_member_effect(QName, {ClusterName, N}) || + N <- RemovedFromCluster], + {remove_members, RemoveMembers, Cluster}. + +make_remove_member_effect(QName, {_ClusterName, Node} = ServerId) -> + ResultFun = fun({ok, _, _}) -> + Fun = fun(Q1) -> + update_type_state( + Q1, + fun(#{nodes := Nodes} = Ts) -> + Ts#{nodes => lists:delete(Node, + Nodes)} + end) + end, + _ = rabbit_amqqueue:update(QName, Fun) + end, + {ServerId, ResultFun}. + is_recoverable(Q) -> Node = node(), Nodes = get_nodes(Q), @@ -1089,11 +1206,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, Members = members(Q), - TickTimeout = application:get_env(rabbit, quorum_tick_interval, - ?TICK_TIMEOUT), - SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval, - ?SNAPSHOT_INTERVAL), - Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval), + Conf = make_ra_conf(Q, ServerId), case ra:start_server(?RA_SYSTEM, Conf) of ok -> case ra:add_member(Members, ServerId, Timeout) of @@ -1573,12 +1686,18 @@ quorum_ctag(Other) -> maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -get_default_quorum_initial_group_size(Arguments) -> - case rabbit_misc:table_lookup(Arguments, <<"x-quorum-initial-group-size">>) of - undefined -> +get_default_quorum_initial_group_size(Arguments, Q) -> + PolicyValue = rabbit_policy:get(<<"target-group-size">>, Q), + ArgValue = rabbit_misc:table_lookup(Arguments, <<"x-quorum-initial-group-size">>), + case {ArgValue, PolicyValue} of + {undefined, undefined} -> application:get_env(rabbit, quorum_cluster_size, 3); - {_Type, Val} -> - Val + {undefined, V} -> + V; + {{_Type, V}, undefined} -> + V; + {{_Type, ArgV}, PolV} -> + max(ArgV, PolV) end. %% member with the current leader first @@ -1590,7 +1709,16 @@ members(Q) when ?amqqueue_is_quorum(Q) -> format_ra_event(ServerId, Evt, QRef) -> {'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}. -make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) -> +make_ra_conf(Q, ServerId) -> + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), + SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval, + ?SNAPSHOT_INTERVAL), + %% Do we want these values to be configurable? + MemberEvalTimeout = application:get_env(rabbit, quorum_eval_members_timeout, + ?EVAL_MEMBERS_TIMEOUT), + MemberEvalEventTimeout = application:get_env(rabbit, quorum_eval_members_event_timeout, + ?EVAL_MEMBERS_EVENT_TIMEOUT), QName = amqqueue:get_name(Q), RaMachine = ra_machine(Q), [{ClusterName, _} | _] = Members = members(Q), @@ -1606,6 +1734,8 @@ make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) -> log_init_args => #{uid => UId, snapshot_interval => SnapshotInterval}, tick_timeout => TickTimeout, + eval_members_timeout => MemberEvalTimeout, + eval_members_event_timeout => MemberEvalEventTimeout, machine => RaMachine, ra_event_formatter => Formatter}. diff --git a/rabbitmq_run.bzl b/rabbitmq_run.bzl index b2e5debae1e9..2db4d8088ab4 100644 --- a/rabbitmq_run.bzl +++ b/rabbitmq_run.bzl @@ -126,6 +126,7 @@ rabbitmq_run_command_private = rule( "start-background-broker", "stop-node", "start-cluster", + "add-node", "stop-cluster", ]), }, diff --git a/scripts/bazel/rabbitmq-run.sh b/scripts/bazel/rabbitmq-run.sh index 608aa08bdd44..15e9c7bb9f5a 100755 --- a/scripts/bazel/rabbitmq-run.sh +++ b/scripts/bazel/rabbitmq-run.sh @@ -170,6 +170,9 @@ for arg in "$@"; do start-cluster) CMD="$arg" ;; + add-node) + CMD="$arg" + ;; stop-cluster) CMD="$arg" ;; @@ -279,6 +282,31 @@ case $CMD in fi done ;; + add-node) + n=${NODE_NUM:=3} + NODE0=${NODE_LEADER:="rabbit-0"} + setup_node_env "$n" + RABBITMQ_NODE_PORT=$((5672 + n)) \ + RABBITMQ_SERVER_START_ARGS=" \ + -rabbit loopback_users [] \ + -rabbitmq_management listener [{port,$((15672 + n))}] \ + -rabbitmq_mqtt tcp_listeners [$((1883 + n))] \ + -rabbitmq_web_mqtt tcp_config [{port,$((1893 + n))}] \ + -rabbitmq_web_mqtt_examples listener [{port,$((1903 + n))}] \ + -rabbitmq_stomp tcp_listeners [$((61613 + n))] \ + -rabbitmq_web_stomp tcp_config [{port,$((61623 + n))}] \ + -rabbitmq_web_stomp_examples listener [{port,$((61633 + n))}] \ + -rabbitmq_prometheus tcp_config [{port,$((15692 + n))}] \ + -rabbitmq_stream tcp_listeners [$((5552 + n))]" \ + "$RABBITMQ_SERVER" \ + > "$RABBITMQ_LOG_BASE"/startup_log \ + 2> "$RABBITMQ_LOG_BASE"/startup_err & + + await_startup + "$RABBITMQCTL" -n "$RABBITMQ_NODENAME" stop_app + "$RABBITMQCTL" -n "$RABBITMQ_NODENAME" join_cluster "$NODE0" + "$RABBITMQCTL" -n "$RABBITMQ_NODENAME" start_app + ;; stop-cluster) nodes=${NODES:=3} for ((n=nodes-1; n >= 0; n--))