1
- package stream_test
1
+ package upgrader_test
2
2
3
3
import (
4
4
"context"
@@ -10,10 +10,11 @@ import (
10
10
"testing"
11
11
"time"
12
12
13
+ upgrader "github.com/libp2p/go-libp2p-transport-upgrader"
14
+
13
15
"github.com/libp2p/go-libp2p-core/peer"
14
16
"github.com/libp2p/go-libp2p-core/sec"
15
17
"github.com/libp2p/go-libp2p-core/transport"
16
- st "github.com/libp2p/go-libp2p-transport-upgrader"
17
18
18
19
ma "github.com/multiformats/go-multiaddr"
19
20
manet "github.com/multiformats/go-multiaddr/net"
@@ -37,23 +38,23 @@ func (mux *MuxAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p
37
38
return sconn , false , err
38
39
}
39
40
40
- func createListener (t * testing.T , upgrader transport.Upgrader ) transport.Listener {
41
+ func createListener (t * testing.T , u transport.Upgrader ) transport.Listener {
41
42
t .Helper ()
42
43
addr , err := ma .NewMultiaddr ("/ip4/127.0.0.1/tcp/0" )
43
44
require .NoError (t , err )
44
45
ln , err := manet .Listen (addr )
45
46
require .NoError (t , err )
46
- return upgrader .UpgradeListener (nil , ln )
47
+ return u .UpgradeListener (nil , ln )
47
48
}
48
49
49
50
func TestAcceptSingleConn (t * testing.T ) {
50
51
require := require .New (t )
51
52
52
- id , upgrader := createUpgrader (t )
53
- ln := createListener (t , upgrader )
53
+ id , u := createUpgrader (t )
54
+ ln := createListener (t , u )
54
55
defer ln .Close ()
55
56
56
- cconn , err := dial (t , upgrader , ln .Multiaddr (), id )
57
+ cconn , err := dial (t , u , ln .Multiaddr (), id )
57
58
require .NoError (err )
58
59
59
60
sconn , err := ln .Accept ()
@@ -65,8 +66,8 @@ func TestAcceptSingleConn(t *testing.T) {
65
66
func TestAcceptMultipleConns (t * testing.T ) {
66
67
require := require .New (t )
67
68
68
- id , upgrader := createUpgrader (t )
69
- ln := createListener (t , upgrader )
69
+ id , u := createUpgrader (t )
70
+ ln := createListener (t , u )
70
71
defer ln .Close ()
71
72
72
73
var toClose []io.Closer
@@ -77,7 +78,7 @@ func TestAcceptMultipleConns(t *testing.T) {
77
78
}()
78
79
79
80
for i := 0 ; i < 10 ; i ++ {
80
- cconn , err := dial (t , upgrader , ln .Multiaddr (), id )
81
+ cconn , err := dial (t , u , ln .Multiaddr (), id )
81
82
require .NoError (err )
82
83
toClose = append (toClose , cconn )
83
84
@@ -97,11 +98,11 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
97
98
timeout = 500 * time .Millisecond
98
99
}
99
100
100
- id , upgrader := createUpgrader (t , st .WithAcceptTimeout (timeout ))
101
- ln := createListener (t , upgrader )
101
+ id , u := createUpgrader (t , upgrader .WithAcceptTimeout (timeout ))
102
+ ln := createListener (t , u )
102
103
defer ln .Close ()
103
104
104
- conn , err := dial (t , upgrader , ln .Multiaddr (), id )
105
+ conn , err := dial (t , u , ln .Multiaddr (), id )
105
106
require .NoError (err )
106
107
107
108
errCh := make (chan error )
@@ -131,16 +132,16 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
131
132
func TestFailedUpgradeOnListen (t * testing.T ) {
132
133
require := require .New (t )
133
134
134
- id , upgrader := createUpgraderWithMuxer (t , & errorMuxer {})
135
- ln := createListener (t , upgrader )
135
+ id , u := createUpgraderWithMuxer (t , & errorMuxer {})
136
+ ln := createListener (t , u )
136
137
137
138
errCh := make (chan error )
138
139
go func () {
139
140
_ , err := ln .Accept ()
140
141
errCh <- err
141
142
}()
142
143
143
- _ , err := dial (t , upgrader , ln .Multiaddr (), id )
144
+ _ , err := dial (t , u , ln .Multiaddr (), id )
144
145
require .Error (err )
145
146
146
147
// close the listener.
@@ -151,8 +152,8 @@ func TestFailedUpgradeOnListen(t *testing.T) {
151
152
func TestListenerClose (t * testing.T ) {
152
153
require := require .New (t )
153
154
154
- _ , upgrader := createUpgrader (t )
155
- ln := createListener (t , upgrader )
155
+ _ , u := createUpgrader (t )
156
+ ln := createListener (t , u )
156
157
157
158
errCh := make (chan error )
158
159
go func () {
@@ -174,7 +175,7 @@ func TestListenerClose(t *testing.T) {
174
175
require .Contains (err .Error (), "use of closed network connection" )
175
176
176
177
// doesn't accept new connections when it is closed
177
- _ , err = dial (t , upgrader , ln .Multiaddr (), peer .ID ("1" ))
178
+ _ , err = dial (t , u , ln .Multiaddr (), peer .ID ("1" ))
178
179
require .Error (err )
179
180
}
180
181
@@ -219,11 +220,11 @@ func TestListenerCloseClosesQueued(t *testing.T) {
219
220
}
220
221
221
222
func TestConcurrentAccept (t * testing.T ) {
222
- var num = 3 * st .AcceptQueueLength
223
+ var num = 3 * upgrader .AcceptQueueLength
223
224
224
225
blockingMuxer := newBlockingMuxer ()
225
- id , upgrader := createUpgraderWithMuxer (t , blockingMuxer )
226
- ln := createListener (t , upgrader )
226
+ id , u := createUpgraderWithMuxer (t , blockingMuxer )
227
+ ln := createListener (t , u )
227
228
defer ln .Close ()
228
229
229
230
accepted := make (chan transport.CapableConn , num )
@@ -246,7 +247,7 @@ func TestConcurrentAccept(t *testing.T) {
246
247
go func () {
247
248
defer wg .Done ()
248
249
249
- conn , err := dial (t , upgrader , ln .Multiaddr (), id )
250
+ conn , err := dial (t , u , ln .Multiaddr (), id )
250
251
if err != nil {
251
252
errCh <- err
252
253
return
@@ -269,79 +270,79 @@ func TestConcurrentAccept(t *testing.T) {
269
270
func TestAcceptQueueBacklogged (t * testing.T ) {
270
271
require := require .New (t )
271
272
272
- id , upgrader := createUpgrader (t )
273
- ln := createListener (t , upgrader )
273
+ id , u := createUpgrader (t )
274
+ ln := createListener (t , u )
274
275
defer ln .Close ()
275
276
276
277
// setup AcceptQueueLength connections, but don't accept any of them
277
278
var counter int32 // to be used atomically
278
279
doDial := func () {
279
- conn , err := dial (t , upgrader , ln .Multiaddr (), id )
280
+ conn , err := dial (t , u , ln .Multiaddr (), id )
280
281
require .NoError (err )
281
282
atomic .AddInt32 (& counter , 1 )
282
283
t .Cleanup (func () { conn .Close () })
283
284
}
284
285
285
- for i := 0 ; i < st .AcceptQueueLength ; i ++ {
286
+ for i := 0 ; i < upgrader .AcceptQueueLength ; i ++ {
286
287
go doDial ()
287
288
}
288
289
289
- require .Eventually (func () bool { return int (atomic .LoadInt32 (& counter )) == st .AcceptQueueLength }, 2 * time .Second , 50 * time .Millisecond )
290
+ require .Eventually (func () bool { return int (atomic .LoadInt32 (& counter )) == upgrader .AcceptQueueLength }, 2 * time .Second , 50 * time .Millisecond )
290
291
291
292
// dial a new connection. This connection should not complete setup, since the queue is full
292
293
go doDial ()
293
294
294
295
time .Sleep (100 * time .Millisecond )
295
- require .Equal (int (atomic .LoadInt32 (& counter )), st .AcceptQueueLength )
296
+ require .Equal (int (atomic .LoadInt32 (& counter )), upgrader .AcceptQueueLength )
296
297
297
298
// accept a single connection. Now the new connection should be set up, and fill the queue again
298
299
conn , err := ln .Accept ()
299
300
require .NoError (err )
300
301
require .NoError (conn .Close ())
301
302
302
- require .Eventually (func () bool { return int (atomic .LoadInt32 (& counter )) == st .AcceptQueueLength + 1 }, 2 * time .Second , 50 * time .Millisecond )
303
+ require .Eventually (func () bool { return int (atomic .LoadInt32 (& counter )) == upgrader .AcceptQueueLength + 1 }, 2 * time .Second , 50 * time .Millisecond )
303
304
}
304
305
305
306
func TestListenerConnectionGater (t * testing.T ) {
306
307
require := require .New (t )
307
308
308
309
testGater := & testGater {}
309
- id , upgrader := createUpgrader (t , st .WithConnectionGater (testGater ))
310
+ id , u := createUpgrader (t , upgrader .WithConnectionGater (testGater ))
310
311
311
- ln := createListener (t , upgrader )
312
+ ln := createListener (t , u )
312
313
defer ln .Close ()
313
314
314
315
// no gating.
315
- conn , err := dial (t , upgrader , ln .Multiaddr (), id )
316
+ conn , err := dial (t , u , ln .Multiaddr (), id )
316
317
require .NoError (err )
317
318
require .False (conn .IsClosed ())
318
319
_ = conn .Close ()
319
320
320
321
// rejecting after handshake.
321
322
testGater .BlockSecured (true )
322
323
testGater .BlockAccept (false )
323
- conn , err = dial (t , upgrader , ln .Multiaddr (), peer .ID ("invalid" ))
324
+ conn , err = dial (t , u , ln .Multiaddr (), peer .ID ("invalid" ))
324
325
require .Error (err )
325
326
require .Nil (conn )
326
327
327
328
// rejecting on accept will trigger first.
328
329
testGater .BlockSecured (true )
329
330
testGater .BlockAccept (true )
330
- conn , err = dial (t , upgrader , ln .Multiaddr (), peer .ID ("invalid" ))
331
+ conn , err = dial (t , u , ln .Multiaddr (), peer .ID ("invalid" ))
331
332
require .Error (err )
332
333
require .Nil (conn )
333
334
334
335
// rejecting only on acceptance.
335
336
testGater .BlockSecured (false )
336
337
testGater .BlockAccept (true )
337
- conn , err = dial (t , upgrader , ln .Multiaddr (), peer .ID ("invalid" ))
338
+ conn , err = dial (t , u , ln .Multiaddr (), peer .ID ("invalid" ))
338
339
require .Error (err )
339
340
require .Nil (conn )
340
341
341
342
// back to normal
342
343
testGater .BlockSecured (false )
343
344
testGater .BlockAccept (false )
344
- conn , err = dial (t , upgrader , ln .Multiaddr (), id )
345
+ conn , err = dial (t , u , ln .Multiaddr (), id )
345
346
require .NoError (err )
346
347
require .False (conn .IsClosed ())
347
348
_ = conn .Close ()
0 commit comments