Skip to content

Organize ra_counters by Ra system #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli

dep_gen_batch_server = hex 0.8.9
dep_aten = hex 0.6.0
dep_seshat = hex 0.6.0
dep_seshat = git https://github.com/rabbitmq/seshat main
DEPS = aten gen_batch_server seshat

TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy
Expand Down
34 changes: 25 additions & 9 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@
register_external_log_reader/1,
member_overview/1,
member_overview/2,
% deprecated
key_metrics/1,
key_metrics/2
key_metrics/2,
key_metrics/3
]).

%% xref should pick these up
Expand Down Expand Up @@ -763,8 +765,7 @@ overview(System) ->
wal := Wal}} = Config,
#{node => node(),
servers => ra_directory:overview(System),
%% TODO:filter counter keys by system
counters => ra_counters:overview(),
counters => ra_counters:overview(System),
wal => #{status => lists:nth(5, element(4, sys:get_status(Wal))),
open_mem_tables => ets:info(OpenTbls, size)
},
Expand Down Expand Up @@ -1211,17 +1212,31 @@ member_overview(ServerId) ->
member_overview(ServerId, Timeout) ->
ra_server_proc:local_state_query(ServerId, overview, Timeout).

%% @doc Returns a map of key metrics about a Ra member
%%
%% For backwards compatibility, since key_metrics/2 can
%% call key_metrics on a remote node during a rolling upgrade.
%% In the past, ra_counters were all under the ra namespace
%% and were not scoped by a Ra system name.
%%
%% @param ServerId the Ra server to obtain key metrics for
%% DEPRECATED: use key_metrics/2
%% @end
key_metrics(ServerId) ->
key_metrics(ra, ServerId, ?DEFAULT_TIMEOUT).

%% @doc Returns a map of key metrics about a Ra member
%%
%% The keys and values may vary depending on what state
%% the member is in. This function will never call into the
%% Ra process itself so is likely to return swiftly even
%% when the Ra process is busy (such as when it is recovering)
%%
%% @param System the system name
%% @param ServerId the Ra server to obtain key metrics for
%% @end
key_metrics(ServerId) ->
key_metrics(ServerId, ?DEFAULT_TIMEOUT).
key_metrics(System, ServerId) ->
key_metrics(System, ServerId, ?DEFAULT_TIMEOUT).

%% @doc Returns a map of key metrics about a Ra member
%%
Expand All @@ -1230,18 +1245,19 @@ key_metrics(ServerId) ->
%% Ra process itself so is likely to return swiftly even
%% when the Ra process is busy (such as when it is recovering)
%%
%% @param System the system name
%% @param ServerId the Ra server to obtain key metrics for
%% @param Timeout The time to wait for the server to reply
%% @end
key_metrics({Name, N} = ServerId, _Timeout) when N == node() ->
key_metrics(System, {Name, N} = ServerId, _Timeout) when N == node() ->
Fields = [last_applied,
commit_index,
snapshot_index,
last_written_index,
last_index,
commit_latency,
term],
Counters = case ra_counters:counters(ServerId, Fields) of
Counters = case ra_counters:counters(System, ServerId, Fields) of
undefined ->
#{};
C -> C
Expand All @@ -1260,8 +1276,8 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() ->
membership => Membership}
end
end;
key_metrics({_, N} = ServerId, Timeout) ->
erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout).
key_metrics(System, {_, N} = ServerId, Timeout) ->
erpc:call(N, ?MODULE, ?FUNCTION_NAME, [System, ServerId], Timeout).

%% internal

Expand Down
5 changes: 3 additions & 2 deletions src/ra_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

-include_lib("eunit/include/eunit.hrl").

-define(BENCH_SYSTEM_NAME, default).
-define(PIPE_SIZE, 500).
-define(DATA_SIZE, 256).

