Skip to content

Make the process factory and process tick extendible #149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions src/ParallelExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,16 @@
use Webmozarts\Console\Parallelization\Logger\Logger;
use Webmozarts\Console\Parallelization\Process\ProcessLauncher;
use Webmozarts\Console\Parallelization\Process\ProcessLauncherFactory;
use Webmozarts\Console\Parallelization\Process\StandardSymfonyProcessFactory;
use function array_filter;
use function array_map;
use function array_merge;
use function array_slice;
use function implode;
use function mb_strlen;
use function sprintf;
use function usleep;

final class ParallelExecutor
{
private const CHILD_POLLING_IN_MICRO_SECONDS = 1000; // 1ms

/**
* @var callable(InputInterface):iterable<string>
*/
Expand Down Expand Up @@ -110,6 +106,11 @@ final class ParallelExecutor

private ProcessLauncherFactory $processLauncherFactory;

/**
* @var callable(): void
*/
private $processTick;

/**
* @param callable(InputInterface):iterable<string> $fetchItems
* @param callable(string, InputInterface, OutputInterface):void $runSingleCommand
Expand All @@ -122,6 +123,7 @@ final class ParallelExecutor
* @param callable(InputInterface, OutputInterface, list<string>):void $runBeforeBatch
* @param callable(InputInterface, OutputInterface, list<string>):void $runAfterBatch
* @param array<string, string> $extraEnvironmentVariables
* @param callable(): void $processTick
*/
public function __construct(
callable $fetchItems,
Expand All @@ -142,7 +144,8 @@ public function __construct(
string $scriptPath,
string $workingDirectory,
?array $extraEnvironmentVariables,
ProcessLauncherFactory $processLauncherFactory
ProcessLauncherFactory $processLauncherFactory,
callable $processTick
) {
self::validateBatchSize($batchSize);
self::validateSegmentSize($segmentSize);
Expand All @@ -168,6 +171,7 @@ public function __construct(
$this->workingDirectory = $workingDirectory;
$this->extraEnvironmentVariables = $extraEnvironmentVariables;
$this->processLauncherFactory = $processLauncherFactory;
$this->processTick = $processTick;
}

public function execute(
Expand Down Expand Up @@ -363,10 +367,7 @@ private function createProcessLauncher(
$segmentSize,
$logger,
fn (string $type, string $buffer) => $this->processChildOutput($buffer, $logger),
// TODO: make this configurable?
static fn () => usleep(self::CHILD_POLLING_IN_MICRO_SECONDS),
// TODO: make this configurable?
new StandardSymfonyProcessFactory(),
$this->processTick,
);
}

Expand Down
30 changes: 28 additions & 2 deletions src/ParallelExecutorFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Webmozarts\Console\Parallelization\ErrorHandler\ErrorHandler;
use Webmozarts\Console\Parallelization\Process\PhpExecutableFinder;
use Webmozarts\Console\Parallelization\Process\ProcessLauncherFactory;
use Webmozarts\Console\Parallelization\Process\StandardSymfonyProcessFactory;
use Webmozarts\Console\Parallelization\Process\SymfonyProcessLauncherFactory;
use function chr;
use function Safe\getcwd;
Expand All @@ -27,6 +28,8 @@

final class ParallelExecutorFactory
{
private const CHILD_POLLING_IN_MICRO_SECONDS = 1000; // 1ms

/**
* @var callable(InputInterface):iterable<string>
*/
Expand Down Expand Up @@ -98,6 +101,11 @@ final class ParallelExecutorFactory

private ProcessLauncherFactory $processLauncherFactory;

/**
* @var callable(): void
*/
private $processTick;

/**
* @param callable(InputInterface):iterable<string> $fetchItems
* @param callable(string, InputInterface, OutputInterface):void $runSingleCommand
Expand All @@ -110,6 +118,7 @@ final class ParallelExecutorFactory
* @param callable(InputInterface, OutputInterface, list<string>):void $runBeforeBatch
* @param callable(InputInterface, OutputInterface, list<string>):void $runAfterBatch
* @param array<string, string> $extraEnvironmentVariables
* @param callable(): void $processTick
*/
private function __construct(
callable $fetchItems,
Expand All @@ -130,7 +139,8 @@ private function __construct(
string $scriptPath,
string $workingDirectory,
?array $extraEnvironmentVariables,
ProcessLauncherFactory $processLauncherFactory
ProcessLauncherFactory $processLauncherFactory,
callable $processTick
) {
$this->fetchItems = $fetchItems;
$this->runSingleCommand = $runSingleCommand;
Expand All @@ -151,6 +161,7 @@ private function __construct(
$this->workingDirectory = $workingDirectory;
$this->extraEnvironmentVariables = $extraEnvironmentVariables;
$this->processLauncherFactory = $processLauncherFactory;
$this->processTick = $processTick;
}

/**
Expand Down Expand Up @@ -185,7 +196,10 @@ public static function create(
self::getScriptPath(),
getcwd(),
null,
new SymfonyProcessLauncherFactory(),
new SymfonyProcessLauncherFactory(
new StandardSymfonyProcessFactory(),
),
static fn () => usleep(self::CHILD_POLLING_IN_MICRO_SECONDS),
);
}

Expand Down Expand Up @@ -362,6 +376,17 @@ public function withProcessLauncherFactory(ProcessLauncherFactory $processLaunch
return $clone;
}

/**
* @param callable(): void $processTick
*/
public function withProcessTick(callable $processTick): self
{
$clone = clone $this;
$clone->processTick = $processTick;

return $clone;
}

public function build(): ParallelExecutor
{
return new ParallelExecutor(
Expand All @@ -384,6 +409,7 @@ public function build(): ParallelExecutor
$this->workingDirectory,
$this->extraEnvironmentVariables,
$this->processLauncherFactory,
$this->processTick,
);
}

Expand Down
3 changes: 1 addition & 2 deletions src/Process/ProcessLauncherFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public function create(
int $segmentSize,
Logger $logger,
callable $callback,
callable $tick,
SymfonyProcessFactory $processFactory
callable $tick
): ProcessLauncher;
}
12 changes: 9 additions & 3 deletions src/Process/SymfonyProcessLauncherFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

final class SymfonyProcessLauncherFactory implements ProcessLauncherFactory
{
private SymfonyProcessFactory $processFactory;

public function __construct(SymfonyProcessFactory $processFactory)
{
$this->processFactory = $processFactory;
}

/**
* @param list<string> $command
* @param array<string, string>|null $extraEnvironmentVariables
Expand All @@ -33,8 +40,7 @@ public function create(
int $segmentSize,
Logger $logger,
callable $callback,
callable $tick,
SymfonyProcessFactory $processFactory
callable $tick
): ProcessLauncher {
return new SymfonyProcessLauncher(
$command,
Expand All @@ -45,7 +51,7 @@ public function create(
$logger,
$callback,
$tick,
$processFactory,
$this->processFactory,
);
}
}
3 changes: 3 additions & 0 deletions tests/ParallelExecutorFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public function test_it_can_create_a_configured_executor(): void
$callable4 = self::createCallable(4);
$callable5 = self::createCallable(5);
$callable6 = self::createCallable(6);
$callable7 = self::createCallable(7);

$childSourceStream = StringStream::fromString('');
$batchSize = 10;
Expand Down Expand Up @@ -78,6 +79,7 @@ public function test_it_can_create_a_configured_executor(): void
->withWorkingDirectory(self::FILE_3)
->withExtraEnvironmentVariables($extraEnvironmentVariables)
->withProcessLauncherFactory($processLauncherFactory)
->withProcessTick($callable7)
->build();

$expected = new ParallelExecutor(
Expand All @@ -100,6 +102,7 @@ public function test_it_can_create_a_configured_executor(): void
self::FILE_3,
$extraEnvironmentVariables,
$processLauncherFactory,
$callable7,
);

self::assertEquals($expected, $executor);
Expand Down
5 changes: 3 additions & 2 deletions tests/ParallelExecutorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
use Webmozarts\Console\Parallelization\Process\FakeProcessLauncherFactory;
use Webmozarts\Console\Parallelization\Process\ProcessLauncher;
use Webmozarts\Console\Parallelization\Process\ProcessLauncherFactory;
use Webmozarts\Console\Parallelization\Process\StandardSymfonyProcessFactory;
use function array_fill;
use function func_get_args;
use function getcwd;
Expand Down Expand Up @@ -427,7 +426,6 @@ public function test_it_can_launch_configured_child_processes(): void
$logger,
Argument::type('callable'),
Argument::type('callable'),
Argument::type(StandardSymfonyProcessFactory::class),
)
->willReturn($processLauncherProphecy->reveal());

Expand All @@ -451,6 +449,7 @@ public function test_it_can_launch_configured_child_processes(): void
$workingDirectory,
$extraEnvironmentVariables,
$processLauncherFactoryProphecy->reveal(),
static function (): void {},
);

$executor->execute(
Expand Down Expand Up @@ -1018,6 +1017,7 @@ private static function createChildProcessExecutor(
__DIR__,
null,
new FakeProcessLauncherFactory(),
static function (): void {},
);
}

Expand Down Expand Up @@ -1069,6 +1069,7 @@ private static function createMainProcessExecutor(
__DIR__,
null,
$processLauncherFactory,
static function (): void {},
);
}

Expand Down
3 changes: 1 addition & 2 deletions tests/Process/FakeProcessLauncherFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public function create(
int $segmentSize,
Logger $logger,
callable $callback,
callable $tick,
SymfonyProcessFactory $processFactory
callable $tick
): ProcessLauncher {
throw new DomainException('Unexpected call.');
}
Expand Down