Skip to content

Commit 09212eb

Browse files
committed
Better fix for #390
Restored awaiting back-pressure when consuming a response body.
1 parent cf885bf commit 09212eb

File tree

5 files changed

+67
-25
lines changed

5 files changed

+67
-25
lines changed

src/Connection/Http1Connection.php

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,19 @@ private function readResponse(
305305
$trailers = $headers;
306306
};
307307

308-
$parser = new Http1Parser($request, $stream, $bodyEmitter->pushAsync(...), $trailersCallback);
308+
$bodyDeferredCancellation = new DeferredCancellation;
309+
$bodyCancellation = new CompositeCancellation(
310+
$readingCancellation,
311+
$bodyDeferredCancellation->getCancellation(),
312+
);
313+
314+
$parser = new Http1Parser(
315+
$request,
316+
$stream,
317+
$bodyEmitter->pushAsync(...),
318+
$bodyCancellation,
319+
$trailersCallback,
320+
);
309321

310322
$start = now();
311323
$inactivityTimeout = $request->getInactivityTimeout();
@@ -353,7 +365,15 @@ private function readResponse(
353365
}
354366

355367
$chunk = $parser->getBuffer();
356-
$parser = new Http1Parser($request, $stream, $bodyEmitter->pushAsync(...), $trailersCallback);
368+
369+
$parser = new Http1Parser(
370+
$request,
371+
$stream,
372+
$bodyEmitter->pushAsync(...),
373+
$bodyCancellation,
374+
$trailersCallback,
375+
);
376+
357377
goto parseChunk;
358378
}
359379

@@ -363,12 +383,6 @@ private function readResponse(
363383
return $this->handleUpgradeResponse($request, $response, $parser->getBuffer());
364384
}
365385

366-
$bodyDeferredCancellation = new DeferredCancellation;
367-
$bodyCancellation = new CompositeCancellation(
368-
$readingCancellation,
369-
$bodyDeferredCancellation->getCancellation()
370-
);
371-
372386
$response->setTrailers($trailersDeferred->getFuture());
373387
$response->setBody(new ResponseBodyStream(
374388
new ReadableIterableStream($bodyEmitter->pipe()),

src/Connection/Internal/Http1Parser.php

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Amp\Http\Client\Connection\Internal;
44

55
use Amp\ByteStream\ReadableBuffer;
6+
use Amp\Cancellation;
67
use Amp\ForbidCloning;
78
use Amp\ForbidSerialization;
89
use Amp\Future;
@@ -72,6 +73,7 @@ public function __construct(
7273
private readonly Request $request,
7374
private readonly Stream $stream,
7475
private readonly \Closure $bodyDataCallback,
76+
private readonly Cancellation $bodyCancellation,
7577
private readonly \Closure $trailersCallback,
7678
) {
7779
$this->maxHeaderBytes = $request->getHeaderSizeLimit();
@@ -167,8 +169,11 @@ public function parse(?string $data = null): ?Response
167169
}
168170

169171
$requestMethod = $this->request->getMethod();
170-
$skipBody = $statusCode < HttpStatus::OK || $statusCode === HttpStatus::NOT_MODIFIED || $statusCode === HttpStatus::NO_CONTENT
171-
|| $requestMethod === 'HEAD' || $requestMethod === 'CONNECT';
172+
$skipBody = $statusCode < HttpStatus::OK
173+
|| $statusCode === HttpStatus::NOT_MODIFIED
174+
|| $statusCode === HttpStatus::NO_CONTENT
175+
|| $requestMethod === 'HEAD'
176+
|| $requestMethod === 'CONNECT';
172177

173178
if ($skipBody) {
174179
$this->complete = true;
@@ -328,7 +333,11 @@ private function shiftHeadersFromBuffer(): ?string
328333
}
329334

330335
if ($this->maxHeaderBytes > 0 && $headersSize > $this->maxHeaderBytes) {
331-
throw new ParseException("Configured header size exceeded: {$headersSize} bytes received, while the configured limit is {$this->maxHeaderBytes} bytes", HttpStatus::REQUEST_HEADER_FIELDS_TOO_LARGE);
336+
throw new ParseException(
337+
"Configured header size exceeded: {$headersSize} bytes received, while the configured " .
338+
"limit is {$this->maxHeaderBytes} bytes",
339+
HttpStatus::REQUEST_HEADER_FIELDS_TOO_LARGE,
340+
);
332341
}
333342

334343
return $headers;
@@ -357,13 +366,17 @@ private function parseRawHeaders(string $rawHeaders): array
357366
$this->chunkedEncoding = \in_array('chunked', $transferEncodings, true);
358367
} elseif (!empty($headerMap['content-length'])) {
359368
if (\count($headerMap['content-length']) > 1) {
360-
throw new ParseException('Can\'t determine body length, because multiple content-length headers present in the response', HttpStatus::BAD_REQUEST);
369+
throw new ParseException('Can\'t determine body length, because multiple content-length ' .
370+
'headers present in the response', HttpStatus::BAD_REQUEST);
361371
}
362372

363373
$contentLength = $headerMap['content-length'][0];
364374

365375
if (!\preg_match('/^(0|[1-9][0-9]*)$/', $contentLength)) {
366-
throw new ParseException('Can\'t determine body length, because the content-length header value is invalid', HttpStatus::BAD_REQUEST);
376+
throw new ParseException(
377+
'Can\'t determine body length, because the content-length header value is invalid',
378+
HttpStatus::BAD_REQUEST,
379+
);
367380
}
368381

369382
$this->remainingBodyBytes = (int) $contentLength;
@@ -467,9 +480,13 @@ private function addToBody(string $data): void
467480
$this->bodyBytesConsumed += $length;
468481

469482
if ($this->maxBodyBytes > 0 && $this->bodyBytesConsumed > $this->maxBodyBytes) {
470-
throw new ParseException("Configured body size exceeded: {$this->bodyBytesConsumed} bytes received, while the configured limit is {$this->maxBodyBytes} bytes", HttpStatus::PAYLOAD_TOO_LARGE);
483+
throw new ParseException(
484+
"Configured body size exceeded: {$this->bodyBytesConsumed} bytes received," .
485+
" while the configured limit is {$this->maxBodyBytes} bytes",
486+
HttpStatus::PAYLOAD_TOO_LARGE,
487+
);
471488
}
472489

473-
($this->bodyDataCallback)($data)->ignore();
490+
($this->bodyDataCallback)($data)->await($this->bodyCancellation);
474491
}
475492
}