Expand Down Expand Up @@ -146,7 +147,7 @@ start(Name, Nodes) when is_atom(Name) ->
initial_members => ServerIds,
machine => {module, ?MODULE, #{}}}
end || N <- Nodes],
ra:start_cluster(default, Configs).
ra:start_cluster(?BENCH_SYSTEM_NAME, Configs).

prepare() ->
_ = application:ensure_all_started(ra),
Expand Down Expand Up @@ -202,7 +203,7 @@ spawn_client(Parent, Leader, Num, DataSize, Pipe, Counter) ->
print_metrics(Name) ->
io:format("Node: ~w~n", [node()]),
io:format("metrics ~p~n", [ets:lookup(ra_metrics, Name)]),
io:format("counters ~p~n", [ra_counters:overview()]).
io:format("counters ~p~n", [ra_counters:overview(?BENCH_SYSTEM_NAME)]).



Expand Down
56 changes: 31 additions & 25 deletions src/ra_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,53 @@
-include("ra.hrl").

-export([
init/0,
new/2,
fetch/1,
init/1,
new/3,
fetch/2,
overview/0,
overview/1,
counters/2,
delete/1
overview/2,
counters/3,
delete/2
]).

-type name() :: term().


-spec init() -> ok.
init() ->
-spec init(atom()) -> ok.
init(Namespace) ->
_ = application:ensure_all_started(seshat),
_ = seshat:new_group(ra),
_ = seshat:new_group(Namespace),
persistent_term:put(?FIELDSPEC_KEY, ?RA_COUNTER_FIELDS),
ok.

-spec new(name(), seshat:fields_spec()) ->
-spec new(atom(), name(), seshat:fields_spec()) ->
counters:counters_ref().
new(Name, FieldsSpec) ->
seshat:new(ra, Name, FieldsSpec).
new(Namespace, Name, FieldsSpec) ->
seshat:new(Namespace, Name, FieldsSpec).

-spec fetch(name()) -> undefined | counters:counters_ref().
fetch(Name) ->
seshat:fetch(ra, Name).
-spec fetch(atom(), name()) -> undefined | counters:counters_ref().
fetch(Namespace, Name) ->
seshat:fetch(Namespace, Name).

-spec delete(term()) -> ok.
delete(Name) ->
seshat:delete(ra, Name).
-spec delete(atom(), term()) -> ok.
delete(Namespace, Name) ->
seshat:delete(Namespace, Name).

-spec overview() -> #{name() => #{atom() => non_neg_integer()}}.
overview() ->
seshat:overview(ra).
%% TODO - this should return counters for all systems
seshat:overview(quorum_queues).

-spec overview(name()) -> #{atom() => non_neg_integer()}.
overview(Name) ->
seshat:overview(ra, Name).
-spec overview(atom()) -> #{seshat:name() => #{atom() => non_neg_integer()}}.
overview(Namespace) ->
seshat:overview(Namespace).

-spec counters(name(), [atom()]) ->
#{atom() => non_neg_integer()} | undefined.
counters(Name, Fields) ->
seshat:counters(ra, Name, Fields).
-spec overview(atom(), name()) -> undefined | #{seshat:name() => integer()}.
overview(Namespace, Name) ->
seshat:overview(Namespace, Name).

