Skip to content

Commit ca60b86

Browse files
committed
Make egress poll interval configurable
Make the egress queue's poll interval configurable.
1 parent b90ae58 commit ca60b86

File tree

17 files changed

+95
-67
lines changed

17 files changed

+95
-67
lines changed

network-mux/bench/socket_read_write/Main.hs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,11 @@ startServerMany sndSizeV ad = forever $ do
201201
-- It will send streams of data over the 41 and 42 miniprotocol.
202202
-- Multiplexing is done with a separate thread running
203203
-- the Egress.muxer function.
204-
startServerEgresss :: StrictTMVar IO Int64 -> Socket -> IO ()
205-
startServerEgresss sndSizeV ad = forever $ do
204+
startServerEgresss :: DiffTime -> StrictTMVar IO Int64 -> Socket -> IO ()
205+
startServerEgresss pollInterval sndSizeV ad = forever $ do
206206
(sd, _) <- Socket.accept ad
207207
withReadBufferIO (\buffer -> do
208-
bearer <-getBearer makeSocketBearer sduTimeout activeTracer sd buffer
208+
bearer <- getBearer (makeSocketBearer' pollInterval) sduTimeout activeTracer sd buffer
209209
sndSize <- atomically $ takeTMVar sndSizeV
210210
eq <- atomically $ newTBQueue 100
211211
w42 <- newTVarIO BL.empty
@@ -273,25 +273,28 @@ main = do
273273
ad1 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
274274
ad2 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
275275
ad3 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
276+
ad4 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
276277

277-
return (ad1, ad2, ad3)
278+
return (ad1, ad2, ad3, ad4)
278279
)
279-
(\(ad1, ad2, ad3) -> do
280+
(\(ad1, ad2, ad3, ad4) -> do
280281
Socket.close ad1
281282
Socket.close ad2
282283
Socket.close ad3
284+
Socket.close ad4
283285
)
284-
(\(ad1, ad2, ad3) -> do
286+
(\(ad1, ad2, ad3, ad4) -> do
285287
sndSizeV <- newEmptyTMVarIO
286288
sndSizeMV <- newEmptyTMVarIO
287289
sndSizeEV <- newEmptyTMVarIO
288290
addr <- setupServer ad1
289291
addrM <- setupServer ad2
290292
addrE <- setupServer ad3
293+
addrF <- setupServer ad4
291294

292295
withAsync (startServer sndSizeV ad1) $ \said -> do
293296
withAsync (startServerMany sndSizeMV ad2) $ \saidM -> do
294-
withAsync (startServerEgresss sndSizeEV ad3) $ \saidE -> do
297+
withAsync (startServerEgresss 0.001 sndSizeEV ad3) $ \saidE -> withAsync (startServerEgresss 0 sndSizeEV ad4) $ \saidF -> do
295298
defaultMain [
296299
-- Suggested Max SDU size for Socket bearer
297300
bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr
@@ -305,9 +308,13 @@ main = do
305308
, bench "Read/Write-Many Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 914 addrM
306309
, bench "Read/Write-Many Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 10 addrM
307310

308-
-- Use standard muxer and demuxer
309-
, bench "Read/Write Mux Benchmark 800+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE
310-
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE
311+
-- Use standard muxer and demuxer, 1ms poll
312+
, bench "Read/Write Mux Benchmark 800+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE
313+
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE
314+
315+
-- Use standard muxer and demuxer, 0ms poll
316+
, bench "Read/Write Mux Benchmark 800+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrF
317+
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrF
311318

312319
-- Use standard demuxer
313320
, bench "Read/Write Demuxer Queuing Benchmark 10 byte SDUs" $ nfIO $ readDemuxerQueueBenchmark sndSizeV 10 addr
@@ -316,4 +323,5 @@ main = do
316323
cancel said
317324
cancel saidM
318325
cancel saidE
326+
cancel saidF
319327
)

network-mux/src/Network/Mux/Bearer.hs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Network.Mux.Bearer
1010
( Bearer (..)
1111
, MakeBearer (..)
1212
, makeSocketBearer
13+
, makeSocketBearer'
1314
, makePipeChannelBearer
1415
, makeQueueChannelBearer
1516
#if defined(mingw32_HOST_OS)
@@ -61,8 +62,11 @@ pureBearer f = \sduTimeout rb tr fd -> pure (f sduTimeout rb tr fd)
6162

6263

6364
makeSocketBearer :: MakeBearer IO Socket
64-
makeSocketBearer = MakeBearer $ (\sduTimeout tr fd rb -> do
65-
return $ socketAsBearer size batch rb sduTimeout tr fd)
65+
makeSocketBearer = makeSocketBearer' 0
66+
67+
makeSocketBearer' :: DiffTime -> MakeBearer IO Socket
68+
makeSocketBearer' pt = MakeBearer $ (\sduTimeout tr fd rb -> do
69+
return $ socketAsBearer size batch rb sduTimeout pt tr fd)
6670
where
6771
size = SDUSize 12_288
6872
batch = 131_072
@@ -89,13 +93,13 @@ makeQueueChannelBearer :: ( MonadSTM m
8993
, MonadThrow m
9094
)
9195
=> MakeBearer m (QueueChannel m)
92-
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _-> queueChannelAsBearer size tr q)
96+
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _ -> queueChannelAsBearer size tr q)
9397
where
9498
size = SDUSize 1_280
9599

96100
#if defined(mingw32_HOST_OS)
97101
makeNamedPipeBearer :: MakeBearer IO HANDLE
98-
makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _-> namedPipeAsBearer size tr fd)
102+
makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _ -> namedPipeAsBearer size tr fd)
99103
where
100104
size = SDUSize 24_576
101105
#endif

