Skip to content

Commit de6e7c9

Browse files
fix(dot/network): close notifications streams (#2093)
close notifications streams for reading/writing when outbound/inbound respectively Closes #2046
1 parent 8bd05d1 commit de6e7c9

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

dot/network/notifications.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,13 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode
152152
}
153153

154154
// createNotificationsMessageHandler returns a function that is called by the handler of *inbound* streams.
155-
func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
156-
messageHandler NotificationsMessageHandler,
157-
batchHandler NotificationsMessageBatchHandler) messageHandler {
155+
func (s *Service) createNotificationsMessageHandler(
156+
info *notificationsProtocol,
157+
notificationsMessageHandler NotificationsMessageHandler,
158+
batchHandler NotificationsMessageBatchHandler,
159+
) messageHandler {
158160
return func(stream libp2pnetwork.Stream, m Message) error {
159-
if m == nil || info == nil || info.handshakeValidator == nil || messageHandler == nil {
161+
if m == nil || info == nil || info.handshakeValidator == nil || notificationsMessageHandler == nil {
160162
return nil
161163
}
162164

@@ -214,6 +216,10 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
214216
}
215217

216218
logger.Tracef("receiver: sent handshake to peer %s using protocol %s", peer, info.protocolID)
219+
220+
if err := stream.CloseWrite(); err != nil {
221+
logger.Tracef("failed to close stream for writing: %s", err)
222+
}
217223
}
218224

219225
return nil
@@ -227,7 +233,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
227233
return nil
228234
}
229235

230-
propagate, err := messageHandler(peer, msg)
236+
propagate, err := notificationsMessageHandler(peer, msg)
231237
if err != nil {
232238
return err
233239
}
@@ -380,6 +386,10 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
380386
hsData.received = true
381387
}
382388

389+
if err := stream.CloseRead(); err != nil {
390+
logger.Tracef("failed to close stream for reading: %s", err)
391+
}
392+
383393
if err = info.handshakeValidator(peer, resp); err != nil {
384394
logger.Tracef("failed to validate handshake from peer %s using protocol %s: %s", peer, info.protocolID, err)
385395
hsData.validated = false

0 commit comments

Comments
 (0)