Skip to content

Commit 3063eb1

Browse files
committed
Make the executor service in ParallelFlow configurable
Resolves #12
1 parent 6d90048 commit 3063eb1

File tree

4 files changed

+42
-26
lines changed

4 files changed

+42
-26
lines changed

src/main/java/org/jeasy/flows/workflow/ParallelFlow.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,15 @@
3030
import java.util.Arrays;
3131
import java.util.List;
3232
import java.util.UUID;
33+
import java.util.concurrent.ExecutorService;
3334

3435
/**
35-
* A parallel flow executes a set of work units in parallel.
36+
* A parallel flow executes a set of work units in parallel. A {@link ParallelFlow}
37+
* requires a {@link ExecutorService} to run work units in parallel using multiple
38+
* threads.
39+
*
40+
* <strong>It is the responsibility of the caller to manage the lifecycle of the
41+
* executor service.</strong>
3642
*
3743
* The status of a parallel flow execution is defined as:
3844
*
@@ -68,14 +74,27 @@ public static class Builder {
6874

6975
private String name;
7076
private List<Work> works;
77+
private ExecutorService executorService;
7178

72-
private Builder() {
79+
private Builder(ExecutorService executorService) {
7380
this.name = UUID.randomUUID().toString();
7481
this.works = new ArrayList<>();
82+
this.executorService = executorService;
7583
}
7684

77-
public static ParallelFlow.Builder aNewParallelFlow() {
78-
return new ParallelFlow.Builder();
85+
/**
86+
* Create a new {@link ParallelFlow} builder. A {@link ParallelFlow}
87+
* requires a {@link ExecutorService} to run work units in parallel
88+
* using multiple threads.
89+
*
90+
* <strong>It is the responsibility of the caller to manage the lifecycle
91+
* of the executor service.</strong>
92+
*
93+
* @param executorService to use to run work units in parallel
94+
* @return a new {@link ParallelFlow} builder
95+
*/
96+
public static ParallelFlow.Builder aNewParallelFlow(ExecutorService executorService) {
97+
return new ParallelFlow.Builder(executorService);
7998
}
8099

81100
public ParallelFlow.Builder named(String name) {
@@ -89,7 +108,7 @@ public ParallelFlow.Builder execute(Work... works) {
89108
}
90109

91110
public ParallelFlow build() {
92-
return new ParallelFlow(name, works, new ParallelFlowExecutor());
111+
return new ParallelFlow(name, works, new ParallelFlowExecutor(executorService));
93112
}
94113
}
95114
}