network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,13 @@ attenuationChannelAsBearer :: forall m.
273273
-> Bearer m
274274
attenuationChannelAsBearer sduSize sduTimeout muxTracer chan =
275275
Bearer {
276-
read = readMux,
277-
write = writeMux,
278-
writeMany = writeMuxMany,
276+
read = readMux,
277+
write = writeMux,
278+
writeMany = writeMuxMany,
279279
sduSize,
280-
batchSize = fromIntegral $ getSDUSize sduSize,
281-
name = "attenuation-channel"
280+
batchSize = fromIntegral $ getSDUSize sduSize,
281+
name = "attenuation-channel",
282+
egressInterval = 0
282283
}
283284
where
284285
readMux :: TimeoutFn m -> m (SDU, Time)

network-mux/src/Network/Mux/Bearer/NamedPipe.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ namedPipeAsBearer :: Mx.SDUSize
3333
-> Mx.Bearer IO
3434
namedPipeAsBearer sduSize tracer h =
3535
Mx.Bearer {
36-
Mx.read = readNamedPipe,
37-
Mx.write = writeNamedPipe,
38-
Mx.writeMany = writeNamedPipeMany,
39-
Mx.sduSize = sduSize,
40-
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
41-
Mx.name = "named-pipe"
36+
Mx.read = readNamedPipe,
37+
Mx.write = writeNamedPipe,
38+
Mx.writeMany = writeNamedPipeMany,
39+
Mx.sduSize = sduSize,
40+
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
41+
Mx.name = "named-pipe",
42+
Mx.egressInterval = 0
4243
}
4344
where
4445
readNamedPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)

network-mux/src/Network/Mux/Bearer/Pipe.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ pipeAsBearer
7575
-> Bearer IO
7676
pipeAsBearer sduSize tracer channel =
7777
Mx.Bearer {
78-
Mx.read = readPipe,
79-
Mx.write = writePipe,
80-
Mx.writeMany = writePipeMany,
81-
Mx.sduSize = sduSize,
82-
Mx.name = "pipe",
83-
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize
78+
Mx.read = readPipe,
79+
Mx.write = writePipe,
80+
Mx.writeMany = writePipeMany,
81+
Mx.sduSize = sduSize,
82+
Mx.name = "pipe",
83+
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
84+
Mx.egressInterval = 0
8485
}
8586
where
8687
readPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)

network-mux/src/Network/Mux/Bearer/Queues.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@ queueChannelAsBearer
4040
-> Bearer m
4141
queueChannelAsBearer sduSize tracer QueueChannel { writeQueue, readQueue } = do
4242
Mx.Bearer {
43-
Mx.read = readMux,
44-
Mx.write = writeMux,
45-
Mx.writeMany = writeMuxMany,
46-
Mx.sduSize = sduSize,
47-
Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize),
48-
Mx.name = "queue-channel"
43+
Mx.read = readMux,
44+
Mx.write = writeMux,
45+
Mx.writeMany = writeMuxMany,
46+
Mx.sduSize = sduSize,
47+
Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize),
48+
Mx.name = "queue-channel",
49+
Mx.egressInterval = 0
4950
}
5051
where
5152
readMux :: Mx.TimeoutFn m -> m (Mx.SDU, Time)

