@@ -34,7 +34,7 @@ module Ouroboros.Network.PeerSelection.PeerStateActions
34
34
import Control.Applicative (Alternative )
35
35
import Control.Concurrent.Class.MonadSTM.Strict
36
36
import Control.Exception (SomeAsyncException (.. ), assert )
37
- import Control.Monad (when , (<=<) )
37
+ import Control.Monad (join , when , (<=<) )
38
38
import Control.Monad.Class.MonadAsync
39
39
import Control.Monad.Class.MonadFork
40
40
import Control.Monad.Class.MonadThrow
@@ -622,10 +622,6 @@ withPeerStateActions PeerStateActionsArguments {
622
622
then return False
623
623
else writeTVar stateVar newState >> return True
624
624
625
- isNotCoolingOrCold :: StrictTVar m PeerStatus -> STM m Bool
626
- isNotCoolingOrCold stateVar =
627
- (> PeerCooling ) <$> readTVar stateVar
628
-
629
625
peerMonitoringLoop
630
626
:: PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b
631
627
-> m ()
@@ -672,9 +668,7 @@ withPeerStateActions PeerStateActionsArguments {
672
668
Just (WithSomeProtocolTemperature (WithHot MiniProtocolError {})) -> do
673
669
-- current `pchPeerStatus` must be 'HotPeer'
674
670
state <- atomically $ do
675
- peerState <- readTVar pchPeerStatus
676
- _ <- updateUnlessCoolingOrCold pchPeerStatus PeerCooling
677
- return peerState
671
+ readTVar pchPeerStatus <* updateUnlessCoolingOrCold pchPeerStatus PeerCooling
678
672
case state of
679
673
PeerCold -> return ()
680
674
PeerCooling -> return ()
@@ -690,12 +684,11 @@ withPeerStateActions PeerStateActionsArguments {
690
684
-- update 'pchPeerStatus' and log (as the two other transition to
691
685
-- cold state.
692
686
state <- atomically $ do
693
- peerState <- readTVar pchPeerStatus
694
- _ <- updateUnlessCoolingOrCold pchPeerStatus PeerCooling
695
- pure peerState
687
+ readTVar pchPeerStatus <* updateUnlessCoolingOrCold pchPeerStatus PeerCooling
696
688
case state of
697
- PeerCold -> return ()
698
- PeerCooling -> return ()
689
+ PeerCold -> return ()
690
+ PeerCooling -> return ()
691
+ PeerWarmWait -> return () -- ^ the relevant trace will be performed by deactivatePeerConnection
699
692
PeerWarm -> traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId))
700
693
PeerHot -> traceWith spsTracer (PeerStatusChanged (HotToCooling pchConnectionId))
701
694
peerMonitoringLoop pch
@@ -919,35 +912,22 @@ withPeerStateActions PeerStateActionsArguments {
919
912
pchPeerStatus,
920
913
pchAppHandles,
921
914
pchPromotedHotVar } = do
922
- -- quiesce warm peer protocols and set hot ones in 'Continue' mode.
923
- wasWarm <- atomically $ do
924
- -- if the peer is cold we can't activate it.
925
- notCold <- isNotCoolingOrCold pchPeerStatus
926
- when notCold $ do
927
- writeTVar (getControlVar SingHot pchAppHandles) Continue
928
- writeTVar (getControlVar SingWarm pchAppHandles) Quiesce
929
- return notCold
930
- when (not wasWarm) $ do
931
- traceWith spsTracer (PeerStatusChangeFailure
932
- (WarmToHot pchConnectionId)
933
- ActiveCold )
934
- throwIO $ ColdActivationException pchConnectionId
935
-
936
- -- start hot peer protocols
937
- startProtocols SingHot isBigLedgerPeer connHandle
938
-
939
- -- Only set the status to PeerHot if the peer isn't PeerCold.
940
- -- This can happen asynchronously between the check above and now.
941
- wasWarm' <- atomically $ updateUnlessCoolingOrCold pchPeerStatus PeerHot
942
- if wasWarm'
943
- then do
944
- atomically . writeTVar pchPromotedHotVar . (Just $! ) =<< getMonotonicTime
945
- traceWith spsTracer (PeerStatusChanged (WarmToHot pchConnectionId))
946
- else do
947
- traceWith spsTracer (PeerStatusChangeFailure
948
- (WarmToHot pchConnectionId)
949
- ActiveCold )
950
- throwIO $ ColdActivationException pchConnectionId
915
+ join . atomically $ do
916
+ peerStatus <- readTVar pchPeerStatus
917
+ case peerStatus of
918
+ PeerWarm -> do
919
+ writeTVar (getControlVar SingHot pchAppHandles) Continue
920
+ writeTVar (getControlVar SingWarm pchAppHandles) Quiesce
921
+ writeTVar pchPeerStatus PeerHot
922
+ return $ do
923
+ startProtocols SingHot isBigLedgerPeer connHandle
924
+ atomically . writeTVar pchPromotedHotVar . (Just $! ) =<< getMonotonicTime
925
+ traceWith spsTracer (PeerStatusChanged (WarmToHot pchConnectionId))
926
+ _otherwise -> return $ do
927
+ traceWith spsTracer (PeerStatusChangeFailure
928
+ (WarmToHot pchConnectionId)
929
+ (ActiveCold peerStatus))
930
+ throwIO $ ColdActivationException pchConnectionId
951
931
952
932
953
933
-- Take a hot peer and demote it to a warm one.
@@ -960,75 +940,64 @@ withPeerStateActions PeerStateActionsArguments {
960
940
pchAppHandles,
961
941
pchPromotedHotVar
962
942
} = do
963
- wasCold <- atomically $ do
964
- notCold <- isNotCoolingOrCold pchPeerStatus
965
- when notCold $ do
966
- writeTVar (getControlVar SingHot pchAppHandles) Terminate
967
- writeTVar (getControlVar SingWarm pchAppHandles) Continue
968
- return (not notCold)
969
- when wasCold $ do
970
- -- The governor attempted to demote an already cold peer.
971
- traceWith spsTracer (PeerStatusChangeFailure
972
- (HotToWarm pchConnectionId)
973
- ActiveCold )
974
- throwIO $ ColdDeactivationException pchConnectionId
975
-
976
-
977
- -- Hot protocols should stop within 'spsDeactivateTimeout'.
978
- res <-
979
- timeout spsDeactivateTimeout
980
- (atomically $ awaitAllResults SingHot pchAppHandles)
981
-
982
- pchPromotedHot <- atomically . stateTVar pchPromotedHotVar $ (, Nothing )
983
- case pchPromotedHot of
984
- Just t1 -> do
985
- dt <- diffTime <$> getMonotonicTime <*> pure t1
986
- traceWith spsTracer (PeerHotDuration pchConnectionId dt)
987
- Nothing -> pure ()
988
-
989
- case res of
990
- Nothing -> do
991
- Mux. stop pchMux
992
- atomically (writeTVar pchPeerStatus PeerCooling )
993
- traceWith spsTracer (PeerStatusChangeFailure
994
- (HotToCooling pchConnectionId)
995
- TimeoutError )
996
- throwIO (DeactivationTimeout pchConnectionId)
997
-
998
- -- some of the hot mini-protocols errored
999
- Just (SomeErrored errs) -> do
1000
- -- we don't need to notify the connection manager, we can instead
1001
- -- relay on mux property: if any of the mini-protocols errors, mux
1002
- -- throws an exception as well.
1003
- atomically (writeTVar pchPeerStatus PeerCooling )
1004
- traceWith spsTracer (PeerStatusChangeFailure
1005
- (HotToCooling pchConnectionId)
1006
- (ApplicationFailure errs))
1007
- throwIO (MiniProtocolExceptions errs)
1008
-
1009
- -- all hot mini-protocols succeeded
1010
- Just (AllSucceeded results) -> do
1011
- -- we don't notify the connection manager as this connection is still
1012
- -- useful to the outbound governor (warm peer).
1013
- wasWarm <- atomically $ do
1014
- -- Only set the status to PeerWarm if the peer isn't cold
1015
- -- (can happen asynchronously).
1016
- notCold <- updateUnlessCoolingOrCold pchPeerStatus PeerWarm
1017
- when notCold $ do
1018
- -- We need to update hot protocols to indicate that they are not
1019
- -- running. Preserve the results returned by their previous
1020
- -- execution.
1021
- modifyTVar (getMiniProtocolsVar SingHot pchAppHandles)
1022
- (\ _ -> Map. map (pure . NotRunning . Right ) results)
1023
- return notCold
1024
-
1025
- if wasWarm
1026
- then traceWith spsTracer (PeerStatusChanged (HotToWarm pchConnectionId))
1027
- else do
1028
- traceWith spsTracer (PeerStatusChangeFailure
1029
- (HotToWarm pchConnectionId)
1030
- ActiveCold )
1031
- throwIO $ ColdDeactivationException pchConnectionId
943
+ join . atomically $ do
944
+ peerStatus <- readTVar pchPeerStatus
945
+ case peerStatus of
946
+ PeerHot -> do
947
+ writeTVar (getControlVar SingHot pchAppHandles) Terminate
948
+ writeTVar (getControlVar SingWarm pchAppHandles) Continue
949
+ writeTVar pchPeerStatus PeerWarmWait
950
+ pchPromotedHot <- stateTVar pchPromotedHotVar (, Nothing )
951
+ return $ do
952
+ -- Hot protocols should stop within 'spsDeactivateTimeout'.
953
+ res <- timeout spsDeactivateTimeout
954
+ $ atomically $ do
955
+ res <- awaitAllResults SingHot pchAppHandles
956
+ res <$ case res of
957
+ AllSucceeded results -> do
958
+ modifyTVar (getMiniProtocolsVar SingHot pchAppHandles)
959
+ (\ _ -> Map. map (pure . NotRunning . Right ) results)
960
+ writeTVar pchPeerStatus PeerWarm
961
+ SomeErrored _ -> writeTVar pchPeerStatus PeerCooling
962
+
963
+ case pchPromotedHot of
964
+ Just t1 -> do
965
+ dt <- diffTime <$> getMonotonicTime <*> pure t1
966
+ traceWith spsTracer (PeerHotDuration pchConnectionId dt)
967
+ Nothing -> pure ()
968
+
969
+ case res of
970
+ Nothing -> do
971
+ Mux. stop pchMux
972
+ atomically (writeTVar pchPeerStatus PeerCooling )
973
+ traceWith spsTracer (PeerStatusChangeFailure
974
+ (HotToCooling pchConnectionId)
975
+ TimeoutError )
976
+ throwIO (DeactivationTimeout pchConnectionId)
977
+ Just (SomeErrored errs) -> do
978
+ traceWith spsTracer (PeerStatusChangeFailure
979
+ (HotToCooling pchConnectionId)
980
+ (ApplicationFailure errs))
981
+ throwIO (MiniProtocolExceptions errs)
982
+ Just (AllSucceeded {}) -> do
983
+ traceWith spsTracer (PeerStatusChanged (HotToWarm pchConnectionId))
984
+
985
+ -- either the peer monitoring loop or peer selection demotion lost the race
986
+ PeerWarmWait -> do
987
+ peerStatus' <- readTVar pchPeerStatus
988
+ check (peerStatus' /= PeerWarmWait )
989
+ return $ do
990
+ case peerStatus' of
991
+ PeerWarm -> return () -- ^ successful demotion by the winner
992
+ -- in this case the race winner traces the error
993
+ _otherwise -> throwIO $ ColdDeactivationException pchConnectionId
994
+
995
+ _otherwise ->
996
+ return $ do
997
+ traceWith spsTracer (PeerStatusChangeFailure
998
+ (HotToWarm pchConnectionId)
999
+ (ActiveCold peerStatus))
1000
+ throwIO $ ColdDeactivationException pchConnectionId
1032
1001
1033
1002
1034
1003
closePeerConnection :: PeerConnectionHandle muxMode responderCtx peerAddr versionData ByteString m a b
@@ -1041,64 +1010,62 @@ withPeerStateActions PeerStateActionsArguments {
1041
1010
pchMux,
1042
1011
pchPromotedHotVar
1043
1012
} = do
1044
- atomically $ do
1013
+ (peerStatus, pchPromotedHot) <- atomically $ do
1045
1014
writeTVar (getControlVar SingWarm pchAppHandles) Terminate
1046
1015
writeTVar (getControlVar SingEstablished pchAppHandles) Terminate
1047
1016
writeTVar (getControlVar SingHot pchAppHandles) Terminate
1017
+ (,) <$> stateTVar pchPeerStatus (, PeerCooling ) <*> stateTVar pchPromotedHotVar (, Nothing )
1018
+
1019
+ case peerStatus of
1020
+ ps@ PeerCooling -> return ps
1021
+ ps@ PeerCold -> return ps
1022
+ _otherwise -> do
1023
+ res <-
1024
+ timeout spsCloseConnectionTimeout
1025
+ (atomically $
1026
+ (\ a b c -> a <> b <> c)
1027
+ -- note: we use last to finish on hot, warm and
1028
+ -- established mini-protocols since 'closePeerConnection'
1029
+ -- is also used by asynchronous demotions, not just
1030
+ -- /warm → cold/ transition.
1031
+ <$> awaitAllResults SingHot pchAppHandles
1032
+ <*> awaitAllResults SingWarm pchAppHandles
1033
+ <*> awaitAllResults SingEstablished pchAppHandles)
1034
+
1035
+ case pchPromotedHot of
1036
+ Just t1 -> do
1037
+ dt <- diffTime <$> getMonotonicTime <*> pure t1
1038
+ traceWith spsTracer (PeerHotDuration pchConnectionId dt)
1039
+ Nothing -> pure ()
1040
+
1041
+ PeerCooling <$ case res of
1042
+ Nothing -> do
1043
+ -- timeout fired
1044
+ Mux. stop pchMux
1045
+ traceWith spsTracer (PeerStatusChangeFailure
1046
+ (WarmToCooling pchConnectionId)
1047
+ TimeoutError )
1048
+
1049
+ Just (SomeErrored errs) -> do
1050
+ -- some mini-protocol errored
1051
+ --
1052
+ -- we don't need to notify the connection manager, we can instead
1053
+ -- rely on mux property: if any of the mini-protocols errors, mux
1054
+ -- throws an exception as well.
1055
+ traceWith spsTracer (PeerStatusChangeFailure
1056
+ (WarmToCooling pchConnectionId)
1057
+ (ApplicationFailure errs))
1058
+ throwIO (MiniProtocolExceptions errs)
1059
+
1060
+ Just AllSucceeded {} -> do
1061
+ -- all mini-protocols terminated cleanly
1062
+ --
1063
+ -- 'unregisterOutboundConnection' could only fail to demote the peer if
1064
+ -- connection manager would simultaneously promote it, but this is not
1065
+ -- possible.
1066
+ _ <- releaseOutboundConnection spsConnectionManager pchConnectionId
1067
+ traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId))
1048
1068
1049
- res <-
1050
- timeout spsCloseConnectionTimeout
1051
- (atomically $
1052
- (\ a b c -> a <> b <> c)
1053
- -- note: we use last to finish on hot, warm and
1054
- -- established mini-protocols since 'closePeerConnection'
1055
- -- is also used by asynchronous demotions, not just
1056
- -- /warm → cold/ transition.
1057
- <$> awaitAllResults SingHot pchAppHandles
1058
- <*> awaitAllResults SingWarm pchAppHandles
1059
- <*> awaitAllResults SingEstablished pchAppHandles)
1060
-
1061
-
1062
- pchPromotedHot <- atomically . stateTVar pchPromotedHotVar $ (, Nothing )
1063
- case pchPromotedHot of
1064
- Just t1 -> do
1065
- dt <- diffTime <$> getMonotonicTime <*> pure t1
1066
- traceWith spsTracer (PeerHotDuration pchConnectionId dt)
1067
- Nothing -> pure ()
1068
-
1069
- wasWarm <- atomically (updateUnlessCoolingOrCold pchPeerStatus PeerCooling )
1070
- case res of
1071
- Nothing -> do
1072
- -- timeout fired
1073
- Mux. stop pchMux
1074
- when wasWarm $
1075
- traceWith spsTracer (PeerStatusChangeFailure
1076
- (WarmToCooling pchConnectionId)
1077
- TimeoutError )
1078
- readTVarIO pchPeerStatus
1079
-
1080
- Just (SomeErrored errs) -> do
1081
- -- some mini-protocol errored
1082
- --
1083
- -- we don't need to notify the connection manager, we can instead
1084
- -- rely on mux property: if any of the mini-protocols errors, mux
1085
- -- throws an exception as well.
1086
- when wasWarm $
1087
- traceWith spsTracer (PeerStatusChangeFailure
1088
- (WarmToCooling pchConnectionId)
1089
- (ApplicationFailure errs))
1090
- throwIO (MiniProtocolExceptions errs)
1091
-
1092
- Just AllSucceeded {} -> do
1093
- -- all mini-protocols terminated cleanly
1094
- --
1095
- -- 'unregisterOutboundConnection' could only fail to demote the peer if
1096
- -- connection manager would simultaneously promote it, but this is not
1097
- -- possible.
1098
- when wasWarm $ do
1099
- _ <- releaseOutboundConnection spsConnectionManager pchConnectionId
1100
- traceWith spsTracer (PeerStatusChanged (WarmToCooling pchConnectionId))
1101
- readTVarIO pchPeerStatus
1102
1069
1103
1070
--
1104
1071
-- Utilities
@@ -1203,7 +1170,7 @@ data FailureType versionNumber =
1203
1170
| HandleFailure ! SomeException
1204
1171
| MuxStoppedFailure
1205
1172
| TimeoutError
1206
- | ActiveCold
1173
+ | ActiveCold ! PeerStatus
1207
1174
| ApplicationFailure ! [MiniProtocolException ]
1208
1175
deriving Show
1209
1176
0 commit comments