Skip to content

Commit 1c4a4e6

Browse files
authored
Add basic flow functions and creation methods (#63)
1 parent 6506992 commit 1c4a4e6

File tree

13 files changed

+1391
-0
lines changed

13 files changed

+1391
-0
lines changed

channels/src/main/java/com/softwaremill/jox/Channel.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ operations on these (previous) segments, and we'll end up wanting to remove such
8787
operations won't use them, so the relinking won't be useful.
8888
*/
8989

90+
/**
91+
* Can be used with {@link Channel#withScopedBufferSize()} to pass buffer size value from scope.
92+
* e.g. `ScopedValues.where(BUFFER_SIZE, 8).run(() -> Channel.withScopedBufferSize())` will create a channel with buffer size = 8
93+
* **/
94+
public static final ScopedValue<Integer> BUFFER_SIZE = ScopedValue.newInstance();
95+
public static final int DEFAULT_BUFFER_SIZE = 16;
96+
9097
// immutable state
9198

9299
private final int capacity;
@@ -202,6 +209,13 @@ public static <T> Channel<T> newUnlimitedChannel() {
202209
return new Channel<>(UNLIMITED_CAPACITY);
203210
}
204211

212+
/**
213+
* Allows for creating Channel with buffer size specified in scope by {@link ScopedValue} {@link Channel#BUFFER_SIZE}
214+
*/
215+
public static <T> Channel<T> withScopedBufferSize() {
216+
return new Channel<>(BUFFER_SIZE.orElse(DEFAULT_BUFFER_SIZE));
217+
}
218+
205219
private static final int UNLIMITED_CAPACITY = -1;
206220

207221
// *******

flows/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<groupId>com.softwaremill.jox</groupId>
8+
<artifactId>parent</artifactId>
9+
<version>0.3.1</version>
10+
</parent>
11+
12+
<artifactId>flows</artifactId>
13+
<version>0.3.1</version>
14+
<packaging>jar</packaging>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>org.junit.jupiter</groupId>
19+
<artifactId>junit-jupiter</artifactId>
20+
<scope>test</scope>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.awaitility</groupId>
24+
<artifactId>awaitility</artifactId>
25+
<scope>test</scope>
26+
</dependency>
27+
<dependency>
28+
<groupId>com.softwaremill.jox</groupId>
29+
<artifactId>channels</artifactId>
30+
<version>0.3.1</version>
31+
</dependency>
32+
<dependency>
33+
<groupId>com.softwaremill.jox</groupId>
34+
<artifactId>structured</artifactId>
35+
<version>0.3.1</version>
36+
</dependency>
37+
</dependencies>
38+
</project>
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
package com.softwaremill.jox.flows;
2+
3+
4+
import static com.softwaremill.jox.flows.Flows.usingEmit;
5+
import static com.softwaremill.jox.structured.Scopes.unsupervised;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
import java.util.concurrent.atomic.AtomicReference;
13+
import java.util.function.BiFunction;
14+
import java.util.function.Consumer;
15+
import java.util.function.Function;
16+
import java.util.function.Predicate;
17+
18+
import com.softwaremill.jox.Channel;
19+
import com.softwaremill.jox.Source;
20+
import com.softwaremill.jox.structured.UnsupervisedScope;
21+
22+
/**
23+
* Describes an asynchronous transformation pipeline. When run, emits elements of type `T`.
24+
* <p>
25+
* A flow is lazy - evaluation happens only when it's run.
26+
* <p>
27+
* Flows can be created using the {@link Flows#fromValues}, {@link Flows#fromSource}} and other `Flow.from*` methods, {@link Flows#tick} etc.
28+
* <p>
29+
* Transformation stages can be added using the available combinators, such as {@link Flow#map}, {@link Flow#buffer}, {@link Flow#grouped}, etc.
30+
* Each such method returns a new immutable {@link Flow} instance.
31+
* <p>
32+
* Running a flow is possible using one of the `run*` methods, such as {@link Flow#runToList}, {@link Flow#runToChannel} or {@link Flow#runFold}.
33+
*/
34+
public class Flow<T> {
35+
protected final FlowStage<T> last;
36+
37+
public Flow(FlowStage<T> last) {
38+
this.last = last;
39+
}
40+
41+
// region Run operations
42+
43+
/** Invokes the given function for each emitted element. Blocks until the flow completes. */
44+
public void runForeach(Consumer<T> sink) throws Exception {
45+
last.run(sink::accept);
46+
}
47+
48+
/** Invokes the provided {@link FlowEmit} for each emitted element. Blocks until the flow completes. */
49+
public void runToEmit(FlowEmit<T> emit) throws Exception {
50+
last.run(emit);
51+
}
52+
53+
/** Accumulates all elements emitted by this flow into a list. Blocks until the flow completes. */
54+
public List<T> runToList() throws Exception {
55+
List<T> result = new ArrayList<>();
56+
runForeach(result::add);
57+
return result;
58+
}
59+
60+
/** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result
61+
* of this method.
62+
* <p>
63+
* Buffer capacity can be set via scoped value {@link Channel#BUFFER_SIZE}. If not specified in scope, {@link Channel#DEFAULT_BUFFER_SIZE} is used.
64+
* <p>
65+
* Method does not block until the flow completes.
66+
*
67+
* @param scope
68+
* Required for creating async forks responsible for writing to channel
69+
*/
70+
public Source<T> runToChannel(UnsupervisedScope scope) {
71+
if (last instanceof SourceBackedFlowStage<T>(Source<T> source)) {
72+
return source;
73+
} else {
74+
Channel<T> channel = Channel.withScopedBufferSize();
75+
runLastToChannelAsync(scope, channel);
76+
return channel;
77+
}
78+
}
79+
80+
/**
81+
* Uses `zero` as the current value and applies function `f` on it and a value emitted by this flow. The returned value is used as the
82+
* next current value and `f` is applied again with the next value emitted by the flow. The operation is repeated until the flow emits
83+
* all elements.
84+
*
85+
* @param zero
86+
* An initial value to be used as the first argument to function `f` call.
87+
* @param f
88+
* A {@link BiFunction} that is applied to the current value and value emitted by the flow.
89+
* @return
90+
* Combined value retrieved from running function `f` on all flow elements in a cumulative manner where result of the previous call is
91+
* used as an input value to the next.
92+
*/
93+
public <U> U runFold(U zero, BiFunction<U, T, U> f) throws Exception {
94+
AtomicReference<U> current = new AtomicReference<>(zero);
95+
last.run(t -> current.set(f.apply(current.get(), t)));
96+
return current.get();
97+
}
98+
99+
/**
100+
* Ignores all elements emitted by the flow. Blocks until the flow completes.
101+
*/
102+
public void runDrain() throws Exception {
103+
runForeach(t -> {});
104+
}
105+
106+
// endregion
107+
108+
// region Flow operations
109+
110+
/** When run, the current pipeline is run asynchronously in the background, emitting elements to a buffer.
111+
* The elements of the buffer are then emitted by the returned flow.
112+
*
113+
* @param bufferCapacity determines size of a buffer.
114+
*
115+
* Any exceptions are propagated by the returned flow.
116+
*/
117+
public Flow<T> buffer(int bufferCapacity) {
118+
return usingEmit(emit -> {
119+
Channel<T> ch = new Channel<>(bufferCapacity);
120+
try {
121+
unsupervised(scope -> {
122+
runLastToChannelAsync(scope, ch);
123+
FlowEmit.channelToEmit(ch, emit);
124+
return null;
125+
});
126+
} catch (ExecutionException e) {
127+
throw (Exception) e.getCause();
128+
}
129+
});
130+
}
131+
132+
/**
133+
* Applies the given `mappingFunction` to each element emitted by this flow. The returned flow then emits the results.
134+
*/
135+
public <U> Flow<U> map(Function<T, U> mappingFunction) {
136+
return usingEmit(emit -> {
137+
last.run(t -> emit.apply(mappingFunction.apply(t)));
138+
});
139+
}
140+
141+
/**
142+
* Emits only those elements emitted by this flow, for which `filteringPredicate` returns `true`.
143+
*/
144+
public Flow<T> filter(Predicate<T> filteringPredicate) {
145+
return usingEmit(emit -> {
146+
last.run(t -> {
147+
if (filteringPredicate.test(t)) {
148+
emit.apply(t);
149+
}
150+
});
151+
});
152+
}
153+
154+
/**
155+
* Applies the given `mappingFunction` to each element emitted by this flow, in sequence.
156+
* The given {@link Consumer<FlowEmit>} can be used to emit an arbitrary number of elements.
157+
* <p>
158+
* The {@link FlowEmit} instance provided to the `mappingFunction` callback should only be used on the calling thread.
159+
* That is, {@link FlowEmit} is thread-unsafe. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of `mappingFunction`.
160+
*/
161+
public <U> Flow<U> mapUsingEmit(Function<T, Consumer<FlowEmit<U>>> mappingFunction) {
162+
return usingEmit(emit -> last.run(t -> mappingFunction.apply(t).accept(emit)));
163+
}
164+
165+
/**
166+
* Applies the given effectful function `f` to each element emitted by this flow. The returned flow emits the elements unchanged.
167+
* If `f` throws an exceptions, the flow fails and propagates the exception.
168+
*/
169+
public Flow<T> tap(Consumer<T> f) {
170+
return map(t -> {
171+
f.accept(t);
172+
return t;
173+
});
174+
}
175+
176+
/**
177+
* Applies the given `mappingFunction` to each element emitted by this flow, obtaining a nested flow to run.
178+
* The elements emitted by the nested flow are then emitted by the returned flow.
179+
* <p>
180+
* The nested flows are run in sequence, that is, the next nested flow is started only after the previous one completes.
181+
*/
182+
public <U> Flow<U> flatMap(Function<T, Flow<U>> mappingFunction) {
183+
return usingEmit(emit -> last.run(t -> mappingFunction.apply(t).runToEmit(emit)));
184+
}
185+
186+
/**
187+
* Takes the first `n` elements from this flow and emits them. If the flow completes before emitting `n` elements, the returned flow
188+
* completes as well.
189+
*/
190+
public Flow<T> take(int n) {
191+
return Flows.usingEmit(emit -> {
192+
AtomicInteger taken = new AtomicInteger(0);
193+
try {
194+
last.run(t -> {
195+
if (taken.getAndIncrement() < n) {
196+
emit.apply(t);
197+
} else {
198+
throw new BreakException();
199+
}
200+
});
201+
} catch (ExecutionException e) {
202+
if (!(e.getCause() instanceof BreakException)) {
203+
throw e;
204+
}
205+
// ignore
206+
} catch (BreakException e) {
207+
// ignore
208+
}
209+
});
210+
}
211+
212+
private static class BreakException extends RuntimeException {
213+
}
214+
215+
/**
216+
* Chunks up the elements into groups of the specified size. The last group may be smaller due to the flow being complete.
217+
*
218+
* @param n The number of elements in a group.
219+
*/
220+
public Flow<List<T>> grouped(int n) {
221+
return groupedWeighted(n, t -> 1L);
222+
}
223+
224+
/**
225+
* Chunks up the elements into groups that have a cumulative weight greater or equal to the `minWeight`. The last group may be smaller
226+
* due to the flow being complete.
227+
*
228+
* @param minWeight The minimum cumulative weight of elements in a group.
229+
* @param costFn The function that calculates the weight of an element.
230+
*/
231+
public Flow<List<T>> groupedWeighted(long minWeight, Function<T, Long> costFn) {
232+
if (minWeight <= 0) {
233+
throw new IllegalArgumentException("minWeight must be > 0");
234+
}
235+
236+
return Flows.usingEmit(emit -> {
237+
List<T> buffer = new ArrayList<>();
238+
AtomicLong accumulatedCost = new AtomicLong(0L);
239+
last.run(t -> {
240+
buffer.add(t);
241+
accumulatedCost.addAndGet(costFn.apply(t));
242+
243+
if (accumulatedCost.get() >= minWeight) {
244+
emit.apply(new ArrayList<>(buffer));
245+
buffer.clear();
246+
accumulatedCost.set(0);
247+
}
248+
});
249+
if (!buffer.isEmpty()) {
250+
emit.apply(buffer);
251+
}
252+
});
253+
}
254+
255+
/**
256+
* Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error).
257+
*/
258+
public Flow<Void> drain() {
259+
return Flows.usingEmit(emit -> {
260+
last.run(t -> {});
261+
});
262+
}
263+
264+
/**
265+
* Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error.
266+
*/
267+
public Flow<T> onComplete(Runnable f) {
268+
return Flows.usingEmit(emit -> {
269+
try {
270+
last.run(emit);
271+
} finally {
272+
f.run();
273+
}
274+
});
275+
}
276+
277+
/**
278+
* Runs `f` after the flow completes successfully, that is when all elements are emitted.
279+
*/
280+
public Flow<T> onDone(Runnable f) {
281+
return Flows.usingEmit(emit -> {
282+
last.run(emit);
283+
f.run();
284+
});
285+
}
286+
287+
/**
288+
* Runs `f` after the flow completes with an error. The error can't be recovered.
289+
*/
290+
public Flow<T> onError(Consumer<Throwable> f) {
291+
return Flows.usingEmit(emit -> {
292+
try {
293+
last.run(emit);
294+
} catch (Throwable e) {
295+
f.accept(e);
296+
throw e;
297+
}
298+
});
299+
}
300+
301+
// endregion
302+
303+
private void runLastToChannelAsync(Channel<T> channel) throws ExecutionException, InterruptedException {
304+
unsupervised(scope -> {
305+
runLastToChannelAsync(scope, channel);
306+
return null;
307+
});
308+
}
309+
310+
private void runLastToChannelAsync(UnsupervisedScope scope, Channel<T> channel) {
311+
scope.forkUnsupervised(() -> {
312+
try {
313+
last.run(channel::send);
314+
channel.done();
315+
} catch (Throwable e) {
316+
channel.error(e);
317+
}
318+
return null;
319+
});
320+
}
321+
}
322+
323+
324+
325+

0 commit comments

Comments
 (0)