Skip to content

Commit 65aef5d

Browse files
authored
Add more run methods for flow (#72)
1 parent 1c4a4e6 commit 65aef5d

File tree

3 files changed

+480
-57
lines changed

3 files changed

+480
-57
lines changed

flows/src/main/java/com/softwaremill/jox/flows/Flow.java

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,24 @@
55
import static com.softwaremill.jox.structured.Scopes.unsupervised;
66

77
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.LinkedList;
810
import java.util.List;
11+
import java.util.NoSuchElementException;
12+
import java.util.Optional;
913
import java.util.concurrent.ExecutionException;
1014
import java.util.concurrent.atomic.AtomicInteger;
1115
import java.util.concurrent.atomic.AtomicLong;
1216
import java.util.concurrent.atomic.AtomicReference;
1317
import java.util.function.BiFunction;
18+
import java.util.function.BinaryOperator;
1419
import java.util.function.Consumer;
1520
import java.util.function.Function;
1621
import java.util.function.Predicate;
22+
import java.util.function.Supplier;
1723

1824
import com.softwaremill.jox.Channel;
25+
import com.softwaremill.jox.Sink;
1926
import com.softwaremill.jox.Source;
2027
import com.softwaremill.jox.structured.UnsupervisedScope;
2128

@@ -68,10 +75,28 @@ public List<T> runToList() throws Exception {
6875
* Required for creating async forks responsible for writing to channel
6976
*/
7077
public Source<T> runToChannel(UnsupervisedScope scope) {
78+
return runToChannelInternal(scope, () -> Channel.withScopedBufferSize());
79+
}
80+
81+
/** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result
82+
* of this method.
83+
* <p>
84+
* Method does not block until the flow completes.
85+
*
86+
* @param scope
87+
* Required for creating async forks responsible for writing to channel
88+
* @param bufferCapacity
89+
* Specifies buffer capacity of created channel
90+
*/
91+
public Source<T> runToChannel(UnsupervisedScope scope, int bufferCapacity) {
92+
return runToChannelInternal(scope, () -> new Channel<>(bufferCapacity));
93+
}
94+
95+
private Source<T> runToChannelInternal(UnsupervisedScope scope, Supplier<Channel<T>> channelProvider) {
7196
if (last instanceof SourceBackedFlowStage<T>(Source<T> source)) {
7297
return source;
7398
} else {
74-
Channel<T> channel = Channel.withScopedBufferSize();
99+
Channel<T> channel = channelProvider.get();
75100
runLastToChannelAsync(scope, channel);
76101
return channel;
77102
}
@@ -103,6 +128,102 @@ public void runDrain() throws Exception {
103128
runForeach(t -> {});
104129
}
105130

131+
/**
132+
* Passes each element emitted by this flow to the given sink. Blocks until the flow completes.
133+
* <p>
134+
* Errors are always propagated to the provided sink. Successful flow completion is propagated when `propagateDone` is set to `true`.
135+
* <p>
136+
* Fatal errors are rethrown.
137+
*/
138+
public void runPipeToSink(Sink<T> sink, boolean propagateDone) {
139+
try {
140+
last.run(sink::send);
141+
if (propagateDone) {
142+
sink.doneOrClosed();
143+
}
144+
} catch (Exception e) {
145+
sink.error(e);
146+
} catch (Throwable t) {
147+
sink.error(t);
148+
throw t;
149+
}
150+
}
151+
152+
/** Returns the last element emitted by this flow, wrapped in {@link Optional#of}, or {@link Optional#empty()} when this source is empty. */
153+
public Optional<T> runLastOptional() throws Exception {
154+
AtomicReference<Optional<T>> value = new AtomicReference<>(Optional.empty());
155+
last.run(t -> value.set(Optional.of(t)));
156+
return value.get();
157+
}
158+
159+
/** Returns the last element emitted by this flow, or throws {@link NoSuchElementException} when the flow emits no elements (is empty).
160+
*
161+
* @throws NoSuchElementException
162+
* When this flow is empty.
163+
*/
164+
public T runLast() throws Exception {
165+
return runLastOptional()
166+
.orElseThrow(() -> new NoSuchElementException("cannot obtain last element from an empty source"));
167+
}
168+
169+
/**
170+
* Applies function `f` on the first and the following (if available) elements emitted by this flow. The returned value is used as the
171+
* next current value and `f` is applied again with the next value emitted by this source. The operation is repeated until this flow
172+
* emits all elements. This is similar operation to {@link Flow#runFold} but it uses the first emitted element as `zero`.
173+
*
174+
* @param f
175+
* A binary function (a function that takes two arguments) that is applied to the current and next values emitted by this flow.
176+
* @return
177+
* Combined value retrieved from running function `f` on all flow elements in a cumulative manner where result of the previous call is
178+
* used as an input value to the next.
179+
* @throws NoSuchElementException
180+
* When this flow is empty.
181+
*/
182+
public T runReduce(BinaryOperator<T> f) throws Exception {
183+
AtomicReference<Optional<T>> current = new AtomicReference<>(Optional.empty());
184+
last.run(t -> {
185+
current.updateAndGet(currentValue -> currentValue
186+
.map(u -> f.apply(u, t))
187+
.or(() -> Optional.of(t)));
188+
});
189+
190+
return current.get().orElseThrow(() -> new NoSuchElementException("cannot reduce an empty flow"));
191+
}
192+
193+
/**
194+
* Returns the list of up to `n` last elements emitted by this flow. Less than `n` elements is returned when this flow emits less
195+
* elements than requested. Empty list is returned when called on an empty flow.
196+
*
197+
* @param n
198+
* Number of elements to be taken from the end of this flow. It is expected that `n >= 0`.
199+
* @return
200+
* A list of up to `n` last elements from this flow.
201+
*/
202+
public List<T> runTakeLast(int n) throws Exception {
203+
if (n < 0) {
204+
throw new IllegalArgumentException("requirement failed: n must be >= 0");
205+
}
206+
if (n == 0) {
207+
runDrain();
208+
return Collections.emptyList();
209+
} else if (n == 1) {
210+
return runLastOptional()
211+
.map(Collections::singletonList)
212+
.orElse(Collections.emptyList());
213+
} else {
214+
List<T> buffer = new LinkedList<>();
215+
216+
last.run(t -> {
217+
if (buffer.size() == n) {
218+
buffer.removeFirst();
219+
}
220+
buffer.add(t);
221+
});
222+
223+
return new ArrayList<>(buffer);
224+
}
225+
}
226+
106227
// endregion
107228

108229
// region Flow operations

0 commit comments

Comments
 (0)