Skip to content

Commit 8560046

Browse files
authored
Add tests for ParallelExecutor child execution (#103)
1 parent 98e11db commit 8560046

9 files changed

+374
-39
lines changed

src/ParallelExecutor.php

+9-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use function implode;
2121
use function mb_strlen;
2222
use function sprintf;
23-
use const STDIN;
2423
use Symfony\Component\Console\Input\Input;
2524
use Symfony\Component\Console\Input\InputDefinition;
2625
use Symfony\Component\Console\Input\InputInterface;
@@ -58,6 +57,11 @@ final class ParallelExecutor
5857

5958
private ItemProcessingErrorHandler $errorHandler;
6059

60+
/**
61+
* @var resource
62+
*/
63+
private $childSourceStream;
64+
6165
/**
6266
* @var positive-int
6367
*/
@@ -107,6 +111,7 @@ final class ParallelExecutor
107111
* @param callable(InputInterface):list<string> $fetchItems
108112
* @param callable(string, InputInterface, OutputInterface):void $runSingleCommand
109113
* @param callable(int):string $getItemName
114+
* @param resource $childSourceStream
110115
* @param positive-int $batchSize
111116
* @param positive-int $segmentSize
112117
* @param callable(InputInterface, OutputInterface):void $runBeforeFirstCommand
@@ -122,6 +127,7 @@ public function __construct(
122127
string $commandName,
123128
InputDefinition $commandDefinition,
124129
ItemProcessingErrorHandler $errorHandler,
130+
$childSourceStream,
125131
int $batchSize,
126132
int $segmentSize,
127133
callable $runBeforeFirstCommand,
@@ -147,6 +153,7 @@ public function __construct(
147153
$this->commandName = $commandName;
148154
$this->commandDefinition = $commandDefinition;
149155
$this->errorHandler = $errorHandler;
156+
$this->childSourceStream = $childSourceStream;
150157
$this->batchSize = $batchSize;
151158
$this->segmentSize = $segmentSize;
152159
$this->runBeforeFirstCommand = $runBeforeFirstCommand;
@@ -278,7 +285,7 @@ private function executeChildProcess(
278285
Logger $logger
279286
): int {
280287
$itemIterator = ChunkedItemsIterator::fromStream(
281-
STDIN,
288+
$this->childSourceStream,
282289
$this->batchSize,
283290
);
284291

src/ParallelExecutorFactory.php

+22
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use function chr;
1717
use const DIRECTORY_SEPARATOR;
1818
use function getcwd;
19+
use const STDIN;
1920
use Symfony\Component\Console\Input\InputDefinition;
2021
use Symfony\Component\Console\Input\InputInterface;
2122
use Symfony\Component\Console\Output\OutputInterface;
@@ -47,6 +48,11 @@ final class ParallelExecutorFactory
4748

4849
private ItemProcessingErrorHandler $errorHandler;
4950

51+
/**
52+
* @var resource
53+
*/
54+
private $childSourceStream;
55+
5056
/**
5157
* @var positive-int
5258
*/
@@ -96,6 +102,7 @@ final class ParallelExecutorFactory
96102
* @param callable(InputInterface):list<string> $fetchItems
97103
* @param callable(string, InputInterface, OutputInterface):void $runSingleCommand
98104
* @param callable(int):string $getItemName
105+
* @param resource $childSourceStream
99106
* @param positive-int $batchSize
100107
* @param positive-int $segmentSize
101108
* @param callable(InputInterface, OutputInterface):void $runBeforeFirstCommand
@@ -111,6 +118,7 @@ private function __construct(
111118
string $commandName,
112119
InputDefinition $commandDefinition,
113120
ItemProcessingErrorHandler $errorHandler,
121+
$childSourceStream,
114122
int $batchSize,
115123
int $segmentSize,
116124
callable $runBeforeFirstCommand,
@@ -130,6 +138,7 @@ private function __construct(
130138
$this->commandName = $commandName;
131139
$this->commandDefinition = $commandDefinition;
132140
$this->errorHandler = $errorHandler;
141+
$this->childSourceStream = $childSourceStream;
133142
$this->batchSize = $batchSize;
134143
$this->segmentSize = $segmentSize;
135144
$this->runBeforeFirstCommand = $runBeforeFirstCommand;
@@ -164,6 +173,7 @@ public static function create(
164173
$commandName,
165174
$commandDefinition,
166175
$errorHandler,
176+
STDIN,
167177
50,
168178
50,
169179
self::getNoopCallable(),
@@ -179,6 +189,17 @@ public static function create(
179189
);
180190
}
181191

192+
/**
193+
* @param resource $childSourceStream
194+
*/
195+
public function withChildSourceStream($childSourceStream): self
196+
{
197+
$clone = clone $this;
198+
$clone->childSourceStream = $childSourceStream;
199+
200+
return $clone;
201+
}
202+
182203
/**
183204
* The number of items to process in a batch. Multiple batches can be
184205
* executed within the main and child processes. This allows to early fetch
@@ -347,6 +368,7 @@ public function build(): ParallelExecutor
347368
$this->commandName,
348369
$this->commandDefinition,
349370
$this->errorHandler,
371+
$this->childSourceStream,
350372
$this->batchSize,
351373
$this->segmentSize,
352374
$this->runBeforeFirstCommand,

tests/ChunkedItemsIteratorTest.php

+9-33
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,9 @@
1313

1414
namespace Webmozarts\Console\Parallelization;
1515

16-
use Closure;
1716
use function fclose;
18-
use function fopen;
19-
use function fwrite;
2017
use InvalidArgumentException;
21-
use LogicException;
2218
use PHPUnit\Framework\TestCase;
23-
use function rewind;
2419
use stdClass;
2520

2621
/**
@@ -164,27 +159,27 @@ public static function valuesProvider(): iterable
164159
public static function streamProvider(): iterable
165160
{
166161
yield 'single item' => [
167-
self::createStream('item0'),
162+
StringStream::fromString('item0'),
168163
['item0'],
169164
];
170165

171166
yield 'single item with space' => [
172-
self::createStream('it em'),
167+
StringStream::fromString('it em'),
173168
['it em'],
174169
];
175170

176171
yield 'empty string' => [
177-
self::createStream(''),
172+
StringStream::fromString(''),
178173
[],
179174
];
180175

181176
yield 'whitespace string' => [
182-
self::createStream(' '),
177+
StringStream::fromString(' '),
183178
[' '],
184179
];
185180

186181
yield 'several items' => [
187-
self::createStream(<<<'STDIN'
182+
StringStream::fromString(<<<'STDIN'
188183
item0
189184
item1
190185
item3
@@ -193,7 +188,7 @@ public static function streamProvider(): iterable
193188
];
194189

195190
yield 'several items with blank values' => [
196-
self::createStream(<<<'STDIN'
191+
StringStream::fromString(<<<'STDIN'
197192
item0
198193
item1
199194
@@ -205,7 +200,7 @@ public static function streamProvider(): iterable
205200
];
206201

207202
yield 'numerical items – items are kept as strings' => [
208-
self::createStream(<<<'STDIN'
203+
StringStream::fromString(<<<'STDIN'
209204
string item
210205
10
211206
.5
@@ -220,7 +215,7 @@ public static function inputProvider(): iterable
220215
{
221216
yield 'one item: the fetch item closure is not evaluated' => [
222217
'item0',
223-
self::createFakeClosure(),
218+
FakeCallable::create(),
224219
['item0'],
225220
];
226221

@@ -240,7 +235,7 @@ public static function invalidValuesProvider(): iterable
240235
];
241236

242237
yield 'closure item' => [
243-
[self::createFakeClosure()],
238+
[FakeCallable::create()],
244239
1,
245240
'The items are potentially passed to the child processes via the STDIN. For this reason they are expected to be string values. Got "Closure" for the item "0".',
246241
];
@@ -252,25 +247,6 @@ public static function invalidValuesProvider(): iterable
252247
];
253248
}
254249

255-
private static function createFakeClosure(): Closure
256-
{
257-
return static function () {
258-
throw new LogicException('Did not expect to be called');
259-
};
260-
}
261-
262-
/**
263-
* @return resource
264-
*/
265-
private static function createStream(string $value)
266-
{
267-
$stream = fopen('php://memory', 'rb+');
268-
fwrite($stream, $value);
269-
rewind($stream);
270-
271-
return $stream;
272-
}
273-
274250
private static function assertStateIs(
275251
ChunkedItemsIterator $iterator,
276252
array $expectedItems,
+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Webmozarts Console Parallelization package.
5+
*
6+
* (c) Webmozarts GmbH <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Webmozarts\Console\Parallelization\ErrorHandler;
15+
16+
use function func_get_args;
17+
use Throwable;
18+
use Webmozarts\Console\Parallelization\Logger\Logger;
19+
20+
final class DummyErrorHandler implements ItemProcessingErrorHandler
21+
{
22+
public array $calls = [];
23+
24+
public function handleError(string $item, Throwable $throwable, Logger $logger): void
25+
{
26+
$this->calls[] = func_get_args();
27+
}
28+
}

tests/CallableUtil.php renamed to tests/FakeCallable.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
use DomainException;
1717

18-
final class CallableUtil
18+
final class FakeCallable
1919
{
2020
private function __construct()
2121
{

tests/ParallelExecutorFactoryTest.php

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public function test_it_can_create_a_configured_executor(): void
4141
$callable5 = self::createCallable(5);
4242
$callable6 = self::createCallable(6);
4343

44+
$childSourceStream = StringStream::fromString('');
4445
$batchSize = 10;
4546
$segmentSize = 20;
4647
$extraEnvironmentVariables = ['CUSTOM_CI' => '0'];
@@ -55,6 +56,7 @@ public function test_it_can_create_a_configured_executor(): void
5556
$definition,
5657
$errorHandler,
5758
)
59+
->withChildSourceStream($childSourceStream)
5860
->withBatchSize($batchSize)
5961
->withSegmentSize($segmentSize)
6062
->withRunBeforeFirstCommand($callable3)
@@ -76,6 +78,7 @@ public function test_it_can_create_a_configured_executor(): void
7678
$commandName,
7779
$definition,
7880
$errorHandler,
81+
$childSourceStream,
7982
$batchSize,
8083
$segmentSize,
8184
$callable3,

0 commit comments

Comments
 (0)