Skip to content

Commit cf885bf

Browse files
trowskidanog
andcommitted
Fix #390
Co-authored-by: Daniil Gentili <[email protected]>
1 parent c6a789d commit cf885bf

File tree

4 files changed

+104
-49
lines changed

4 files changed

+104
-49
lines changed

src/Connection/Http1Connection.php

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,9 @@ private function readResponse(
294294
Request $request,
295295
Cancellation $originalCancellation,
296296
Cancellation $readingCancellation,
297-
Stream $stream
297+
Stream $stream,
298298
): Response {
299299
$bodyEmitter = new Queue();
300-
$bodyCallback = static fn (string $data) => $bodyEmitter->push($data);
301-
302300
$trailersDeferred = new DeferredFuture;
303301
$trailersDeferred->getFuture()->ignore();
304302

@@ -307,17 +305,17 @@ private function readResponse(
307305
$trailers = $headers;
308306
};
309307

310-
$parser = new Http1Parser($request, $stream, $bodyCallback, $trailersCallback);
308+
$parser = new Http1Parser($request, $stream, $bodyEmitter->pushAsync(...), $trailersCallback);
311309

312310
$start = now();
313-
$timeout = $request->getInactivityTimeout();
311+
$inactivityTimeout = $request->getInactivityTimeout();
314312

315313
try {
316314
if ($this->socket === null) {
317315
throw new SocketException('Socket closed prior to response completion');
318316
}
319317

320-
while (null !== $chunk = $this->readChunk($timeout)) {
318+
while (null !== $chunk = $this->readChunk($inactivityTimeout)) {
321319
parseChunk:
322320
$response = $parser->parse($chunk);
323321
if ($response === null) {
@@ -355,7 +353,7 @@ private function readResponse(
355353
}
356354

357355
$chunk = $parser->getBuffer();
358-
$parser = new Http1Parser($request, $stream, $bodyCallback, $trailersCallback);
356+
$parser = new Http1Parser($request, $stream, $bodyEmitter->pushAsync(...), $trailersCallback);
359357
goto parseChunk;
360358
}
361359

@@ -374,22 +372,26 @@ private function readResponse(
374372
$response->setTrailers($trailersDeferred->getFuture());
375373
$response->setBody(new ResponseBodyStream(
376374
new ReadableIterableStream($bodyEmitter->pipe()),
377-
$bodyDeferredCancellation
375+
$bodyDeferredCancellation,
378376
));
379377

378+
[$requestTimeout, $explicitTimeout, $priorTimeout] = $this->determineKeepAliveTimeout($response);
379+
380380
// Read body async
381381
EventLoop::queue(function () use (
382382
$parser,
383383
$request,
384-
$response,
384+
$requestTimeout,
385+
$explicitTimeout,
386+
$priorTimeout,
387+
$inactivityTimeout,
385388
$bodyEmitter,
386389
$trailersDeferred,
387390
$originalCancellation,
388391
$readingCancellation,
389392
$bodyCancellation,
390393
$stream,
391-
$timeout,
392-
&$trailers
394+
&$trailers,
393395
) {
394396
$closeId = $bodyCancellation->subscribe($this->close(...));
395397

@@ -401,31 +403,24 @@ private function readResponse(
401403
$chunk = null;
402404

403405
try {
404-
/** @psalm-suppress PossiblyNullReference */
405406
do {
406-
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
407407
$parser->parse($chunk);
408-
/**
409-
* @noinspection NotOptimalIfConditionsInspection
410-
* @psalm-suppress TypeDoesNotContainType
411-
*/
412408
if ($parser->isComplete()) {
413409
break;
414410
}
415411

416-
/** @psalm-suppress TypeDoesNotContainNull */
417412
if ($this->socket === null) {
418413
throw new SocketException('Socket closed prior to response completion');
419414
}
420-
} while (null !== $chunk = $this->socket->read($timeout > 0 ? new TimeoutCancellation($timeout) : null));
415+
} while (null !== $chunk = $this->readChunk($inactivityTimeout));
421416
} catch (CancelledException $e) {
422417
$this->close();
423418
$originalCancellation->throwIfRequested();
424419

425420
throw new TimeoutException(
426-
'Inactivity timeout exceeded, more than ' . $timeout . ' seconds elapsed from last data received',
427-
0,
428-
$e
421+
'Inactivity timeout exceeded, more than ' . $inactivityTimeout
422+
. ' seconds elapsed from last data received',
423+
previous: $e,
429424
);
430425
}
431426

@@ -443,10 +438,11 @@ private function readResponse(
443438
}
444439
}
445440

446-
$timeout = $this->determineKeepAliveTimeout($response);
441+
$this->explicitTimeout = $explicitTimeout ?: $this->explicitTimeout;
442+
$this->priorTimeout = $priorTimeout ?? $this->priorTimeout;
447443

448-
if ($timeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) {
449-
$this->timeoutWatcher = EventLoop::delay($timeout, $this->close(...));
444+
if ($requestTimeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) {
445+
$this->timeoutWatcher = EventLoop::delay($requestTimeout, $this->close(...));
450446
EventLoop::unreference($this->timeoutWatcher);
451447
$this->watchIdleConnection();
452448
} else {
@@ -504,7 +500,7 @@ private function readResponse(
504500
throw new TimeoutException('Allowed transfer timeout exceeded, took longer than ' . $request->getTransferTimeout() . ' s', 0, $e);
505501
}
506502

507-
throw new TimeoutException('Inactivity timeout exceeded, more than ' . $timeout . ' seconds elapsed from last data received', 0, $e);
503+
throw new TimeoutException('Inactivity timeout exceeded, more than ' . $inactivityTimeout . ' seconds elapsed from last data received', 0, $e);
508504
} catch (\Throwable $e) {
509505
$this->close();
510506
throw new SocketException('Receiving the response headers failed: ' . $e->getMessage(), 0, $e);
@@ -546,33 +542,34 @@ private function getRemainingTime(): float
546542
return \max(0, $timestamp - now());
547543
}
548544

549-
private function determineKeepAliveTimeout(Response $response): int
545+
/**
546+
* @return array{int, bool, int|null}
547+
*/
548+
private function determineKeepAliveTimeout(Response $response): array
550549
{
551550
$request = $response->getRequest();
552551

553552
$requestConnHeader = $request->getHeader('connection') ?? '';
554553
$responseConnHeader = $response->getHeader('connection') ?? '';
555554

556555
if (!\strcasecmp($requestConnHeader, 'close')) {
557-
return 0;
556+
return [0, false, null];
558557
}
559558

560559
if ($response->getProtocolVersion() === '1.0') {
561-
return 0;
560+
return [0, false, null];
562561
}
563562

564563
if (!\strcasecmp($responseConnHeader, 'close')) {
565-
return 0;
564+
return [0, false, null];
566565
}
567566

568567
$params = Http\parseMultipleHeaderFields($response, 'keep-alive')[0] ?? null;
569568

570569
$timeout = (int) ($params['timeout'] ?? $this->priorTimeout);
571-
if (isset($params['timeout'])) {
572-
$this->explicitTimeout = true;
573-
}
570+
$timeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT);
574571

575-
return $this->priorTimeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT);
572+
return [$timeout, isset($params['timeout']), $timeout];
576573
}
577574

578575
/**

src/Connection/Internal/Http1Parser.php

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Amp\ByteStream\ReadableBuffer;
66
use Amp\ForbidCloning;
77
use Amp\ForbidSerialization;
8+
use Amp\Future;
89
use Amp\Http\Client\Connection\Stream;
910
use Amp\Http\Client\ParseException;
1011
use Amp\Http\Client\Request;
@@ -39,7 +40,8 @@ final class Http1Parser
3940
public const TRAILERS_START = 4;
4041
public const TRAILERS = 5;
4142

42-
private ?Response $response = null;
43+
/** @var \WeakReference<Response>|null */
44+
private ?\WeakReference $responseRef = null;
4345

4446
private int $state = self::AWAITING_HEADERS;
4547

@@ -63,7 +65,7 @@ final class Http1Parser
6365
private readonly int $maxBodyBytes;
6466

6567
/**
66-
* @param \Closure(string):void $bodyDataCallback
68+
* @param \Closure(string):Future $bodyDataCallback
6769
* @param \Closure(HeaderMapType):void $trailersCallback
6870
*/
6971
public function __construct(
@@ -110,9 +112,11 @@ public function parse(?string $data = null): ?Response
110112

111113
if (!$this->bodyStarted && \in_array($this->state, [self::BODY_CHUNKS, self::BODY_IDENTITY, self::BODY_IDENTITY_EOF], true)) {
112114
$this->bodyStarted = true;
113-
$response = $this->response;
114-
\assert($response !== null);
115-
events()->responseBodyStart($this->request, $this->stream, $response);
115+
$response = $this->responseRef?->get();
116+
if ($response) {
117+
events()->responseBodyStart($this->request, $this->stream, $response);
118+
$response = null;
119+
}
116120
}
117121

118122
switch ($this->state) {
@@ -183,15 +187,21 @@ public function parse(?string $data = null): ?Response
183187
$response->addHeader($key, $value);
184188
}
185189

190+
$this->responseRef = \WeakReference::create($response);
191+
186192
events()->responseHeaderEnd($this->request, $this->stream, $response);
187193

188-
return $this->response = $response;
194+
return $response;
189195
}
190196

191197
body_identity:
192198
{
193199
if ($data !== null && $data !== '') {
194-
events()->responseBodyProgress($this->request, $this->stream, $this->response);
200+
$response = $this->responseRef?->get();
201+
if ($response) {
202+
events()->responseBodyProgress($this->request, $this->stream, $response);
203+
$response = null;
204+
}
195205
}
196206

197207
$bufferDataSize = \strlen($this->buffer);
@@ -220,7 +230,11 @@ public function parse(?string $data = null): ?Response
220230
body_identity_eof:
221231
{
222232
if ($data !== null && $data !== '') {
223-
events()->responseBodyProgress($this->request, $this->stream, $this->response);
233+
$response = $this->responseRef?->get();
234+
if ($response) {
235+
events()->responseBodyProgress($this->request, $this->stream, $response);
236+
$response = null;
237+
}
224238
}
225239

226240
$this->addToBody($this->buffer);
@@ -231,7 +245,11 @@ public function parse(?string $data = null): ?Response
231245
body_chunks:
232246
{
233247
if ($data !== null && $data !== '') {
234-
events()->responseBodyProgress($this->request, $this->stream, $this->response);
248+
$response = $this->responseRef?->get();
249+
if ($response) {
250+
events()->responseBodyProgress($this->request, $this->stream, $response);
251+
$response = null;
252+
}
235253
}
236254

237255
if ($this->parseChunkedBody()) {
@@ -272,7 +290,11 @@ public function parse(?string $data = null): ?Response
272290

273291
complete:
274292
{
275-
events()->responseBodyEnd($this->request, $this->stream, $this->response);
293+
$response = $this->responseRef?->get();
294+
if ($response) {
295+
events()->responseBodyEnd($this->request, $this->stream, $response);
296+
$response = null;
297+
}
276298

277299
$this->complete = true;
278300

@@ -438,7 +460,6 @@ private function parseTrailers(string $trailers): void
438460
private function addToBody(string $data): void
439461
{
440462
$length = \strlen($data);
441-
442463
if (!$length) {
443464
return;
444465
}
@@ -449,8 +470,6 @@ private function addToBody(string $data): void
449470
throw new ParseException("Configured body size exceeded: {$this->bodyBytesConsumed} bytes received, while the configured limit is {$this->maxBodyBytes} bytes", HttpStatus::PAYLOAD_TOO_LARGE);
450471
}
451472

452-
if ($this->bodyDataCallback) {
453-
($this->bodyDataCallback)($data);
454-
}
473+
($this->bodyDataCallback)($data)->ignore();
455474
}
456475
}

test/Connection/ConnectionLimitingPoolTest.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,44 @@ public function testSingleConnection(): void
3939
]);
4040
}
4141

42+
public function testSingleConnectionDoNotUseBody(): void
43+
{
44+
$client = (new HttpClientBuilder)
45+
->usingPool(ConnectionLimitingPool::byAuthority(1))
46+
->build();
47+
48+
$this->setMinimumRuntime(2);
49+
50+
$r1 = new Request('https://httpbin.org/delay/1');
51+
$r1->setProtocolVersions(['1.1']);
52+
$r2 = new Request('https://httpbin.org/delay/1');
53+
$r2->setProtocolVersions(['1.1']);
54+
55+
Future\await([
56+
async($client->request(...), $r1),
57+
async($client->request(...), $r2),
58+
]);
59+
}
60+
61+
public function testSingleConnectionDoNotUseBodyHttp2(): void
62+
{
63+
$client = (new HttpClientBuilder)
64+
->usingPool(ConnectionLimitingPool::byAuthority(1))
65+
->build();
66+
67+
$this->setMinimumRuntime(1);
68+
69+
$r1 = new Request('https://httpbin.org/delay/1');
70+
$r1->setProtocolVersions(['2']);
71+
$r2 = new Request('https://httpbin.org/delay/1');
72+
$r2->setProtocolVersions(['2']);
73+
74+
Future\await([
75+
async($client->request(...), $r1),
76+
async($client->request(...), $r2),
77+
]);
78+
}
79+
4280
public function testTwoConnections(): void
4381
{
4482
$client = (new HttpClientBuilder)

test/ParserTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace Amp\Http\Client;
55

6+
use Amp\Future;
67
use Amp\Http\Client\Connection\Connection;
78
use Amp\Http\Client\Connection\Internal\Http1Parser;
89
use Amp\Http\Client\Connection\Stream;
@@ -55,7 +56,7 @@ public function testResponseWithTrailers(): void
5556
$parser = new Http1Parser(
5657
$request,
5758
$this->createMock(Stream::class),
58-
$this->createCallback(1),
59+
$this->createCallback(1, fn () => Future::complete()),
5960
$callback
6061
);
6162

0 commit comments

Comments
 (0)