Skip to content

Commit 177855c

Browse files
committed
WIP: Introduce Ra worker process - first draft
Responsibilities: * Writing snapshots * Writing and promoting checkpoints * Compaction
1 parent a9d5065 commit 177855c

14 files changed

+420
-227
lines changed

src/ra.hrl

+2
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@
9494
%% A member of the cluster from which replies should be sent.
9595
-type ra_reply_from() :: leader | local | {member, ra_server_id()}.
9696

97+
-type mfargs() :: {M :: module(), F :: atom(), A :: [term()]}.
98+
9799
-define(RA_PROTO_VERSION, 1).
98100
%% the protocol version should be incremented whenever extensions need to be
99101
%% done to the core protocol records (below). It is only ever exchanged by the

src/ra_bench.erl

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
% profile/0,
2626
% stop_profile/0
27+
start/2,
2728

2829
prepare/0,
2930
run/3,

src/ra_log.erl

+20-14
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,12 @@ handle_event({snapshot_written, {Idx, Term} = Snap, SnapKind},
774774
ra_snapshot:directory(SnapState, SnapKind),
775775
Snap}],
776776
{State0, Effects};
777+
handle_event({snapshot_error, Snap, SnapKind, Error},
778+
#?MODULE{cfg =#cfg{log_id = LogId},
779+
snapshot_state = SnapState0} = State0) ->
780+
?INFO("~ts: snapshot error for ~w ~s ", [LogId, Snap, SnapKind]),
781+
SnapState = ra_snapshot:handle_error(Snap, Error, SnapState0),
782+
{State0#?MODULE{snapshot_state = SnapState}, []};
777783
handle_event({resend_write, Idx},
778784
#?MODULE{cfg =#cfg{log_id = LogId}} = State) ->
779785
% resend missing entries from mem tables.
@@ -891,7 +897,7 @@ suggest_snapshot(SnapKind, Idx, Cluster, MacVersion, MacState,
891897
promote_checkpoint(Idx, #?MODULE{cfg = Cfg,
892898
snapshot_state = SnapState0} = State) ->
893899
case ra_snapshot:pending(SnapState0) of
894-
{_WriterPid, _IdxTerm, snapshot} ->
900+
{_IdxTerm, snapshot} ->
895901
%% If we're currently writing a snapshot, skip promoting a
896902
%% checkpoint.
897903
{State, []};
@@ -1086,24 +1092,24 @@ read_config(Dir) ->
10861092
delete_everything(#?MODULE{cfg = #cfg{uid = UId,
10871093
names = Names,
10881094
directory = Dir},
1089-
snapshot_state = SnapState} = Log) ->
1095+
snapshot_state = _SnapState} = Log) ->
10901096
_ = close(Log),
10911097
%% if there is a snapshot process pending it could cause the directory
10921098
%% deletion to fail, best kill the snapshot process first
10931099
ok = ra_log_ets:delete_mem_tables(Names, UId),
10941100
catch ets:delete(ra_log_snapshot_state, UId),
1095-
case ra_snapshot:pending(SnapState) of
1096-
{Pid, _, _} ->
1097-
case is_process_alive(Pid) of
1098-
true ->
1099-
exit(Pid, kill),
1100-
ok;
1101-
false ->
1102-
ok
1103-
end;
1104-
_ ->
1105-
ok
1106-
end,
1101+
% case ra_snapshot:pending(SnapState) of
1102+
% {Pid, _, _} ->
1103+
% case is_process_alive(Pid) of
1104+
% true ->
1105+
% exit(Pid, kill),
1106+
% ok;
1107+
% false ->
1108+
% ok
1109+
% end;
1110+
% _ ->
1111+
% ok
1112+
% end,
11071113
try ra_lib:recursive_delete(Dir) of
11081114
ok -> ok
11091115
catch

src/ra_server.erl

+23-20
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@
178178
{notify, #{pid() => [term()]}} |
179179
%% used for tracking valid leader messages
180180
{record_leader_msg, ra_server_id()} |
181-
start_election_timeout.
181+
start_election_timeout |
182+
{bg_work, fun(() -> ok) | mfargs()}.
182183

183184
-type effects() :: [effect()].
184185

@@ -232,7 +233,8 @@
232233
counter => counters:counters_ref(),
233234
membership => ra_membership(),
234235
system_config => ra_system:config(),
235-
has_changed => boolean()
236+
has_changed => boolean(),
237+
parent => term() %% the supervisor
236238
}.
237239