src/functions.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,15 @@ function processRequest(Request $request, array $eventListeners, \Closure $reque
3535
}
3636

3737
$trailers = $response->getTrailers();
38-
$trailers->map(fn () => events()->requestEnd($request, $response))->ignore();
38+
39+
$responseRef = \WeakReference::create($response);
40+
$trailers->map(function () use ($request, $responseRef): void {
41+
$response = $responseRef->get();
42+
if ($response) {
43+
events()->requestEnd($request, $response);
44+
}
45+
})->ignore();
46+
3947
$trailers->catch(fn (\Throwable $exception) => events()->requestFailed($request, $exception))->ignore();
4048

4149
return $response;

test/Connection/ConnectionLimitingPoolTest.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ public function testSingleConnectionDoNotUseBody(): void
5252
$r2 = new Request('https://httpbin.org/delay/1');
5353
$r2->setProtocolVersions(['1.1']);
5454

55-
Future\await([
56-
async($client->request(...), $r1),
57-
async($client->request(...), $r2),
58-
]);
55+
$future = async($client->request(...), $r1);
56+
async($client->request(...), $r2);
57+
58+
$future->await();
5959
}
6060

6161
public function testSingleConnectionDoNotUseBodyHttp2(): void
@@ -71,10 +71,10 @@ public function testSingleConnectionDoNotUseBodyHttp2(): void
7171
$r2 = new Request('https://httpbin.org/delay/1');
7272
$r2->setProtocolVersions(['2']);
7373

74-
Future\await([
75-
async($client->request(...), $r1),
76-
async($client->request(...), $r2),
77-
]);
74+
$future = async($client->request(...), $r1);
75+
async($client->request(...), $r2);
76+
77+
$future->await();
7878
}
7979

8080
public function testTwoConnections(): void

test/ParserTest.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Amp\Http\Client\Connection\Connection;
88
use Amp\Http\Client\Connection\Internal\Http1Parser;
99
use Amp\Http\Client\Connection\Stream;
10+
use Amp\NullCancellation;
1011
use Amp\PHPUnit\AsyncTestCase;
1112

1213
class ParserTest extends AsyncTestCase
@@ -27,7 +28,8 @@ public function testKeepAliveHeadResponseParse(): void
2728
$parser = new Http1Parser(
2829
$request,
2930
$this->createMock(Stream::class),
30-
$this->createCallback(0),
31+
$this->createCallback(0, fn () => Future::complete()),
32+
new NullCancellation(),
3133
$this->createCallback(0)
3234
);
3335

@@ -57,6 +59,7 @@ public function testResponseWithTrailers(): void
5759
$request,
5860
$this->createMock(Stream::class),
5961
$this->createCallback(1, fn () => Future::complete()),
62+
new NullCancellation(),
6063
$callback
6164
);
6265

0 commit comments

Comments
 (0)