Skip to content

Make egress poll interval configurable #5113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions decentralized-message-queue/src/DMQ/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ mkDiffusionConfiguration
, Diffusion.dcBulkChurnInterval = dmqcChurnInterval
, Diffusion.dcMuxForkPolicy = Diffusion.noBindForkPolicy -- TODO: Make option flag for responderForkPolicy
, Diffusion.dcLocalMuxForkPolicy = Diffusion.noBindForkPolicy -- TODO: Make option flag for responderForkPolicy
, Diffusion.dcEgressPollInterval = 0 -- TODO: Make option flag for egress poll interval
}
where
hints = defaultHints {
Expand Down
28 changes: 18 additions & 10 deletions network-mux/bench/socket_read_write/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ startServerMany sndSizeV ad = forever $ do
-- It will send streams of data over the 41 and 42 miniprotocol.
-- Multiplexing is done with a separate thread running
-- the Egress.muxer function.
startServerEgresss :: StrictTMVar IO Int64 -> Socket -> IO ()
startServerEgresss sndSizeV ad = forever $ do
startServerEgresss :: DiffTime -> StrictTMVar IO Int64 -> Socket -> IO ()
startServerEgresss pollInterval sndSizeV ad = forever $ do
(sd, _) <- Socket.accept ad
withReadBufferIO (\buffer -> do
bearer <-getBearer makeSocketBearer sduTimeout activeTracer sd buffer
bearer <- getBearer (makeSocketBearer' pollInterval) sduTimeout activeTracer sd buffer
sndSize <- atomically $ takeTMVar sndSizeV
eq <- atomically $ newTBQueue 100
w42 <- newTVarIO BL.empty
Expand Down Expand Up @@ -273,25 +273,28 @@ main = do
ad1 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
ad2 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
ad3 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
ad4 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol

return (ad1, ad2, ad3)
return (ad1, ad2, ad3, ad4)
)
(\(ad1, ad2, ad3) -> do
(\(ad1, ad2, ad3, ad4) -> do
Socket.close ad1
Socket.close ad2
Socket.close ad3
Socket.close ad4
)
(\(ad1, ad2, ad3) -> do
(\(ad1, ad2, ad3, ad4) -> do
sndSizeV <- newEmptyTMVarIO
sndSizeMV <- newEmptyTMVarIO
sndSizeEV <- newEmptyTMVarIO
addr <- setupServer ad1
addrM <- setupServer ad2
addrE <- setupServer ad3
addrF <- setupServer ad4

withAsync (startServer sndSizeV ad1) $ \said -> do
withAsync (startServerMany sndSizeMV ad2) $ \saidM -> do
withAsync (startServerEgresss sndSizeEV ad3) $ \saidE -> do
withAsync (startServerEgresss 0.001 sndSizeEV ad3) $ \saidE -> withAsync (startServerEgresss 0 sndSizeEV ad4) $ \saidF -> do
defaultMain [
-- Suggested Max SDU size for Socket bearer
bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr
Expand All @@ -305,9 +308,13 @@ main = do
, bench "Read/Write-Many Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 914 addrM
, bench "Read/Write-Many Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 10 addrM

-- Use standard muxer and demuxer
, bench "Read/Write Mux Benchmark 800+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE
-- Use standard muxer and demuxer, 1ms poll
, bench "Read/Write Mux Benchmark 800+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE

-- Use standard muxer and demuxer, 0ms poll
, bench "Read/Write Mux Benchmark 800+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrF
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrF

-- Use standard demuxer
, bench "Read/Write Demuxer Queuing Benchmark 10 byte SDUs" $ nfIO $ readDemuxerQueueBenchmark sndSizeV 10 addr
Expand All @@ -316,4 +323,5 @@ main = do
cancel said
cancel saidM
cancel saidE
cancel saidF
)
12 changes: 8 additions & 4 deletions network-mux/src/Network/Mux/Bearer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Network.Mux.Bearer
( Bearer (..)
, MakeBearer (..)
, makeSocketBearer
, makeSocketBearer'
, makePipeChannelBearer
, makeQueueChannelBearer
#if defined(mingw32_HOST_OS)
Expand Down Expand Up @@ -61,8 +62,11 @@ pureBearer f = \sduTimeout rb tr fd -> pure (f sduTimeout rb tr fd)


makeSocketBearer :: MakeBearer IO Socket
makeSocketBearer = MakeBearer $ (\sduTimeout tr fd rb -> do
return $ socketAsBearer size batch rb sduTimeout tr fd)
makeSocketBearer = makeSocketBearer' 0

makeSocketBearer' :: DiffTime -> MakeBearer IO Socket
makeSocketBearer' pt = MakeBearer $ (\sduTimeout tr fd rb -> do
return $ socketAsBearer size batch rb sduTimeout pt tr fd)
where
size = SDUSize 12_288
batch = 131_072
Expand All @@ -89,13 +93,13 @@ makeQueueChannelBearer :: ( MonadSTM m
, MonadThrow m
)
=> MakeBearer m (QueueChannel m)
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _-> queueChannelAsBearer size tr q)
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _ -> queueChannelAsBearer size tr q)
where
size = SDUSize 1_280

#if defined(mingw32_HOST_OS)
makeNamedPipeBearer :: MakeBearer IO HANDLE
makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _-> namedPipeAsBearer size tr fd)
makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _ -> namedPipeAsBearer size tr fd)
where
size = SDUSize 24_576
#endif
11 changes: 6 additions & 5 deletions network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,13 @@ attenuationChannelAsBearer :: forall m.
-> Bearer m
attenuationChannelAsBearer sduSize sduTimeout muxTracer chan =
Bearer {
read = readMux,
write = writeMux,
writeMany = writeMuxMany,
read = readMux,
write = writeMux,
writeMany = writeMuxMany,
sduSize,
batchSize = fromIntegral $ getSDUSize sduSize,
name = "attenuation-channel"
batchSize = fromIntegral $ getSDUSize sduSize,
name = "attenuation-channel",
egressInterval = 0
}
where
readMux :: TimeoutFn m -> m (SDU, Time)
Expand Down
13 changes: 7 additions & 6 deletions network-mux/src/Network/Mux/Bearer/NamedPipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ namedPipeAsBearer :: Mx.SDUSize
-> Mx.Bearer IO
namedPipeAsBearer sduSize tracer h =
Mx.Bearer {
Mx.read = readNamedPipe,
Mx.write = writeNamedPipe,
Mx.writeMany = writeNamedPipeMany,
Mx.sduSize = sduSize,
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
Mx.name = "named-pipe"
Mx.read = readNamedPipe,
Mx.write = writeNamedPipe,
Mx.writeMany = writeNamedPipeMany,
Mx.sduSize = sduSize,
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
Mx.name = "named-pipe",
Mx.egressInterval = 0
}
where
readNamedPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)
Expand Down
13 changes: 7 additions & 6 deletions network-mux/src/Network/Mux/Bearer/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ pipeAsBearer
-> Bearer IO
pipeAsBearer sduSize tracer channel =
Mx.Bearer {
Mx.read = readPipe,
Mx.write = writePipe,
Mx.writeMany = writePipeMany,
Mx.sduSize = sduSize,
Mx.name = "pipe",
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize
Mx.read = readPipe,
Mx.write = writePipe,
Mx.writeMany = writePipeMany,
Mx.sduSize = sduSize,
Mx.name = "pipe",
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
Mx.egressInterval = 0
}
where
readPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)
Expand Down
13 changes: 7 additions & 6 deletions network-mux/src/Network/Mux/Bearer/Queues.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ queueChannelAsBearer
-> Bearer m
queueChannelAsBearer sduSize tracer QueueChannel { writeQueue, readQueue } = do
Mx.Bearer {
Mx.read = readMux,
Mx.write = writeMux,
Mx.writeMany = writeMuxMany,
Mx.sduSize = sduSize,
Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize),
Mx.name = "queue-channel"
Mx.read = readMux,
Mx.write = writeMux,
Mx.writeMany = writeMuxMany,
Mx.sduSize = sduSize,
Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize),
Mx.name = "queue-channel",
Mx.egressInterval = 0
}
where
readMux :: Mx.TimeoutFn m -> m (Mx.SDU, Time)
Expand Down
16 changes: 9 additions & 7 deletions network-mux/src/Network/Mux/Bearer/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,19 @@ socketAsBearer
-> Int
-> Maybe (Mx.ReadBuffer IO)
-> DiffTime
-> DiffTime
-> Tracer IO Mx.Trace
-> Socket.Socket
-> Bearer IO
socketAsBearer sduSize batchSize readBuffer_m sduTimeout tracer sd =
socketAsBearer sduSize batchSize readBuffer_m sduTimeout pollInterval tracer sd =
Mx.Bearer {
Mx.read = readSocket,
Mx.write = writeSocket,
Mx.writeMany = writeSocketMany,
Mx.sduSize = sduSize,
Mx.batchSize = batchSize,
Mx.name = "socket-bearer"
Mx.read = readSocket,
Mx.write = writeSocket,
Mx.writeMany = writeSocketMany,
Mx.sduSize = sduSize,
Mx.batchSize = batchSize,
Mx.name = "socket-bearer",
Mx.egressInterval = pollInterval
}
where
readSocket :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)
Expand Down
7 changes: 2 additions & 5 deletions network-mux/src/Network/Mux/Egress.hs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ muxer
=> EgressQueue m
-> Bearer m
-> m void
muxer egressQueue Bearer { writeMany, sduSize, batchSize } =
muxer egressQueue Bearer { writeMany, sduSize, batchSize, egressInterval } =
withTimeoutSerial $ \timeout ->
forever $ do
start <- getMonotonicTime
Expand All @@ -156,12 +156,9 @@ muxer egressQueue Bearer { writeMany, sduSize, batchSize } =
empty <- atomically $ isEmptyTBQueue egressQueue
when (empty) $ do
let delta = diffTime end start
threadDelay (loopInterval - delta)
threadDelay (egressInterval - delta)

