Skip to content

Commit df09d45

Browse files
fix(dot/peerset): remove race conditions from peerset package (#2267)
* chore: remove race conditions from `peerset` package Co-authored-by: Quentin McGaw <[email protected]>
1 parent ea95ffd commit df09d45

19 files changed

+681
-322
lines changed

dot/network/connmgr_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ func TestMaxPeers(t *testing.T) {
8282
continue
8383
}
8484

85-
n.host.h.Peerstore().AddAddrs(ainfo.ID, ainfo.Addrs, peerstore.PermanentAddrTTL)
85+
n.host.p2pHost.Peerstore().AddAddrs(ainfo.ID, ainfo.Addrs, peerstore.PermanentAddrTTL)
8686
n.host.cm.peerSetHandler.AddPeer(0, ainfo.ID)
8787
}
8888

8989
time.Sleep(200 * time.Millisecond)
90-
p := nodes[0].host.h.Peerstore().Peers()
90+
p := nodes[0].host.p2pHost.Peerstore().Peers()
9191
require.LessOrEqual(t, max, len(p))
9292
}
9393

@@ -152,15 +152,15 @@ func TestPersistentPeers(t *testing.T) {
152152
time.Sleep(time.Millisecond * 600)
153153

154154
// B should have connected to A during bootstrap
155-
conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
155+
conns := nodeB.host.p2pHost.Network().ConnsToPeer(nodeA.host.id())
156156
require.NotEqual(t, 0, len(conns))
157157

158158
// if A disconnects from B, B should reconnect
159159
nodeA.host.cm.peerSetHandler.DisconnectPeer(0, nodeB.host.id())
160160

161161
time.Sleep(time.Millisecond * 500)
162162

163-
conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
163+
conns = nodeB.host.p2pHost.Network().ConnsToPeer(nodeA.host.id())
164164
require.NotEqual(t, 0, len(conns))
165165
}
166166

