Skip to content

Drain HTTP/3 response after trailers #116319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions src/libraries/Common/tests/System/Net/Http/Http3LoopbackStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ public sealed class Http3LoopbackStream : IAsyncDisposable

public const long MaxHeaderListSize = 0x6;

private readonly QuicStream _stream;
public QuicStream Stream { get; }

public bool CanRead => _stream.CanRead;
public bool CanWrite => _stream.CanWrite;
public bool CanRead => Stream.CanRead;
public bool CanWrite => Stream.CanWrite;

public Http3LoopbackStream(QuicStream stream)
{
_stream = stream;
Stream = stream;
}

public ValueTask DisposeAsync() => _stream.DisposeAsync();
public ValueTask DisposeAsync() => Stream.DisposeAsync();

public long StreamId => _stream.Id;
public long StreamId => Stream.Id;

public async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
{
Expand All @@ -54,7 +54,7 @@ public async Task SendUnidirectionalStreamTypeAsync(long streamType)
{
var buffer = new byte[MaximumVarIntBytes];
int bytesWritten = EncodeHttpInteger(streamType, buffer);
await _stream.WriteAsync(buffer.AsMemory(0, bytesWritten)).ConfigureAwait(false);
await Stream.WriteAsync(buffer.AsMemory(0, bytesWritten)).ConfigureAwait(false);
}

public async Task SendSettingsFrameAsync(SettingsEntry[] settingsEntries)
Expand Down Expand Up @@ -129,7 +129,7 @@ private async Task SendPartialHeadersFrameAsync(HttpStatusCode? statusCode, IEnu
// Slice off final byte so the payload is not complete
payload = payload.Slice(0, payload.Length - 1);

await _stream.WriteAsync(payload).ConfigureAwait(false);
await Stream.WriteAsync(payload).ConfigureAwait(false);
}

public async Task SendDataFrameAsync(ReadOnlyMemory<byte> data)
Expand All @@ -156,13 +156,13 @@ private async Task SendFrameHeaderAsync(long frameType, int payloadLength)
bytesWritten += EncodeHttpInteger(frameType, buffer.AsSpan(bytesWritten));
bytesWritten += EncodeHttpInteger(payloadLength, buffer.AsSpan(bytesWritten));

await _stream.WriteAsync(buffer.AsMemory(0, bytesWritten)).ConfigureAwait(false);
await Stream.WriteAsync(buffer.AsMemory(0, bytesWritten)).ConfigureAwait(false);
}

public async Task SendFrameAsync(long frameType, ReadOnlyMemory<byte> framePayload)
{
await SendFrameHeaderAsync(frameType, framePayload.Length).ConfigureAwait(false);
await _stream.WriteAsync(framePayload).ConfigureAwait(false);
await Stream.WriteAsync(framePayload).ConfigureAwait(false);
}

static int EncodeHttpInteger(long longToEncode, Span<byte> buffer)
Expand Down Expand Up @@ -286,7 +286,7 @@ public async Task SendResponseBodyAsync(byte[] content, bool isFinal = true)

if (isFinal)
{
_stream.CompleteWrites();
Stream.CompleteWrites();
}
}

Expand Down Expand Up @@ -327,7 +327,7 @@ public async Task SendResponseBodyAsync(byte[] content, bool isFinal = true)

