Skip to content

Add tests for ParallelExecutor child execution #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/ParallelExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use function implode;
use function mb_strlen;
use function sprintf;
use const STDIN;
use Symfony\Component\Console\Input\Input;
use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputInterface;
Expand Down Expand Up @@ -58,6 +57,11 @@ final class ParallelExecutor

private ItemProcessingErrorHandler $errorHandler;

/**
* @var resource
*/
private $childSourceStream;

/**
* @var positive-int
*/
Expand Down Expand Up @@ -107,6 +111,7 @@ final class ParallelExecutor
* @param callable(InputInterface):list<string> $fetchItems
* @param callable(string, InputInterface, OutputInterface):void $runSingleCommand
* @param callable(int):string $getItemName
* @param resource $childSourceStream
* @param positive-int $batchSize
* @param positive-int $segmentSize
* @param callable(InputInterface, OutputInterface):void $runBeforeFirstCommand
Expand All @@ -122,6 +127,7 @@ public function __construct(
string $commandName,
InputDefinition $commandDefinition,
ItemProcessingErrorHandler $errorHandler,
$childSourceStream,
int $batchSize,
int $segmentSize,
callable $runBeforeFirstCommand,
Expand All @@ -147,6 +153,7 @@ public function __construct(
$this->commandName = $commandName;
$this->commandDefinition = $commandDefinition;
$this->errorHandler = $errorHandler;
$this->childSourceStream = $childSourceStream;
$this->batchSize = $batchSize;
$this->segmentSize = $segmentSize;
$this->runBeforeFirstCommand = $runBeforeFirstCommand;
Expand Down Expand Up @@ -278,7 +285,7 @@ private function executeChildProcess(
Logger $logger
): int {
$itemIterator = ChunkedItemsIterator::fromStream(
STDIN,
$this->childSourceStream,
$this->batchSize,
);

Expand Down
22 changes: 22 additions & 0 deletions src/ParallelExecutorFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use function chr;
use const DIRECTORY_SEPARATOR;
use function getcwd;
use const STDIN;
use Symfony\Component\Console\Input\InputDefinition;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
Expand Down Expand Up @@ -47,6 +48,11 @@ final class ParallelExecutorFactory

private ItemProcessingErrorHandler $errorHandler;

/**
* @var resource
*/
private $childSourceStream;

/**
* @var positive-int
*/
Expand Down Expand Up @@ -96,6 +102,7 @@ final class ParallelExecutorFactory
* @param callable(InputInterface):list<string> $fetchItems
* @param callable(string, InputInterface, OutputInterface):void $runSingleCommand
* @param callable(int):string $getItemName
* @param resource $childSourceStream
* @param positive-int $batchSize
* @param positive-int $segmentSize
* @param callable(InputInterface, OutputInterface):void $runBeforeFirstCommand
Expand All @@ -111,6 +118,7 @@ private function __construct(
string $commandName,
InputDefinition $commandDefinition,
ItemProcessingErrorHandler $errorHandler,
$childSourceStream,
int $batchSize,
int $segmentSize,
callable $runBeforeFirstCommand,
Expand All @@ -130,6 +138,7 @@ private function __construct(
$this->commandName = $commandName;
$this->commandDefinition = $commandDefinition;
$this->errorHandler = $errorHandler;
$this->childSourceStream = $childSourceStream;
$this->batchSize = $batchSize;
$this->segmentSize = $segmentSize;
$this->runBeforeFirstCommand = $runBeforeFirstCommand;
Expand Down Expand Up @@ -164,6 +173,7 @@ public static function create(
$commandName,
$commandDefinition,
$errorHandler,
STDIN,
50,
50,
self::getNoopCallable(),
Expand All @@ -179,6 +189,17 @@ public static function create(
);
}

/**
* @param resource $childSourceStream
*/
public function withChildSourceStream($childSourceStream): self
{
$clone = clone $this;
$clone->childSourceStream = $childSourceStream;

return $clone;
}

/**
* The number of items to process in a batch. Multiple batches can be
* executed within the main and child processes. This allows to early fetch
Expand Down Expand Up @@ -347,6 +368,7 @@ public function build(): ParallelExecutor
$this->commandName,
$this->commandDefinition,
$this->errorHandler,
$this->childSourceStream,
$this->batchSize,
$this->segmentSize,
$this->runBeforeFirstCommand,
Expand Down
42 changes: 9 additions & 33 deletions tests/ChunkedItemsIteratorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,9 @@

namespace Webmozarts\Console\Parallelization;

use Closure;
use function fclose;
use function fopen;
use function fwrite;
use InvalidArgumentException;
use LogicException;
use PHPUnit\Framework\TestCase;
use function rewind;
use stdClass;

/**
Expand Down Expand Up @@ -164,27 +159,27 @@ public static function valuesProvider(): iterable
public static function streamProvider(): iterable
{
yield 'single item' => [
self::createStream('item0'),
StringStream::fromString('item0'),
['item0'],
];

yield 'single item with space' => [
self::createStream('it em'),
StringStream::fromString('it em'),
['it em'],
];

yield 'empty string' => [
self::createStream(''),
StringStream::fromString(''),
[],
];

yield 'whitespace string' => [
self::createStream(' '),
StringStream::fromString(' '),
[' '],
];

yield 'several items' => [
self::createStream(<<<'STDIN'
StringStream::fromString(<<<'STDIN'
item0
item1
item3
Expand All @@ -193,7 +188,7 @@ public static function streamProvider(): iterable
];

yield 'several items with blank values' => [
self::createStream(<<<'STDIN'
StringStream::fromString(<<<'STDIN'
item0
item1

Expand All @@ -205,7 +200,7 @@ public static function streamProvider(): iterable
];

yield 'numerical items – items are kept as strings' => [
self::createStream(<<<'STDIN'
StringStream::fromString(<<<'STDIN'
string item
10
.5
Expand All @@ -220,7 +215,7 @@ public static function inputProvider(): iterable
{
yield 'one item: the fetch item closure is not evaluated' => [
'item0',
self::createFakeClosure(),
FakeCallable::create(),
['item0'],
];

Expand All @@ -240,7 +235,7 @@ public static function invalidValuesProvider(): iterable
];

yield 'closure item' => [
[self::createFakeClosure()],
[FakeCallable::create()],
1,
'The items are potentially passed to the child processes via the STDIN. For this reason they are expected to be string values. Got "Closure" for the item "0".',
];
Expand All @@ -252,25 +247,6 @@ public static function invalidValuesProvider(): iterable
];
}

private static function createFakeClosure(): Closure
{
return static function () {
throw new LogicException('Did not expect to be called');
};
}

/**
* @return resource
*/
private static function createStream(string $value)
{
$stream = fopen('php://memory', 'rb+');
fwrite($stream, $value);
rewind($stream);

return $stream;
}

private static function assertStateIs(
ChunkedItemsIterator $iterator,
array $expectedItems,
Expand Down
28 changes: 28 additions & 0 deletions tests/ErrorHandler/DummyErrorHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/*
* This file is part of the Webmozarts Console Parallelization package.
*
* (c) Webmozarts GmbH <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Webmozarts\Console\Parallelization\ErrorHandler;

use function func_get_args;
use Throwable;
use Webmozarts\Console\Parallelization\Logger\Logger;

final class DummyErrorHandler implements ItemProcessingErrorHandler
{
public array $calls = [];

public function handleError(string $item, Throwable $throwable, Logger $logger): void
{
$this->calls[] = func_get_args();
}
}
2 changes: 1 addition & 1 deletion tests/CallableUtil.php → tests/FakeCallable.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use DomainException;

final class CallableUtil
final class FakeCallable
{
private function __construct()
{
Expand Down
3 changes: 3 additions & 0 deletions tests/ParallelExecutorFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public function test_it_can_create_a_configured_executor(): void
$callable5 = self::createCallable(5);
$callable6 = self::createCallable(6);

$childSourceStream = StringStream::fromString('');
$batchSize = 10;
$segmentSize = 20;
$extraEnvironmentVariables = ['CUSTOM_CI' => '0'];
Expand All @@ -55,6 +56,7 @@ public function test_it_can_create_a_configured_executor(): void
$definition,
$errorHandler,
)
->withChildSourceStream($childSourceStream)
->withBatchSize($batchSize)
->withSegmentSize($segmentSize)
->withRunBeforeFirstCommand($callable3)
Expand All @@ -76,6 +78,7 @@ public function test_it_can_create_a_configured_executor(): void
$commandName,
$definition,
$errorHandler,
$childSourceStream,
$batchSize,
$segmentSize,
$callable3,
Expand Down
Loading