@@ -18,6 +18,7 @@ import (
18
18
"time"
19
19
20
20
"github.com/libp2p/go-libp2p-core/network"
21
+ "github.com/libp2p/go-libp2p-core/peer"
21
22
"github.com/libp2p/go-libp2p-testing/ci"
22
23
23
24
"github.com/stretchr/testify/require"
@@ -43,6 +44,40 @@ func getFunctionName(i interface{}) string {
43
44
return runtime .FuncForPC (reflect .ValueOf (i ).Pointer ()).Name ()
44
45
}
45
46
47
+ type peerScope struct {
48
+ mx sync.Mutex
49
+ memory int
50
+ }
51
+
52
+ func (p * peerScope ) ReserveMemory (size int , _ uint8 ) error {
53
+ p .mx .Lock ()
54
+ p .memory += size
55
+ p .mx .Unlock ()
56
+ return nil
57
+ }
58
+
59
+ func (p * peerScope ) ReleaseMemory (size int ) {
60
+ p .mx .Lock ()
61
+ defer p .mx .Unlock ()
62
+ if p .memory < size {
63
+ panic (fmt .Sprintf ("tried to release too much memory: %d (current: %d)" , size , p .memory ))
64
+ }
65
+ p .memory -= size
66
+ }
67
+
68
+ // Check checks that we don't have any more reserved memory.
69
+ func (p * peerScope ) Check (t * testing.T ) {
70
+ p .mx .Lock ()
71
+ defer p .mx .Unlock ()
72
+ require .Zero (t , p .memory , "expected all reserved memory to have been released" )
73
+ }
74
+
75
+ func (p * peerScope ) Stat () network.ScopeStat { return network.ScopeStat {} }
76
+ func (p * peerScope ) BeginSpan () (network.ResourceScopeSpan , error ) { return nil , nil }
77
+ func (p * peerScope ) Peer () peer.ID { panic ("implement me" ) }
78
+
79
+ var _ network.PeerScope = & peerScope {}
80
+
46
81
type Options struct {
47
82
tr network.Multiplexer
48
83
connNum int
@@ -141,9 +176,13 @@ func SubtestSimpleWrite(t *testing.T, tr network.Multiplexer) {
141
176
defer nc1 .Close ()
142
177
143
178
log ("wrapping conn" )
144
- c1 , err := tr .NewConn (nc1 , false , nil )
179
+ scope := & peerScope {}
180
+ c1 , err := tr .NewConn (nc1 , false , scope )
145
181
checkErr (t , err )
146
- defer c1 .Close ()
182
+ defer func () {
183
+ c1 .Close ()
184
+ scope .Check (t )
185
+ }()
147
186
148
187
// serve the outgoing conn, because some muxers assume
149
188
// that we _always_ call serve. (this is an error?)
@@ -253,7 +292,8 @@ func SubtestStress(t *testing.T, opt Options) {
253
292
return
254
293
}
255
294
256
- c , err := opt .tr .NewConn (nc , false , nil )
295
+ scope := & peerScope {}
296
+ c , err := opt .tr .NewConn (nc , false , scope )
257
297
if err != nil {
258
298
t .Fatal (fmt .Errorf ("a.AddConn(%s <--> %s): %s" , nc .LocalAddr (), nc .RemoteAddr (), err ))
259
299
return
@@ -282,6 +322,7 @@ func SubtestStress(t *testing.T, opt Options) {
282
322
}
283
323
wg .Wait ()
284
324
c .Close ()
325
+ scope .Check (t )
285
326
}
286
327
287
328
openConnsAndRW := func () {
@@ -375,10 +416,12 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
375
416
}
376
417
}()
377
418
378
- muxb , err := tr .NewConn (b , false , nil )
419
+ scope := & peerScope {}
420
+ muxb , err := tr .NewConn (b , false , scope )
379
421
if err != nil {
380
422
t .Fatal (err )
381
423
}
424
+ defer scope .Check (t )
382
425
383
426
time .Sleep (time .Millisecond * 50 )
384
427
@@ -428,7 +471,8 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
428
471
wg .Add (1 )
429
472
go func () {
430
473
defer wg .Done ()
431
- muxa , err := tr .NewConn (a , true , nil )
474
+ scope := & peerScope {}
475
+ muxa , err := tr .NewConn (a , true , scope )
432
476
if err != nil {
433
477
t .Error (err )
434
478
return
@@ -444,18 +488,21 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
444
488
if err != network .ErrReset {
445
489
t .Error ("should have been stream reset" )
446
490
}
447
-
448
491
s .Close ()
492
+ scope .Check (t )
449
493
}()
450
494
451
- muxb , err := tr .NewConn (b , false , nil )
495
+ scope := & peerScope {}
496
+ muxb , err := tr .NewConn (b , false , scope )
452
497
if err != nil {
453
498
t .Fatal (err )
454
499
}
500
+ defer muxb .Close ()
455
501
456
502
str , err := muxb .AcceptStream ()
457
503
checkErr (t , err )
458
504
str .Reset ()
505
+ scope .Check (t )
459
506
460
507
wg .Wait ()
461
508
}
@@ -464,16 +511,18 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
464
511
func SubtestWriteAfterClose (t * testing.T , tr network.Multiplexer ) {
465
512
a , b := tcpPipe (t )
466
513
467
- muxa , err := tr .NewConn (a , true , nil )
514
+ scopea := & peerScope {}
515
+ muxa , err := tr .NewConn (a , true , scopea )
468
516
checkErr (t , err )
469
517
470
- muxb , err := tr .NewConn (b , false , nil )
518
+ scopeb := & peerScope {}
519
+ muxb , err := tr .NewConn (b , false , scopeb )
471
520
checkErr (t , err )
472
521
473
- err = muxa .Close ()
474
- checkErr ( t , err )
475
- err = muxb .Close ()
476
- checkErr ( t , err )
522
+ checkErr ( t , muxa .Close () )
523
+ scopea . Check ( t )
524
+ checkErr ( t , muxb .Close () )
525
+ scopeb . Check ( t )
477
526
478
527
// make sure the underlying net.Conn was closed
479
528
if _ , err := a .Write ([]byte ("foobar" )); err == nil || ! strings .Contains (err .Error (), "use of closed network connection" ) {
0 commit comments