where
loopInterval :: DiffTime
loopInterval = 0.001

maxSDUsPerBatch :: Int
maxSDUsPerBatch = 100

Expand Down
14 changes: 8 additions & 6 deletions network-mux/src/Network/Mux/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,19 @@ msHeaderLength = 8
--
data Bearer m = Bearer {
-- | Timestamp and send SDU.
write :: TimeoutFn m -> SDU -> m Time
write :: TimeoutFn m -> SDU -> m Time
-- | Timestamp and send many SDUs.
, writeMany :: TimeoutFn m -> [SDU] -> m Time
, writeMany :: TimeoutFn m -> [SDU] -> m Time
-- | Read a SDU
, read :: TimeoutFn m -> m (SDU, Time)
, read :: TimeoutFn m -> m (SDU, Time)
-- | Return a suitable SDU payload size.
, sduSize :: SDUSize
, sduSize :: SDUSize
-- | Return a suitable batch size
, batchSize :: Int
, batchSize :: Int
-- | Name of the bearer
, name :: String
, name :: String
-- | Egress poll interval
, egressInterval :: DiffTime
}

newtype SDUSize = SDUSize { getSDUSize :: Word16 }
Expand Down
2 changes: 1 addition & 1 deletion network-mux/test/Test/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ runWithSocket cap clientBuf_m serverBuf_m initApps respApps = withIOManager (\io
)
)
where
mkBearer buf_m sock tr = getBearer makeSocketBearer (-1) tr sock buf_m
mkBearer buf_m sock tr = getBearer (makeSocketBearer' 0.001) (-1) tr sock buf_m
clientTracer = contramap (Mx.WithBearer "client") activeTracer
serverTracer = contramap (Mx.WithBearer "server") activeTracer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,13 @@ makeFDBearer :: MonadDelay m
=> MakeBearer m (FD m)
makeFDBearer = MakeBearer $ \_ _ _ _ ->
return Mx.Bearer {
Mx.write = \_ _ -> getMonotonicTime,
Mx.writeMany = \_ _ -> getMonotonicTime,
Mx.read = \_ -> forever (threadDelay 3600),
Mx.sduSize = Mx.SDUSize 1500,
Mx.batchSize = 1500,
Mx.name = "FD"
Mx.write = \_ _ -> getMonotonicTime,
Mx.writeMany = \_ _ -> getMonotonicTime,
Mx.read = \_ -> forever (threadDelay 3600),
Mx.sduSize = Mx.SDUSize 1500,
Mx.batchSize = 1500,
Mx.name = "FD",
Mx.egressInterval = 0
}

-- | We only keep exceptions here which should not be handled by the test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Ouroboros.Network.Snocket
, AddressFamily (..)
, Snocket (..)
, makeSocketBearer
, makeSocketBearer'
, makeLocalRawBearer
-- ** Socket based Snockets
, SocketSnocket
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- Renamed `Arguments` to `DiffusionConfiguration`
- Renamed `Applications` to `DiffusionApplications`
- `runM` function now receives `ExtraParameters` as an argument
- Configurable Mux Egress Poll Interval

## 0.20.1.0 -- 2025-03-13

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ run lpci tracerChurnMode localConfig metrics tracers args apps = do
(\e -> traceWith tracer (Diffusion.DiffusionErrored e)
>> throwIO (Diffusion.DiffusionError e))
$ withIOManager $ \iocp -> do
interfaces <- Diffusion.mkInterfaces iocp tracer
interfaces <- Diffusion.mkInterfaces iocp tracer (Diffusion.dcEgressPollInterval args)
Diffusion.runM
interfaces
tracers
Expand Down
12 changes: 8 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.RethrowPolicy
import Ouroboros.Network.Server qualified as Server
import Ouroboros.Network.Snocket (LocalAddress, LocalSocket (..),
localSocketFileDescriptor, makeLocalBearer, makeSocketBearer)
localSocketFileDescriptor, makeLocalBearer, makeSocketBearer')
import Ouroboros.Network.Snocket qualified as Snocket
import Ouroboros.Network.Socket (configureSocket, configureSystemdSocket)

Expand Down Expand Up @@ -870,7 +870,7 @@ run extraParams tracers args apps = do
>> throwIO (DiffusionError e))
$ withIOManager $ \iocp -> do

