diff --git a/Makefile b/Makefile index 1a25938b..0d2cc732 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli dep_gen_batch_server = hex 0.8.9 dep_aten = hex 0.6.0 -dep_seshat = hex 0.6.0 +dep_seshat = git https://github.com/rabbitmq/seshat main DEPS = aten gen_batch_server seshat TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy diff --git a/src/ra.erl b/src/ra.erl index 7296c720..ad73a77c 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -81,8 +81,10 @@ register_external_log_reader/1, member_overview/1, member_overview/2, + % deprecated key_metrics/1, - key_metrics/2 + key_metrics/2, + key_metrics/3 ]). %% xref should pick these up @@ -763,8 +765,7 @@ overview(System) -> wal := Wal}} = Config, #{node => node(), servers => ra_directory:overview(System), - %% TODO:filter counter keys by system - counters => ra_counters:overview(), + counters => ra_counters:overview(System), wal => #{status => lists:nth(5, element(4, sys:get_status(Wal))), open_mem_tables => ets:info(OpenTbls, size) }, @@ -1211,6 +1212,19 @@ member_overview(ServerId) -> member_overview(ServerId, Timeout) -> ra_server_proc:local_state_query(ServerId, overview, Timeout). +%% @doc Returns a map of key metrics about a Ra member +%% +%% For backwards compatibility, since key_metrics/2 can +%% call key_metrics on a remote node during a rolling upgrade. +%% In the past, ra_counters were all under the ra namespace +%% and were not scoped by a Ra system name. +%% +%% @param ServerId the Ra server to obtain key metrics for +%% DEPRECATED: use key_metrics/2 +%% @end +key_metrics(ServerId) -> + key_metrics(ra, ServerId, ?DEFAULT_TIMEOUT). + %% @doc Returns a map of key metrics about a Ra member %% %% The keys and values may vary depending on what state @@ -1218,10 +1232,11 @@ member_overview(ServerId, Timeout) -> %% Ra process itself so is likely to return swiftly even %% when the Ra process is busy (such as when it is recovering) %% +%% @param System the system name %% @param ServerId the Ra server to obtain key metrics for %% @end -key_metrics(ServerId) -> - key_metrics(ServerId, ?DEFAULT_TIMEOUT). +key_metrics(System, ServerId) -> + key_metrics(System, ServerId, ?DEFAULT_TIMEOUT). %% @doc Returns a map of key metrics about a Ra member %% @@ -1230,10 +1245,11 @@ key_metrics(ServerId) -> %% Ra process itself so is likely to return swiftly even %% when the Ra process is busy (such as when it is recovering) %% +%% @param System the system name %% @param ServerId the Ra server to obtain key metrics for %% @param Timeout The time to wait for the server to reply %% @end -key_metrics({Name, N} = ServerId, _Timeout) when N == node() -> +key_metrics(System, {Name, N} = ServerId, _Timeout) when N == node() -> Fields = [last_applied, commit_index, snapshot_index, @@ -1241,7 +1257,7 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() -> last_index, commit_latency, term], - Counters = case ra_counters:counters(ServerId, Fields) of + Counters = case ra_counters:counters(System, ServerId, Fields) of undefined -> #{}; C -> C @@ -1260,8 +1276,8 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() -> membership => Membership} end end; -key_metrics({_, N} = ServerId, Timeout) -> - erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout). +key_metrics(System, {_, N} = ServerId, Timeout) -> + erpc:call(N, ?MODULE, ?FUNCTION_NAME, [System, ServerId], Timeout). %% internal diff --git a/src/ra_bench.erl b/src/ra_bench.erl index 56fff922..bddb88f4 100644 --- a/src/ra_bench.erl +++ b/src/ra_bench.erl @@ -15,6 +15,7 @@ -include_lib("eunit/include/eunit.hrl"). +-define(BENCH_SYSTEM_NAME, default). -define(PIPE_SIZE, 500). -define(DATA_SIZE, 256). @@ -146,7 +147,7 @@ start(Name, Nodes) when is_atom(Name) -> initial_members => ServerIds, machine => {module, ?MODULE, #{}}} end || N <- Nodes], - ra:start_cluster(default, Configs). + ra:start_cluster(?BENCH_SYSTEM_NAME, Configs). prepare() -> _ = application:ensure_all_started(ra), @@ -202,7 +203,7 @@ spawn_client(Parent, Leader, Num, DataSize, Pipe, Counter) -> print_metrics(Name) -> io:format("Node: ~w~n", [node()]), io:format("metrics ~p~n", [ets:lookup(ra_metrics, Name)]), - io:format("counters ~p~n", [ra_counters:overview()]). + io:format("counters ~p~n", [ra_counters:overview(?BENCH_SYSTEM_NAME)]). diff --git a/src/ra_counters.erl b/src/ra_counters.erl index 080b4572..5cb681d6 100644 --- a/src/ra_counters.erl +++ b/src/ra_counters.erl @@ -8,47 +8,53 @@ -include("ra.hrl"). -export([ - init/0, - new/2, - fetch/1, + init/1, + new/3, + fetch/2, overview/0, overview/1, - counters/2, - delete/1 + overview/2, + counters/3, + delete/2 ]). -type name() :: term(). --spec init() -> ok. -init() -> +-spec init(atom()) -> ok. +init(Namespace) -> _ = application:ensure_all_started(seshat), - _ = seshat:new_group(ra), + _ = seshat:new_group(Namespace), persistent_term:put(?FIELDSPEC_KEY, ?RA_COUNTER_FIELDS), ok. --spec new(name(), seshat:fields_spec()) -> +-spec new(atom(), name(), seshat:fields_spec()) -> counters:counters_ref(). -new(Name, FieldsSpec) -> - seshat:new(ra, Name, FieldsSpec). +new(Namespace, Name, FieldsSpec) -> + seshat:new(Namespace, Name, FieldsSpec). --spec fetch(name()) -> undefined | counters:counters_ref(). -fetch(Name) -> - seshat:fetch(ra, Name). +-spec fetch(atom(), name()) -> undefined | counters:counters_ref(). +fetch(Namespace, Name) -> + seshat:fetch(Namespace, Name). --spec delete(term()) -> ok. -delete(Name) -> - seshat:delete(ra, Name). +-spec delete(atom(), term()) -> ok. +delete(Namespace, Name) -> + seshat:delete(Namespace, Name). -spec overview() -> #{name() => #{atom() => non_neg_integer()}}. overview() -> - seshat:overview(ra). + %% TODO - this should return counters for all systems + seshat:overview(quorum_queues). --spec overview(name()) -> #{atom() => non_neg_integer()}. -overview(Name) -> - seshat:overview(ra, Name). +-spec overview(atom()) -> #{seshat:name() => #{atom() => non_neg_integer()}}. +overview(Namespace) -> + seshat:overview(Namespace). --spec counters(name(), [atom()]) -> - #{atom() => non_neg_integer()} | undefined. -counters(Name, Fields) -> - seshat:counters(ra, Name, Fields). +-spec overview(atom(), name()) -> undefined | #{seshat:name() => integer()}. +overview(Namespace, Name) -> + seshat:overview(Namespace, Name). + +-spec counters(atom(), name(), [atom()]) -> + #{seshat:name() => non_neg_integer()} | undefined. +counters(Namespace, Name, Fields) -> + seshat:counters(Namespace, Name, Fields). diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index f6ab0478..ce7e554e 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -115,7 +115,7 @@ init([#{data_dir := DataDir, name := SegWriterName, system := System} = Conf]) -> process_flag(trap_exit, true), - CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS), + CRef = ra_counters:new(System, SegWriterName, ?COUNTER_FIELDS), SegmentConf = maps:get(segment_conf, Conf, #{}), maybe_upgrade_segment_file_names(System, DataDir), {ok, #state{system = System, diff --git a/src/ra_log_sup.erl b/src/ra_log_sup.erl index de0b4676..ffd445b5 100644 --- a/src/ra_log_sup.erl +++ b/src/ra_log_sup.erl @@ -52,9 +52,10 @@ init([#{data_dir := DataDir, make_wal_conf(#{data_dir := DataDir, - name := _System, + name := System, names := #{wal := WalName, - segment_writer := SegWriterName} = Names} = Cfg) -> + segment_writer := SegWriterName} = Names0} = Cfg) -> + Names = Names0#{system => System}, WalDir = case Cfg of #{wal_data_dir := D} -> D; _ -> DataDir @@ -73,7 +74,8 @@ make_wal_conf(#{data_dir := DataDir, MinBinVheapSize = maps:get(wal_min_bin_vheap_size, Cfg, ?MIN_BIN_VHEAP_SIZE), MinHeapSize = maps:get(wal_min_heap_size, Cfg, ?MIN_HEAP_SIZE), - #{name => WalName, + #{system => System, + name => WalName, names => Names, dir => WalDir, segment_writer => SegWriterName, diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 87aa87e6..26c83605 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -266,8 +266,9 @@ init(#{dir := Dir} = Conf0) -> garbage_collect := Gc, min_heap_size := MinHeapSize, min_bin_vheap_size := MinBinVheapSize, + system := System, names := #{wal := WalName, - open_mem_tbls := MemTablesName} = Names} = + open_mem_tbls := MemTablesName} = Names0} = merge_conf_defaults(Conf0), ?NOTICE("WAL: ~ts init, mem-tables table name: ~w", [WalName, MemTablesName]), @@ -275,10 +276,11 @@ init(#{dir := Dir} = Conf0) -> % given ra_log_wal is effectively a fan-in sink it is likely that it will % at times receive large number of messages from a large number of % writers + Names = Names0#{system => System}, process_flag(message_queue_data, off_heap), process_flag(min_bin_vheap_size, MinBinVheapSize), process_flag(min_heap_size, MinHeapSize), - CRef = ra_counters:new(WalName, ?COUNTER_FIELDS), + CRef = ra_counters:new(System, WalName, ?COUNTER_FIELDS), Conf = #conf{dir = Dir, segment_writer = SegWriter, compute_checksums = ComputeChecksums, @@ -325,7 +327,7 @@ terminate(Reason, State) -> format_status(#state{conf = #conf{write_strategy = Strat, sync_method = SyncMeth, compute_checksums = Cs, - names = #{wal := WalName}, + names = #{system := System, wal := WalName}, max_size_bytes = MaxSize}, writers = Writers, wal = #wal{file_size = FSize, @@ -337,7 +339,7 @@ format_status(#state{conf = #conf{write_strategy = Strat, filename => filename:basename(Fn), current_size => FSize, max_size_bytes => MaxSize, - counters => ra_counters:overview(WalName) + counters => ra_counters:overview(System, WalName) }. %% Internal diff --git a/src/ra_metrics_ets.erl b/src/ra_metrics_ets.erl index df2ca7b2..4a3f6e95 100644 --- a/src/ra_metrics_ets.erl +++ b/src/ra_metrics_ets.erl @@ -38,7 +38,6 @@ init([]) -> {write_concurrency, true}, public], _ = ets:new(ra_log_metrics, [set | TableFlags]), - ok = ra_counters:init(), ok = ra_leaderboard:init(), %% Table for ra processes to record their current snapshot index so that diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 1c725cbd..0f8aa6d0 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -311,12 +311,13 @@ init(Config) -> {ok, recover, State, [{next_event, cast, go}]}. do_init(#{id := Id, - cluster_name := ClusterName} = Config0) -> + cluster_name := ClusterName, + system_config := #{name := System}} = Config0) -> Key = ra_lib:ra_server_id_to_local_name(Id), true = ets:insert(ra_state, {Key, init, unknown}), process_flag(trap_exit, true), Config = #{counter := Counter, - system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(Id), + system_config := #{names := Names} = SysConf} = maps:merge(config_defaults(System, Id), Config0), MsgQData = maps:get(message_queue_data, SysConf, off_heap), MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf, @@ -1048,7 +1049,8 @@ handle_event(_EventType, EventContent, StateName, State) -> terminate(Reason, StateName, #state{conf = #conf{name = Key, cluster_name = ClusterName}, - server_state = ServerState = #{cfg := #cfg{metrics_key = MetricsKey}}} = State) -> + server_state = ServerState = #{cfg := #cfg{metrics_key = MetricsKey, + system_config = #{name := System}}}} = State) -> ?DEBUG("~ts: terminating with ~w in state ~w", [log_id(State), Reason, StateName]), #{names := #{server_sup := SrvSup, @@ -1068,7 +1070,7 @@ terminate(Reason, StateName, catch ra_directory:unregister_name(Names, UId), _ = ra_server:terminate(ServerState, Reason), catch ra_log_meta:delete_sync(MetaName, UId), - catch ra_counters:delete(Id), + catch ra_counters:delete(System, Id), Self = self(), %% we have to terminate the child spec from the supervisor as it %% won't do this automatically, even for transient children @@ -1768,10 +1770,10 @@ gen_statem_safe_call(ServerId, Msg, Timeout) -> do_state_query(QueryName, #state{server_state = State}) -> ra_server:state_query(QueryName, State). -config_defaults(ServerId) -> - Counter = case ra_counters:fetch(ServerId) of +config_defaults(System, ServerId) -> + Counter = case ra_counters:fetch(System, ServerId) of undefined -> - ra_counters:new(ServerId, + ra_counters:new(System, ServerId, {persistent_term, ?FIELDSPEC_KEY}); C -> C diff --git a/src/ra_server_sup_sup.erl b/src/ra_server_sup_sup.erl index dcfc964e..8c3bf0cd 100644 --- a/src/ra_server_sup_sup.erl +++ b/src/ra_server_sup_sup.erl @@ -188,7 +188,7 @@ delete_server_rpc(System, RaName) -> catch ets:delete(ra_metrics, RaName), catch ets:delete(ra_state, RaName), catch ets:delete(ra_open_file_metrics, Pid), - catch ra_counters:delete({RaName, node()}), + catch ra_counters:delete(System, {RaName, node()}), catch ra_leaderboard:clear(RaName), ok end. diff --git a/src/ra_system.erl b/src/ra_system.erl index 68a0664e..1e49bc56 100644 --- a/src/ra_system.erl +++ b/src/ra_system.erl @@ -15,7 +15,8 @@ stop_default/0 ]). --type names() :: #{wal := atom(), +-type names() :: #{system := atom(), + wal := atom(), wal_sup := atom(), log_sup := atom(), log_ets := atom(), @@ -146,7 +147,8 @@ default_config() -> low_priority_commands_flush_size => LowPriorityCommandsFlushSize, low_priority_commands_in_memory_size => LowPriorityInMemSize, machine_upgrade_strategy => MachineUpgradeStrategy, - names => #{wal => ra_log_wal, + names => #{system => default, + wal => ra_log_wal, wal_sup => ra_log_wal_sup, log_sup => ra_log_sup, log_ets => ra_log_ets, @@ -159,7 +161,8 @@ default_config() -> }}. derive_names(SysName) when is_atom(SysName) -> - #{wal => derive(SysName, <<"log_wal">>), + #{system => SysName, + wal => derive(SysName, <<"log_wal">>), wal_sup => derive(SysName, <<"log_wal_sup">>), log_sup => derive(SysName, <<"log_sup">>), log_ets => derive(SysName, <<"log_ets">>), diff --git a/src/ra_system_sup.erl b/src/ra_system_sup.erl index ab00ccdb..addd37ed 100644 --- a/src/ra_system_sup.erl +++ b/src/ra_system_sup.erl @@ -40,6 +40,7 @@ init(#{data_dir := DataDir, start => {ra_server_sup_sup, start_link, [Cfg]}}, Recover = #{id => ra_system_recover, start => {ra_system_recover, start_link, [maps:get(name, Cfg)]}}, + ra_counters:init(Name), {ok, {SupFlags, [Ets, RaLogSup, RaServerSupSup, Recover]}}; {error, Code} -> ?ERR("Failed to create Ra data directory at '~ts', file system operation error: ~p", [DataDir, Code]), diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 15f681e0..7d3f3b84 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -558,7 +558,7 @@ nonvoter_catches_up(Config) -> ?assertMatch(#{Group := #{membership := promotable}}, rpc:call(NodeC, ra_directory, overview, [?SYS])), ?assertMatch(#{membership := promotable}, - ra:key_metrics(C)), + ra:key_metrics(?SYS, C)), ?assertMatch({ok, #{membership := promotable}, _}, ra:member_overview(C)), @@ -570,7 +570,7 @@ nonvoter_catches_up(Config) -> ?assertMatch(#{Group := #{membership := voter}}, rpc:call(NodeC, ra_directory, overview, [?SYS])), ?assertMatch(#{membership := voter}, - ra:key_metrics(C)), + ra:key_metrics(?SYS, C)), stop_peers(Peers), ok. @@ -594,7 +594,7 @@ nonvoter_catches_up_after_restart(Config) -> ?assertMatch(#{Group := #{membership := promotable}}, rpc:call(NodeC, ra_directory, overview, [?SYS])), ?assertMatch(#{membership := promotable}, - ra:key_metrics(C)), + ra:key_metrics(?SYS, C)), ?assertMatch({ok, #{membership := promotable}, _}, ra:member_overview(C)), ok = ra:stop_server(?SYS, C), @@ -608,7 +608,7 @@ nonvoter_catches_up_after_restart(Config) -> ?assertMatch(#{Group := #{membership := voter}}, rpc:call(NodeC, ra_directory, overview, [?SYS])), ?assertMatch(#{membership := voter}, - ra:key_metrics(C)), + ra:key_metrics(?SYS, C)), stop_peers(Peers), ok. @@ -632,7 +632,7 @@ nonvoter_catches_up_after_leader_restart(Config) -> ?assertMatch(#{Group := #{membership := promotable}}, rpc:call(NodeC, ra_directory, overview, [?SYS])), ?assertMatch(#{membership := promotable}, - ra:key_metrics(C)), + ra:key_metrics(?SYS, C)), ?assertMatch({ok, #{membership := promotable}, _}, ra:member_overview(C)), ok = ra:stop_server(?SYS, Leader), @@ -646,7 +646,7 @@ nonvoter_catches_up_after_leader_restart(Config) -> ?assertMatch(#{Group := #{membership := voter}}, rpc:call(NodeC, ra_directory, overview, [?SYS])), ?assertMatch(#{membership := voter}, - ra:key_metrics(C)), + ra:key_metrics(?SYS, C)), stop_peers(Peers), ok. @@ -669,7 +669,7 @@ key_metrics(Config) -> timer:sleep(100), TestId = lists:last(Started), ok = ra:stop_server(?SYS, TestId), - StoppedMetrics = ra:key_metrics(TestId), + StoppedMetrics = ra:key_metrics(?SYS, TestId), ct:pal("StoppedMetrics ~p", [StoppedMetrics]), ?assertMatch(#{state := noproc, last_applied := LA, @@ -683,12 +683,12 @@ key_metrics(Config) -> {ok, _, _} = ra:process_command(Leader, {data, Data}), await_condition( fun () -> - Metrics = ra:key_metrics(TestId), + Metrics = ra:key_metrics(?SYS, TestId), ct:pal("FollowerMetrics ~p", [Metrics]), follower == maps:get(state, Metrics) end, 200), [begin - M = ra:key_metrics(S), + M = ra:key_metrics(?SYS, S), ct:pal("Metrics ~p", [M]), ?assertMatch(#{state := _, last_applied := LA, @@ -827,7 +827,7 @@ recover_from_checkpoint(Config) -> checkpoints_promoted := 0 } when B > 0, ct_rpc:call(N, ra_counters, counters, - [ServerId, CounterKeys])) + [?SYS, ServerId, CounterKeys])) end || {_, N} = ServerId <- ServerIds], @@ -867,7 +867,7 @@ recover_from_checkpoint(Config) -> checkpoints_promoted := 1 } when B > 0, ct_rpc:call(N, ra_counters, counters, - [ServerId, CounterKeys])) + [?SYS, ServerId, CounterKeys])) end || {_, N} = ServerId <- ServerIds], %% Restart the servers: the servers should be able to recover from the %% snapshot which was promoted from a checkpoint. @@ -1194,13 +1194,13 @@ stopped_wal_causes_leader_change(Config, RecoverStrat) -> %% kill the wal until the system crashes and the current member is terminated %% and another leader is elected - #{term := Term} = ra:key_metrics(Follower), + #{term := Term} = ra:key_metrics(?SYS, Follower), await_condition(fun () -> WalPid = ct_rpc:call(LeaderNode, erlang, whereis, [ra_log_wal]), true = ct_rpc:call(LeaderNode, erlang, exit, [WalPid, kill]), - #{term := T} = ra:key_metrics(Follower), + #{term := T} = ra:key_metrics(?SYS, Follower), T > Term andalso (begin P = ct_rpc:call(LeaderNode, erlang, whereis, [LeaderName]),% [ra_log_wal]), diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 146a237d..a438a32b 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -163,7 +163,7 @@ pipeline_commands(Config) -> ?assertMatch(#{last_index := I, last_applied := I, last_written_index := I, - commit_index := I}, ra:key_metrics(N1)), + commit_index := I}, ra:key_metrics(?SYS, N1)), terminate_cluster([N1]). stop_server_idemp(Config) -> diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 6c5816c2..fc0acc5d 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -9,6 +9,8 @@ %% %% +-define(SYS, default). + all() -> [ {group, random}, @@ -251,8 +253,8 @@ delete_during_segment_flush(Config) -> ok. read_one(Config) -> - ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS), - Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)}), + ra_counters:new(?SYS, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS), + Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?SYS, ?FUNCTION_NAME)}), Log1 = append_n(1, 2, 1, Log0), % Log1 = ra_log:append({1, 1, <<1:64/integer>>}, Log0), % ensure the written event is delivered @@ -260,7 +262,7 @@ read_one(Config) -> {[_], Log} = ra_log_take(1, 1, Log2), % read out of range #{?FUNCTION_NAME := #{read_mem_table := M1, - read_segment := M2}} = ra_counters:overview(), + read_segment := M2}} = ra_counters:overview(?SYS), % read two entries ?assertEqual(1, M1 + M2), ra_log:close(Log), @@ -286,8 +288,8 @@ take_after_overwrite_and_init(Config) -> validate_sequential_fold(Config) -> - ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS), - Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME), + ra_counters:new(?SYS, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS), + Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?SYS, ?FUNCTION_NAME), max_open_segments => 2}), % write 1000 entries Log1 = append_and_roll(1, 500, 1, Log0), @@ -311,7 +313,7 @@ validate_sequential_fold(Config) -> #{?FUNCTION_NAME := #{read_mem_table := M1, open_segments := 2, %% as this is the max - read_segment := M4} = O} = ra_counters:overview(), + read_segment := M4} = O} = ra_counters:overview(?SYS), ct:pal("counters ~p", [O]), ?assertEqual(1000, M1 + M4), @@ -319,8 +321,8 @@ validate_sequential_fold(Config) -> ok. validate_reads_for_overlapped_writes(Config) -> - ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS), - Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME) + ra_counters:new(?SYS, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS), + Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?SYS, ?FUNCTION_NAME) }), % write a segment and roll 1 - 299 - term 1 Log1 = write_and_roll(1, 300, 1, Log0), @@ -342,11 +344,11 @@ validate_reads_for_overlapped_writes(Config) -> Log8 = validate_fold(200, 550, 2, Log7), #{?FUNCTION_NAME := #{read_mem_table := M1, - read_segment := M2}} = ra_counters:overview(), + read_segment := M2}} = ra_counters:overview(?SYS), ?assertEqual(550, M1 + M2), ra_log:close(Log8), %% re open to test init with overlapping segments - Log = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)}), + Log = ra_log_init(Config, #{counter => ra_counters:fetch(?SYS, ?FUNCTION_NAME)}), ra_log:close(Log), ok. @@ -642,9 +644,9 @@ writes_lower_than_snapshot_index_are_dropped(Config) -> ok. updated_segment_can_be_read(Config) -> - ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS), + ra_counters:new(?SYS, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS), Log0 = ra_log_init(Config, - #{counter => ra_counters:fetch(?FUNCTION_NAME), + #{counter => ra_counters:fetch(?SYS, ?FUNCTION_NAME), min_snapshot_interval => 1}), %% append a few entries Log2 = append_and_roll(1, 5, 1, Log0), @@ -660,7 +662,7 @@ updated_segment_can_be_read(Config) -> ?assertEqual(15, length(Entries1)), ct:pal("Entries: ~p", [Entries]), ct:pal("Entries1: ~p", [Entries1]), - ct:pal("Counters ~p", [ra_counters:overview(?FUNCTION_NAME)]), + ct:pal("Counters ~p", [ra_counters:overview(?SYS, ?FUNCTION_NAME)]), ?assertEqual(15, length(Entries1)), % l18 = length(Entries1), ok. diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 002089a9..49e53034 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -59,10 +59,10 @@ init_per_testcase(TestCase, Config) -> logger:set_primary_config(level, all), PrivDir = ?config(priv_dir, Config), Dir = filename:join(PrivDir, TestCase), - SysCfg = ra_system:default_config(), + #{name := System} = SysCfg = ra_system:default_config(), ra_system:store(SysCfg), _ = ra_log_ets:start_link(SysCfg), - ra_counters:init(), + ra_counters:init(System), UId = atom_to_binary(TestCase, utf8), ok = ra_directory:register_name(default, UId, self(), undefined, TestCase, TestCase), diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index a538a52a..b25a126e 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -81,7 +81,7 @@ init_per_group(Group, Config) -> SysCfg = (ra_system:default_config())#{data_dir => Dir}, ra_system:store(SysCfg), ra_directory:init(?SYS), - ra_counters:init(), + ra_counters:init(?SYS), % application:ensure_all_started(lg), {SyncMethod, WriteStrat} = case Group of @@ -106,12 +106,13 @@ init_per_testcase(TestCase, Config) -> Sys = ?config(sys_cfg, Config), Dir = filename:join([PrivDir, G, M, TestCase]), {ok, Ets} = ra_log_ets:start_link(Sys), - ra_counters:init(), + ra_counters:init(?SYS), UId = atom_to_binary(TestCase, utf8), ok = ra_directory:register_name(default, UId, self(), undefined, TestCase, TestCase), Names = maps:get(names, Sys), WalConf = #{dir => Dir, + system => ?SYS, name => ra_log_wal, names => Names, write_strategy => G,