private HttpRequestData ParseHeaders(ReadOnlySpan<byte> buffer)
{
HttpRequestData request = new HttpRequestData { RequestId = Http3LoopbackConnection.GetRequestId(_stream) };
HttpRequestData request = new HttpRequestData { RequestId = Http3LoopbackConnection.GetRequestId(Stream) };

(int prefixLength, int requiredInsertCount, int deltaBase) = QPackTestDecoder.DecodePrefix(buffer);
if (requiredInsertCount != 0 || deltaBase != 0) throw new Exception("QPack dynamic table not yet supported.");
Expand Down Expand Up @@ -371,7 +371,7 @@ async Task WaitForReadCancellation()
}
else
{
int bytesRead = await _stream.ReadAsync(new byte[1]).ConfigureAwait(false);
int bytesRead = await Stream.ReadAsync(new byte[1]).ConfigureAwait(false);
if (bytesRead != 0)
{
throw new Exception($"Unexpected data received while waiting for client cancllation.");
Expand All @@ -388,7 +388,7 @@ async Task WaitForWriteCancellation()
{
try
{
await _stream.WritesClosed.ConfigureAwait(false);
await Stream.WritesClosed.ConfigureAwait(false);
}
catch (QuicException ex) when (ex.QuicError == QuicError.StreamAborted && ex.ApplicationErrorCode == Http3LoopbackConnection.H3_REQUEST_CANCELLED)
{
Expand Down Expand Up @@ -425,7 +425,7 @@ private async Task DrainResponseData()

public void Abort(long errorCode, QuicAbortDirection direction = QuicAbortDirection.Both)
{
_stream.Abort(direction, errorCode);
Stream.Abort(direction, errorCode);
}

public async Task<(long? frameType, byte[] payload)> ReadFrameAsync()
Expand All @@ -441,7 +441,7 @@ public void Abort(long errorCode, QuicAbortDirection direction = QuicAbortDirect

while (totalBytesRead != payloadLength)
{
int bytesRead = await _stream.ReadAsync(payload.AsMemory(totalBytesRead)).ConfigureAwait(false);
int bytesRead = await Stream.ReadAsync(payload.AsMemory(totalBytesRead)).ConfigureAwait(false);
if (bytesRead == 0) throw new Exception("Unable to read frame; unexpected end of stream.");

totalBytesRead += bytesRead;
Expand All @@ -460,7 +460,7 @@ public void Abort(long errorCode, QuicAbortDirection direction = QuicAbortDirect

do
{
bytesRead = await _stream.ReadAsync(buffer.AsMemory(bufferActiveLength++, 1)).ConfigureAwait(false);
bytesRead = await Stream.ReadAsync(buffer.AsMemory(bufferActiveLength++, 1)).ConfigureAwait(false);
if (bytesRead == 0)
{
return bufferActiveLength == 1 ? (long?)null : throw new Exception("Unable to read varint; unexpected end of stream.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
Expand Down Expand Up @@ -46,6 +47,9 @@ internal sealed class Http3RequestStream : IHttpStreamHeadersHandler, IAsyncDisp
/// <summary>Any trailing headers.</summary>
private List<(HeaderDescriptor name, string value)>? _trailingHeaders;

/// <summary>Response drain task after receiving trailers.</summary>
private Task? _responseDrainTask;

// When reading response content, keep track of the number of bytes left in the current data frame.
private long _responseDataPayloadRemaining;

Expand Down Expand Up @@ -82,56 +86,64 @@ public Http3RequestStream(HttpRequestMessage request, Http3Connection connection
_responseRecvCompleted = false;
}

public void Dispose()
// Synchronous QuicStream.Dispose() is implemented in a sync-over-async manner,
// there is no benefit from maintaining a separate synchronous implementation in this type.
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (!_disposed)
{
_disposed = true;
AbortStream();
if (_stream.WritesClosed.IsCompleted)
Task? disposeTask = null;
if (_responseDrainTask is not null)
{
_connection.LogExceptions(_stream.DisposeAsync().AsTask());
disposeTask = WaitForDrainCompletionAndDisposeAsync();
}
else
{
_stream.Dispose();
AbortStream();
if (_stream.WritesClosed.IsCompleted)
{
disposeTask = _stream.DisposeAsync().AsTask();
}
}
DisposeSyncHelper();
}
}

private void RemoveFromConnectionIfDone()
{
if (_responseRecvCompleted && _requestSendCompleted)
{
_connection.RemoveStream(_stream);
}
}

public async ValueTask DisposeAsync()
{
if (!_disposed)
{
_disposed = true;
AbortStream();
if (_stream.WritesClosed.IsCompleted)
if (disposeTask is not null)
{
_connection.LogExceptions(_stream.DisposeAsync().AsTask());
_connection.LogExceptions(disposeTask);
}
else
{
await _stream.DisposeAsync().ConfigureAwait(false);
}
DisposeSyncHelper();

_connection.RemoveStream(_stream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we could hit the issue with GC eating the H3 stream while inside WaitForDrainCompletionAndDisposeAsync. For a reference: https://devblogs.microsoft.com/dotnet/keeping-async-methods-alive/

Copy link
Member Author

@antonfirsov antonfirsov Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see us doing the same in HttpContentReadStream (which can get unrooted quickly after returning from an HttpRequest.Dispose):

I must admit that I'm confused. According to my understanding of the article, the problem is that the state machine is unrooted (thus all its' references to locals and this), however this doesn't make us worry and run GC.KeepAlive when we fire-and-forget tasks like at the quoted line.

Edit: changed the comment to keep it on point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephentoub can you please help and give some pointers what should the right approach here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the question?

Copy link
Member Author

@antonfirsov antonfirsov Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm introducing a method here that is usually called in a fire-and-forget manner:

async ValueTask WaitForDrainCompletionAndDisposeAsync()
{
Debug.Assert(_responseDrainTask is not null);
await _responseDrainTask.ConfigureAwait(false);
AbortStream();
await _stream.DisposeAsync().ConfigureAwait(false);
_recvBuffer.Dispose();
}

The owning Http3RequestStream may become unrooted while WaitForDrainCompletionAndDisposeAsync is running. Can this result in the GC killing the objects under Http3RequestStream thus breaking WaitForDrainCompletionAndDisposeAsync? If the answer is yes, why don't we worry about the same problem in the quoted HttpContentReadStream code where we fire-and-forget a draining Task in a very similar manner?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted in the linked blog post, async method state machine objects are kept alive by the thing they're awaiting. The question then isn't whether Http3RequestStream is rooted, but whether _responseDrainTask will eventually complete and is thus itself rooted (since the only way it could be completed is if something rooted was referencing it in order to complete it).

_sendBuffer.Dispose();

// If response drain is in progress it might be still using _recvBuffer; let WaitForDrainCompletionAndDisposeAsync() dispose it.
if (_responseDrainTask is null)
{
_recvBuffer.Dispose();
}
}

async Task WaitForDrainCompletionAndDisposeAsync()
{
Debug.Assert(_responseDrainTask is not null);
await _responseDrainTask.ConfigureAwait(false);
AbortStream();
await _stream.DisposeAsync().ConfigureAwait(false);
_recvBuffer.Dispose();
}
}

private void DisposeSyncHelper()
private void RemoveFromConnectionIfDone()
{
_connection.RemoveStream(_stream);

_sendBuffer.Dispose();
_recvBuffer.Dispose();
if (_responseRecvCompleted && _requestSendCompleted)
{
_connection.RemoveStream(_stream);
}
}

public void GoAway()
Expand Down Expand Up @@ -564,10 +576,9 @@ private async ValueTask DrainContentLength0Frames(CancellationToken cancellation
_trailingHeaders = new List<(HeaderDescriptor name, string value)>();
await ReadHeadersAsync(payloadLength, cancellationToken).ConfigureAwait(false);

// Stop looping after a trailing header.
// There may be extra frames after this one, but they would all be unknown extension
// frames that can be safely ignored. Just stop reading here.
// Note: this does leave us open to a bad server sending us an out of order DATA frame.
// We do not expect more DATA frames after the trailers.
// Start draining the response to avoid aborting reads during disposal.
_responseDrainTask = DrainResponseAsync();
goto case null;
case null:
// Done receiving: copy over trailing headers.
Expand Down Expand Up @@ -1335,6 +1346,54 @@ private void HandleReadResponseContentException(Exception ex, CancellationToken
throw new HttpIOException(HttpRequestError.Unknown, SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
}


/// <summary>
/// Drain the underlying QuicStream without attempting to interpret the data.
/// Note: this does leave us open to a bad server sending us out of order frames.
/// </summary>
private async Task DrainResponseAsync()
{
HttpConnectionSettings settings = _connection.Pool.Settings;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could short-circuit this with checking ReadsClosed. Also I'd prefer this to finish synchronously in the most common scenario.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadsClosed.IsCompleted is never true at the moment we finish reading the trailers. See my #116319 (comment) above.

TimeSpan drainTime = settings._maxResponseDrainTime;
int remaining = settings._maxResponseDrainSize;
Debug.Assert(remaining >= 0);
if (drainTime == TimeSpan.Zero || remaining == 0)
{
return;
}

using CancellationTokenSource cts = new CancellationTokenSource(settings._maxResponseDrainTime);
try
{
// If there is more data than MaxResponseDrainSize, we will silently stop draining and let Dispose(Async) abort the reads.
while (remaining > 0)
{
_recvBuffer.EnsureAvailableSpace(1);
Memory<byte> buffer = remaining >= _recvBuffer.AvailableMemory.Length ? _recvBuffer.AvailableMemory : _recvBuffer.AvailableMemory.Slice(0, remaining);
int bytesRead = await _stream.ReadAsync(buffer, cts.Token).ConfigureAwait(false);
if (bytesRead == 0)
{
// Reached EOS.
return;
}
remaining -= bytesRead;
_recvBuffer.Commit(bytesRead);
_recvBuffer.Discard(bytesRead);
}
}
catch (Exception ex)
{
// Eat exceptions and stop draining to unblock QuicStream disposal waiting for response drain.
if (NetEventSource.Log.IsEnabled())
{
string message = ex is OperationCanceledException oce && oce.CancellationToken == cts.Token ? "Response drain timed out." : $"Response drain failed with exception: {ex}";
Trace(message);
}

return;
}
}

private async ValueTask<bool> ReadNextDataFrameAsync(HttpResponseMessage response, CancellationToken cancellationToken)
{
if (_responseDataPayloadRemaining == -1)
Expand Down Expand Up @@ -1365,11 +1424,10 @@ private async ValueTask<bool> ReadNextDataFrameAsync(HttpResponseMessage respons
_trailingHeaders = new List<(HeaderDescriptor name, string value)>();
await ReadHeadersAsync(payloadLength, cancellationToken).ConfigureAwait(false);

// There may be more frames after this one, but they would all be unknown extension
// frames that we are allowed to skip. Just close the stream early.
// We do not expect more DATA frames after the trailers.
// Start draining the response to avoid aborting reads during disposal.
_responseDrainTask = DrainResponseAsync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this change will only drain the stream if there are trailers, right?
If the message doesn't have the trailers (ends with DATA frame or is just HEADERS without body and trailers), we still could mistakenly abort the reading side. Or am I overlooking something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this change will only drain the stream if there are trailers, right?

Yes. Since #60118 talked about only this specific case, and I didn't consider others. Should we do it? If yes should it happen in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, what I'm thinking is that if it means just to just plug-in DrainResponseAsync to few other places then let's do it. On the other hand, if it's more complicated we should probably at least think it through before merging this. Just to make sure we're not doing here something that would have to be undone.

Copy link
Member Author

@antonfirsov antonfirsov Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the message doesn't have the trailers (ends with DATA frame or is just HEADERS without body and trailers), we still could mistakenly abort the reading side

I spent more time examining Http3RequestStream code, and I don't think this could happen. If the message ends with a DATA frame (or an uknown frame we skipped), Http3ReadStream.ReadAsync will return with the data read into its' buffer. There will be a subsequent read issued against the Http3ReadStream which will make ReadNextDataFrameAsync hit an EOS and stop processing data by setting _responseDataPayloadRemaining = -1 in case null (and then return 0 from Http3ReadStream.Read(Async)):

case null:
// End of stream.
CopyTrailersToResponseMessage(response);
_responseDataPayloadRemaining = -1; // Set to -1 to indicate EOS.
return false;

The primary issue with the trailer handling logic on main is that we execute the code in case null without actually reading the EOS.

This will short circuit subsequent calls to Http3ReadStream.ReadAsync to return 0 while in fact never reading the EOS from the underlying QuicStream:

if (_responseDataPayloadRemaining == -1)
{
// EOS -- this branch will only be taken if user calls Read again after EOS.
return false;
}


// Note: if a server sends additional HEADERS or DATA frames at this point, it
// would be a connection error -- not draining the stream means we won't catch this.
goto case null;
case null:
// End of stream.
Expand Down
Loading
Loading