-spec counters(atom(), name(), [atom()]) ->
#{seshat:name() => non_neg_integer()} | undefined.
counters(Namespace, Name, Fields) ->
seshat:counters(Namespace, Name, Fields).
2 changes: 1 addition & 1 deletion src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ init([#{data_dir := DataDir,
name := SegWriterName,
system := System} = Conf]) ->
process_flag(trap_exit, true),
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS),
CRef = ra_counters:new(System, SegWriterName, ?COUNTER_FIELDS),
SegmentConf = maps:get(segment_conf, Conf, #{}),
maybe_upgrade_segment_file_names(System, DataDir),
{ok, #state{system = System,
Expand Down
8 changes: 5 additions & 3 deletions src/ra_log_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ init([#{data_dir := DataDir,


make_wal_conf(#{data_dir := DataDir,
name := _System,
name := System,
names := #{wal := WalName,
segment_writer := SegWriterName} = Names} = Cfg) ->
segment_writer := SegWriterName} = Names0} = Cfg) ->
Names = Names0#{system => System},
WalDir = case Cfg of
#{wal_data_dir := D} -> D;
_ -> DataDir
Expand All @@ -73,7 +74,8 @@ make_wal_conf(#{data_dir := DataDir,
MinBinVheapSize = maps:get(wal_min_bin_vheap_size, Cfg,
?MIN_BIN_VHEAP_SIZE),
MinHeapSize = maps:get(wal_min_heap_size, Cfg, ?MIN_HEAP_SIZE),
#{name => WalName,
#{system => System,
name => WalName,
names => Names,
dir => WalDir,
segment_writer => SegWriterName,
Expand Down
10 changes: 6 additions & 4 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,21 @@ init(#{dir := Dir} = Conf0) ->
garbage_collect := Gc,
min_heap_size := MinHeapSize,
min_bin_vheap_size := MinBinVheapSize,
system := System,
names := #{wal := WalName,
open_mem_tbls := MemTablesName} = Names} =
open_mem_tbls := MemTablesName} = Names0} =
merge_conf_defaults(Conf0),
?NOTICE("WAL: ~ts init, mem-tables table name: ~w",
[WalName, MemTablesName]),
process_flag(trap_exit, true),
% given ra_log_wal is effectively a fan-in sink it is likely that it will
% at times receive large number of messages from a large number of
% writers
Names = Names0#{system => System},
process_flag(message_queue_data, off_heap),
process_flag(min_bin_vheap_size, MinBinVheapSize),
process_flag(min_heap_size, MinHeapSize),
CRef = ra_counters:new(WalName, ?COUNTER_FIELDS),
CRef = ra_counters:new(System, WalName, ?COUNTER_FIELDS),
Conf = #conf{dir = Dir,
segment_writer = SegWriter,
compute_checksums = ComputeChecksums,
Expand Down Expand Up @@ -325,7 +327,7 @@ terminate(Reason, State) ->
format_status(#state{conf = #conf{write_strategy = Strat,
sync_method = SyncMeth,
compute_checksums = Cs,
names = #{wal := WalName},
names = #{system := System, wal := WalName},
max_size_bytes = MaxSize},
writers = Writers,
wal = #wal{file_size = FSize,
Expand All @@ -337,7 +339,7 @@ format_status(#state{conf = #conf{write_strategy = Strat,
filename => filename:basename(Fn),
current_size => FSize,
max_size_bytes => MaxSize,
counters => ra_counters:overview(WalName)
counters => ra_counters:overview(System, WalName)
}.

%% Internal
Expand Down
1 change: 0 additions & 1 deletion src/ra_metrics_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ init([]) ->
{write_concurrency, true},
public],
_ = ets:new(ra_log_metrics, [set | TableFlags]),
ok = ra_counters:init(),
ok = ra_leaderboard:init(),

%% Table for ra processes to record their current snapshot index so that
Expand Down
16 changes: 9 additions & 7 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,13 @@ init(Config) ->
{ok, recover, State, [{next_event, cast, go}]}.

do_init(#{id := Id,
cluster_name := ClusterName} = Config0) ->
cluster_name := ClusterName,
system_config := #{name := System}} = Config0) ->
Key = ra_lib:ra_server_id_to_local_name(Id),
true = ets:insert(ra_state, {Key, init, unknown}),
process_flag(trap_exit, true),
Config = #{counter := Counter,
system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(Id),
system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(System, Id),
Config0),
MsgQData = maps:get(message_queue_data, SysConf, off_heap),
MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf,
Expand Down Expand Up @@ -1048,7 +1049,8 @@ handle_event(_EventType, EventContent, StateName, State) ->

