@@ -533,6 +533,60 @@ func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) {
533
533
}
534
534
}
535
535
536
+ func SubtestStreamLeftOpen (t * testing.T , tr network.Multiplexer ) {
537
+ a , b := tcpPipe (t )
538
+
539
+ const numStreams = 10
540
+ const dataLen = 50 * 1024
541
+
542
+ scopea := & peerScope {}
543
+ muxa , err := tr .NewConn (a , true , scopea )
544
+ checkErr (t , err )
545
+
546
+ scopeb := & peerScope {}
547
+ muxb , err := tr .NewConn (b , false , scopeb )
548
+ checkErr (t , err )
549
+
550
+ var wg sync.WaitGroup
551
+ wg .Add (1 + numStreams )
552
+ go func () {
553
+ defer wg .Done ()
554
+ for i := 0 ; i < numStreams ; i ++ {
555
+ stra , err := muxa .OpenStream (context .Background ())
556
+ checkErr (t , err )
557
+ go func () {
558
+ defer wg .Done ()
559
+ _ , err = stra .Write (randBuf (dataLen ))
560
+ checkErr (t , err )
561
+ // do NOT close or reset the stream
562
+ }()
563
+ }
564
+ }()
565
+
566
+ wg .Add (1 + numStreams )
567
+ go func () {
568
+ defer wg .Done ()
569
+ for i := 0 ; i < numStreams ; i ++ {
570
+ str , err := muxb .AcceptStream ()
571
+ checkErr (t , err )
572
+ go func () {
573
+ defer wg .Done ()
574
+ _ , err = io .ReadFull (str , make ([]byte , dataLen ))
575
+ checkErr (t , err )
576
+ }()
577
+ }
578
+ }()
579
+
580
+ // Now we have a bunch of open streams.
581
+ // Make sure that their memory is returned when we close the connection.
582
+ wg .Wait ()
583
+
584
+ muxa .Close ()
585
+ scopea .Check (t )
586
+ muxb .Close ()
587
+ scopeb .Check (t )
588
+ }
589
+
536
590
func SubtestStress1Conn1Stream1Msg (t * testing.T , tr network.Multiplexer ) {
537
591
SubtestStress (t , Options {
538
592
tr : tr ,
@@ -611,6 +665,7 @@ var subtests = []TransportTest{
611
665
SubtestStress1Conn100Stream100Msg10MB ,
612
666
SubtestStreamOpenStress ,
613
667
SubtestStreamReset ,
668
+ SubtestStreamLeftOpen ,
614
669
}
615
670
616
671
// SubtestAll runs all the stream multiplexer tests against the target
0 commit comments