@@ -239,7 +239,7 @@ func TestSetReservedPeer(t *testing.T) {
239239

240240
require.Equal(t, 2, node3.host.peerCount())
241241

242-
node3.host.h.Peerstore().AddAddrs(addrC.ID, addrC.Addrs, peerstore.PermanentAddrTTL)
242+
node3.host.p2pHost.Peerstore().AddAddrs(addrC.ID, addrC.Addrs, peerstore.PermanentAddrTTL)
243243
node3.host.cm.peerSetHandler.SetReservedPeer(0, addrC.ID)
244244
time.Sleep(200 * time.Millisecond)
245245

dot/network/discovery_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func newTestDiscovery(t *testing.T, num int) []*discovery {
3535
require.NoError(t, err)
3636
disc := &discovery{
3737
ctx: srvc.ctx,
38-
h: srvc.host.h,
38+
h: srvc.host.p2pHost,
3939
ds: ds,
4040
}
4141

@@ -200,7 +200,7 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
200200
time.Sleep(time.Millisecond * 500)
201201

202202
// assert B and C can discover each other
203-
addrs := nodeB.host.h.Peerstore().Addrs(nodeC.host.id())
203+
addrs := nodeB.host.p2pHost.Peerstore().Addrs(nodeC.host.id())
204204
require.NotEqual(t, 0, len(addrs))
205205

206206
}

dot/network/host.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ const (
6868
// host wraps libp2p host with network host configuration and services
6969
type host struct {
7070
ctx context.Context
71-
h libp2phost.Host
71+
p2pHost libp2phost.Host
7272
discovery *discovery
7373
bootnodes []peer.AddrInfo
7474
persistentPeers []peer.AddrInfo
@@ -211,7 +211,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
211211

212212
host := &host{
213213
ctx: ctx,
214-
h: h,
214+
p2pHost: h,
215215
discovery: discovery,
216216
bootnodes: bns,
217217
protocolID: pid,
@@ -236,14 +236,14 @@ func (h *host) close() error {
236236
}
237237

238238
// close libp2p host
239-
err = h.h.Close()
239+
err = h.p2pHost.Close()
240240
if err != nil {
241241
logger.Errorf("Failed to close libp2p host: %s", err)
242242
return err
243243
}
244244

245245
h.closeSync.Do(func() {
246-
err = h.h.Peerstore().Close()
246+
err = h.p2pHost.Peerstore().Close()
247247
if err != nil {
248248
logger.Errorf("Failed to close libp2p peerstore: %s", err)
249249
return
@@ -260,28 +260,28 @@ func (h *host) close() error {
260260

261261
// registerStreamHandler registers the stream handler for the given protocol id.
262262
func (h *host) registerStreamHandler(pid protocol.ID, handler func(libp2pnetwork.Stream)) {
263-
h.h.SetStreamHandler(pid, handler)
263+
h.p2pHost.SetStreamHandler(pid, handler)
264264
}
265265

266266
// connect connects the host to a specific peer address
267267
func (h *host) connect(p peer.AddrInfo) (err error) {
268-
h.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
268+
h.p2pHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
269269
ctx, cancel := context.WithTimeout(h.ctx, connectTimeout)
270270
defer cancel()
271-
err = h.h.Connect(ctx, p)
271+
err = h.p2pHost.Connect(ctx, p)
272272
return err
273273
}
274274

275275
// bootstrap connects the host to the configured bootnodes
276276
func (h *host) bootstrap() {
277277
for _, info := range h.persistentPeers {
278-
h.h.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
278+
h.p2pHost.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
279279
h.cm.peerSetHandler.AddReservedPeer(0, info.ID)
280280
}
281281

282282
for _, addrInfo := range h.bootnodes {
283283
logger.Debugf("bootstrapping to peer %s", addrInfo.ID)
284-
h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
284+
h.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
285285
h.cm.peerSetHandler.AddPeer(0, addrInfo.ID)
286286
}
287287
}
@@ -290,7 +290,7 @@ func (h *host) bootstrap() {
290290
// the newly created stream.
291291
func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (libp2pnetwork.Stream, error) {
292292
// open outbound stream with host protocol id
293-
stream, err := h.h.NewStream(h.ctx, p, pid)
293+
stream, err := h.p2pHost.NewStream(h.ctx, p, pid)
294294
if err != nil {
295295
logger.Tracef("failed to open new stream with peer %s using protocol %s: %s", p, pid, err)
296296
return nil, err
@@ -334,12 +334,12 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
334334

335335
// id returns the host id
336336
func (h *host) id() peer.ID {
337-
return h.h.ID()
337+
return h.p2pHost.ID()
338338
}
339339

340340
// Peers returns connected peers
341341
func (h *host) peers() []peer.ID {
342-
return h.h.Network().Peers()
342+
return h.p2pHost.Network().Peers()
343343
}
344344

345345
// addReservedPeers adds the peers `addrs` to the protected peers list and connects to them
@@ -354,7 +354,7 @@ func (h *host) addReservedPeers(addrs ...string) error {
354354
if err != nil {
355355
return err
356356
}
357-
h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
357+
h.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
358358
h.cm.peerSetHandler.AddReservedPeer(0, addrInfo.ID)
359359
}
360360

@@ -369,7 +369,7 @@ func (h *host) removeReservedPeers(ids ...string) error {
369369
return err
370370
}
371371
h.cm.peerSetHandler.RemoveReservedPeer(0, peerID)
372-
h.h.ConnManager().Unprotect(peerID, "")
372+
h.p2pHost.ConnManager().Unprotect(peerID, "")
373373
}
374374

375375
return nil
@@ -378,7 +378,7 @@ func (h *host) removeReservedPeers(ids ...string) error {
378378
// supportsProtocol checks if the protocol is supported by peerID
379379
// returns an error if could not get peer protocols
380380
func (h *host) supportsProtocol(peerID peer.ID, protocol protocol.ID) (bool, error) {
381-
peerProtocols, err := h.h.Peerstore().SupportsProtocols(peerID, string(protocol))
381+
peerProtocols, err := h.p2pHost.Peerstore().SupportsProtocols(peerID, string(protocol))
382382
if err != nil {
383383
return false, err
384384
}
@@ -388,21 +388,21 @@ func (h *host) supportsProtocol(peerID peer.ID, protocol protocol.ID) (bool, err
388388

389389
// peerCount returns the number of connected peers
390390
func (h *host) peerCount() int {
391-
peers := h.h.Network().Peers()
391+
peers := h.p2pHost.Network().Peers()
392392
return len(peers)
393393
}
394394

395395
// addrInfo returns the libp2p peer.AddrInfo of the host
396396
func (h *host) addrInfo() peer.AddrInfo {
397397
return peer.AddrInfo{
398-
ID: h.h.ID(),
399-
Addrs: h.h.Addrs(),
398+
ID: h.p2pHost.ID(),
399+
Addrs: h.p2pHost.Addrs(),
400400
}
401401
}
402402

403403
// multiaddrs returns the multiaddresses of the host
404404
func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) {
405-
addrs := h.h.Addrs()
405+
addrs := h.p2pHost.Addrs()
406406
for _, addr := range addrs {
407407
multiaddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, h.id()))
408408
if err != nil {
@@ -415,16 +415,16 @@ func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) {
415415

416416
// protocols returns all protocols currently supported by the node
417417
func (h *host) protocols() []string {
418-
return h.h.Mux().Protocols()
418+
return h.p2pHost.Mux().Protocols()
419419
}
420420

421421
// closePeer closes connection with peer.
422422
func (h *host) closePeer(peer peer.ID) error {
423-
return h.h.Network().ClosePeer(peer)
423+
return h.p2pHost.Network().ClosePeer(peer)
424424
}
425425

426426
func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
427-
connToPeer := h.h.Network().ConnsToPeer(p)
427+
connToPeer := h.p2pHost.Network().ConnsToPeer(p)
428428
for _, c := range connToPeer {
429429
for _, st := range c.GetStreams() {
430430
if st.Protocol() != pID {

dot/network/host_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,13 @@ func TestBootstrap(t *testing.T) {
170170

171171
peerCountA := nodeA.host.peerCount()
172172
if peerCountA == 0 {
173-
peerCountA := len(nodeA.host.h.Peerstore().Peers())
173+
peerCountA := len(nodeA.host.p2pHost.Peerstore().Peers())
174174
require.NotZero(t, peerCountA)
175175
}
176176

177177
peerCountB := nodeB.host.peerCount()
178178
if peerCountB == 0 {
179-
peerCountB := len(nodeB.host.h.Peerstore().Peers())
179+
peerCountB := len(nodeB.host.p2pHost.Peerstore().Peers())
180180
require.NotZero(t, peerCountB)
181181
}
182182
}
@@ -498,7 +498,7 @@ func Test_RemoveReservedPeers(t *testing.T) {
498498
time.Sleep(100 * time.Millisecond)
499499

500500
require.Equal(t, 1, nodeA.host.peerCount())
501-
isProtected := nodeA.host.h.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "")
501+
isProtected := nodeA.host.p2pHost.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "")
502502
require.False(t, isProtected)
503503

504504
err = nodeA.host.removeReservedPeers("unknown_perr_id")
@@ -583,7 +583,7 @@ func TestPeerConnect(t *testing.T) {
583583
nodeB.noGossip = true
584584

585585
addrInfoB := nodeB.host.addrInfo()
586-
nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
586+
nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
587587
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID)
588588

589589
time.Sleep(100 * time.Millisecond)
@@ -621,7 +621,7 @@ func TestBannedPeer(t *testing.T) {
621621
nodeB.noGossip = true
622622

623623
addrInfoB := nodeB.host.addrInfo()
624-
nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
624+
nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
625625
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID)
626626

627627
time.Sleep(100 * time.Millisecond)
@@ -674,7 +674,7 @@ func TestPeerReputation(t *testing.T) {
674674
nodeB.noGossip = true
675675

676676
addrInfoB := nodeB.host.addrInfo()
677-
nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
677+
nodeA.host.p2pHost.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
678678
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID)
679679

680680
time.Sleep(100 * time.Millisecond)

dot/network/light_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestHandleLightMessage_Response(t *testing.T) {
113113
}
114114
require.NoError(t, err)
115115

116-
stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+lightID)
116+
stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+lightID)
117117
require.NoError(t, err)
118118

119119
// Testing empty request

dot/network/mdns.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (m *mdns) start() {
4747
// create and start service
4848
mdns, err := libp2pdiscovery.NewMdnsService(
4949
m.host.ctx,
50-
m.host.h,
50+
m.host.p2pHost,
5151
MDNSPeriod,
5252
string(m.host.protocolID),
5353
)
@@ -89,7 +89,7 @@ func (n Notifee) HandlePeerFound(p peer.AddrInfo) {
8989
"Peer %s found using mDNS discovery, with host %s",
9090
p.ID, n.host.id())
9191

92-
n.host.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
92+
n.host.p2pHost.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
9393
// connect to found peer
9494
n.host.cm.peerSetHandler.AddPeer(0, p.ID)
9595
}

dot/network/mdns_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ func TestMDNS(t *testing.T) {
4242

4343
if peerCountA == 0 {
4444
// check peerstore for disconnected peers
45-
peerCountA := len(nodeA.host.h.Peerstore().Peers())
45+
peerCountA := len(nodeA.host.p2pHost.Peerstore().Peers())
4646
require.NotZero(t, peerCountA)
4747
}
4848

4949
if peerCountB == 0 {
5050
// check peerstore for disconnected peers
51-
peerCountB := len(nodeB.host.h.Peerstore().Peers())
51+
peerCountB := len(nodeB.host.p2pHost.Peerstore().Peers())
5252
require.NotZero(t, peerCountB)
5353
}
5454
}

dot/network/notifications_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounce(t *testing.T) {
112112
}
113113
require.NoError(t, err)
114114

115-
stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
115+
stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
116116
require.NoError(t, err)
117117

118118
// create info and handler
@@ -181,7 +181,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
181181
}
182182
require.NoError(t, err)
183183

184-
stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
184+
stream, err := s.host.p2pHost.NewStream(s.ctx, b.host.id(), s.host.protocolID+blockAnnounceID)
185185
require.NoError(t, err)
186186

187187
// try invalid handshake
@@ -250,7 +250,7 @@ func Test_HandshakeTimeout(t *testing.T) {
250250
info := newNotificationsProtocol(nodeA.host.protocolID+blockAnnounceID,
251251
nodeA.getBlockAnnounceHandshake, testHandshakeDecoder, nodeA.validateBlockAnnounceHandshake)
252252

253-
nodeB.host.h.SetStreamHandler(info.protocolID, func(stream libp2pnetwork.Stream) {
253+
nodeB.host.p2pHost.SetStreamHandler(info.protocolID, func(stream libp2pnetwork.Stream) {
254254
// should not respond to a handshake message
255255
})
256256

@@ -267,7 +267,7 @@ func Test_HandshakeTimeout(t *testing.T) {
267267
// clear handshake data from connection handler
268268
time.Sleep(time.Millisecond * 100)
269269
info.peersData.deleteOutboundHandshakeData(nodeB.host.id())
270-
connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
270+
connAToB := nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id())
271271
for _, stream := range connAToB[0].GetStreams() {
272272
_ = stream.Close()
273273
}
@@ -289,7 +289,7 @@ func Test_HandshakeTimeout(t *testing.T) {
289289
require.Nil(t, data)
290290

291291
// a stream should be open until timeout
292-
connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
292+
connAToB = nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id())
293293
require.Len(t, connAToB, 1)
294294
require.Len(t, connAToB[0].GetStreams(), 1)
295295

@@ -301,7 +301,7 @@ func Test_HandshakeTimeout(t *testing.T) {
301301
require.Nil(t, data)
302302

303303
// stream should be closed
304-
connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
304+
connAToB = nodeA.host.p2pHost.Network().ConnsToPeer(nodeB.host.id())
305305
require.Len(t, connAToB, 1)
306306
require.Len(t, connAToB[0].GetStreams(), 0)
307307
}
@@ -343,7 +343,7 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
343343
require.NoError(t, err)
344344

345345
txnProtocolID := srvc1.host.protocolID + transactionsID
346-
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
346+
stream, err := srvc1.host.p2pHost.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
347347
require.NoError(t, err)
348348

349349
// create info and handler

0 commit comments

Comments
 (0)