238240
-type ra_server_info_key() :: machine_version | atom().
@@ -1542,8 +1544,8 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15421544
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b",
15431545
[LogId, Num, ChunkFlag, SnapIndex, SnapTerm]),
15441546
SnapState0 = ra_log:snapshot_state(Log0),
1545-
{ok, SnapState} = ra_snapshot:accept_chunk(Data, Num, ChunkFlag,
1546-
SnapState0),
1547+
{ok, SnapState, Effs0} = ra_snapshot:accept_chunk(Data, Num, ChunkFlag,
1548+
SnapState0),
15471549
Reply = #install_snapshot_result{term = CurTerm,
15481550
last_term = SnapTerm,
15491551
last_index = SnapIndex},
@@ -1597,11 +1599,12 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15971599
%% it was the last snapshot chunk so we can revert back to
15981600
%% follower status
15991601
{follower, persist_last_applied(State), [{reply, Reply} |
1600-
Effs ++ SnapInstalledEffs]};
1602+
Effs0 ++ Effs ++
1603+
SnapInstalledEffs]};
16011604
next ->
16021605
Log = ra_log:set_snapshot_state(SnapState, Log0),
16031606
State = update_term(Term, State0#{log => Log}),
1604-
{receive_snapshot, State, [{reply, Reply}]}
1607+
{receive_snapshot, State, [{reply, Reply} | Effs0]}
16051608
end;
16061609
handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg,
16071610
#{current_term := CurTerm,
@@ -2295,20 +2298,20 @@ handle_down(RaftState, snapshot_sender, Pid, Info,
22952298
"~ts: Snapshot sender process ~w exited with ~W",
22962299
[LogId, Pid, Info, 10]),
22972300
{leader, peer_snapshot_process_exited(Pid, State), []};
2298-
handle_down(RaftState, snapshot_writer, Pid, Info,
2299-
#{cfg := #cfg{log_id = LogId}, log := Log0} = State)
2300-
when is_pid(Pid) ->
2301-
case Info of
2302-
noproc -> ok;
2303-
normal -> ok;
2304-
_ ->
2305-
?WARN("~ts: Snapshot write process ~w exited with ~w",
2306-
[LogId, Pid, Info])
2307-
end,
2308-
SnapState0 = ra_log:snapshot_state(Log0),
2309-
SnapState = ra_snapshot:handle_down(Pid, Info, SnapState0),
2310-
Log = ra_log:set_snapshot_state(SnapState, Log0),
2311-
{RaftState, State#{log => Log}, []};
2301+
% handle_down(RaftState, snapshot_writer, Pid, Info,
2302+
% #{cfg := #cfg{log_id = LogId}, log := Log0} = State)
2303+
% when is_pid(Pid) ->
2304+
% case Info of
2305+
% noproc -> ok;
2306+
% normal -> ok;
2307+
% _ ->
2308+
% ?WARN("~ts: Snapshot write process ~w exited with ~w",
2309+
% [LogId, Pid, Info])
2310+
% end,
2311+
% SnapState0 = ra_log:snapshot_state(Log0),
2312+
% SnapState = ra_snapshot:handle_error(Pid, Info, SnapState0),
2313+
% Log = ra_log:set_snapshot_state(SnapState, Log0),
2314+
% {RaftState, State#{log => Log}, []};
23122315
handle_down(RaftState, log, Pid, Info, #{log := Log0} = State) ->
23132316
{Log, Effects} = ra_log:handle_event({down, Pid, Info}, Log0),
23142317
{RaftState, State#{log => Log}, Effects};

src/ra_server_proc.erl

+29-11
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@
146146
receive_snapshot_timeout = ?DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT :: non_neg_integer(),
147147
install_snap_rpc_timeout :: non_neg_integer(),
148148
aten_poll_interval = 1000 :: non_neg_integer(),
149-
counter :: undefined | counters:counters_ref()
149+
counter :: undefined | counters:counters_ref(),
150+
worker_pid :: pid()
150151
}).
151152

152153
-record(state, {conf :: #conf{},
@@ -301,17 +302,13 @@ multi_statem_call([ServerId | ServerIds], Msg, Errs, Timeout) ->
301302
%%%===================================================================
302303

303304
init(#{reply_to := ReplyTo} = Config) ->
304-
%% we have a reply to key, perform init async
305305
{ok, post_init, maps:remove(reply_to, Config),
306-
[{next_event, internal, {go, ReplyTo}}]};
307-
init(Config) ->
308-
%% no reply_to key, must have been started by an older node run synchronous
309-
%% init
310-
State = do_init(Config),
311-
{ok, recover, State, [{next_event, cast, go}]}.
306+
[{next_event, internal, {go, ReplyTo}}]}.
312307

313308
do_init(#{id := Id,
314-
cluster_name := ClusterName} = Config0) ->
309+
parent := ParentPid,
310+
cluster_name := ClusterName} = Config0)
311+
when is_pid(ParentPid) ->
315312
Key = ra_lib:ra_server_id_to_local_name(Id),
316313
true = ets:insert(ra_state, {Key, init, unknown}),
317314
process_flag(trap_exit, true),
@@ -362,6 +359,16 @@ do_init(#{id := Id,
362359
ReceiveSnapshotTimeout = maps:get(receive_snapshot_timeout, SysConf,
363360
?DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT),
364361
AtenPollInt = application:get_env(aten, poll_interval, 1000),
362+
%% TODO: full error handling
363+
WorkerPid = case ra_server_sup:start_ra_worker(ParentPid, Config) of
364+
{ok, P} -> P;
365+
{error, {already_started, P}} ->
366+
P
367+
end,
368+
ra_env:configure_logger(logger),
369+
%% monitor worker process, it is easier to handle than linking as we're
370+
%% already processing all downs
371+
_ = monitor(process, WorkerPid),
365372
State = #state{conf = #conf{log_id = LogId,
366373
cluster_name = ClusterName,
367374
name = Key,
@@ -373,7 +380,8 @@ do_init(#{id := Id,
373380
install_snap_rpc_timeout = InstallSnapRpcTimeout,
374381
receive_snapshot_timeout = ReceiveSnapshotTimeout,
375382
aten_poll_interval = AtenPollInt,
376-
counter = Counter},
383+
counter = Counter,
384+
worker_pid = WorkerPid},
377385
low_priority_commands = ra_ets_queue:new(),
378386
server_state = ServerState},
379387
ok = net_kernel:monitor_nodes(true, [nodedown_reason]),
@@ -1513,7 +1521,7 @@ handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}},
15131521
SS = ra_server:update_peer(To, #{status => disconnected}, SS0),
15141522
{State0#state{server_state = SS}, Actions}
15151523
end;
1516-
handle_effect(_, {delete_snapshot, Dir, SnapshotRef}, _, State0, Actions) ->
1524+
handle_effect(_, {delete_snapshot, Dir, SnapshotRef}, _, State0, Actions) ->
15171525
%% delete snapshots in separate process
15181526
_ = spawn(fun() ->
15191527
ra_snapshot:delete(Dir, SnapshotRef)
@@ -1604,6 +1612,11 @@ handle_effect(follower, {record_leader_msg, _LeaderId}, _, State0, Actions) ->
16041612
handle_effect(_, {record_leader_msg, _LeaderId}, _, State0, Actions) ->
16051613
%% non follower states don't need to reset state timeout after an effect
16061614
{State0, Actions};
1615+
handle_effect(_, {bg_work, FunOrMfa, ErrFun}, _,
1616+
#state{conf = #conf{worker_pid = WorkerPid}} = State0, Actions) ->
1617+
%% non follower states don't need to reset state timeout after an effect
1618+
ra_worker:queue_work(WorkerPid, FunOrMfa, ErrFun),
1619+
{State0, Actions};
16071620
handle_effect(_, _, _, State0, Actions) ->
16081621
{State0, Actions}.
16091622

@@ -2018,6 +2031,11 @@ handle_node_status_change(Node, Status, InfoList, RaftState,
20182031
monitors = Monitors}),
20192032
{keep_state, State, Actions}.
20202033

2034+
handle_process_down(Pid, Info, _RaftState,
2035+
#state{conf = #conf{worker_pid = Pid}} = State) ->
2036+
?WARN("~ts: worker exited with ~w",
2037+
[log_id(State), Info]),
2038+
{stop, Info, State};
20212039
handle_process_down(Pid, Info, RaftState,
20222040
#state{monitors = Monitors0,
20232041
pending_notifys = Nots,

src/ra_server_sup.erl

+22-10
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
%% API functions
1313
-export([start_link/1]).
14+
-export([start_ra_worker/2]).
1415

1516
%% Supervisor callbacks
1617
-export([init/1]).
@@ -20,28 +21,39 @@
2021
%%%===================================================================
2122

2223
start_link(Config) ->
23-
supervisor:start_link(?MODULE, [Config]).
24+
supervisor:start_link(?MODULE, Config).
25+
26+
-spec start_ra_worker(pid(), ra_server:config()) ->
27+
supervisor:startchild_ret().
28+
start_ra_worker(SupPid, Config)
29+
when is_pid(SupPid) andalso
30+
is_map(Config) ->
31+
RaWorker = #{id => ra_worker,
32+
type => worker,
33+
restart => transient,
34+
start => {ra_worker, start_link, [Config]}},
35+
supervisor:start_child(SupPid, RaWorker).
2436

2537
%%%===================================================================
2638
%%% Supervisor callbacks
2739
%%%===================================================================
2840

2941
%%--------------------------------------------------------------------
3042

31-
init([Config0]) ->
43+
init(Config0) ->
3244
Id = maps:get(id, Config0),
3345
Config = Config0#{parent => self()},
3446
Name = ra_lib:ra_server_id_to_local_name(Id),
35-
SupFlags = #{strategy => one_for_one,
47+
SupFlags = #{strategy => one_for_all,
3648
intensity => 2,
3749
period => 5},
38-
ChildSpec = #{id => Name,
39-
type => worker,
40-
% needs to be transient as may shut itself down by returning
41-
% {stop, normal, State}
42-
restart => transient,
43-
start => {ra_server_proc, start_link, [Config]}},
44-
{ok, {SupFlags, [ChildSpec]}}.
50+
RaServer = #{id => Name,
51+
type => worker,
52+
% needs to be transient as may shut itself down by returning
53+
% {stop, normal, State}
54+
restart => transient,
55+
start => {ra_server_proc, start_link, [Config]}},
56+
{ok, {SupFlags, [RaServer]}}.
4557

4658
%%%===================================================================
4759
%%% Internal functions

0 commit comments

Comments
 (0)