terminate(Reason, StateName,
#state{conf = #conf{name = Key, cluster_name = ClusterName},
server_state = ServerState = #{cfg := #cfg{metrics_key = MetricsKey}}} = State) ->
server_state = ServerState = #{cfg := #cfg{metrics_key = MetricsKey,
system_config = #{name := System}}}} = State) ->
?DEBUG("~ts: terminating with ~w in state ~w",
[log_id(State), Reason, StateName]),
#{names := #{server_sup := SrvSup,
Expand All @@ -1068,7 +1070,7 @@ terminate(Reason, StateName,
catch ra_directory:unregister_name(Names, UId),
_ = ra_server:terminate(ServerState, Reason),
catch ra_log_meta:delete_sync(MetaName, UId),
catch ra_counters:delete(Id),
catch ra_counters:delete(System, Id),
Self = self(),
%% we have to terminate the child spec from the supervisor as it
%% won't do this automatically, even for transient children
Expand Down Expand Up @@ -1768,10 +1770,10 @@ gen_statem_safe_call(ServerId, Msg, Timeout) ->
do_state_query(QueryName, #state{server_state = State}) ->
ra_server:state_query(QueryName, State).

config_defaults(ServerId) ->
Counter = case ra_counters:fetch(ServerId) of
config_defaults(System, ServerId) ->
Counter = case ra_counters:fetch(System, ServerId) of
undefined ->
ra_counters:new(ServerId,
ra_counters:new(System, ServerId,
{persistent_term, ?FIELDSPEC_KEY});
C ->
C
Expand Down
2 changes: 1 addition & 1 deletion src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ delete_server_rpc(System, RaName) ->
catch ets:delete(ra_metrics, RaName),
catch ets:delete(ra_state, RaName),
catch ets:delete(ra_open_file_metrics, Pid),
catch ra_counters:delete({RaName, node()}),
catch ra_counters:delete(System, {RaName, node()}),
catch ra_leaderboard:clear(RaName),
ok
end.
Expand Down
9 changes: 6 additions & 3 deletions src/ra_system.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
stop_default/0
]).

-type names() :: #{wal := atom(),
-type names() :: #{system := atom(),
wal := atom(),
wal_sup := atom(),
log_sup := atom(),
log_ets := atom(),
Expand Down Expand Up @@ -146,7 +147,8 @@ default_config() ->
low_priority_commands_flush_size => LowPriorityCommandsFlushSize,
low_priority_commands_in_memory_size => LowPriorityInMemSize,
machine_upgrade_strategy => MachineUpgradeStrategy,
names => #{wal => ra_log_wal,
names => #{system => default,
wal => ra_log_wal,
wal_sup => ra_log_wal_sup,
log_sup => ra_log_sup,
log_ets => ra_log_ets,
Expand All @@ -159,7 +161,8 @@ default_config() ->
}}.

derive_names(SysName) when is_atom(SysName) ->
#{wal => derive(SysName, <<"log_wal">>),
#{system => SysName,
wal => derive(SysName, <<"log_wal">>),
wal_sup => derive(SysName, <<"log_wal_sup">>),
log_sup => derive(SysName, <<"log_sup">>),
log_ets => derive(SysName, <<"log_ets">>),
Expand Down
1 change: 1 addition & 0 deletions src/ra_system_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ init(#{data_dir := DataDir,
start => {ra_server_sup_sup, start_link, [Cfg]}},
Recover = #{id => ra_system_recover,
start => {ra_system_recover, start_link, [maps:get(name, Cfg)]}},
ra_counters:init(Name),
{ok, {SupFlags, [Ets, RaLogSup, RaServerSupSup, Recover]}};
{error, Code} ->
?ERR("Failed to create Ra data directory at '~ts', file system operation error: ~p", [DataDir, Code]),
Expand Down
Loading