src/main/java/org/jeasy/flows/workflow/ParallelFlowExecutor.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.Map;
3333
import java.util.concurrent.ExecutionException;
3434
import java.util.concurrent.ExecutorService;
35-
import java.util.concurrent.Executors;
3635
import java.util.concurrent.Future;
3736
import java.util.logging.Level;
3837
import java.util.logging.Logger;
@@ -41,26 +40,14 @@ class ParallelFlowExecutor {
4140

4241
private static final Logger LOGGER = Logger.getLogger(ParallelFlowExecutor.class.getName());
4342

44-
/*
45-
* TODO Making the executor configurable requires to answer the following questions first:
46-
*
47-
* 1. If the user provides a custom executor, when should it be shutdown? -> Could be documented so the user shuts it down himself
48-
* 2. If the user provides a custom executor which is shared by multiple parallel flow, shutting it down here (as currently done) may impact other flows
49-
* 3. If it is decided to shut down the executor at the end of the parallel flow, the parallel flow could not be re-run (in a repeat flow for example) since the executor will be in an illegal state
50-
*/
5143
private ExecutorService workExecutor;
5244

53-
ParallelFlowExecutor() {
54-
this.workExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());;
45+
ParallelFlowExecutor(ExecutorService workExecutor) {
46+
this.workExecutor = workExecutor;
5547
}
5648

5749
List<WorkReport> executeInParallel(List<Work> works) {
58-
// re-init in case it has been shut down in a previous run (See question 3)
59-
if(workExecutor.isShutdown()) {
60-
workExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
61-
}
62-
63-
// submit works to be executed in parallel
50+
// submit work units to be executed in parallel
6451
Map<Work, Future<WorkReport>> reportFutures = new HashMap<>();
6552
for (Work work : works) {
6653
Future<WorkReport> reportFuture = workExecutor.submit(work);
@@ -69,7 +56,7 @@ List<WorkReport> executeInParallel(List<Work> works) {
6956

7057
// poll for work completion
7158
int finishedWorks = works.size();
72-
// FIXME polling futures for completion, not sure this is the best way to run callables in parallel and wait them for completion (use CompletionService??)
59+
// FIXME polling futures for completion, not sure this is the best way to run callables in parallel and wait for them to complete (use CompletionService??)
7360
while (finishedWorks > 0) {
7461
for (Future<WorkReport> future : reportFutures.values()) {
7562
if (future != null && future.isDone()) {
@@ -88,7 +75,6 @@ List<WorkReport> executeInParallel(List<Work> works) {
8875
}
8976
}
9077

91-
workExecutor.shutdown(); // because if not, the workflow engine may run forever.. (See question 2).
9278
return workReports;
9379
}
9480
}

src/test/java/org/jeasy/flows/engine/WorkFlowEngineImplTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
*/
2424
package org.jeasy.flows.engine;
2525

26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
2629
import org.jeasy.flows.work.DefaultWorkReport;
2730
import org.jeasy.flows.work.Work;
2831
import org.jeasy.flows.work.WorkReport;
@@ -73,7 +76,8 @@ public void composeWorkFlowFromSeparateFlowsAndExecuteIt() {
7376
.times(3)
7477
.build();
7578

76-
ParallelFlow parallelFlow = aNewParallelFlow()
79+
ExecutorService executorService = Executors.newFixedThreadPool(2);
80+
ParallelFlow parallelFlow = aNewParallelFlow(executorService)
7781
.named("print 'hello' and 'world' in parallel")
7882
.execute(work2, work3)
7983
.build();
@@ -91,6 +95,7 @@ public void composeWorkFlowFromSeparateFlowsAndExecuteIt() {
9195

9296
WorkFlowEngine workFlowEngine = aNewWorkFlowEngine().build();
9397
WorkReport workReport = workFlowEngine.run(sequentialFlow);
98+
executorService.shutdown();
9499
assertThat(workReport.getStatus()).isEqualTo(WorkStatus.COMPLETED);
95100
System.out.println("workflow report = " + workReport);
96101
}
@@ -103,14 +108,15 @@ public void defineWorkFlowInlineAndExecuteIt() {
103108
PrintMessageWork work3 = new PrintMessageWork("world");
104109
PrintMessageWork work4 = new PrintMessageWork("done");
105110

111+
ExecutorService executorService = Executors.newFixedThreadPool(2);
106112
WorkFlow workflow = aNewSequentialFlow()
107113
.execute(aNewRepeatFlow()
108114
.named("print foo 3 times")
109115
.repeat(work1)
110116
.times(3)
111117
.build())
112118
.then(aNewConditionalFlow()
113-
.execute(aNewParallelFlow()
119+
.execute(aNewParallelFlow(executorService)
114120
.named("print 'hello' and 'world' in parallel")
115121
.execute(work2, work3)
116122
.build())
@@ -121,6 +127,7 @@ public void defineWorkFlowInlineAndExecuteIt() {
121127

122128
WorkFlowEngine workFlowEngine = aNewWorkFlowEngine().build();
123129
WorkReport workReport = workFlowEngine.run(workflow);
130+
executorService.shutdown();
124131
assertThat(workReport.getStatus()).isEqualTo(WorkStatus.COMPLETED);
125132
System.out.println("workflow report = " + workReport);
126133
}

src/test/java/org/jeasy/flows/workflow/ParallelFlowExecutorTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,23 @@
3232

3333
import java.util.Arrays;
3434
import java.util.List;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
3537

3638
public class ParallelFlowExecutorTest {
3739

3840
@Test
3941
public void call() {
4042

4143
// given
44+
ExecutorService executorService = Executors.newFixedThreadPool(2);
4245
HelloWorldWork work1 = new HelloWorldWork("work1", WorkStatus.COMPLETED);
4346
HelloWorldWork work2 = new HelloWorldWork("work2", WorkStatus.FAILED);
44-
ParallelFlowExecutor parallelFlowExecutor = new ParallelFlowExecutor();
47+
ParallelFlowExecutor parallelFlowExecutor = new ParallelFlowExecutor(executorService);
4548

4649
// when
4750
List<WorkReport> workReports = parallelFlowExecutor.executeInParallel(Arrays.asList(work1, work2));
51+
executorService.shutdown();
4852

4953
// then
5054
Assertions.assertThat(workReports).hasSize(2);

0 commit comments

Comments
 (0)