network-mux/src/Network/Mux/Bearer/Socket.hs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,19 @@ socketAsBearer
5252
-> Int
5353
-> Maybe (Mx.ReadBuffer IO)
5454
-> DiffTime
55+
-> DiffTime
5556
-> Tracer IO Mx.Trace
5657
-> Socket.Socket
5758
-> Bearer IO
58-
socketAsBearer sduSize batchSize readBuffer_m sduTimeout tracer sd =
59+
socketAsBearer sduSize batchSize readBuffer_m sduTimeout pollInterval tracer sd =
5960
Mx.Bearer {
60-
Mx.read = readSocket,
61-
Mx.write = writeSocket,
62-
Mx.writeMany = writeSocketMany,
63-
Mx.sduSize = sduSize,
64-
Mx.batchSize = batchSize,
65-
Mx.name = "socket-bearer"
61+
Mx.read = readSocket,
62+
Mx.write = writeSocket,
63+
Mx.writeMany = writeSocketMany,
64+
Mx.sduSize = sduSize,
65+
Mx.batchSize = batchSize,
66+
Mx.name = "socket-bearer",
67+
Mx.egressInterval = pollInterval
6668
}
6769
where
6870
readSocket :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)

network-mux/src/Network/Mux/Egress.hs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ muxer
144144
=> EgressQueue m
145145
-> Bearer m
146146
-> m void
147-
muxer egressQueue Bearer { writeMany, sduSize, batchSize } =
147+
muxer egressQueue Bearer { writeMany, sduSize, batchSize, egressInterval } =
148148
withTimeoutSerial $ \timeout ->
149149
forever $ do
150150
start <- getMonotonicTime
@@ -156,12 +156,9 @@ muxer egressQueue Bearer { writeMany, sduSize, batchSize } =
156156
empty <- atomically $ isEmptyTBQueue egressQueue
157157
when (empty) $ do
158158
let delta = diffTime end start
159-
threadDelay (loopInterval - delta)
159+
threadDelay (egressInterval - delta)
160160

161161
where
162-
loopInterval :: DiffTime
163-
loopInterval = 0.001
164-
165162
maxSDUsPerBatch :: Int
166163
maxSDUsPerBatch = 100
167164

network-mux/src/Network/Mux/Types.hs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,17 +244,19 @@ msHeaderLength = 8
244244
--
245245
data Bearer m = Bearer {
246246
-- | Timestamp and send SDU.
247-
write :: TimeoutFn m -> SDU -> m Time
247+
write :: TimeoutFn m -> SDU -> m Time
248248
-- | Timestamp and send many SDUs.
249-
, writeMany :: TimeoutFn m -> [SDU] -> m Time
249+
, writeMany :: TimeoutFn m -> [SDU] -> m Time
250250
-- | Read a SDU
251-
, read :: TimeoutFn m -> m (SDU, Time)
251+
, read :: TimeoutFn m -> m (SDU, Time)
252252
-- | Return a suitable SDU payload size.
253-
, sduSize :: SDUSize
253+
, sduSize :: SDUSize
254254
-- | Return a suitable batch size
255-
, batchSize :: Int
255+
, batchSize :: Int
256256
-- | Name of the bearer
257-
, name :: String
257+
, name :: String
258+
-- | Egress poll interval
259+
, egressInterval :: DiffTime
258260
}
259261

260262
newtype SDUSize = SDUSize { getSDUSize :: Word16 }

network-mux/test/Test/Mux.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,7 @@ runWithSocket cap clientBuf_m serverBuf_m initApps respApps = withIOManager (\io
919919
)
920920
)
921921
where
922-
mkBearer buf_m sock tr = getBearer makeSocketBearer (-1) tr sock buf_m
922+
mkBearer buf_m sock tr = getBearer (makeSocketBearer' 0.001) (-1) tr sock buf_m
923923
clientTracer = contramap (Mx.WithBearer "client") activeTracer
924924
serverTracer = contramap (Mx.WithBearer "server") activeTracer
925925

0 commit comments

Comments
 (0)