diff --git a/docs/network-spec/miniprotocols.tex b/docs/network-spec/miniprotocols.tex index 23678d64b17..deb49dfc2ee 100644 --- a/docs/network-spec/miniprotocols.tex +++ b/docs/network-spec/miniprotocols.tex @@ -872,7 +872,7 @@ \subsection{Timeouts per state} \header{state} & \header{timeout} \\\hline \StIdle & \texttt{3673}s \\ \StCanAwait & \texttt{10}s \\ - \StMustReply & random between \texttt{135}s and \texttt{269}s \\ + \StMustReply & random between \texttt{601}s and \texttt{911}s \\ \StIntersect & \texttt{10}s \\ \end{tabular} \caption{timeouts per state} diff --git a/ouroboros-network-api/src/Ouroboros/Network/PeerSelection/LedgerPeers/Type.hs b/ouroboros-network-api/src/Ouroboros/Network/PeerSelection/LedgerPeers/Type.hs index 553378d0e74..ef711b3bb3f 100644 --- a/ouroboros-network-api/src/Ouroboros/Network/PeerSelection/LedgerPeers/Type.hs +++ b/ouroboros-network-api/src/Ouroboros/Network/PeerSelection/LedgerPeers/Type.hs @@ -221,7 +221,7 @@ newtype AccPoolStakeCoded = AccPoolStakeCoded AccPoolStake data IsBigLedgerPeer = IsBigLedgerPeer | IsNotBigLedgerPeer - deriving Eq + deriving (Eq, Show) -- | Return ledger state information and ledger peers. -- diff --git a/ouroboros-network/CHANGELOG.md b/ouroboros-network/CHANGELOG.md index c711a9f391c..d4e4b5e7135 100644 --- a/ouroboros-network/CHANGELOG.md +++ b/ouroboros-network/CHANGELOG.md @@ -6,6 +6,11 @@ ### Non-breaking changes +* Limit the number of faulures to 5 before a peer that isn't a localroot, bootstrap peer or public root peer is forgotten. +* Decrease the time blockfetch waits for chainsync to exit in case of an error +* Increase the timeout for chainsync in state StMustReply to between 601 and 911 seconds. +* Ensure timeout to enter safe mode when enabling bootstrap peers is respected + ## 0.21.4.0 -- 2025-10-05 ### Non-breaking changes diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Testnet/Cardano.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Testnet/Cardano.hs index 6a78d2f7f80..3b061dbac80 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Testnet/Cardano.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Testnet/Cardano.hs @@ -1292,6 +1292,8 @@ prop_peer_selection_action_trace_coverage defaultBearerInfo diffScript = "PeerMonitoringResult " ++ show wspt peerSelectionActionsTraceMap (AcquireConnectionError e) = "AcquireConnectionError " ++ show e + peerSelectionActionsTraceMap (PeerHotDuration _id _dt) = + "PeerHotDuration" eventsSeenNames = map peerSelectionActionsTraceMap events @@ -2378,7 +2380,7 @@ prop_diffusion_target_established_local ioSimTrace traceNumber = (fromMaybe Set.empty) . Signal.fromEvents . Signal.selectEvents - (\case TracePromoteColdFailed _ _ peer _ _ -> + (\case TracePromoteColdFailed _ _ peer _ _ _ -> Just (Set.singleton peer) --TODO: what about TraceDemoteWarmDone ? -- these are also not immediate candidates @@ -3018,7 +3020,7 @@ prop_diffusion_async_demotions ioSimTrace traceNumber = Just $ Stop failures where failures = Set.singleton peeraddr - TracePromoteColdFailed _ _ peeraddr _ _ -> + TracePromoteColdFailed _ _ peeraddr _ _ _ -> Just $ Stop failures where failures = Set.singleton peeraddr @@ -3030,7 +3032,7 @@ prop_diffusion_async_demotions ioSimTrace traceNumber = Just $ Stop failures where failures = Set.singleton peeraddr - TracePromoteColdBigLedgerPeerFailed _ _ peeraddr _ _ -> + TracePromoteColdBigLedgerPeerFailed _ _ peeraddr _ _ _ -> Just $ Stop failures where failures = Set.singleton peeraddr @@ -3765,7 +3767,7 @@ prop_diffusion_peer_selection_actions_no_dodgy_traces ioSimTrace traceNumber = $ evs' numOfActiveColdErrors = length . filter (\case - (PeerStatusChangeFailure HotToWarm{} ActiveCold) + (PeerStatusChangeFailure HotToWarm{} ActiveCold{}) -> True _ -> False) $ evs' @@ -3790,7 +3792,7 @@ prop_diffusion_peer_selection_actions_no_dodgy_traces ioSimTrace traceNumber = . map (\case ev@( WithTime _ (PeerStatusChangeFailure (HotToWarm _) TimeoutError) - , WithTime _ (PeerStatusChangeFailure (HotToWarm _) ActiveCold) + , WithTime _ (PeerStatusChangeFailure (HotToWarm _) ActiveCold{}) ) -> counterexample (show ev) $ counterexample (unlines $ map show peerSelectionActionsEvents) @@ -3860,7 +3862,8 @@ prop_diffusion_peer_selection_actions_no_dodgy_traces ioSimTrace traceNumber = WithTime _ (PeerStatusChangeFailure type_ _) -> getConnId type_ WithTime _ (PeerMonitoringError connId _) -> Just connId WithTime _ (PeerMonitoringResult connId _) -> Just connId - WithTime _ (AcquireConnectionError _) -> Nothing) + WithTime _ (AcquireConnectionError _) -> Nothing + WithTime _ (PeerHotDuration connId _) -> Just connId) $ peerSelectionActionsEvents ) @@ -4750,4 +4753,3 @@ showBucket size a | a < size , show (a `div` size * size + size) , ")" ] - diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs index b03f90bda17..5c9b22f6c82 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection.hs @@ -2514,7 +2514,7 @@ prop_governor_target_established_below (MaxTime maxTime) env = (fromMaybe Set.empty) . Signal.fromEvents . Signal.selectEvents - (\case TracePromoteColdFailed _ _ peer _ _ -> + (\case TracePromoteColdFailed _ _ peer _ _ _ -> --TODO: the environment does not yet cause this to happen -- it requires synchronous failure in the establish action Just $! Set.singleton peer @@ -2623,7 +2623,7 @@ prop_governor_target_established_big_ledger_peers_below (MaxTime maxTime) env = (fromMaybe Set.empty) . Signal.fromEvents . Signal.selectEvents - (\case TracePromoteColdBigLedgerPeerFailed _ _ peer _ _ -> + (\case TracePromoteColdBigLedgerPeerFailed _ _ peer _ _ _ -> --TODO: the environment does not yet cause this to happen -- it requires synchronous failure in the establish action Just (Set.singleton peer) @@ -3261,7 +3261,7 @@ prop_governor_target_established_local (MaxTime maxTime) env = (fromMaybe Set.empty) . Signal.fromEvents . Signal.selectEvents - (\case TracePromoteColdFailed _ _ peer _ _ -> + (\case TracePromoteColdFailed _ _ peer _ _ _ -> --TODO: the environment does not yet cause this to happen -- it requires synchronous failure in the establish action Just (Set.singleton peer) @@ -4108,6 +4108,8 @@ _governorFindingPublicRoots targetNumberOfRootPeers readDomains readUseBootstrap policyPeerShareBatchWaitTime = 0, -- seconds policyPeerShareOverallTimeout = 0, -- seconds policyPeerShareActivationDelay = 2, -- seconds + policyMaxConnectionRetries = 5, + policyClearFailCountDelay = 120, --seconds policyErrorDelay = 0 -- seconds } pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr) diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/Cardano/MockEnvironment.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/Cardano/MockEnvironment.hs index 92f65b971dd..0baa4ad8354 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/Cardano/MockEnvironment.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/PeerSelection/Cardano/MockEnvironment.hs @@ -723,6 +723,8 @@ mockPeerSelectionPolicy GovernorMockEnvironment { policyPeerShareBatchWaitTime = 3, -- seconds policyPeerShareOverallTimeout = 10, -- seconds policyPeerShareActivationDelay = 300, -- seconds + policyMaxConnectionRetries = 5, + policyClearFailCountDelay = 120, -- seconds policyErrorDelay = 10 -- seconds } @@ -751,64 +753,64 @@ tracerTracePeerSelection = contramap f tracerTestTraceEvent -- make the tracer strict f :: TracePeerSelection extraState extraFlags extraPeers PeerAddr -> TestTraceEvent extraState extraFlags extraPeers extraCounters - f a@(TraceLocalRootPeersChanged !_ !_) = GovernorEvent a - f a@(TraceTargetsChanged !_ !_) = GovernorEvent a - f a@(TracePublicRootsRequest !_ !_) = GovernorEvent a - f a@(TracePublicRootsResults !_ !_ !_) = GovernorEvent a - f a@(TracePublicRootsFailure !_ !_ !_) = GovernorEvent a - f a@(TraceForgetColdPeers !_ !_ !_) = GovernorEvent a - f a@(TraceBigLedgerPeersRequest !_ !_) = GovernorEvent a - f a@(TraceBigLedgerPeersResults !_ !_ !_) = GovernorEvent a - f a@(TraceBigLedgerPeersFailure !_ !_ !_) = GovernorEvent a - f a@(TraceForgetBigLedgerPeers !_ !_ !_) = GovernorEvent a - f a@(TracePickInboundPeers !_ !_ !_ !_) = GovernorEvent a - f a@(TracePeerShareRequests !_ !_ !_ !_ !_) = GovernorEvent a - f a@(TracePeerShareResults !_) = GovernorEvent a - f a@(TracePeerShareResultsFiltered !_) = GovernorEvent a - f a@(TracePromoteColdPeers !_ !_ !_) = GovernorEvent a - f a@(TracePromoteColdLocalPeers !_ !_) = GovernorEvent a - f a@(TracePromoteColdFailed !_ !_ !_ !_ !_) = GovernorEvent a - f a@(TracePromoteColdDone !_ !_ !_) = GovernorEvent a - f a@(TracePromoteColdBigLedgerPeers !_ !_ !_) = GovernorEvent a - f a@(TracePromoteColdBigLedgerPeerFailed !_ !_ !_ !_ !_) = GovernorEvent a - f a@(TracePromoteColdBigLedgerPeerDone !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmPeers !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmLocalPeers !_ !_) = GovernorEvent a - f a@(TracePromoteWarmFailed !_ !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmDone !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmAborted !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmBigLedgerPeers !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmBigLedgerPeerFailed !_ !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmBigLedgerPeerDone !_ !_ !_) = GovernorEvent a - f a@(TracePromoteWarmBigLedgerPeerAborted !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteWarmPeers !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteWarmFailed !_ !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteWarmDone !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteWarmBigLedgerPeers !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteWarmBigLedgerPeerFailed !_ !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteWarmBigLedgerPeerDone !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteHotPeers !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteLocalHotPeers !_ !_) = GovernorEvent a - f a@(TraceDemoteHotFailed !_ !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteHotDone !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteHotBigLedgerPeers !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteHotBigLedgerPeerFailed !_ !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteHotBigLedgerPeerDone !_ !_ !_) = GovernorEvent a - f a@(TraceDemoteAsynchronous !_) = GovernorEvent a - f a@(TraceDemoteLocalAsynchronous !_) = GovernorEvent a - f a@(TraceDemoteBigLedgerPeersAsynchronous !_) = GovernorEvent a - f a@TraceGovernorWakeup = GovernorEvent a - f a@(TraceChurnWait !_) = GovernorEvent a - f a@(TraceChurnMode !_) = GovernorEvent a - f a@(TraceLedgerStateJudgementChanged !_) = GovernorEvent a - f a@TraceOnlyBootstrapPeers = GovernorEvent a - f a@TraceBootstrapPeersFlagChangedWhilstInSensitiveState = GovernorEvent a - f a@(TraceUseBootstrapPeersChanged !_) = GovernorEvent a - f a@(TraceOutboundGovernorCriticalFailure !_) = GovernorEvent a - f a@(TraceDebugState !_ !_) = GovernorEvent a - f a@(TraceChurnAction !_ !_ !_) = GovernorEvent a - f a@(TraceChurnTimeout !_ !_ !_) = GovernorEvent a - f a@(TraceVerifyPeerSnapshot !_) = GovernorEvent a + f a@(TraceLocalRootPeersChanged !_ !_) = GovernorEvent a + f a@(TraceTargetsChanged !_ !_) = GovernorEvent a + f a@(TracePublicRootsRequest !_ !_) = GovernorEvent a + f a@(TracePublicRootsResults !_ !_ !_) = GovernorEvent a + f a@(TracePublicRootsFailure !_ !_ !_) = GovernorEvent a + f a@(TraceForgetColdPeers !_ !_ !_) = GovernorEvent a + f a@(TraceBigLedgerPeersRequest !_ !_) = GovernorEvent a + f a@(TraceBigLedgerPeersResults !_ !_ !_) = GovernorEvent a + f a@(TraceBigLedgerPeersFailure !_ !_ !_) = GovernorEvent a + f a@(TraceForgetBigLedgerPeers !_ !_ !_) = GovernorEvent a + f a@(TracePickInboundPeers !_ !_ !_ !_) = GovernorEvent a + f a@(TracePeerShareRequests !_ !_ !_ !_ !_) = GovernorEvent a + f a@(TracePeerShareResults !_) = GovernorEvent a + f a@(TracePeerShareResultsFiltered !_) = GovernorEvent a + f a@(TracePromoteColdPeers !_ !_ !_) = GovernorEvent a + f a@(TracePromoteColdLocalPeers !_ !_) = GovernorEvent a + f a@(TracePromoteColdFailed !_ !_ !_ !_ !_ !_) = GovernorEvent a + f a@(TracePromoteColdDone !_ !_ !_) = GovernorEvent a + f a@(TracePromoteColdBigLedgerPeers !_ !_ !_) = GovernorEvent a + f a@(TracePromoteColdBigLedgerPeerFailed !_ !_ !_ !_ !_ !_) = GovernorEvent a + f a@(TracePromoteColdBigLedgerPeerDone !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmPeers !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmLocalPeers !_ !_) = GovernorEvent a + f a@(TracePromoteWarmFailed !_ !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmDone !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmAborted !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmBigLedgerPeers !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmBigLedgerPeerFailed !_ !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmBigLedgerPeerDone !_ !_ !_) = GovernorEvent a + f a@(TracePromoteWarmBigLedgerPeerAborted !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteWarmPeers !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteWarmFailed !_ !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteWarmDone !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteWarmBigLedgerPeers !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteWarmBigLedgerPeerFailed !_ !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteWarmBigLedgerPeerDone !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteHotPeers !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteLocalHotPeers !_ !_) = GovernorEvent a + f a@(TraceDemoteHotFailed !_ !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteHotDone !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteHotBigLedgerPeers !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteHotBigLedgerPeerFailed !_ !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteHotBigLedgerPeerDone !_ !_ !_) = GovernorEvent a + f a@(TraceDemoteAsynchronous !_) = GovernorEvent a + f a@(TraceDemoteLocalAsynchronous !_) = GovernorEvent a + f a@(TraceDemoteBigLedgerPeersAsynchronous !_) = GovernorEvent a + f a@TraceGovernorWakeup = GovernorEvent a + f a@(TraceChurnWait !_) = GovernorEvent a + f a@(TraceChurnMode !_) = GovernorEvent a + f a@(TraceLedgerStateJudgementChanged !_) = GovernorEvent a + f a@TraceOnlyBootstrapPeers = GovernorEvent a + f a@TraceBootstrapPeersFlagChangedWhilstInSensitiveState = GovernorEvent a + f a@(TraceUseBootstrapPeersChanged !_) = GovernorEvent a + f a@(TraceOutboundGovernorCriticalFailure !_) = GovernorEvent a + f a@(TraceDebugState !_ !_) = GovernorEvent a + f a@(TraceChurnAction !_ !_ !_) = GovernorEvent a + f a@(TraceChurnTimeout !_ !_ !_) = GovernorEvent a + f a@(TraceVerifyPeerSnapshot !_) = GovernorEvent a tracerDebugPeerSelection :: Tracer (IOSim s) (DebugPeerSelection Cardano.ExtraState PeerTrustable (Cardano.ExtraPeers PeerAddr) PeerAddr) tracerDebugPeerSelection = GovernorDebug `contramap` tracerTestTraceEvent diff --git a/ouroboros-network/src/Ouroboros/Cardano/Network/Diffusion/Handlers.hs b/ouroboros-network/src/Ouroboros/Cardano/Network/Diffusion/Handlers.hs index e836b556ff2..c7789baa1a4 100644 --- a/ouroboros-network/src/Ouroboros/Cardano/Network/Diffusion/Handlers.hs +++ b/ouroboros-network/src/Ouroboros/Cardano/Network/Diffusion/Handlers.hs @@ -39,6 +39,8 @@ sigUSR1Handler -> PeerSharing -> STM IO UseBootstrapPeers -> STM IO LedgerStateJudgement + -> (peerconn -> STM IO (Maybe Time)) + -- ^ return time when an active peer was promoted to a hot peer. -> ConnectionManager muxMode socket ntnAddr handle handleError IO -> StrictTVar IO (PeerSelectionState @@ -49,7 +51,7 @@ sigUSR1Handler -> IO () #ifdef POSIX sigUSR1Handler tracersExtra getUseLedgerPeers ownPeerSharing getBootstrapPeers - getLedgerStateJudgement connectionManager dbgStateVar metrics = do + getLedgerStateJudgement getPromotedHotTime connectionManager dbgStateVar metrics = do _ <- Signals.installHandler Signals.sigUSR1 (Signals.Catch @@ -66,7 +68,7 @@ sigUSR1Handler tracersExtra getUseLedgerPeers ownPeerSharing getBootstrapPeers useBootstrapPeers <*> readTVar dbgStateVar - let dbgState = makeDebugPeerSelectionState ps up bp lsj am + dbgState <- makeDebugPeerSelectionState ps up bp lsj am getPromotedHotTime now traceWith (dtConnectionManagerTracer tracersExtra) (TrState state) @@ -77,5 +79,5 @@ sigUSR1Handler tracersExtra getUseLedgerPeers ownPeerSharing getBootstrapPeers Nothing return () #else -sigUSR1Handler _ _ _ _ _ _ _ _ = pure () +sigUSR1Handler _ _ _ _ _ _ _ _ _ = pure () #endif diff --git a/ouroboros-network/src/Ouroboros/Cardano/Network/PeerSelection/Governor/Monitor.hs b/ouroboros-network/src/Ouroboros/Cardano/Network/PeerSelection/Governor/Monitor.hs index 46409b0a88c..ce106b18d9f 100644 --- a/ouroboros-network/src/Ouroboros/Cardano/Network/PeerSelection/Governor/Monitor.hs +++ b/ouroboros-network/src/Ouroboros/Cardano/Network/PeerSelection/Governor/Monitor.hs @@ -632,6 +632,7 @@ waitForSystemToQuiesce st@PeerSelectionState{ , extraState = cpst@Cardano.ExtraState { Cardano.ledgerStateJudgement , Cardano.bootstrapPeersFlag + , Cardano.bootstrapPeersTimeout , Cardano.hasOnlyBootstrapPeers } } @@ -668,4 +669,4 @@ waitForSystemToQuiesce st@PeerSelectionState{ } } } - | otherwise = GuardedSkip Nothing + | otherwise = GuardedSkip bootstrapPeersTimeout diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientRegistry.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientRegistry.hs index 141d132f5ed..a53a9734cef 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientRegistry.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/ClientRegistry.hs @@ -91,8 +91,11 @@ bracketFetchClient (FetchClientRegistry ctxVar fetchRegistry syncRegistry dqRegistry keepRegistry dyingRegistry) _version peer action = do ksVar <- newEmptyTMVarIO - bracket (register ksVar) (uncurry (unregister ksVar)) (action . fst) + fst <$> generalBracket (register ksVar) (unregister ksVar) (action . fst) where + onExceptionTimeout :: DiffTime + onExceptionTimeout = 1 + register :: StrictTMVar m () -> m ( FetchClientContext header block m , (ThreadId m, StrictTMVar m ()) ) @@ -157,11 +160,15 @@ bracketFetchClient (FetchClientRegistry ctxVar ) unregister :: StrictTMVar m () - -> FetchClientContext header block m - -> (ThreadId m, StrictTMVar m ()) + -> ( FetchClientContext header block m + , (ThreadId m, StrictTMVar m ()) ) + -> ExitCase a -> m () - unregister ksVar FetchClientContext { fetchClientCtxStateVars = stateVars } - (tid, doneVar) = uninterruptibleMask $ \unmask -> do + unregister ksVar (FetchClientContext { fetchClientCtxStateVars = stateVars }, + (tid, doneVar)) exitCase = uninterruptibleMask $ \unmask -> do + let timeoutLimit = case exitCase of + ExitCaseSuccess _ -> deactivateTimeout + _ -> onExceptionTimeout dead <- do -- Signal we are shutting down dieFast <- atomically $ do @@ -183,7 +190,7 @@ bracketFetchClient (FetchClientRegistry ctxVar else do -- Give the sync client a chance to exit cleanly before killing it. res <- onException - (unmask $ timeout deactivateTimeout $ atomically $ readTMVar doneVar) + (unmask $ timeout timeoutLimit $ atomically $ readTMVar doneVar) (-- no time to wait, die die die! uninterruptibleMask_ $ do throwTo tid AsyncCancelled diff --git a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs index 5f086f097f6..530a0056e51 100644 --- a/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs +++ b/ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs @@ -991,10 +991,10 @@ fetchRequestDecisions fetchDecisionPolicy fetchMode chains = nActivePeers :: Set peer nActivePeers = Set.fromList - . map snd - . filter (\(inFlight, _) -> inFlight > 0) - . map (\(_, _, PeerFetchInFlight{peerFetchReqsInFlight}, _, p, _) -> - (peerFetchReqsInFlight, p)) + . map (\(_,_,e) -> e) + . filter (\(s, inFlight, _) -> inFlight > 0 && s /= PeerFetchStatusShutdown) + . map (\(_, s, PeerFetchInFlight{peerFetchReqsInFlight}, _, p, _) -> + (s, peerFetchReqsInFlight, p)) $ chains -- Order the peers based on current PeerGSV. The top performing peers will be @@ -1009,6 +1009,7 @@ fetchRequestDecisions fetchDecisionPolicy fetchMode chains = . take (fromIntegral maxConcurrentFetchPeers) . sortBy (\a b -> comparePeerGSV nActivePeers (peerSalt fetchDecisionPolicy) a b) . map (\(_, _, _, gsv, p, _) -> (gsv, p)) + . filter (\(_, s, _, _, _, _) -> s /= PeerFetchStatusShutdown) $ chains maxConcurrentFetchPeers :: Word diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs index 2835412d0b5..fb652f71e77 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs @@ -16,8 +16,9 @@ module Ouroboros.Network.Diffusion , run ) where -import Control.Concurrent.Class.MonadSTM.Strict (StrictTVar) +import Control.Concurrent.Class.MonadSTM.Strict (STM, StrictTVar) import Control.Exception (Exception, IOException) +import Control.Monad.Class.MonadTime.SI import Data.Functor (void) import Network.DNS (Resolver) import Network.Socket (Socket) @@ -31,6 +32,7 @@ import Ouroboros.Network.NodeToNode (NodeToNodeVersion, NodeToNodeVersionData, RemoteAddress) import Ouroboros.Network.PeerSelection.Governor.Types (PeerSelectionState) import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics) +import Ouroboros.Network.PeerSelection.PeerStateActions (getPromotedHotTime) -- | Promoted data types. -- @@ -111,7 +113,10 @@ run :: forall (p2p :: P2P) extraArgs extraState extraDebugState extraFlags , Exception exception ) => (forall mode x y. - P2P.NodeToNodeConnectionManager mode Socket + ( P2P.NodeToNodePeerConnectionHandle mode RemoteAddress + NodeToNodeVersionData IO x y + -> STM IO (Maybe Time)) + -> P2P.NodeToNodeConnectionManager mode Socket RemoteAddress NodeToNodeVersionData NodeToNodeVersion IO x y -> StrictTVar IO @@ -148,7 +153,7 @@ run sigUSR1Signal (P2PApplicationsExtra appsExtra) = void $ P2P.run - sigUSR1Signal tracers tracersExtra + (sigUSR1Signal getPromotedHotTime) tracers tracersExtra args argsExtra apps appsExtra run _ tracers (NonP2PTracers tracersExtra) diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs index cc0f396cf56..5c85b595a03 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Policies.hs @@ -44,13 +44,13 @@ closeConnectionTimeout = 120 -- | Chain sync `mustReplayTimeout` lower bound. -- minChainSyncTimeout :: DiffTime -minChainSyncTimeout = 135 +minChainSyncTimeout = 601 -- | Chain sync `mustReplayTimeout` upper bound. -- maxChainSyncTimeout :: DiffTime -maxChainSyncTimeout = 269 +maxChainSyncTimeout = 911 -- | Churn timeouts after 60s trying to establish a connection. -- @@ -126,6 +126,8 @@ simplePeerSelectionPolicy rngVar metrics errorDelay = PeerSelectionPolicy { policyPeerShareBatchWaitTime = 3, -- seconds policyPeerShareOverallTimeout = 10, -- seconds policyPeerShareActivationDelay = 300, -- seconds + policyMaxConnectionRetries = 5, + policyClearFailCountDelay = 120, -- seconds policyErrorDelay = ExitPolicy.repromoteDelay errorDelay } diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs index 2c8486eab8a..d5ec3007f76 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor.hs @@ -665,7 +665,7 @@ peerSelectionGovernorLoop tracer -- Make sure preBlocking set is in the right place preBlocking policy actions st - <> Monitor.connections actions st + <> Monitor.connections actions policy st <> Monitor.jobs jobPool st -- This job monitors for changes in big ledger peer snapshot file (eg. reload) -- and copies it into the governor's private state. When a change is detected, diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/ActivePeers.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/ActivePeers.hs index 2036d58a205..33b90792ade 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/ActivePeers.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/ActivePeers.hs @@ -439,8 +439,12 @@ jobPromoteWarmPeer -> peerconn -> Job () m (Completion m extraState extraDebugState extraFlags extraPeers peeraddr peerconn) -jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {activatePeerConnection}} - PeerSelectionPolicy { policyErrorDelay } +jobPromoteWarmPeer PeerSelectionActions{ peerStateActions = PeerStateActions {activatePeerConnection} + , extraPeersAPI = PublicExtraPeersAPI { memberExtraPeers + , differenceExtraPeers }} + PeerSelectionPolicy { policyErrorDelay + , policyMaxConnectionRetries + , policyClearFailCountDelay } peeraddr isBigLedgerPeer peerconn = Job job handler () "promoteWarmPeer" where @@ -463,6 +467,7 @@ jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {act -- When promotion fails we set the peer as cold. Completion $ \st@PeerSelectionState { publicRootPeers, + localRootPeers, activePeers, establishedPeers, knownPeers, @@ -481,20 +486,25 @@ jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {act establishedPeers (fuzz, stdGen') = randomR (-2, 2 :: Double) stdGen delay = realToFrac fuzz + policyErrorDelay - knownPeers' = if peeraddr `KnownPeers.member` knownPeers - then KnownPeers.setConnectTimes - (Map.singleton - peeraddr - (delay `addTime` now)) - $ snd $ KnownPeers.incrementFailCount - peeraddr - knownPeers - else + (knownPeers', forgottenPeers) = + if peeraddr `KnownPeers.member` knownPeers + then KnownPeers.reportFailures + now + policyMaxConnectionRetries + (Set.singleton peeraddr) + (\p -> LocalRootPeers.member p localRootPeers || + (memberExtraPeers p (PublicRootPeers.getExtraPeers publicRootPeers))) + (\_ _ -> delay) + knownPeers + else -- Apparently the governor can remove -- the peer we failed to promote from the -- set of known peers before we can process -- the failure. - knownPeers + (knownPeers, Set.empty) + publicRootPeers' = PublicRootPeers.difference differenceExtraPeers + publicRootPeers + forgottenPeers in Decision { decisionTrace = if peeraddr `Set.member` bigLedgerPeersSet @@ -513,6 +523,7 @@ jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {act inProgressPromoteWarm = Set.delete peeraddr (inProgressPromoteWarm st), knownPeers = knownPeers', + publicRootPeers = publicRootPeers', establishedPeers = establishedPeers', stdGen = stdGen' }, @@ -542,14 +553,18 @@ jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {act return $ Completion $ \st@PeerSelectionState { publicRootPeers, activePeers, + knownPeers, targets = PeerSelectionTargets { targetNumberOfActivePeers } } - _now -> + now -> let bigLedgerPeersSet = PublicRootPeers.getBigLedgerPeers publicRootPeers in if peeraddr `EstablishedPeers.member` establishedPeers st - then let activePeers' = Set.insert peeraddr activePeers in + then let activePeers' = Set.insert peeraddr activePeers + knownPeers' = KnownPeers.setClearFailCountTime + peeraddr (policyClearFailCountDelay `addTime` now) + knownPeers in Decision { decisionTrace = if peeraddr `Set.member` bigLedgerPeersSet then [TracePromoteWarmBigLedgerPeerDone @@ -566,7 +581,8 @@ jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {act decisionState = st { activePeers = activePeers', inProgressPromoteWarm = Set.delete peeraddr - (inProgressPromoteWarm st) + (inProgressPromoteWarm st), + knownPeers = knownPeers' }, decisionJobs = [] } diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/EstablishedPeers.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/EstablishedPeers.hs index bc6e137a817..efb9e722403 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/EstablishedPeers.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/EstablishedPeers.hs @@ -476,11 +476,16 @@ jobPromoteColdPeer PeerSelectionActions { peerStateActions = PeerStateActions {establishPeerConnection}, peerConnToPeerSharing, extraPeersAPI = PublicExtraPeersAPI { - extraPeersToSet + extraPeersToSet, + memberExtraPeers, + differenceExtraPeers }, extraStateToExtraCounters } - PeerSelectionPolicy { policyPeerShareActivationDelay } + PeerSelectionPolicy { + policyPeerShareActivationDelay, + policyMaxConnectionRetries + } peeraddr isBigLedgerPeer diffusionMode = Job job handler () "promoteColdPeer" where @@ -489,6 +494,7 @@ jobPromoteColdPeer PeerSelectionActions { peeraddr peerconn) handler e = return $ Completion $ \st@PeerSelectionState { + localRootPeers, publicRootPeers, stdGen, targets = PeerSelectionTargets { @@ -497,25 +503,32 @@ jobPromoteColdPeer PeerSelectionActions { } } now -> - let (failCount, knownPeers') = KnownPeers.incrementFailCount - peeraddr - (knownPeers st) - (fuzz, stdGen') = randomR (-2, 2 :: Double) stdGen + let (fuzz, stdGen') = randomR (-2, 2 :: Double) stdGen -- exponential backoff: 5s, 10s, 20s, 40s, 80s, 160s. - delay :: DiffTime - delay = realToFrac fuzz + delay :: Int -> DiffTime + delay failCount = realToFrac fuzz + fromIntegral ( baseColdPeerRetryDiffTime * 2 ^ (pred failCount `min` maxColdPeerRetryBackoff) ) bigLedgerPeersSet = PublicRootPeers.getBigLedgerPeers publicRootPeers - - st' = st { knownPeers = KnownPeers.setConnectTimes - (Map.singleton - peeraddr - (delay `addTime` now)) - knownPeers', + (knownPeers', forgottenPeers) = + KnownPeers.reportFailures + now + policyMaxConnectionRetries + (Set.singleton peeraddr) + (\p -> LocalRootPeers.member p localRootPeers || + (memberExtraPeers p (PublicRootPeers.getExtraPeers publicRootPeers))) + (const delay) + (knownPeers st) + publicRootPeers' = PublicRootPeers.difference differenceExtraPeers publicRootPeers + forgottenPeers + (delayVal, forgotten) = case KnownPeers.lookupConnectTime peeraddr knownPeers' of + Nothing -> (0, True) + Just t -> (t `diffTime` now, False) + st' = st { knownPeers = knownPeers', + publicRootPeers = publicRootPeers', inProgressPromoteCold = Set.delete peeraddr (inProgressPromoteCold st), stdGen = stdGen' @@ -528,12 +541,12 @@ jobPromoteColdPeer PeerSelectionActions { targetNumberOfEstablishedBigLedgerPeers (case cs' of PeerSelectionCounters { numberOfEstablishedBigLedgerPeers = a } -> a) - peeraddr delay e] + peeraddr delayVal e forgotten] else [TracePromoteColdFailed targetNumberOfEstablishedPeers (case cs' of PeerSelectionCounters { numberOfEstablishedPeers = a } -> a) - peeraddr delay e], + peeraddr delayVal e forgotten], decisionState = st', decisionJobs = [] } @@ -576,10 +589,7 @@ jobPromoteColdPeer PeerSelectionActions { ) (Set.singleton peeraddr) $ KnownPeers.setSuccessfulConnectionFlag (Set.singleton peeraddr) - $ KnownPeers.clearTepidFlag peeraddr $ - KnownPeers.resetFailCount - peeraddr - knownPeers + $ KnownPeers.clearTepidFlag peeraddr knownPeers bigLedgerPeersSet = PublicRootPeers.getBigLedgerPeers publicRootPeers st' = st { establishedPeers = establishedPeers', diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs index 74b9ee56af3..8f631197524 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Monitor.hs @@ -116,11 +116,15 @@ jobs jobPool st = connections :: forall m extraState extraDebugState extraFlags extraPeers extraAPI extraCounters peeraddr peerconn. (MonadSTM m, Ord peeraddr) => PeerSelectionActions extraState extraFlags extraPeers extraAPI extraCounters peeraddr peerconn m + -> PeerSelectionPolicy peeraddr m -> PeerSelectionState extraState extraFlags extraPeers peeraddr peerconn -> Guarded (STM m) (TimedDecision m extraState extraDebugState extraFlags extraPeers peeraddr peerconn) connections PeerSelectionActions{ peerStateActions = PeerStateActions {monitorPeerConnection} + , extraPeersAPI = PublicExtraPeersAPI { memberExtraPeers + , differenceExtraPeers } } + PeerSelectionPolicy { policyMaxConnectionRetries } st@PeerSelectionState { publicRootPeers, localRootPeers, @@ -178,14 +182,21 @@ connections PeerSelectionActions{ -- Asynchronous transition to cold peer can only be -- a result of a failure. - knownPeers' = KnownPeers.setConnectTimes - ( (\(_, a) -> ExitPolicy.repromoteDelay (fromMaybe 0 a) `addTime` now) - <$> demotedToCold - ) - . Set.foldr' - ((snd .) . KnownPeers.incrementFailCount) - (knownPeers st) - $ Map.keysSet demotedToCold + (knownPeers', forgottenPeers) = KnownPeers.reportFailures + now + policyMaxConnectionRetries + (Map.keysSet demotedToCold) + ( \p -> LocalRootPeers.member p localRootPeers || + (memberExtraPeers p (PublicRootPeers.getExtraPeers publicRootPeers)) + ) + (\p _ -> + case Map.lookup p demotedToCold of + Nothing -> 0 + Just (_, a) -> ExitPolicy.repromoteDelay (fromMaybe 0 a) + ) + (knownPeers st) + publicRootPeers' = PublicRootPeers.difference differenceExtraPeers publicRootPeers + forgottenPeers (localDemotions, nonLocalDemotions) = Map.partitionWithKey (\peer _ -> peer `LocalRootPeers.member` localRootPeers) @@ -215,6 +226,7 @@ connections PeerSelectionActions{ activePeers = activePeers', establishedPeers = establishedPeers', knownPeers = knownPeers', + publicRootPeers = publicRootPeers', -- When promoting a warm peer, it might happen -- that the connection will break (or one of the diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs index 56be58c8480..dfe1f6dc375 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/Governor/Types.hs @@ -212,6 +212,10 @@ data PeerSelectionPolicy peeraddr m = PeerSelectionPolicy { -- allowed to take policyPeerShareActivationDelay :: !DiffTime, -- ^ Delay until we consider a peer suitable for peer sharing + policyMaxConnectionRetries :: !Int, + -- ^ Maximum cold promotion attempts for non localroots, and bootstrap peers, + policyClearFailCountDelay :: !DiffTime, + -- ^ Time a peer has to have been hot before clearing its fail counter. -- | Re-promote delay, passed from `ExitPolicy`. -- @@ -770,18 +774,24 @@ data DebugPeerSelectionState extraState extraFlags extraPeers peeraddr = dpssUpstreamyness :: !(Map peeraddr Int), dpssFetchynessBlocks :: !(Map peeraddr Int), dpssAssociationMode :: !AssociationMode, + dpssHotDurations :: !(Map peeraddr (IsBigLedgerPeer, DiffTime)), dpssExtraState :: !extraState } deriving Show makeDebugPeerSelectionState - :: PeerSelectionState extraState extraFlags extraPeers peeraddr peerconn + :: (Ord peeraddr, MonadSTM m) + => PeerSelectionState extraState extraFlags extraPeers peeraddr peerconn -> Map peeraddr Int -> Map peeraddr Int -> extraDebugState -> AssociationMode - -> DebugPeerSelectionState extraDebugState extraFlags extraPeers peeraddr -makeDebugPeerSelectionState PeerSelectionState {..} up bp es am = - DebugPeerSelectionState { + -> (peerconn -> STM m (Maybe Time)) + -> Time + -> m (DebugPeerSelectionState extraDebugState extraFlags extraPeers peeraddr) +makeDebugPeerSelectionState PeerSelectionState {..} up bp es am getPromotedHotTime now = do + let activeMap = EstablishedPeers.toMap establishedPeers `Map.restrictKeys` activePeers + dpssHotDurations <- Map.traverseMaybeWithKey getDiffTimes activeMap + return DebugPeerSelectionState { dpssTargets = targets , dpssLocalRootPeers = localRootPeers , dpssPublicRootPeers = publicRootPeers @@ -803,8 +813,20 @@ makeDebugPeerSelectionState PeerSelectionState {..} up bp es am = , dpssUpstreamyness = up , dpssFetchynessBlocks = bp , dpssAssociationMode = am + , dpssHotDurations , dpssExtraState = es } + where + getDiffTimes peeraddr peerconn = do + t1 <- atomically $ getPromotedHotTime peerconn + case t1 of + Nothing -> return Nothing + Just t1' -> + let !dt = now `diffTime` t1' + in if Set.member peeraddr (PublicRootPeers.getBigLedgerPeers publicRootPeers) + then return . Just $ (IsBigLedgerPeer, dt) + else return . Just $ (IsNotBigLedgerPeer, dt) + -- | Public 'PeerSelectionState' that can be accessed by Peer Sharing -- mechanisms without any problem. @@ -1698,8 +1720,8 @@ data TracePeerSelection extraDebugState extraFlags extraPeers peeraddr = | TracePromoteColdPeers Int Int (Set peeraddr) -- | target local established, actual local established, selected peers | TracePromoteColdLocalPeers [(WarmValency, Int)] (Set peeraddr) - -- promotion, reason - | TracePromoteColdFailed Int Int peeraddr DiffTime SomeException + -- promotion, reason, forgotten? + | TracePromoteColdFailed Int Int peeraddr DiffTime SomeException Bool -- | target established, actual established, peer | TracePromoteColdDone Int Int peeraddr @@ -1707,8 +1729,8 @@ data TracePeerSelection extraDebugState extraFlags extraPeers peeraddr = -- peers, selected peers | TracePromoteColdBigLedgerPeers Int Int (Set peeraddr) -- | target established big ledger peers, actual established big ledger - -- peers, peer, delay until next promotion, reason - | TracePromoteColdBigLedgerPeerFailed Int Int peeraddr DiffTime SomeException + -- peers, peer, delay until next promotion, reason, forgotten? + | TracePromoteColdBigLedgerPeerFailed Int Int peeraddr DiffTime SomeException Bool -- | target established big ledger peers, actual established big ledger -- peers, peer | TracePromoteColdBigLedgerPeerDone Int Int peeraddr diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerStateActions.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerStateActions.hs index 25a0a176da3..6abbc6adcad 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerStateActions.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/PeerStateActions.hs @@ -1,7 +1,9 @@ +{-# LANGUAGE BlockArguments #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -17,6 +19,7 @@ module Ouroboros.Network.PeerSelection.PeerStateActions -- * Create PeerStateActions PeerStateActionsArguments (..) , PeerConnectionHandle + , getPromotedHotTime , withPeerStateActions , pchPeerSharing -- * Exceptions @@ -33,10 +36,11 @@ module Ouroboros.Network.PeerSelection.PeerStateActions import Control.Applicative (Alternative) import Control.Concurrent.Class.MonadSTM.Strict import Control.Exception (SomeAsyncException (..), assert) -import Control.Monad (when, (<=<)) +import Control.Monad (join, when, (<=<)) import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadFork import Control.Monad.Class.MonadThrow +import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI import Control.Concurrent.JobPool (Job (..), JobPool) @@ -426,13 +430,25 @@ awaitAllResults tok bundle = do -- together with their state 'StrictTVar's. -- data PeerConnectionHandle (muxMode :: Mux.Mode) responderCtx peerAddr versionData bytes m a b = PeerConnectionHandle { - pchConnectionId :: ConnectionId peerAddr, - pchPeerStatus :: StrictTVar m PeerStatus, - pchMux :: Mux.Mux muxMode m, - pchAppHandles :: TemperatureBundle (ApplicationHandle muxMode responderCtx peerAddr bytes m a b), - pchVersionData :: !versionData + pchConnectionId :: !(ConnectionId peerAddr), + pchPeerStatus :: !(StrictTVar m PeerStatus), + pchMux :: !(Mux.Mux muxMode m), + pchAppHandles :: !(TemperatureBundle (ApplicationHandle muxMode responderCtx peerAddr bytes m a b)), + pchVersionData :: !versionData, + pchPromotedHotVar :: !(StrictTVar m (Maybe Time)) } + +-- | Retrieve the time the remote peer has been promoted to hot state +-- or Nothing if either the peer was not promoted or is being currently demoted +-- +getPromotedHotTime :: (MonadSTM m) + => PeerConnectionHandle muxMode responderCtx peerAddr versionData bytes m a b + -> STM m (Maybe Time) +getPromotedHotTime PeerConnectionHandle { pchPromotedHotVar } = + readTVar pchPromotedHotVar + + mkInitiatorContext :: MonadSTM m => SingProtocolTemperature pt -> IsBigLedgerPeer @@ -617,14 +633,25 @@ withPeerStateActions PeerStateActionsArguments { then return False else writeTVar stateVar newState >> return True - isNotCoolingOrCold :: StrictTVar m PeerStatus -> STM m Bool - isNotCoolingOrCold stateVar = - (> PeerCooling) <$> readTVar stateVar + tracePeerHotDuration + :: PeerConnectionHandle muxMode responderCtx peerAddr versionData bytes m a b + -> m () + tracePeerHotDuration PeerConnectionHandle { pchConnectionId, pchPromotedHotVar } = do + pchPromotedHot <- atomically $ stateTVar pchPromotedHotVar (, Nothing) + case pchPromotedHot of + Just t1 -> do + dt <- (`diffTime` t1) <$> getMonotonicTime + traceWith spsTracer (PeerHotDuration pchConnectionId dt) + Nothing -> pure () peerMonitoringLoop :: PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b -> m () - peerMonitoringLoop pch@PeerConnectionHandle { pchConnectionId, pchPeerStatus, pchAppHandles } = do + peerMonitoringLoop pch@PeerConnectionHandle { + pchConnectionId, + pchPeerStatus, + pchAppHandles + } = do -- A first-to-finish synchronisation on all the bundles; As a result -- this is a first-to-finish synchronisation between all the -- mini-protocols runs toward the given peer. @@ -729,6 +756,7 @@ withPeerStateActions PeerStateActionsArguments { -- Nothing -> + tracePeerHotDuration pch >> traceWith spsTracer (PeerStatusChanged (CoolingToCold pchConnectionId)) establishPeerConnection :: JobPool () m (Maybe SomeException) @@ -764,7 +792,8 @@ withPeerStateActions PeerStateActionsArguments { writeTVar (projectBundle SingWarm controlMessageBundle) Continue writeTVar (projectBundle SingEstablished controlMessageBundle) Continue - awaitVarBundle <- atomically $ mkAwaitVars muxBundle + awaitVarBundle <- atomically $ mkAwaitVars muxBundle + pchPromotedHotVar <- newTVarIO Nothing let connHandle = PeerConnectionHandle { @@ -775,7 +804,8 @@ withPeerStateActions PeerStateActionsArguments { muxBundle controlMessageBundle awaitVarBundle, - pchVersionData = versionData + pchVersionData = versionData, + pchPromotedHotVar } startProtocols SingWarm isBigLedgerPeer connHandle @@ -792,9 +822,10 @@ withPeerStateActions PeerStateActionsArguments { Just SomeAsyncException {} -> Nothing Nothing -> Just e) (\e -> do - atomically $ do + atomically do waitForOutboundDemotion spsConnectionManager connId writeTVar peerStateVar PeerCold + tracePeerHotDuration connHandle traceWith spsTracer (PeerMonitoringError connId e) throwIO e) (peerMonitoringLoop connHandle $> Nothing)) @@ -909,106 +940,87 @@ withPeerStateActions PeerStateActionsArguments { connHandle@PeerConnectionHandle { pchConnectionId, pchPeerStatus, - pchAppHandles } = do - -- quiesce warm peer protocols and set hot ones in 'Continue' mode. - wasWarm <- atomically $ do - -- if the peer is cold we can't activate it. - notCold <- isNotCoolingOrCold pchPeerStatus - when notCold $ do - writeTVar (getControlVar SingHot pchAppHandles) Continue - writeTVar (getControlVar SingWarm pchAppHandles) Quiesce - return notCold - when (not wasWarm) $ do - traceWith spsTracer (PeerStatusChangeFailure - (WarmToHot pchConnectionId) - ActiveCold) - throwIO $ ColdActivationException pchConnectionId - - -- start hot peer protocols + pchAppHandles, + pchPromotedHotVar } = do + join . atomically $ do + peerStatus <- readTVar pchPeerStatus + case peerStatus of + PeerWarm -> do + writeTVar (getControlVar SingHot pchAppHandles) Continue + writeTVar (getControlVar SingWarm pchAppHandles) Quiesce + writeTVar pchPeerStatus PeerHot + return $ pure () + + _otherwise -> return do + traceWith spsTracer (PeerStatusChangeFailure + (WarmToHot pchConnectionId) + (ActiveCold peerStatus)) + throwIO $ ColdActivationException pchConnectionId startProtocols SingHot isBigLedgerPeer connHandle - - -- Only set the status to PeerHot if the peer isn't PeerCold. - -- This can happen asynchronously between the check above and now. - wasWarm' <- atomically $ updateUnlessCoolingOrCold pchPeerStatus PeerHot - if wasWarm' - then traceWith spsTracer (PeerStatusChanged (WarmToHot pchConnectionId)) - else do - traceWith spsTracer (PeerStatusChangeFailure - (WarmToHot pchConnectionId) - ActiveCold) - throwIO $ ColdActivationException pchConnectionId - + atomically . writeTVar pchPromotedHotVar . (Just $!) =<< getMonotonicTime + traceWith spsTracer (PeerStatusChanged (WarmToHot pchConnectionId)) -- Take a hot peer and demote it to a warm one. deactivatePeerConnection :: PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b -> m () deactivatePeerConnection - PeerConnectionHandle { + pch@PeerConnectionHandle { pchConnectionId, pchPeerStatus, pchMux, pchAppHandles } = do - wasCold <- atomically $ do - notCold <- isNotCoolingOrCold pchPeerStatus - when notCold $ do - writeTVar (getControlVar SingHot pchAppHandles) Terminate - writeTVar (getControlVar SingWarm pchAppHandles) Continue - return (not notCold) - when wasCold $ do - -- The governor attempted to demote an already cold peer. - traceWith spsTracer (PeerStatusChangeFailure - (HotToWarm pchConnectionId) - ActiveCold) - throwIO $ ColdDeactivationException pchConnectionId - - - -- Hot protocols should stop within 'spsDeactivateTimeout'. - res <- - timeout spsDeactivateTimeout - (atomically $ awaitAllResults SingHot pchAppHandles) - case res of - Nothing -> do - Mux.stop pchMux - atomically (writeTVar pchPeerStatus PeerCooling) - traceWith spsTracer (PeerStatusChangeFailure - (HotToCooling pchConnectionId) - TimeoutError) - throwIO (DeactivationTimeout pchConnectionId) - - -- some of the hot mini-protocols errored - Just (SomeErrored errs) -> do - -- we don't need to notify the connection manager, we can instead - -- relay on mux property: if any of the mini-protocols errors, mux - -- throws an exception as well. - atomically (writeTVar pchPeerStatus PeerCooling) - traceWith spsTracer (PeerStatusChangeFailure - (HotToCooling pchConnectionId) - (ApplicationFailure errs)) - throwIO (MiniProtocolExceptions errs) - - -- all hot mini-protocols succeeded - Just (AllSucceeded results) -> do - -- we don't notify the connection manager as this connection is still - -- useful to the outbound governor (warm peer). - wasWarm <- atomically $ do - -- Only set the status to PeerWarm if the peer isn't cold - -- (can happen asynchronously). - notCold <- updateUnlessCoolingOrCold pchPeerStatus PeerWarm - when notCold $ do - -- We need to update hot protocols to indicate that they are not - -- running. Preserve the results returned by their previous - -- execution. - modifyTVar (getMiniProtocolsVar SingHot pchAppHandles) - (\_ -> Map.map (pure . NotRunning . Right) results) - return notCold - - if wasWarm - then traceWith spsTracer (PeerStatusChanged (HotToWarm pchConnectionId)) - else do - traceWith spsTracer (PeerStatusChangeFailure - (HotToWarm pchConnectionId) - ActiveCold) - throwIO $ ColdDeactivationException pchConnectionId + join . atomically $ do + peerStatus <- readTVar pchPeerStatus + case peerStatus of + PeerHot -> do + writeTVar (getControlVar SingHot pchAppHandles) Terminate + writeTVar (getControlVar SingWarm pchAppHandles) Continue + return do + -- Hot protocols should stop within 'spsDeactivateTimeout'. + res <- + timeout spsDeactivateTimeout + $ join . atomically $ do + res <- awaitAllResults SingHot pchAppHandles + case res of + AllSucceeded results -> do + modifyTVar (getMiniProtocolsVar SingHot pchAppHandles) + (\_ -> Map.map (pure . NotRunning . Right) results) + stateTVar pchPeerStatus \case + PeerHot -> ( traceWith spsTracer (PeerStatusChanged + (HotToWarm pchConnectionId)) + , PeerWarm) + x -> (pure () , x) + SomeErrored errs -> + stateTVar pchPeerStatus \status -> + if status <= PeerCooling then + (throwIO (MiniProtocolExceptions errs), status) + else ( traceWith spsTracer (PeerStatusChangeFailure + (HotToCooling pchConnectionId) + (ApplicationFailure errs)) + >> throwIO (MiniProtocolExceptions errs) + , PeerCooling) + + case res of + Nothing -> do + Mux.stop pchMux + trace <- atomically $ updateUnlessCoolingOrCold pchPeerStatus PeerCooling + when trace do + traceWith spsTracer (PeerStatusChangeFailure + (HotToCooling pchConnectionId) + TimeoutError) + throwIO (DeactivationTimeout pchConnectionId) + Just _ -> tracePeerHotDuration pch + + -- we could genuinly hit this case due to a race between 'peerMonitoringLoop' + -- and peer selection demotion job + PeerWarm -> return $ pure () + + _otherwise -> + return $ do + traceWith spsTracer (PeerStatusChangeFailure + (HotToWarm pchConnectionId) + (ActiveCold peerStatus)) + throwIO $ ColdDeactivationException pchConnectionId closePeerConnection :: PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b @@ -1020,57 +1032,55 @@ withPeerStateActions PeerStateActionsArguments { pchAppHandles, pchMux } = do - atomically $ do + peerStatus <- atomically do writeTVar (getControlVar SingWarm pchAppHandles) Terminate writeTVar (getControlVar SingEstablished pchAppHandles) Terminate writeTVar (getControlVar SingHot pchAppHandles) Terminate - - res <- - timeout spsCloseConnectionTimeout - (atomically $ - (\a b c -> a <> b <> c) - -- note: we use last to finish on hot, warm and - -- established mini-protocols since 'closePeerConnection' - -- is also used by asynchronous demotions, not just - -- /warm → cold/ transition. - <$> awaitAllResults SingHot pchAppHandles - <*> awaitAllResults SingWarm pchAppHandles - <*> awaitAllResults SingEstablished pchAppHandles) - case res of - Nothing -> do - -- timeout fired - Mux.stop pchMux - wasWarm <- atomically (updateUnlessCoolingOrCold pchPeerStatus PeerCooling) - when wasWarm $ - traceWith spsTracer (PeerStatusChangeFailure - (WarmToCooling pchConnectionId) - TimeoutError) - readTVarIO pchPeerStatus - - Just (SomeErrored errs) -> do - -- some mini-protocol errored - -- - -- we don't need to notify the connection manager, we can instead - -- rely on mux property: if any of the mini-protocols errors, mux - -- throws an exception as well. - wasWarm <- atomically (updateUnlessCoolingOrCold pchPeerStatus PeerCooling) - when wasWarm $ - traceWith spsTracer (PeerStatusChangeFailure - (WarmToCooling pchConnectionId) - (ApplicationFailure errs)) - throwIO (MiniProtocolExceptions errs) - - Just AllSucceeded {} -> do - -- all mini-protocols terminated cleanly - -- - -- 'unregisterOutboundConnection' could only fail to demote the peer if - -- connection manager would simultaneously promote it, but this is not - -- possible. - wasWarm <- atomically (updateUnlessCoolingOrCold pchPeerStatus PeerCooling) - when wasWarm $ do - _ <- releaseOutboundConnection spsConnectionManager pchConnectionId - traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId)) - readTVarIO pchPeerStatus + readTVar pchPeerStatus <* updateUnlessCoolingOrCold pchPeerStatus PeerCooling + + case peerStatus of + PeerCooling -> return peerStatus + PeerCold -> return peerStatus + _otherwise -> do + res <- + timeout spsCloseConnectionTimeout + (atomically $ + (\a b c -> a <> b <> c) + -- note: we use last to finish on hot, warm and + -- established mini-protocols since 'closePeerConnection' + -- is also used by asynchronous demotions, not just + -- /warm → cold/ transition. + <$> awaitAllResults SingHot pchAppHandles + <*> awaitAllResults SingWarm pchAppHandles + <*> awaitAllResults SingEstablished pchAppHandles) + + PeerCooling <$ case res of + Nothing -> do + -- timeout fired + Mux.stop pchMux + traceWith spsTracer (PeerStatusChangeFailure + (WarmToCooling pchConnectionId) + TimeoutError) + + Just (SomeErrored errs) -> do + -- some mini-protocol errored + -- + -- we don't need to notify the connection manager, we can instead + -- rely on mux property: if any of the mini-protocols errors, mux + -- throws an exception as well. + traceWith spsTracer (PeerStatusChangeFailure + (WarmToCooling pchConnectionId) + (ApplicationFailure errs)) + throwIO (MiniProtocolExceptions errs) + + Just AllSucceeded {} -> do + -- all mini-protocols terminated cleanly + -- + -- 'unregisterOutboundConnection' could only fail to demote the peer if + -- connection manager would simultaneously promote it, but this is not + -- possible. + _ <- releaseOutboundConnection spsConnectionManager pchConnectionId + traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId)) -- -- Utilities @@ -1175,7 +1185,7 @@ data FailureType versionNumber = | HandleFailure !SomeException | MuxStoppedFailure | TimeoutError - | ActiveCold + | ActiveCold !PeerStatus | ApplicationFailure ![MiniProtocolException] deriving Show @@ -1203,4 +1213,5 @@ data PeerSelectionActionsTrace peerAddr vNumber = | PeerMonitoringError (ConnectionId peerAddr) SomeException | PeerMonitoringResult (ConnectionId peerAddr) (Maybe (WithSomeProtocolTemperature FirstToFinishResult)) | AcquireConnectionError SomeException + | PeerHotDuration (ConnectionId peerAddr) DiffTime deriving Show diff --git a/ouroboros-network/src/Ouroboros/Network/PeerSelection/State/KnownPeers.hs b/ouroboros-network/src/Ouroboros/Network/PeerSelection/State/KnownPeers.hs index 537a439f3ca..5d3ddc21c59 100644 --- a/ouroboros-network/src/Ouroboros/Network/PeerSelection/State/KnownPeers.hs +++ b/ouroboros-network/src/Ouroboros/Network/PeerSelection/State/KnownPeers.hs @@ -21,8 +21,7 @@ module Ouroboros.Network.PeerSelection.State.KnownPeers , member -- * Special operations , setCurrentTime - , incrementFailCount - , resetFailCount + , reportFailures , lookupFailCount , lookupTepidFlag , setTepidFlag @@ -30,8 +29,11 @@ module Ouroboros.Network.PeerSelection.State.KnownPeers , setSuccessfulConnectionFlag -- ** Tracking when we can (re)connect , minConnectTime - , setConnectTimes , availableToConnect + , lookupConnectTime + -- ** Tracking when we can clear fail counter + , minClearFailCountTime + , setClearFailCountTime -- ** Selecting peers to ask , canPeerShareRequest , getPeerSharingRequestPeers @@ -73,18 +75,22 @@ data KnownPeers peeraddr = KnownPeers { -- | All the known peers. -- - allPeers :: !(Map peeraddr KnownPeerInfo), + allPeers :: !(Map peeraddr KnownPeerInfo), -- | The subset of known peers that we would be allowed to try to -- establish a connection to now. This is because we have not connected -- with them before or because any failure backoff time has expired. -- - availableToConnect :: !(Set peeraddr), + availableToConnect :: !(Set peeraddr), -- | The subset of known peers that we cannot connect to for the moment. -- It keeps track of the next time we are allowed to make the next -- connection attempt. - nextConnectTimes :: !(OrdPSQ peeraddr Time ()) + nextConnectTimes :: !(OrdPSQ peeraddr Time ()), + + -- | The subset of known peers that are currently hot, has failed previously + -- and we should reset that fail counter some time in the future. + clearFailCountTimes :: !(OrdPSQ peeraddr Time ()) } deriving (Eq, Show) @@ -186,9 +192,10 @@ alterKnownPeerInfo (peerSharing, peerAdvertise) peerLookupResult = empty :: KnownPeers peeraddr empty = KnownPeers { - allPeers = Map.empty, - availableToConnect = Set.empty, - nextConnectTimes = PSQ.empty + allPeers = Map.empty, + availableToConnect = Set.empty, + nextConnectTimes = PSQ.empty, + clearFailCountTimes = PSQ.empty } size :: KnownPeers peeraddr -> Int @@ -259,9 +266,9 @@ alter f ks knownPeers@KnownPeers { newNextConnectTimes = Set.foldl' (flip PSQ.delete) nextConnectTimes ks in knownPeers { - allPeers = newAllPeers - , availableToConnect = newAvailableToConnect - , nextConnectTimes = newNextConnectTimes + allPeers = newAllPeers + , availableToConnect = newAvailableToConnect + , nextConnectTimes = newNextConnectTimes } delete :: Ord peeraddr @@ -272,7 +279,8 @@ delete peeraddrs knownPeers@KnownPeers { allPeers, availableToConnect, - nextConnectTimes + nextConnectTimes, + clearFailCountTimes } = knownPeers { allPeers = @@ -282,7 +290,10 @@ delete peeraddrs Set.difference availableToConnect peeraddrs, nextConnectTimes = - List.foldl' (flip PSQ.delete) nextConnectTimes peeraddrs + List.foldl' (flip PSQ.delete) nextConnectTimes peeraddrs, + + clearFailCountTimes = + List.foldl' (flip PSQ.delete) clearFailCountTimes peeraddrs } @@ -296,13 +307,15 @@ setCurrentTime :: Ord peeraddr -> KnownPeers peeraddr setCurrentTime now knownPeers@KnownPeers { availableToConnect, - nextConnectTimes + nextConnectTimes, + clearFailCountTimes } = - let knownPeers' = - knownPeers { + let knownPeers' = List.foldl' (\kp (p,_,_) -> resetFailCount p kp) + (knownPeers { availableToConnect = availableToConnect', - nextConnectTimes = nextConnectTimes' - } + nextConnectTimes = nextConnectTimes', + clearFailCountTimes = clearFailCountTimes' + }) nowClear in assert (invariant knownPeers') knownPeers' where (nowAvailableToConnect, nextConnectTimes') = @@ -312,18 +325,21 @@ setCurrentTime now knownPeers@KnownPeers { availableToConnect <> Set.fromList [ peeraddr | (peeraddr, _, _) <- nowAvailableToConnect ] + (nowClear, clearFailCountTimes') = + PSQ.atMostView now clearFailCountTimes incrementFailCount :: Ord peeraddr => peeraddr -> KnownPeers peeraddr -> (Int, KnownPeers peeraddr) -incrementFailCount peeraddr knownPeers@KnownPeers{allPeers} = +incrementFailCount peeraddr knownPeers@KnownPeers{allPeers, clearFailCountTimes} = assert (peeraddr `Map.member` allPeers) $ let allPeers' = Map.update (Just . incr) peeraddr allPeers in ( -- since the `peeraddr` is assumed to be part of `allPeers` the `Map.!` -- is safe knownPeerFailCount (allPeers' Map.! peeraddr) - , knownPeers { allPeers = allPeers' } + , knownPeers { allPeers = allPeers' + , clearFailCountTimes = PSQ.delete peeraddr clearFailCountTimes } ) where incr kpi = kpi { knownPeerFailCount = knownPeerFailCount kpi + 1 } @@ -405,30 +421,103 @@ minConnectTime KnownPeers { nextConnectTimes } fn = | otherwise -> go psq' Nothing -> Nothing +lookupConnectTime :: Ord peeraddr + => peeraddr + -> KnownPeers peeraddr + -> Maybe Time +lookupConnectTime peeraddr KnownPeers { nextConnectTimes } = + fst <$> PSQ.lookup peeraddr nextConnectTimes -setConnectTimes :: Ord peeraddr - => Map peeraddr Time --TODO: make this a single entry - -> KnownPeers peeraddr - -> KnownPeers peeraddr -setConnectTimes times - knownPeers@KnownPeers { - allPeers, - availableToConnect, - nextConnectTimes - } = - assert (all (`Map.member` allPeers) (Map.keysSet times)) $ - let knownPeers' = knownPeers { + +-- | Report failures for a set of peers. +-- +-- Returns the new KnowPeers and a set of peers that where forgotten. +-- +reportFailures :: Ord peeraddr + => Time + -- ^ current time + -> Int + -- ^ Max number of failures before a peer is forgotten + -> Set peeraddr + -- ^ peers with failure + -> (peeraddr -> Bool) + -- ^ do we have to remember the peer? + -> (peeraddr -> Int -> DiffTime) + -- ^ calculate delay from failure count + -> KnownPeers peeraddr + -> (KnownPeers peeraddr, Set peeraddr) +reportFailures now + maxFail + peers + unforgetable + calcDelay + knownPeers@KnownPeers { + allPeers, + availableToConnect, + nextConnectTimes + } = + assert (all (`Map.member` allPeers) peers) $ + + -- increment fail counts + let (peers', knownPeers') = Set.foldl' incFail (Map.empty, knownPeers) peers + -- filter out peers with too high fail count + (peers'', forgets) = Map.partitionWithKey partFn peers' + -- calculate reconnect times + times = Map.mapWithKey (\p fc -> (calcDelay p fc) `addTime` now) peers'' + -- set next connect times. + knownPeers'' = delete (Map.keysSet forgets) knownPeers' { availableToConnect = availableToConnect Set.\\ Map.keysSet times, - nextConnectTimes = Map.foldlWithKey' (\psq peeraddr time -> PSQ.insert peeraddr time () psq) nextConnectTimes times } - in assert (invariant knownPeers') knownPeers' + in assert (invariant knownPeers'') (knownPeers'', Map.keysSet forgets) + where + + incFail (m,kp) p = + let (fc, kp') = incrementFailCount p kp in + (Map.insert p fc m, kp') + + partFn p _ = + case Map.lookup p allPeers of + Just k -> if knownPeerFailCount k < maxFail || unforgetable p + then True + else False + Nothing -> False -- Impossible + +----------------------------------- +-- Tracking when we should clear the fail counter +-- +minClearFailCountTime :: Ord peeraddr + => KnownPeers peeraddr + -> Maybe Time +minClearFailCountTime KnownPeers { clearFailCountTimes } = + go clearFailCountTimes + where + go psq = case PSQ.minView psq of + Just (_, t, _, _) -> Just t + Nothing -> Nothing + + +setClearFailCountTime :: Ord peeraddr + => peeraddr + -> Time + -> KnownPeers peeraddr + -> KnownPeers peeraddr +setClearFailCountTime peer time + knownPeers@KnownPeers { + allPeers, + clearFailCountTimes + } = + assert (Map.member peer allPeers ) $ + let knownPeers' = knownPeers { + clearFailCountTimes = PSQ.insert peer time () clearFailCountTimes + } + in assert (invariant knownPeers') knownPeers' --------------------------------- -- Selecting peers to ask