@@ -579,8 +579,13 @@ leader(info, {Status, Node, InfoList}, State0)
579579 when Status =:= nodedown orelse
580580 Status =:= nodeup ->
581581 handle_node_status_change (Node , Status , InfoList , ? FUNCTION_NAME , State0 );
582- leader (info , {update_peer , PeerId , Update }, State0 ) ->
583- State = update_peer (PeerId , Update , State0 ),
582+ leader (info , {unsuspend_peer , PeerId }, State0 ) ->
583+ State = case ra_server :peer_status (PeerId , State0 # state .server_state ) of
584+ suspended ->
585+ update_peer (PeerId , #{status => normal }, State0 );
586+ _ ->
587+ State0
588+ end ,
584589 {keep_state , State , []};
585590leader (_ , tick_timeout , State0 ) ->
586591 {State1 , RpcEffs } = make_rpcs (State0 ),
@@ -1393,13 +1398,16 @@ handle_effects(RaftState, Effects0, EvtType, State0, Actions0) ->
13931398 {State , lists :reverse (Actions )}.
13941399
13951400handle_effect (_RaftState , {send_rpc , To , Rpc }, _ ,
1396- # state {conf = Conf } = State0 , Actions ) ->
1401+ # state {conf = Conf ,
1402+ server_state = SS } = State0 , Actions ) ->
13971403 % fully qualified use only so that we can mock it for testing
13981404 % TODO: review / refactor to remove the mod call here
1405+ PeerStatus = ra_server :peer_status (To , SS ),
13991406 case ? MODULE :send_rpc (To , Rpc , State0 ) of
14001407 ok ->
14011408 {State0 , Actions };
1402- nosuspend ->
1409+ nosuspend when PeerStatus == normal ->
1410+ incr_counter (Conf , ? C_RA_SRV_MSGS_SENT , 1 ),
14031411 % % update peer status to suspended and spawn a process
14041412 % % to send the rpc without nosuspend so that it will block until
14051413 % % the data can get through
@@ -1410,11 +1418,13 @@ handle_effect(_RaftState, {send_rpc, To, Rpc}, _,
14101418 % % the peer status back to normal
14111419 ok = gen_statem :cast (To , Rpc ),
14121420 incr_counter (Conf , ? C_RA_SRV_MSGS_SENT , 1 ),
1413- Self ! {update_peer , To , #{ status => normal } }
1421+ Self ! {unsuspend_peer , To }
14141422 end ),
1415- ? DEBUG (" ~ts : temporarily suspending peer ~w due to full distribution buffer" ,
1416- [log_id (State0 ), To ]),
1423+ ? DEBUG (" ~ts : temporarily suspending peer ~w due to full distribution buffer ~W " ,
1424+ [log_id (State0 ), To , Rpc , 5 ]),
14171425 {update_peer (To , #{status => suspended }, State0 ), Actions };
1426+ nosuspend ->
1427+ {State0 , Actions };
14181428 noconnect ->
14191429 % % for noconnects just allow it to pipeline and catch up later
14201430 {State0 , Actions }
@@ -1976,6 +1986,8 @@ send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
19761986 Result = read_chunks_and_send_rpc (RPC , To , ReadState , 1 ,
19771987 ChunkSize , InstallTimeout ,
19781988 SnapState ),
1989+ ? DEBUG (" ~ts : sending snapshot to ~w completed" ,
1990+ [LogId , To ]),
19791991 ok = gen_statem :cast (Id , {To , Result })
19801992 end .
19811993
0 commit comments