interfaces <- mkInterfaces iocp tracer
interfaces <- mkInterfaces iocp tracer (dcEgressPollInterval args)

runM interfaces
tracers
Expand All @@ -884,21 +884,25 @@ run extraParams tracers args apps = do

mkInterfaces :: IOManager
-> Tracer IO (DiffusionTracer ntnAddr ntcAddr)
-> DiffTime
-> IO (Interfaces Socket
RemoteAddress
LocalSocket
LocalAddress
Resolver
IOException
IO)
mkInterfaces iocp tracer = do
mkInterfaces iocp tracer egressPollInterval = do

diRng <- newStdGen
diConnStateIdSupply <- atomically $ CM.newConnStateIdSupply Proxy

-- Clamp the mux egress poll interval to sane values.
let egressInterval = max 0 $ min 0.200 egressPollInterval

return $ Interfaces {
diNtnSnocket = Snocket.socketSnocket iocp,
diNtnBearer = makeSocketBearer,
diNtnBearer = makeSocketBearer' egressInterval,
diWithBuffer = withReadBufferIO,
diNtnConfigureSocket = configureSocket,
diNtnConfigureSystemdSocket =
Expand Down
3 changes: 3 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ data Configuration extraFlags m ntnFd ntnAddr ntcFd ntcAddr = Configuration {
--
, dcLocalMuxForkPolicy :: Mx.ForkPolicy ntcAddr

-- | Mux egress queue's poll interval
, dcEgressPollInterval :: DiffTime

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ run blockGeneratorArgs limits ni na
, Diffusion.dcReadLedgerPeerSnapshot = pure Nothing -- ^ tested independently
, Diffusion.dcMuxForkPolicy = noBindForkPolicy
, Diffusion.dcLocalMuxForkPolicy = noBindForkPolicy
, Diffusion.dcEgressPollInterval = 0.001
}

appArgs :: PeerMetrics m NtNAddr
Expand Down