1
+ {-# LANGUAGE BlockArguments #-}
1
2
{-# LANGUAGE DataKinds #-}
2
3
{-# LANGUAGE FlexibleContexts #-}
3
4
{-# LANGUAGE GADTs #-}
4
5
{-# LANGUAGE KindSignatures #-}
6
+ {-# LANGUAGE LambdaCase #-}
5
7
{-# LANGUAGE NamedFieldPuns #-}
6
8
{-# LANGUAGE RankNTypes #-}
7
9
{-# LANGUAGE ScopedTypeVariables #-}
@@ -34,7 +36,7 @@ module Ouroboros.Network.PeerSelection.PeerStateActions
34
36
import Control.Applicative (Alternative )
35
37
import Control.Concurrent.Class.MonadSTM.Strict
36
38
import Control.Exception (SomeAsyncException (.. ), assert )
37
- import Control.Monad (when , (<=<) )
39
+ import Control.Monad (join , when , (<=<) )
38
40
import Control.Monad.Class.MonadAsync
39
41
import Control.Monad.Class.MonadFork
40
42
import Control.Monad.Class.MonadThrow
@@ -622,14 +624,10 @@ withPeerStateActions PeerStateActionsArguments {
622
624
then return False
623
625
else writeTVar stateVar newState >> return True
624
626
625
- isNotCoolingOrCold :: StrictTVar m PeerStatus -> STM m Bool
626
- isNotCoolingOrCold stateVar =
627
- (> PeerCooling ) <$> readTVar stateVar
628
-
629
627
peerMonitoringLoop
630
628
:: PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b
631
629
-> m ()
632
- peerMonitoringLoop pch@ PeerConnectionHandle { pchConnectionId, pchPeerStatus, pchAppHandles } = do
630
+ peerMonitoringLoop pch@ PeerConnectionHandle { pchConnectionId, pchPeerStatus, pchAppHandles, pchPromotedHotVar } = do
633
631
-- A first-to-finish synchronisation on all the bundles; As a result
634
632
-- this is a first-to-finish synchronisation between all the
635
633
-- mini-protocols runs toward the given peer.
@@ -671,15 +669,18 @@ withPeerStateActions PeerStateActionsArguments {
671
669
672
670
Just (WithSomeProtocolTemperature (WithHot MiniProtocolError {})) -> do
673
671
-- current `pchPeerStatus` must be 'HotPeer'
674
- state <- atomically $ do
675
- peerState <- readTVar pchPeerStatus
676
- _ <- updateUnlessCoolingOrCold pchPeerStatus PeerCooling
677
- return peerState
672
+ (pchPromotedHot, state) <- atomically $ do
673
+ (,) <$> stateTVar pchPromotedHotVar (, Nothing ) <*> readTVar pchPeerStatus <* updateUnlessCoolingOrCold pchPeerStatus PeerCooling
674
+ case pchPromotedHot of
675
+ Just t1 -> do
676
+ dt <- diffTime <$> getMonotonicTime <*> pure t1
677
+ traceWith spsTracer (PeerHotDuration pchConnectionId dt)
678
+ Nothing -> pure ()
678
679
case state of
679
680
PeerCold -> return ()
680
681
PeerCooling -> return ()
681
- hotOrWarm -> assert (hotOrWarm == PeerHot ) $
682
- traceWith spsTracer (PeerStatusChanged (HotToCooling pchConnectionId))
682
+ hotOrInDemotion -> assert (hotOrInDemotion >= PeerWarmWait ) $
683
+ traceWith spsTracer (PeerStatusChanged (HotToCooling pchConnectionId))
683
684
peerMonitoringLoop pch
684
685
Just (WithSomeProtocolTemperature (WithWarm MiniProtocolError {})) -> do
685
686
-- current `pchPeerStatus` must be 'WarmPeer'
@@ -689,13 +690,21 @@ withPeerStateActions PeerStateActionsArguments {
689
690
Just (WithSomeProtocolTemperature (WithEstablished MiniProtocolError {})) -> do
690
691
-- update 'pchPeerStatus' and log (as the two other transition to
691
692
-- cold state.
692
- state <- atomically $ do
693
- peerState <- readTVar pchPeerStatus
694
- _ <- updateUnlessCoolingOrCold pchPeerStatus PeerCooling
695
- pure peerState
693
+ (state, pchPromotedHot) <- atomically $ do
694
+ state <- readTVar pchPeerStatus <* updateUnlessCoolingOrCold pchPeerStatus PeerCooling
695
+ case state of
696
+ PeerHot -> (state,) <$> stateTVar pchPromotedHotVar (,Nothing )
697
+ _otherwise -> return (state, Nothing )
698
+ case pchPromotedHot of
699
+ Just t1 -> do
700
+ dt <- diffTime <$> getMonotonicTime <*> pure t1
701
+ traceWith spsTracer (PeerHotDuration pchConnectionId dt)
702
+ Nothing -> pure ()
703
+
696
704
case state of
697
- PeerCold -> return ()
698
- PeerCooling -> return ()
705
+ PeerCold -> return ()
706
+ PeerCooling -> return ()
707
+ PeerWarmWait -> return () -- ^ the relevant trace will be performed by deactivatePeerConnection
699
708
PeerWarm -> traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId))
700
709
PeerHot -> traceWith spsTracer (PeerStatusChanged (HotToCooling pchConnectionId))
701
710
peerMonitoringLoop pch
@@ -919,35 +928,22 @@ withPeerStateActions PeerStateActionsArguments {
919
928
pchPeerStatus,
920
929
pchAppHandles,
921
930
pchPromotedHotVar } = do
922
- -- quiesce warm peer protocols and set hot ones in 'Continue' mode.
923
- wasWarm <- atomically $ do
924
- -- if the peer is cold we can't activate it.
925
- notCold <- isNotCoolingOrCold pchPeerStatus
926
- when notCold $ do
927
- writeTVar (getControlVar SingHot pchAppHandles) Continue
928
- writeTVar (getControlVar SingWarm pchAppHandles) Quiesce
929
- return notCold
930
- when (not wasWarm) $ do
931
- traceWith spsTracer (PeerStatusChangeFailure
932
- (WarmToHot pchConnectionId)
933
- ActiveCold )
934
- throwIO $ ColdActivationException pchConnectionId
935
-
936
- -- start hot peer protocols
937
- startProtocols SingHot isBigLedgerPeer connHandle
938
-
939
- -- Only set the status to PeerHot if the peer isn't PeerCold.
940
- -- This can happen asynchronously between the check above and now.
941
- wasWarm' <- atomically $ updateUnlessCoolingOrCold pchPeerStatus PeerHot
942
- if wasWarm'
943
- then do
944
- atomically . writeTVar pchPromotedHotVar . (Just $! ) =<< getMonotonicTime
945
- traceWith spsTracer (PeerStatusChanged (WarmToHot pchConnectionId))
946
- else do
947
- traceWith spsTracer (PeerStatusChangeFailure
948
- (WarmToHot pchConnectionId)
949
- ActiveCold )
950
- throwIO $ ColdActivationException pchConnectionId
931
+ join . atomically $ do
932
+ peerStatus <- readTVar pchPeerStatus
933
+ case peerStatus of
934
+ PeerWarm -> do
935
+ writeTVar (getControlVar SingHot pchAppHandles) Continue
936
+ writeTVar (getControlVar SingWarm pchAppHandles) Quiesce
937
+ writeTVar pchPeerStatus PeerHot
938
+ return $ do
939
+ startProtocols SingHot isBigLedgerPeer connHandle
940
+ atomically . writeTVar pchPromotedHotVar . (Just $! ) =<< getMonotonicTime
941
+ traceWith spsTracer (PeerStatusChanged (WarmToHot pchConnectionId))
942
+ _otherwise -> return $ do
943
+ traceWith spsTracer (PeerStatusChangeFailure
944
+ (WarmToHot pchConnectionId)
945
+ (ActiveCold peerStatus))
946
+ throwIO $ ColdActivationException pchConnectionId
951
947
952
948
953
949
-- Take a hot peer and demote it to a warm one.
@@ -960,75 +956,65 @@ withPeerStateActions PeerStateActionsArguments {
960
956
pchAppHandles,
961
957
pchPromotedHotVar
962
958
} = do
963
- wasCold <- atomically $ do
964
- notCold <- isNotCoolingOrCold pchPeerStatus
965
- when notCold $ do
966
- writeTVar (getControlVar SingHot pchAppHandles) Terminate
967
- writeTVar (getControlVar SingWarm pchAppHandles) Continue
968
- return (not notCold)
969
- when wasCold $ do
970
- -- The governor attempted to demote an already cold peer.
971
- traceWith spsTracer (PeerStatusChangeFailure
972
- (HotToWarm pchConnectionId)
973
- ActiveCold )
974
- throwIO $ ColdDeactivationException pchConnectionId
975
-
976
-
977
- -- Hot protocols should stop within 'spsDeactivateTimeout'.
978
- res <-
979
- timeout spsDeactivateTimeout
980
- (atomically $ awaitAllResults SingHot pchAppHandles)
981
-
982
- pchPromotedHot <- atomically . stateTVar pchPromotedHotVar $ (, Nothing )
983
- case pchPromotedHot of
984
- Just t1 -> do
985
- dt <- diffTime <$> getMonotonicTime <*> pure t1
986
- traceWith spsTracer (PeerHotDuration pchConnectionId dt)
987
- Nothing -> pure ()
988
-
989
- case res of
990
- Nothing -> do
991
- Mux. stop pchMux
992
- atomically (writeTVar pchPeerStatus PeerCooling )
993
- traceWith spsTracer (PeerStatusChangeFailure
994
- (HotToCooling pchConnectionId)
995
- TimeoutError )
996
- throwIO (DeactivationTimeout pchConnectionId)
997
-
998
- -- some of the hot mini-protocols errored
999
- Just (SomeErrored errs) -> do
1000
- -- we don't need to notify the connection manager, we can instead
1001
- -- relay on mux property: if any of the mini-protocols errors, mux
1002
- -- throws an exception as well.
1003
- atomically (writeTVar pchPeerStatus PeerCooling )
1004
- traceWith spsTracer (PeerStatusChangeFailure
1005
- (HotToCooling pchConnectionId)
1006
- (ApplicationFailure errs))
1007
- throwIO (MiniProtocolExceptions errs)
1008
-
1009
- -- all hot mini-protocols succeeded
1010
- Just (AllSucceeded results) -> do
1011
- -- we don't notify the connection manager as this connection is still
1012
- -- useful to the outbound governor (warm peer).
1013
- wasWarm <- atomically $ do
1014
- -- Only set the status to PeerWarm if the peer isn't cold
1015
- -- (can happen asynchronously).
1016
- notCold <- updateUnlessCoolingOrCold pchPeerStatus PeerWarm
1017
- when notCold $ do
1018
- -- We need to update hot protocols to indicate that they are not
1019
- -- running. Preserve the results returned by their previous
1020
- -- execution.
1021
- modifyTVar (getMiniProtocolsVar SingHot pchAppHandles)
1022
- (\ _ -> Map. map (pure . NotRunning . Right ) results)
1023
- return notCold
1024
-
1025
- if wasWarm
1026
- then traceWith spsTracer (PeerStatusChanged (HotToWarm pchConnectionId))
1027
- else do
1028
- traceWith spsTracer (PeerStatusChangeFailure
1029
- (HotToWarm pchConnectionId)
1030
- ActiveCold )
1031
- throwIO $ ColdDeactivationException pchConnectionId
959
+ join . atomically $ do
960
+ peerStatus <- readTVar pchPeerStatus
961
+ case peerStatus of
962
+ PeerHot -> do
963
+ writeTVar (getControlVar SingHot pchAppHandles) Terminate
964
+ writeTVar (getControlVar SingWarm pchAppHandles) Continue
965
+ pchPromotedHot <- stateTVar pchPromotedHotVar (, Nothing )
966
+ writeTVar pchPeerStatus PeerWarmWait
967
+ return $ do
968
+ -- Hot protocols should stop within 'spsDeactivateTimeout'.
969
+ res <- timeout spsDeactivateTimeout
970
+ $ atomically $ do
971
+ res <- awaitAllResults SingHot pchAppHandles
972
+ res <$ case res of
973
+ AllSucceeded results -> do
974
+ modifyTVar (getMiniProtocolsVar SingHot pchAppHandles)
975
+ (\ _ -> Map. map (pure . NotRunning . Right ) results)
976
+ writeTVar pchPeerStatus PeerWarm
977
+ SomeErrored _ ->
978
+ void $ updateUnlessCoolingOrCold pchPeerStatus PeerCooling
979
+
980
+ case pchPromotedHot of
981
+ Just t1 -> do
982
+ dt <- diffTime <$> getMonotonicTime <*> pure t1
983
+ traceWith spsTracer (PeerHotDuration pchConnectionId dt)
984
+ Nothing -> pure ()
985
+
986
+ case res of
987
+ Nothing -> do
988
+ Mux. stop pchMux
989
+ void . atomically $ updateUnlessCoolingOrCold pchPeerStatus PeerCooling
990
+ traceWith spsTracer (PeerStatusChangeFailure
991
+ (HotToCooling pchConnectionId)
992
+ TimeoutError )
993
+ throwIO (DeactivationTimeout pchConnectionId)
994
+ Just (SomeErrored errs) -> do
995
+ traceWith spsTracer (PeerStatusChangeFailure
996
+ (HotToCooling pchConnectionId)
997
+ (ApplicationFailure errs))
998
+ throwIO (MiniProtocolExceptions errs)
999
+ Just (AllSucceeded {}) -> do
1000
+ traceWith spsTracer (PeerStatusChanged (HotToWarm pchConnectionId))
1001
+
1002
+ -- either the peer monitoring loop or peer selection demotion lost the race
1003
+ PeerWarmWait -> do
1004
+ peerStatus' <- readTVar pchPeerStatus
1005
+ check (peerStatus' /= PeerWarmWait )
1006
+ return $ do
1007
+ case peerStatus' of
1008
+ PeerWarm -> return () -- ^ successful demotion by the winner
1009
+ -- in this case the race winner traces the error
1010
+ _otherwise -> throwIO $ ColdDeactivationException pchConnectionId
1011
+
1012
+ _otherwise ->
1013
+ return $ do
1014
+ traceWith spsTracer (PeerStatusChangeFailure
1015
+ (HotToWarm pchConnectionId)
1016
+ (ActiveCold peerStatus))
1017
+ throwIO $ ColdDeactivationException pchConnectionId
1032
1018
1033
1019
1034
1020
closePeerConnection :: PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b
@@ -1041,64 +1027,56 @@ withPeerStateActions PeerStateActionsArguments {
1041
1027
pchMux,
1042
1028
pchPromotedHotVar
1043
1029
} = do
1044
- atomically $ do
1030
+ peerStatus <- atomically $ do
1045
1031
writeTVar (getControlVar SingWarm pchAppHandles) Terminate
1046
1032
writeTVar (getControlVar SingEstablished pchAppHandles) Terminate
1047
1033
writeTVar (getControlVar SingHot pchAppHandles) Terminate
1034
+ readTVar pchPeerStatus <* updateUnlessCoolingOrCold pchPeerStatus PeerCooling
1035
+
1036
+ case peerStatus of
1037
+ ps@ PeerCooling -> return ps
1038
+ ps@ PeerCold -> return ps
1039
+ _otherwise -> do
1040
+ res <-
1041
+ timeout spsCloseConnectionTimeout
1042
+ (atomically $
1043
+ (\ a b c -> a <> b <> c)
1044
+ -- note: we use last to finish on hot, warm and
1045
+ -- established mini-protocols since 'closePeerConnection'
1046
+ -- is also used by asynchronous demotions, not just
1047
+ -- /warm → cold/ transition.
1048
+ <$> awaitAllResults SingHot pchAppHandles
1049
+ <*> awaitAllResults SingWarm pchAppHandles
1050
+ <*> awaitAllResults SingEstablished pchAppHandles)
1051
+
1052
+ PeerCooling <$ case res of
1053
+ Nothing -> do
1054
+ -- timeout fired
1055
+ Mux. stop pchMux
1056
+ traceWith spsTracer (PeerStatusChangeFailure
1057
+ (WarmToCooling pchConnectionId)
1058
+ TimeoutError )
1059
+
1060
+ Just (SomeErrored errs) -> do
1061
+ -- some mini-protocol errored
1062
+ --
1063
+ -- we don't need to notify the connection manager, we can instead
1064
+ -- rely on mux property: if any of the mini-protocols errors, mux
1065
+ -- throws an exception as well.
1066
+ traceWith spsTracer (PeerStatusChangeFailure
1067
+ (WarmToCooling pchConnectionId)
1068
+ (ApplicationFailure errs))
1069
+ throwIO (MiniProtocolExceptions errs)
1070
+
1071
+ Just AllSucceeded {} -> do
1072
+ -- all mini-protocols terminated cleanly
1073
+ --
1074
+ -- 'unregisterOutboundConnection' could only fail to demote the peer if
1075
+ -- connection manager would simultaneously promote it, but this is not
1076
+ -- possible.
1077
+ _ <- releaseOutboundConnection spsConnectionManager pchConnectionId
1078
+ traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId))
1048
1079
1049
- res <-
1050
- timeout spsCloseConnectionTimeout
1051
- (atomically $
1052
- (\ a b c -> a <> b <> c)
1053
- -- note: we use last to finish on hot, warm and
1054
- -- established mini-protocols since 'closePeerConnection'
1055
- -- is also used by asynchronous demotions, not just
1056
- -- /warm → cold/ transition.
1057
- <$> awaitAllResults SingHot pchAppHandles
1058
- <*> awaitAllResults SingWarm pchAppHandles
1059
- <*> awaitAllResults SingEstablished pchAppHandles)
1060
-
1061
-
1062
- pchPromotedHot <- atomically . stateTVar pchPromotedHotVar $ (, Nothing )
1063
- case pchPromotedHot of
1064
- Just t1 -> do
1065
- dt <- diffTime <$> getMonotonicTime <*> pure t1
1066
- traceWith spsTracer (PeerHotDuration pchConnectionId dt)
1067
- Nothing -> pure ()
1068
-
1069
- wasWarm <- atomically (updateUnlessCoolingOrCold pchPeerStatus PeerCooling )
1070
- case res of
1071
- Nothing -> do
1072
- -- timeout fired
1073
- Mux. stop pchMux
1074
- when wasWarm $
1075
- traceWith spsTracer (PeerStatusChangeFailure
1076
- (WarmToCooling pchConnectionId)
1077
- TimeoutError )
1078
- readTVarIO pchPeerStatus
1079
-
1080
- Just (SomeErrored errs) -> do
1081
- -- some mini-protocol errored
1082
- --
1083
- -- we don't need to notify the connection manager, we can instead
1084
- -- rely on mux property: if any of the mini-protocols errors, mux
1085
- -- throws an exception as well.
1086
- when wasWarm $
1087
- traceWith spsTracer (PeerStatusChangeFailure
1088
- (WarmToCooling pchConnectionId)
1089
- (ApplicationFailure errs))
1090
- throwIO (MiniProtocolExceptions errs)
1091
-
1092
- Just AllSucceeded {} -> do
1093
- -- all mini-protocols terminated cleanly
1094
- --
1095
- -- 'unregisterOutboundConnection' could only fail to demote the peer if
1096
- -- connection manager would simultaneously promote it, but this is not
1097
- -- possible.
1098
- when wasWarm $ do
1099
- _ <- releaseOutboundConnection spsConnectionManager pchConnectionId
1100
- traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId))
1101
- readTVarIO pchPeerStatus
1102
1080
1103
1081
--
1104
1082
-- Utilities
@@ -1203,7 +1181,7 @@ data FailureType versionNumber =
1203
1181
| HandleFailure ! SomeException
1204
1182
| MuxStoppedFailure
1205
1183
| TimeoutError
1206
- | ActiveCold
1184
+ | ActiveCold ! PeerStatus
1207
1185
| ApplicationFailure ! [MiniProtocolException ]
1208
1186
deriving Show
1209
1187
0 commit comments