@@ -105,7 +105,9 @@ public Http2Stream(HttpRequestMessage request, Http2Connection connection)
105
105
106
106
_headerBudgetRemaining = connection . _pool . Settings . MaxResponseHeadersByteLength ;
107
107
108
- if ( _request . Content == null )
108
+ // Extended connect requests will use the response content stream for bidirectional communication.
109
+ // We will ignore any content set for such requests in SendRequestBodyAsync, as it has no defined semantics.
110
+ if ( _request . Content == null || _request . IsExtendedConnectRequest )
109
111
{
110
112
_requestCompletionState = StreamCompletionState . Completed ;
111
113
if ( _request . IsExtendedConnectRequest )
@@ -173,7 +175,9 @@ public HttpResponseMessage GetAndClearResponse()
173
175
174
176
public async Task SendRequestBodyAsync ( CancellationToken cancellationToken )
175
177
{
176
- if ( _request . Content == null )
178
+ // Extended connect requests will use the response content stream for bidirectional communication.
179
+ // Ignore any content set for such requests, as it has no defined semantics.
180
+ if ( _request . Content == null || _request . IsExtendedConnectRequest )
177
181
{
178
182
Debug . Assert ( _requestCompletionState == StreamCompletionState . Completed ) ;
179
183
return ;
@@ -250,6 +254,7 @@ public async Task SendRequestBodyAsync(CancellationToken cancellationToken)
250
254
// and we also don't want to propagate any error to the caller, in particular for non-duplex scenarios.
251
255
Debug . Assert ( _responseCompletionState == StreamCompletionState . Completed ) ;
252
256
_requestCompletionState = StreamCompletionState . Completed ;
257
+ Debug . Assert ( ! ConnectProtocolEstablished ) ;
253
258
Complete ( ) ;
254
259
return ;
255
260
}
@@ -261,6 +266,7 @@ public async Task SendRequestBodyAsync(CancellationToken cancellationToken)
261
266
262
267
_requestCompletionState = StreamCompletionState . Failed ;
263
268
SendReset ( ) ;
269
+ Debug . Assert ( ! ConnectProtocolEstablished ) ;
264
270
Complete ( ) ;
265
271
}
266
272
@@ -313,6 +319,7 @@ public async Task SendRequestBodyAsync(CancellationToken cancellationToken)
313
319
314
320
if ( complete )
315
321
{
322
+ Debug . Assert ( ! ConnectProtocolEstablished ) ;
316
323
Complete ( ) ;
317
324
}
318
325
}
@@ -420,7 +427,17 @@ private void Cancel()
420
427
if ( sendReset )
421
428
{
422
429
SendReset ( ) ;
423
- Complete ( ) ;
430
+
431
+ // Extended CONNECT notes:
432
+ //
433
+ // To prevent from calling it *twice*, Extended CONNECT stream's Complete() is only
434
+ // called from CloseResponseBody(), as CloseResponseBody() is *always* called
435
+ // from Extended CONNECT stream's Dispose().
436
+
437
+ if ( ! ConnectProtocolEstablished )
438
+ {
439
+ Complete ( ) ;
440
+ }
424
441
}
425
442
}
426
443
@@ -810,7 +827,20 @@ public void OnHeadersComplete(bool endStream)
810
827
Debug . Assert ( _responseCompletionState == StreamCompletionState . InProgress , $ "Response already completed with state={ _responseCompletionState } ") ;
811
828
812
829
_responseCompletionState = StreamCompletionState . Completed ;
813
- if ( _requestCompletionState == StreamCompletionState . Completed )
830
+
831
+ // Extended CONNECT notes:
832
+ //
833
+ // To prevent from calling it *prematurely*, Extended CONNECT stream's Complete() is only
834
+ // called from CloseResponseBody(), as CloseResponseBody() is *only* called
835
+ // from Extended CONNECT stream's Dispose().
836
+ //
837
+ // Due to bidirectional streaming nature of the Extended CONNECT request,
838
+ // the *write side* of the stream can only be completed by calling Dispose().
839
+ //
840
+ // The streaming in both ways happens over the single "response" stream instance, which makes
841
+ // _requestCompletionState *not indicative* of the actual state of the write side of the stream.
842
+
843
+ if ( _requestCompletionState == StreamCompletionState . Completed && ! ConnectProtocolEstablished )
814
844
{
815
845
Complete ( ) ;
816
846
}
@@ -871,7 +901,20 @@ public void OnResponseData(ReadOnlySpan<byte> buffer, bool endStream)
871
901
Debug . Assert ( _responseCompletionState == StreamCompletionState . InProgress , $ "Response already completed with state={ _responseCompletionState } ") ;
872
902
873
903
_responseCompletionState = StreamCompletionState . Completed ;
874
- if ( _requestCompletionState == StreamCompletionState . Completed )
904
+
905
+ // Extended CONNECT notes:
906
+ //
907
+ // To prevent from calling it *prematurely*, Extended CONNECT stream's Complete() is only
908
+ // called from CloseResponseBody(), as CloseResponseBody() is *only* called
909
+ // from Extended CONNECT stream's Dispose().
910
+ //
911
+ // Due to bidirectional streaming nature of the Extended CONNECT request,
912
+ // the *write side* of the stream can only be completed by calling Dispose().
913
+ //
914
+ // The streaming in both ways happens over the single "response" stream instance, which makes
915
+ // _requestCompletionState *not indicative* of the actual state of the write side of the stream.
916
+
917
+ if ( _requestCompletionState == StreamCompletionState . Completed && ! ConnectProtocolEstablished )
875
918
{
876
919
Complete ( ) ;
877
920
}
@@ -1036,17 +1079,17 @@ public async Task ReadResponseHeadersAsync(CancellationToken cancellationToken)
1036
1079
Debug . Assert ( _response != null && _response . Content != null ) ;
1037
1080
// Start to process the response body.
1038
1081
var responseContent = ( HttpConnectionResponseContent ) _response . Content ;
1039
- if ( emptyResponse )
1082
+ if ( ConnectProtocolEstablished )
1083
+ {
1084
+ responseContent . SetStream ( new Http2ReadWriteStream ( this , closeResponseBodyOnDispose : true ) ) ;
1085
+ }
1086
+ else if ( emptyResponse )
1040
1087
{
1041
1088
// If there are any trailers, copy them over to the response. Normally this would be handled by
1042
1089
// the response stream hitting EOF, but if there is no response body, we do it here.
1043
1090
MoveTrailersToResponseMessage ( _response ) ;
1044
1091
responseContent . SetStream ( EmptyReadStream . Instance ) ;
1045
1092
}
1046
- else if ( ConnectProtocolEstablished )
1047
- {
1048
- responseContent . SetStream ( new Http2ReadWriteStream ( this ) ) ;
1049
- }
1050
1093
else
1051
1094
{
1052
1095
responseContent . SetStream ( new Http2ReadStream ( this ) ) ;
@@ -1309,8 +1352,25 @@ private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer, CancellationT
1309
1352
}
1310
1353
}
1311
1354
1355
+ // This method should only be called from Http2ReadWriteStream.Dispose()
1312
1356
private void CloseResponseBody ( )
1313
1357
{
1358
+ // Extended CONNECT notes:
1359
+ //
1360
+ // Due to bidirectional streaming nature of the Extended CONNECT request,
1361
+ // the *write side* of the stream can only be completed by calling Dispose()
1362
+ // (which, for Extended CONNECT case, will in turn call CloseResponseBody())
1363
+ //
1364
+ // Similarly to QuicStream, disposal *gracefully* closes the write side of the stream
1365
+ // (unless we've received RST_STREAM before) and *abortively* closes the read side
1366
+ // of the stream (unless we've received EOS before).
1367
+
1368
+ if ( ConnectProtocolEstablished && _resetException is null )
1369
+ {
1370
+ // Gracefully close the write side of the Extended CONNECT stream
1371
+ _connection . LogExceptions ( _connection . SendEndStreamAsync ( StreamId ) ) ;
1372
+ }
1373
+
1314
1374
// Check if the response body has been fully consumed.
1315
1375
bool fullyConsumed = false ;
1316
1376
Debug . Assert ( ! Monitor . IsEntered ( SyncObject ) ) ;
@@ -1323,6 +1383,7 @@ private void CloseResponseBody()
1323
1383
}
1324
1384
1325
1385
// If the response body isn't completed, cancel it now.
1386
+ // This includes aborting the read side of the Extended CONNECT stream.
1326
1387
if ( ! fullyConsumed )
1327
1388
{
1328
1389
Cancel ( ) ;
@@ -1337,6 +1398,12 @@ private void CloseResponseBody()
1337
1398
1338
1399
lock ( SyncObject )
1339
1400
{
1401
+ if ( ConnectProtocolEstablished )
1402
+ {
1403
+ // This should be the only place where Extended Connect stream is completed
1404
+ Complete ( ) ;
1405
+ }
1406
+
1340
1407
_responseBuffer . Dispose ( ) ;
1341
1408
}
1342
1409
}
@@ -1430,10 +1497,7 @@ private enum StreamCompletionState : byte
1430
1497
1431
1498
private sealed class Http2ReadStream : Http2ReadWriteStream
1432
1499
{
1433
- public Http2ReadStream ( Http2Stream http2Stream ) : base ( http2Stream )
1434
- {
1435
- base . CloseResponseBodyOnDispose = true ;
1436
- }
1500
+ public Http2ReadStream ( Http2Stream http2Stream ) : base ( http2Stream , closeResponseBodyOnDispose : true ) { }
1437
1501
1438
1502
public override bool CanWrite => false ;
1439
1503
@@ -1482,12 +1546,13 @@ public class Http2ReadWriteStream : HttpBaseStream
1482
1546
private Http2Stream ? _http2Stream ;
1483
1547
private readonly HttpResponseMessage _responseMessage ;
1484
1548
1485
- public Http2ReadWriteStream ( Http2Stream http2Stream )
1549
+ public Http2ReadWriteStream ( Http2Stream http2Stream , bool closeResponseBodyOnDispose = false )
1486
1550
{
1487
1551
Debug . Assert ( http2Stream != null ) ;
1488
1552
Debug . Assert ( http2Stream . _response != null ) ;
1489
1553
_http2Stream = http2Stream ;
1490
1554
_responseMessage = _http2Stream . _response ;
1555
+ CloseResponseBodyOnDispose = closeResponseBodyOnDispose ;
1491
1556
}
1492
1557
1493
1558
~ Http2ReadWriteStream ( )
@@ -1503,7 +1568,7 @@ public Http2ReadWriteStream(Http2Stream http2Stream)
1503
1568
}
1504
1569
}
1505
1570
1506
- protected bool CloseResponseBodyOnDispose { get ; set ; }
1571
+ protected bool CloseResponseBodyOnDispose { get ; private init ; }
1507
1572
1508
1573
protected override void Dispose ( bool disposing )
1509
1574
{
0 commit comments