From e0b4bdfd27ec41b9e7a97d9300b57801ec0e99b8 Mon Sep 17 00:00:00 2001 From: emilb Date: Tue, 10 Dec 2024 17:28:05 +0100 Subject: [PATCH 1/6] Add basic flow functions and creation methods --- flows/pom.xml | 38 +++ .../main/java/com/softwaremill/jox/Flow.java | 218 ++++++++++++++++++ .../java/com/softwaremill/jox/FlowEmit.java | 26 +++ .../java/com/softwaremill/jox/FlowStage.java | 5 + .../main/java/com/softwaremill/jox/Flows.java | 210 +++++++++++++++++ .../jox/SourceBackedFlowStage.java | 9 + .../softwaremill/jox/ThrowingConsumer.java | 5 + .../java/com/softwaremill/jox/FlowTest.java | 193 ++++++++++++++++ 8 files changed, 704 insertions(+) create mode 100644 flows/pom.xml create mode 100644 flows/src/main/java/com/softwaremill/jox/Flow.java create mode 100644 flows/src/main/java/com/softwaremill/jox/FlowEmit.java create mode 100644 flows/src/main/java/com/softwaremill/jox/FlowStage.java create mode 100644 flows/src/main/java/com/softwaremill/jox/Flows.java create mode 100644 flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java create mode 100644 flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java create mode 100644 flows/src/test/java/com/softwaremill/jox/FlowTest.java diff --git a/flows/pom.xml b/flows/pom.xml new file mode 100644 index 0000000..b9c4b9c --- /dev/null +++ b/flows/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + + com.softwaremill.jox + parent + 0.3.1 + + + flows + 0.3.1 + jar + + + + org.junit.jupiter + junit-jupiter + test + + + org.awaitility + awaitility + test + + + com.softwaremill.jox + channels + 0.3.1 + + + com.softwaremill.jox + structured + 0.3.1 + + + diff --git a/flows/src/main/java/com/softwaremill/jox/Flow.java b/flows/src/main/java/com/softwaremill/jox/Flow.java new file mode 100644 index 0000000..3eacd6f --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/Flow.java @@ -0,0 +1,218 @@ +package com.softwaremill.jox; + + +import com.softwaremill.jox.structured.UnsupervisedScope; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +import static com.softwaremill.jox.Flows.usingEmit; +import static com.softwaremill.jox.structured.Scopes.unsupervised; + +/** + * Describes an asynchronous transformation pipeline. When run, emits elements of type `T`. + *

+ * A flow is lazy - evaluation happens only when it's run. + *

+ * Flows can be created using the {@link Flows#fromValues}, {@link Flows#fromSource}} and other `Flow.from*` methods, {@link Flows#tick} etc. + *

+ * Transformation stages can be added using the available combinators, such as {@link Flow#map}, {@link Flow#buffer}, {@link Flow#grouped}, etc. + * Each such method returns a new immutable {@link Flow} instance. + *

+ * Running a flow is possible using one of the `run*` methods, such as {@link Flow#runToList}, {@link Flow#runToChannel} or {@link Flow#runFold}. + */ +public class Flow { + protected final FlowStage last; + + public Flow(FlowStage last) { + this.last = last; + } + + // region Run operations + + /** Invokes the given function for each emitted element. Blocks until the flow completes. */ + public void runForeach(Consumer sink) throws Throwable { + last.run(sink::accept); + } + + /** Invokes the provided {@link FlowEmit} for each emitted element. Blocks until the flow completes. */ + public void runToEmit(FlowEmit emit) throws Throwable { + last.run(emit); + } + + /** Accumulates all elements emitted by this flow into a list. Blocks until the flow completes. */ + public List runToList() throws Throwable { + List result = new ArrayList<>(); + runForeach(result::add); + return result; + } + + /** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result + * of this method. + *

+ * By default, buffer capacity is unlimited. + *

+ * Blocks until the flow completes. + */ + public Source runToChannel() throws Throwable { + if (last instanceof SourceBackedFlowStage(Source source)) { + return source; + } else { + Channel channel = Channel.newUnlimitedChannel(); + runLastToChannelAsync(channel); + return channel; + } + } + + // endregion + + // region Flow operations + + /** When run, the current pipeline is run asynchronously in the background, emitting elements to a buffer. + * The elements of the buffer are then emitted by the returned flow. + * + * @param bufferCapacity determines size of a buffer. + * + * Any exceptions are propagated by the returned flow. + */ + public Flow buffer(int bufferCapacity) { + return usingEmit(emit -> { + Channel ch = new Channel<>(bufferCapacity); + unsupervised(scope -> { + runLastToChannelAsync(scope, ch); + FlowEmit.channelToEmit(ch, emit); + return null; + }); + }); + } + + /** + * Applies the given `mappingFunction` to each element emitted by this flow. The returned flow then emits the results. + */ + public Flow map(Function mappingFunction) { + return usingEmit(emit -> { + last.run(((ThrowingConsumer) t -> emit.apply(mappingFunction.apply(t)))::accept); + }); + } + + /** + * Emits only those elements emitted by this flow, for which `filteringPredicate` returns `true`. + */ + public Flow filter(Predicate filteringPredicate) { + return usingEmit(emit -> { + last.run(((ThrowingConsumer) t -> { + if (filteringPredicate.test(t)) { + emit.apply(t); + } + })::accept); + }); + } + + /** + * Applies the given `mappingFunction` to each element emitted by this flow, in sequence. + * The given {@link Consumer} can be used to emit an arbitrary number of elements. + *

+ * The {@link FlowEmit} instance provided to the `mappingFunction` callback should only be used on the calling thread. + * That is, {@link FlowEmit} is thread-unsafe. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of `mappingFunction`. + */ + public Flow mapUsingEmit(Function>> mappingFunction) { + return usingEmit(emit -> { + last.run(t -> mappingFunction.apply(t).accept(emit)); + }); + } + + /** + * Applies the given effectful function `f` to each element emitted by this flow. The returned flow emits the elements unchanged. + * If `f` throws an exceptions, the flow fails and propagates the exception. + */ + public Flow tap(Consumer f) { + return map(t -> { + f.accept(t); + return t; + }); + } + + /** + * Applies the given `mappingFunction` to each element emitted by this flow, obtaining a nested flow to run. + * The elements emitted by the nested flow are then emitted by the returned flow. + *

+ * The nested flows are run in sequence, that is, the next nested flow is started only after the previous one completes. + */ + public Flow flatMap(Function> mappingFunction) { + return usingEmit(emit -> { + last.run(((ThrowingConsumer) t -> mappingFunction.apply(t).runToEmit(emit))::accept); + }); + } + + /** + * Chunks up the elements into groups of the specified size. The last group may be smaller due to the flow being complete. + * + * @param n The number of elements in a group. + */ + public Flow> grouped(int n) { + return groupedWeighted(n, t -> 1L); + } + + /** + * Chunks up the elements into groups that have a cumulative weight greater or equal to the `minWeight`. The last group may be smaller + * due to the flow being complete. + * + * @param minWeight The minimum cumulative weight of elements in a group. + * @param costFn The function that calculates the weight of an element. + */ + public Flow> groupedWeighted(long minWeight, Function costFn) { + if (minWeight <= 0) { + throw new IllegalArgumentException("minWeight must be > 0"); + } + + return Flows.usingEmit(emit -> { + List buffer = new ArrayList<>(); + AtomicLong accumulatedCost = new AtomicLong(0L); + last.run(t -> { + buffer.add(t); + accumulatedCost.addAndGet(costFn.apply(t)); + + if (accumulatedCost.get() >= minWeight) { + emit.apply(buffer); + buffer.clear(); + accumulatedCost.set(0); + } + }); + if (!buffer.isEmpty()) { + emit.apply(buffer); + } + }); + } + + // endregion + + private void runLastToChannelAsync(Channel channel) throws ExecutionException, InterruptedException { + unsupervised(scope -> { + runLastToChannelAsync(scope, channel); + return null; + }); + } + + private void runLastToChannelAsync(UnsupervisedScope scope, Channel channel) { + scope.forkUnsupervised(() -> { + try { + last.run(((ThrowingConsumer) channel::send)::accept); + } catch (Throwable e) { + channel.error(e); + throw new RuntimeException(e); + } finally { + channel.done(); + } + return null; + }); + } +} + + + + diff --git a/flows/src/main/java/com/softwaremill/jox/FlowEmit.java b/flows/src/main/java/com/softwaremill/jox/FlowEmit.java new file mode 100644 index 0000000..4ced00c --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/FlowEmit.java @@ -0,0 +1,26 @@ +package com.softwaremill.jox; + +public interface FlowEmit { + void apply(T t) throws Throwable; + + static void channelToEmit(Source source, FlowEmit emit) throws Throwable { + boolean shouldRun = true; + while (shouldRun) { + Object t = null; + try { + t = source.receiveOrClosed(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + shouldRun = !switch (t) { + case ChannelDone done -> false; + case ChannelError error -> throw error.toException(); + default -> { + //noinspection unchecked + emit.apply((T) t); + yield true; + } + }; + } + } +} diff --git a/flows/src/main/java/com/softwaremill/jox/FlowStage.java b/flows/src/main/java/com/softwaremill/jox/FlowStage.java new file mode 100644 index 0000000..49e263b --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/FlowStage.java @@ -0,0 +1,5 @@ +package com.softwaremill.jox; + +public interface FlowStage { + void run(FlowEmit emit) throws Throwable; +} diff --git a/flows/src/main/java/com/softwaremill/jox/Flows.java b/flows/src/main/java/com/softwaremill/jox/Flows.java new file mode 100644 index 0000000..21c30c1 --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/Flows.java @@ -0,0 +1,210 @@ +package com.softwaremill.jox; + +import com.softwaremill.jox.structured.Fork; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +interface Flows { + + /** + * Creates a flow, which when run, provides a {@link FlowEmit} instance to the given `withEmit` function. Elements can be emitted to be + * processed by downstream stages by calling {@link FlowEmit#apply}. + *

+ * The {@link FlowEmit} instance provided to the {@param withEmit} callback should only be used on the calling thread. + * That is, {@link FlowEmit} is thread-unsafe. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of {@param withEmit}. + */ + static Flow usingEmit(ThrowingConsumer> withEmit) { + return new Flow<>(withEmit::accept); + } + + /** + * Creates a flow using the given {@param source}. An element is emitted for each value received from the source. + * If the source is completed with an error, is it propagated by throwing. + */ + static Flow fromSource(Source source) { + return new Flow<>(new SourceBackedFlowStage<>(source)); + } + + /** + * Creates a flow from the given `iterable`. Each element of the iterable is emitted in order. + */ + static Flow fromIterable(Iterable iterable) { + return fromIterator(iterable.iterator()); + } + + /** + * Creates a flow from the given values. Each value is emitted in order. + */ + static Flow fromValues(T... ts) { + return fromIterator(Arrays.asList(ts).iterator()); + } + + /** + * Creates a flow from the given (lazily evaluated) `iterator`. Each element of the iterator is emitted in order. + */ + static Flow fromIterator(Iterator it) { + return usingEmit(emit -> { + while (it.hasNext()) { + emit.apply(it.next()); + } + }); + } + + /** + * Creates a flow from the given fork. The flow will emit up to one element, or complete by throwing an exception if the fork fails. + */ + static Flow fromFork(Fork f) { + return usingEmit(emit -> emit.apply(f.join())); + } + + /** + * Creates a flow which emits elements starting with `zero`, and then applying `mappingFunction` to the previous element to get the next one. + */ + static Flow iterate(T zero, Function mappingFunction) { + return usingEmit(emit -> { + T t = zero; + while (true) { + emit.apply(t); + t = mappingFunction.apply(t); + } + }); + } + + /** + * Creates a flow which emits a range of numbers, from `from`, to `to` (inclusive), stepped by `step`. + */ + static Flow range(int from, int to, int step) { + // do nothing + return usingEmit(emit -> { + for (int i = from; i <= to; i += step) { + emit.apply(i); + } + }); + } + + /** + * Creates a flow which emits the given `value` repeatedly, at least {@param interval} apart between each two elements. + * The first value is emitted immediately. + *

+ * The interval is measured between subsequent emissions. Hence, if the following transformation pipeline is slow, the next emission can + * occur immediately after the previous one is fully processed (if processing took more than the inter-emission interval duration). + * However, ticks do not accumulate; for example, if processing is slow enough that multiple intervals pass between send invocations, + * only one tick will be sent. + * + * @param interval + * The temporal spacing between subsequent ticks. + * @param value + * The element to emitted on every tick. + */ + static Flow tick(Duration interval, T value) { + return usingEmit(emit -> { + while (true) { + long start = System.nanoTime(); + emit.apply(value); + long end = System.nanoTime(); + long sleep = interval.toNanos() - (end - start); + if (sleep > 0) { + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(sleep), (int) (sleep % 1_000_000)); + } + } + }); + } + + /** Creates a flow, which emits the given `element` repeatedly. */ + static Flow repeat(T element) { + return repeatEval(() -> element); + } + + /** Creates a flow, which emits the result of evaluating `supplierFunction` repeatedly. As the parameter is passed by-name, the evaluation is deferred + * until the element is emitted, and happens multiple times. + * + * @param supplierFunction + * The code block, computing the element to emit. + */ + static Flow repeatEval(Supplier supplierFunction) { + return usingEmit(emit -> { + while (true) { + emit.apply(supplierFunction.get()); + } + }); + } + + /** Creates a flow, which emits the value contained in the result of evaluating `supplierFunction` repeatedly. + * When the evaluation of `supplierFunction` returns a {@link Optional#empty()}, the flow is completed as "done", and no more values are evaluated or emitted. + *

+ * As the `supplierFunction` parameter is passed by-name, the evaluation is deferred until the element is emitted, and happens multiple times. + * + * @param supplierFunction + * The code block, computing the optional element to emit. + */ + static Flow repeatEvalWhileDefined(Supplier> supplierFunction) { + // do nothing + return usingEmit(emit -> { + boolean shouldRun = true; + while (shouldRun) { + Optional result = supplierFunction.get(); + if (result.isPresent()) { + emit.apply(result.get()); + } else { + shouldRun = false; + } + } + }); + } + + /** Create a flow which sleeps for the given `timeout` and then completes as done. */ + static Flow timeout(Duration timeout) { + return usingEmit(emit -> Thread.sleep(timeout.toMillis())); + } + + /** + * Creates a flow which concatenates the given `flows` in order. First elements from the first flow are emitted, then from the second etc. + * If any of the flows completes with an error, it is propagated. + */ + static Flow concat(List> flows) { + return new Flow<>(((ThrowingConsumer>) emit -> { + for (Flow currentFlow : flows) { + currentFlow.runToEmit(((ThrowingConsumer) emit::apply)::accept); + } + })::accept); + } + + /** Creates an empty flow, which emits no elements and completes immediately. */ + static Flow empty() { + return usingEmit(emit -> {}); + } + + /** Creates a flow that emits a single element when `from` completes, or throws an exception when `from` fails. */ + static Flow fromCompletableFuture(CompletableFuture from) { + return usingEmit(emit -> emit.apply(from.get())); + } + + /** + * Creates a flow that emits all elements from the given future {@link Source} when `from` completes. + * + * @param from the future source to emit elements from + */ + static Flow fromFutureSource(CompletableFuture> from) { + return fromSource(from.join()); + } + + /** + * Creates a flow that fails immediately with the given {@link java.lang.Throwable} + * + * @param t + * The {@link java.lang.Throwable} to fail with + */ + static Flow failed(Throwable t) { + return new Flow<>(emit -> { + throw t; + }); + } +} \ No newline at end of file diff --git a/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java b/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java new file mode 100644 index 0000000..a572bfe --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java @@ -0,0 +1,9 @@ +package com.softwaremill.jox; + +record SourceBackedFlowStage(Source source) implements FlowStage { + + @Override + public void run(FlowEmit emit) throws Throwable { + FlowEmit.channelToEmit(source, emit); + } +} diff --git a/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java b/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java new file mode 100644 index 0000000..4affdfc --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java @@ -0,0 +1,5 @@ +package com.softwaremill.jox; + +public interface ThrowingConsumer { + void accept(T t) throws Throwable; +} diff --git a/flows/src/test/java/com/softwaremill/jox/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/FlowTest.java new file mode 100644 index 0000000..015e359 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/FlowTest.java @@ -0,0 +1,193 @@ +package com.softwaremill.jox; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class FlowTest { + + @Test + void shouldRunForEach() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + flow.runForeach(results::add); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void shouldRunToList() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + List results = flow.runToList(); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void shouldRunToChannel() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + } + + @Test + void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { + // given + Channel channel = Channel.newUnlimitedChannel(); + Flow flow = Flows.fromSource(channel); + + // when + Source receivedChannel = flow.runToChannel(); + + // then + assertEquals(channel, receivedChannel); + } + + @Test + void shouldWorkWithASingleAsyncBoundary() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3, 4, 5) + .buffer(3); + + // when + List integers = flow.runToList(); + + // then + assertEquals(List.of(1, 2, 3, 4, 5), integers); + } + + @Test + void shouldWorkWithMultipleAsyncBoundaries() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3, 4, 5) + .buffer(3) + .map(i -> i * 2) + .buffer(2) + .map(i -> i + 1) + .buffer(5); + + // when + List integers = flow.runToList(); + + // then + assertEquals(List.of(3, 5, 7, 9, 11), integers); + } + + @Test + void shouldMap() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + Flow mapped = flow.map(Object::toString); + mapped.runForeach(results::add); + + // then + assertEquals(List.of("1", "2", "3"), results); + } + + @Test + void shouldMapUsingEmit() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + Flow mapped = flow.mapUsingEmit(i -> emit -> { + for (int j = 0; j < 2; j++) { + try { + emit.apply(i + j); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }); + + mapped.runForeach(results::add); + + // then + assertEquals(List.of(1, 2, 2, 3, 3, 4), results); + } + + @Test + void shouldFilter() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3, 4, 5); + List results = new ArrayList<>(); + + // when + flow.filter(i -> i % 2 == 0) + .runForeach(results::add); + + // then + assertEquals(List.of(2, 4), results); + } + + @Test + void shouldTap() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + flow + .tap(results::add) + .map(i -> i * 2) + .runForeach(j -> { + }); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void shouldFlatMap() throws Throwable { + // given + Flow flow = Flows.fromValues(10, 20, 30); + List results = new ArrayList<>(); + + // when + flow + .flatMap(i -> Flows.fromValues(i + 1, i + 2)) + .runForeach(results::add); + + // then + assertEquals(List.of(11, 12, 21, 22, 31, 32), results); + } + + @Test + void shouldPropagateErrorFromFlatMap() { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Flow mapped = flow.flatMap(i -> { + if (i == 2) { + throw new RuntimeException("error"); + } + return Flows.fromValues(i * 2); + }); + + // then + assertThrows(RuntimeException.class, () -> mapped.runForeach(i -> {})); + } +} \ No newline at end of file From b51faa124b2d01ee327a6de8a1ec5774544cf2a4 Mon Sep 17 00:00:00 2001 From: emilb Date: Wed, 11 Dec 2024 17:54:40 +0100 Subject: [PATCH 2/6] code review & fixes: * move from Throwable to Exception * add `runFold`, `runDrain`, `onComplete`, `onDone`, `onError` * fix error propagation with `buffer` method * simplify redundant casting and lambdas * fix `groupedWeighted` * fix `runLastToChannelAsync` * fix `com.softwaremill.jox.FlowEmit.channelToEmit` * fix `com.softwaremill.jox.Flows.concat` and `com.softwaremill.jox.Flows.fromCompletableFuture` * migrated more tests --- .../main/java/com/softwaremill/jox/Flow.java | 160 ++++++++++--- .../java/com/softwaremill/jox/FlowEmit.java | 23 +- .../java/com/softwaremill/jox/FlowStage.java | 2 +- .../main/java/com/softwaremill/jox/Flows.java | 62 +++-- .../jox/SourceBackedFlowStage.java | 2 +- .../softwaremill/jox/ThrowingConsumer.java | 2 +- .../jox/FlowCompleteCallbacksTest.java | 101 ++++++++ .../com/softwaremill/jox/FlowGroupedTest.java | 93 +++++++ .../java/com/softwaremill/jox/FlowTest.java | 71 +++++- .../java/com/softwaremill/jox/FlowsTest.java | 226 ++++++++++++++++++ .../jox/UnsupervisedScopeExtension.java | 39 +++ .../jox/WithUnsupervisedScope.java | 14 ++ 12 files changed, 722 insertions(+), 73 deletions(-) create mode 100644 flows/src/test/java/com/softwaremill/jox/FlowCompleteCallbacksTest.java create mode 100644 flows/src/test/java/com/softwaremill/jox/FlowGroupedTest.java create mode 100644 flows/src/test/java/com/softwaremill/jox/FlowsTest.java create mode 100644 flows/src/test/java/com/softwaremill/jox/UnsupervisedScopeExtension.java create mode 100644 flows/src/test/java/com/softwaremill/jox/WithUnsupervisedScope.java diff --git a/flows/src/main/java/com/softwaremill/jox/Flow.java b/flows/src/main/java/com/softwaremill/jox/Flow.java index 3eacd6f..440ff18 100644 --- a/flows/src/main/java/com/softwaremill/jox/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/Flow.java @@ -6,7 +6,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -36,17 +39,17 @@ public Flow(FlowStage last) { // region Run operations /** Invokes the given function for each emitted element. Blocks until the flow completes. */ - public void runForeach(Consumer sink) throws Throwable { + public void runForeach(Consumer sink) throws Exception { last.run(sink::accept); } /** Invokes the provided {@link FlowEmit} for each emitted element. Blocks until the flow completes. */ - public void runToEmit(FlowEmit emit) throws Throwable { + public void runToEmit(FlowEmit emit) throws Exception { last.run(emit); } /** Accumulates all elements emitted by this flow into a list. Blocks until the flow completes. */ - public List runToList() throws Throwable { + public List runToList() throws Exception { List result = new ArrayList<>(); runForeach(result::add); return result; @@ -59,16 +62,42 @@ public List runToList() throws Throwable { *

* Blocks until the flow completes. */ - public Source runToChannel() throws Throwable { + public Source runToChannel(UnsupervisedScope scope) { if (last instanceof SourceBackedFlowStage(Source source)) { return source; } else { - Channel channel = Channel.newUnlimitedChannel(); - runLastToChannelAsync(channel); + Channel channel = new Channel<>(); + runLastToChannelAsync(scope, channel); return channel; } } + /** + * Uses `zero` as the current value and applies function `f` on it and a value emitted by this flow. The returned value is used as the + * next current value and `f` is applied again with the next value emitted by the flow. The operation is repeated until the flow emits + * all elements. + * + * @param zero + * An initial value to be used as the first argument to function `f` call. + * @param f + * A {@link BiFunction} that is applied to the current value and value emitted by the flow. + * @return + * Combined value retrieved from running function `f` on all flow elements in a cumulative manner where result of the previous call is + * used as an input value to the next. + */ + public U runFold(U zero, BiFunction f) throws Exception { + AtomicReference current = new AtomicReference<>(zero); + last.run(t -> current.set(f.apply(current.get(), t))); + return current.get(); + } + + /** + * Ignores all elements emitted by the flow. Blocks until the flow completes. + */ + public void runDrain() throws Exception { + runForeach(t -> {}); + } + // endregion // region Flow operations @@ -83,11 +112,15 @@ public Source runToChannel() throws Throwable { public Flow buffer(int bufferCapacity) { return usingEmit(emit -> { Channel ch = new Channel<>(bufferCapacity); - unsupervised(scope -> { - runLastToChannelAsync(scope, ch); - FlowEmit.channelToEmit(ch, emit); - return null; - }); + try { + unsupervised(scope -> { + runLastToChannelAsync(scope, ch); + FlowEmit.channelToEmit(ch, emit); + return null; + }); + } catch (ExecutionException e) { + throw (Exception) e.getCause(); + } }); } @@ -96,37 +129,35 @@ public Flow buffer(int bufferCapacity) { */ public Flow map(Function mappingFunction) { return usingEmit(emit -> { - last.run(((ThrowingConsumer) t -> emit.apply(mappingFunction.apply(t)))::accept); + last.run(t -> emit.apply(mappingFunction.apply(t))); }); } - /** + /** * Emits only those elements emitted by this flow, for which `filteringPredicate` returns `true`. */ public Flow filter(Predicate filteringPredicate) { return usingEmit(emit -> { - last.run(((ThrowingConsumer) t -> { + last.run(t -> { if (filteringPredicate.test(t)) { emit.apply(t); } - })::accept); + }); }); } - /** - * Applies the given `mappingFunction` to each element emitted by this flow, in sequence. + /** + * Applies the given `mappingFunction` to each element emitted by this flow, in sequence. * The given {@link Consumer} can be used to emit an arbitrary number of elements. *

* The {@link FlowEmit} instance provided to the `mappingFunction` callback should only be used on the calling thread. * That is, {@link FlowEmit} is thread-unsafe. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of `mappingFunction`. */ public Flow mapUsingEmit(Function>> mappingFunction) { - return usingEmit(emit -> { - last.run(t -> mappingFunction.apply(t).accept(emit)); - }); + return usingEmit(emit -> last.run(t -> mappingFunction.apply(t).accept(emit))); } - /** + /** * Applies the given effectful function `f` to each element emitted by this flow. The returned flow emits the elements unchanged. * If `f` throws an exceptions, the flow fails and propagates the exception. */ @@ -137,18 +168,45 @@ public Flow tap(Consumer f) { }); } - /** + /** * Applies the given `mappingFunction` to each element emitted by this flow, obtaining a nested flow to run. * The elements emitted by the nested flow are then emitted by the returned flow. *

* The nested flows are run in sequence, that is, the next nested flow is started only after the previous one completes. */ public Flow flatMap(Function> mappingFunction) { - return usingEmit(emit -> { - last.run(((ThrowingConsumer) t -> mappingFunction.apply(t).runToEmit(emit))::accept); + return usingEmit(emit -> last.run(t -> mappingFunction.apply(t).runToEmit(emit))); + } + + /** + * Takes the first `n` elements from this flow and emits them. If the flow completes before emitting `n` elements, the returned flow + * completes as well. + */ + public Flow take(int n) { + return Flows.usingEmit(emit -> { + AtomicInteger taken = new AtomicInteger(0); + try { + last.run(t -> { + if (taken.getAndIncrement() < n) { + emit.apply(t); + } else { + throw new BreakException(); + } + }); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof BreakException)) { + throw e; + } + // ignore + } catch (BreakException e) { + // ignore + } }); } + private static class BreakException extends RuntimeException { + } + /** * Chunks up the elements into groups of the specified size. The last group may be smaller due to the flow being complete. * @@ -178,7 +236,7 @@ public Flow> groupedWeighted(long minWeight, Function costFn) { accumulatedCost.addAndGet(costFn.apply(t)); if (accumulatedCost.get() >= minWeight) { - emit.apply(buffer); + emit.apply(new ArrayList<>(buffer)); buffer.clear(); accumulatedCost.set(0); } @@ -189,6 +247,52 @@ public Flow> groupedWeighted(long minWeight, Function costFn) { }); } + /** + * Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error). + */ + public Flow drain() { + return Flows.usingEmit(emit -> { + last.run(t -> {}); + }); + } + + /** + * Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error. + */ + public Flow onComplete(Runnable f) { + return Flows.usingEmit(emit -> { + try { + last.run(emit); + } finally { + f.run(); + } + }); + } + + /** + * Runs `f` after the flow completes successfully, that is when all elements are emitted. + */ + public Flow onDone(Runnable f) { + return Flows.usingEmit(emit -> { + last.run(emit); + f.run(); + }); + } + + /** + * Runs `f` after the flow completes with an error. The error can't be recovered. + */ + public Flow onError(Consumer f) { + return Flows.usingEmit(emit -> { + try { + last.run(emit); + } catch (Throwable e) { + f.accept(e); + throw e; + } + }); + } + // endregion private void runLastToChannelAsync(Channel channel) throws ExecutionException, InterruptedException { @@ -201,12 +305,10 @@ private void runLastToChannelAsync(Channel channel) throws ExecutionException private void runLastToChannelAsync(UnsupervisedScope scope, Channel channel) { scope.forkUnsupervised(() -> { try { - last.run(((ThrowingConsumer) channel::send)::accept); + last.run(channel::send); + channel.done(); } catch (Throwable e) { channel.error(e); - throw new RuntimeException(e); - } finally { - channel.done(); } return null; }); diff --git a/flows/src/main/java/com/softwaremill/jox/FlowEmit.java b/flows/src/main/java/com/softwaremill/jox/FlowEmit.java index 4ced00c..a967e73 100644 --- a/flows/src/main/java/com/softwaremill/jox/FlowEmit.java +++ b/flows/src/main/java/com/softwaremill/jox/FlowEmit.java @@ -1,23 +1,24 @@ package com.softwaremill.jox; +import java.util.concurrent.ExecutionException; + public interface FlowEmit { - void apply(T t) throws Throwable; + void apply(T t) throws Exception; - static void channelToEmit(Source source, FlowEmit emit) throws Throwable { + static void channelToEmit(Source source, FlowEmit emit) throws Exception { boolean shouldRun = true; while (shouldRun) { - Object t = null; - try { - t = source.receiveOrClosed(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - shouldRun = !switch (t) { + Object t = source.receiveOrClosed(); + shouldRun = switch (t) { case ChannelDone done -> false; case ChannelError error -> throw error.toException(); default -> { - //noinspection unchecked - emit.apply((T) t); + try { + //noinspection unchecked + emit.apply((T) t); + } catch (Throwable e) { + throw new ExecutionException(e); + } yield true; } }; diff --git a/flows/src/main/java/com/softwaremill/jox/FlowStage.java b/flows/src/main/java/com/softwaremill/jox/FlowStage.java index 49e263b..17d98b0 100644 --- a/flows/src/main/java/com/softwaremill/jox/FlowStage.java +++ b/flows/src/main/java/com/softwaremill/jox/FlowStage.java @@ -1,5 +1,5 @@ package com.softwaremill.jox; public interface FlowStage { - void run(FlowEmit emit) throws Throwable; + void run(FlowEmit emit) throws Exception; } diff --git a/flows/src/main/java/com/softwaremill/jox/Flows.java b/flows/src/main/java/com/softwaremill/jox/Flows.java index 21c30c1..4bc353b 100644 --- a/flows/src/main/java/com/softwaremill/jox/Flows.java +++ b/flows/src/main/java/com/softwaremill/jox/Flows.java @@ -5,14 +5,16 @@ import java.time.Duration; import java.util.Arrays; import java.util.Iterator; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; -interface Flows { +public final class Flows { + + private Flows() {} /** * Creates a flow, which when run, provides a {@link FlowEmit} instance to the given `withEmit` function. Elements can be emitted to be @@ -21,7 +23,7 @@ interface Flows { * The {@link FlowEmit} instance provided to the {@param withEmit} callback should only be used on the calling thread. * That is, {@link FlowEmit} is thread-unsafe. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of {@param withEmit}. */ - static Flow usingEmit(ThrowingConsumer> withEmit) { + public static Flow usingEmit(ThrowingConsumer> withEmit) { return new Flow<>(withEmit::accept); } @@ -29,28 +31,29 @@ static Flow usingEmit(ThrowingConsumer> withEmit) { * Creates a flow using the given {@param source}. An element is emitted for each value received from the source. * If the source is completed with an error, is it propagated by throwing. */ - static Flow fromSource(Source source) { + public static Flow fromSource(Source source) { return new Flow<>(new SourceBackedFlowStage<>(source)); } /** * Creates a flow from the given `iterable`. Each element of the iterable is emitted in order. */ - static Flow fromIterable(Iterable iterable) { + public static Flow fromIterable(Iterable iterable) { return fromIterator(iterable.iterator()); } /** * Creates a flow from the given values. Each value is emitted in order. */ - static Flow fromValues(T... ts) { + @SafeVarargs + public static Flow fromValues(T... ts) { return fromIterator(Arrays.asList(ts).iterator()); } /** * Creates a flow from the given (lazily evaluated) `iterator`. Each element of the iterator is emitted in order. */ - static Flow fromIterator(Iterator it) { + public static Flow fromIterator(Iterator it) { return usingEmit(emit -> { while (it.hasNext()) { emit.apply(it.next()); @@ -61,14 +64,14 @@ static Flow fromIterator(Iterator it) { /** * Creates a flow from the given fork. The flow will emit up to one element, or complete by throwing an exception if the fork fails. */ - static Flow fromFork(Fork f) { + public static Flow fromFork(Fork f) { return usingEmit(emit -> emit.apply(f.join())); } /** * Creates a flow which emits elements starting with `zero`, and then applying `mappingFunction` to the previous element to get the next one. */ - static Flow iterate(T zero, Function mappingFunction) { + public static Flow iterate(T zero, Function mappingFunction) { return usingEmit(emit -> { T t = zero; while (true) { @@ -81,7 +84,7 @@ static Flow iterate(T zero, Function mappingFunction) { /** * Creates a flow which emits a range of numbers, from `from`, to `to` (inclusive), stepped by `step`. */ - static Flow range(int from, int to, int step) { + public static Flow range(int from, int to, int step) { // do nothing return usingEmit(emit -> { for (int i = from; i <= to; i += step) { @@ -104,7 +107,7 @@ static Flow range(int from, int to, int step) { * @param value * The element to emitted on every tick. */ - static Flow tick(Duration interval, T value) { + public static Flow tick(Duration interval, T value) { return usingEmit(emit -> { while (true) { long start = System.nanoTime(); @@ -119,7 +122,7 @@ static Flow tick(Duration interval, T value) { } /** Creates a flow, which emits the given `element` repeatedly. */ - static Flow repeat(T element) { + public static Flow repeat(T element) { return repeatEval(() -> element); } @@ -129,7 +132,7 @@ static Flow repeat(T element) { * @param supplierFunction * The code block, computing the element to emit. */ - static Flow repeatEval(Supplier supplierFunction) { + public static Flow repeatEval(Supplier supplierFunction) { return usingEmit(emit -> { while (true) { emit.apply(supplierFunction.get()); @@ -145,7 +148,7 @@ static Flow repeatEval(Supplier supplierFunction) { * @param supplierFunction * The code block, computing the optional element to emit. */ - static Flow repeatEvalWhileDefined(Supplier> supplierFunction) { + public static Flow repeatEvalWhileDefined(Supplier> supplierFunction) { // do nothing return usingEmit(emit -> { boolean shouldRun = true; @@ -161,7 +164,7 @@ static Flow repeatEvalWhileDefined(Supplier> supplierFunction } /** Create a flow which sleeps for the given `timeout` and then completes as done. */ - static Flow timeout(Duration timeout) { + public static Flow timeout(Duration timeout) { return usingEmit(emit -> Thread.sleep(timeout.toMillis())); } @@ -169,22 +172,29 @@ static Flow timeout(Duration timeout) { * Creates a flow which concatenates the given `flows` in order. First elements from the first flow are emitted, then from the second etc. * If any of the flows completes with an error, it is propagated. */ - static Flow concat(List> flows) { - return new Flow<>(((ThrowingConsumer>) emit -> { + @SafeVarargs + public static Flow concat(Flow... flows) { + return usingEmit(emit -> { for (Flow currentFlow : flows) { - currentFlow.runToEmit(((ThrowingConsumer) emit::apply)::accept); + currentFlow.runToEmit(emit); } - })::accept); + }); } /** Creates an empty flow, which emits no elements and completes immediately. */ - static Flow empty() { + public static Flow empty() { return usingEmit(emit -> {}); } /** Creates a flow that emits a single element when `from` completes, or throws an exception when `from` fails. */ - static Flow fromCompletableFuture(CompletableFuture from) { - return usingEmit(emit -> emit.apply(from.get())); + public static Flow fromCompletableFuture(CompletableFuture from) { + return usingEmit(emit -> { + try { + emit.apply(from.get()); + } catch (ExecutionException e) { + throw (Exception) e.getCause(); + } + }); } /** @@ -192,17 +202,17 @@ static Flow fromCompletableFuture(CompletableFuture from) { * * @param from the future source to emit elements from */ - static Flow fromFutureSource(CompletableFuture> from) { + public static Flow fromFutureSource(CompletableFuture> from) { return fromSource(from.join()); } /** - * Creates a flow that fails immediately with the given {@link java.lang.Throwable} + * Creates a flow that fails immediately with the given {@link java.lang.Exception} * * @param t - * The {@link java.lang.Throwable} to fail with + * The {@link java.lang.Exception} to fail with */ - static Flow failed(Throwable t) { + public static Flow failed(Exception t) { return new Flow<>(emit -> { throw t; }); diff --git a/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java b/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java index a572bfe..cba40d9 100644 --- a/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java +++ b/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java @@ -3,7 +3,7 @@ record SourceBackedFlowStage(Source source) implements FlowStage { @Override - public void run(FlowEmit emit) throws Throwable { + public void run(FlowEmit emit) throws Exception { FlowEmit.channelToEmit(source, emit); } } diff --git a/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java b/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java index 4affdfc..5afda6b 100644 --- a/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java +++ b/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java @@ -1,5 +1,5 @@ package com.softwaremill.jox; public interface ThrowingConsumer { - void accept(T t) throws Throwable; + void accept(T t) throws Exception; } diff --git a/flows/src/test/java/com/softwaremill/jox/FlowCompleteCallbacksTest.java b/flows/src/test/java/com/softwaremill/jox/FlowCompleteCallbacksTest.java new file mode 100644 index 0000000..8fd5209 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/FlowCompleteCallbacksTest.java @@ -0,0 +1,101 @@ +package com.softwaremill.jox; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.*; + +public class FlowCompleteCallbacksTest { + @Test + void ensureOnCompleteRunsInCaseOfSuccess() throws Exception { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .onComplete(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + f.runDrain(); + + // then + assertTrue(didRun.get()); + } + + @Test + void ensureOnCompleteRunsInCaseOfError() { + //given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .tap(i -> {throw new RuntimeException();}) + .onComplete(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + assertThrows(RuntimeException.class, f::runDrain); + + // then + assertTrue(didRun.get()); + } + + @Test + void ensureOnDoneRunsInCaseOfSuccess() throws Exception { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3).onDone(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + f.runDrain(); + + // then + assertTrue(didRun.get()); + } + + @Test + void ensureOnDoneDoesNotRunInCaseOfError() { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .tap(i -> {throw new RuntimeException();}) + .onDone(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + assertThrows(RuntimeException.class, f::runDrain); + + // then + assertFalse(didRun.get()); + } + + @Test + void ensureOnErrorDoesNotRunInCaseOfSuccess() throws Exception { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .onError(e -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + f.runDrain(); + + // then + assertFalse(didRun.get()); + } + + @Test + void ensureOnErrorRunsInCaseOfError() { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .tap(i -> {throw new RuntimeException();}) + .onError(e -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + assertThrows(RuntimeException.class, f::runDrain); + + // then + assertTrue(didRun.get()); + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/FlowGroupedTest.java b/flows/src/test/java/com/softwaremill/jox/FlowGroupedTest.java new file mode 100644 index 0000000..5f3108d --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/FlowGroupedTest.java @@ -0,0 +1,93 @@ +package com.softwaremill.jox; + +import com.softwaremill.jox.structured.UnsupervisedScope; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +public class FlowGroupedTest { + + @Test + void shouldEmitGroupedElements() throws Exception { + // when + List> result = Flows.fromValues(1, 2, 3, 4, 5, 6) + .grouped(3) + .runToList(); + + // then + assertEquals(List.of(List.of(1, 2, 3), List.of(4, 5, 6)), result); + } + + @Test + void shouldEmitGroupedElementsAndIncludeRemainingValuesWhenFlowCloses() throws Exception { + // given + List> result = Flows.fromValues(1, 2, 3, 4, 5, 6, 7) + .grouped(3) + .runToList(); + + // then + assertEquals(List.of(List.of(1, 2, 3), List.of(4, 5, 6), List.of(7)), result); + } + + @Test + @WithUnsupervisedScope + void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed(UnsupervisedScope scope) throws InterruptedException { + // given + RuntimeException failure = new RuntimeException(); + + // when + Object result = Flows.failed(failure) + .grouped(3) + .runToChannel(scope) + .receiveOrClosed(); + + // then + assertInstanceOf(ChannelError.class, result); + assertEquals(failure, ((ChannelError) result).cause()); + } + + @Test + void shouldEmitGroupedElementsWithCustomCostFunction() throws Exception { + // when + List> result = Flows.fromValues(1, 2, 3, 4, 5, 6, 5, 3, 1) + .groupedWeighted(10, n -> (long) (n * 2)) + .runToList(); + + // then + assertEquals(List.of(List.of(1, 2, 3), List.of(4, 5), List.of(6), List.of(5), List.of(3, 1)), result); + } + + @Test + @WithUnsupervisedScope + void shouldReturnFailedFlowWhenCostFunctionThrowsException(UnsupervisedScope scope) { + // when + ChannelClosedException exception = assertThrows(ChannelClosedException.class, () -> + Flows.fromValues(1, 2, 3, 0, 4, 5, 6, 7) + .groupedWeighted(150, n -> (long) (100 / n)) + .runToChannel(scope) + .forEach(i -> { + })); + + // then + assertInstanceOf(ArithmeticException.class, exception.getCause()); + } + + @Test + @WithUnsupervisedScope + void shouldReturnFailedSourceWhenTheOriginalSourceIsFailed(UnsupervisedScope scope) throws InterruptedException { + // given + RuntimeException failure = new RuntimeException(); + + // when + Object result = Flows.failed(failure) + .groupedWeighted(10, n -> Long.parseLong(n.toString()) * 2) + .runToChannel(scope) + .receiveOrClosed(); + + // then + assertInstanceOf(ChannelError.class, result); + assertEquals(failure, ((ChannelError) result).cause()); + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/FlowTest.java index 015e359..0909d9b 100644 --- a/flows/src/test/java/com/softwaremill/jox/FlowTest.java +++ b/flows/src/test/java/com/softwaremill/jox/FlowTest.java @@ -1,5 +1,6 @@ package com.softwaremill.jox; +import com.softwaremill.jox.structured.UnsupervisedScope; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -35,12 +36,13 @@ void shouldRunToList() throws Throwable { } @Test - void shouldRunToChannel() throws Throwable { + @WithUnsupervisedScope + void shouldRunToChannel(UnsupervisedScope scope) throws Throwable { // given Flow flow = Flows.fromValues(1, 2, 3); // when - Source source = flow.runToChannel(); + Source source = flow.runToChannel(scope); // then assertEquals(1, source.receive()); @@ -49,18 +51,68 @@ void shouldRunToChannel() throws Throwable { } @Test - void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { + @WithUnsupervisedScope + void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow(UnsupervisedScope scope) throws Throwable { // given Channel channel = Channel.newUnlimitedChannel(); Flow flow = Flows.fromSource(channel); // when - Source receivedChannel = flow.runToChannel(); + Source receivedChannel = flow.runToChannel(scope); // then assertEquals(channel, receivedChannel); } + @Test + void shouldThrowExceptionForFailedFlow() { + assertThrows(IllegalStateException.class, () -> { + Flows.failed(new IllegalStateException()) + .runFold(0, (acc, n) -> Integer.valueOf(acc.toString() + n)); + }); + } + + @Test + void shouldThrowExceptionThrownInFunctionF() { + RuntimeException thrown = assertThrows(RuntimeException.class, () -> { + Flows.fromValues(1) + .runFold(0, (acc, n) -> { throw new RuntimeException("Function `f` is broken"); }); + }); + assertEquals("Function `f` is broken", thrown.getMessage()); + } + + @Test + void shouldReturnZeroValueFromFoldOnEmptySource() throws Exception { + assertEquals(0, Flows.empty().runFold(0, (acc, n) -> Integer.valueOf(acc.toString() + n))); + } + + @Test + void shouldReturnFoldOnNonEmptyFold() throws Exception { + assertEquals(3, Flows.fromValues(1, 2).runFold(0, Integer::sum)); + } + + @Test + void shouldTakeFromSimpleFlow() throws Exception { + Flow f = Flows.fromValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = f.take(5).runToList(); + assertEquals(List.of(1, 2, 3, 4, 5), result); + } + + @Test + void shouldTakeFromAsyncFlow() throws Exception { + Flow f = Flows.fromValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .buffer(16); + List result = f.take(5).runToList(); + assertEquals(List.of(1, 2, 3, 4, 5), result); + } + + @Test + void shouldTakeAllIfFlowEndsSooner() throws Exception { + Flow f = Flows.fromValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = f.take(50).runToList(); + assertEquals(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), result); + } + @Test void shouldWorkWithASingleAsyncBoundary() throws Throwable { // given @@ -91,6 +143,17 @@ void shouldWorkWithMultipleAsyncBoundaries() throws Throwable { assertEquals(List.of(3, 5, 7, 9, 11), integers); } + @Test + void shouldPropagateErrorsWhenUsingBuffer() { + Exception exception = assertThrows(ChannelClosedException.class, () -> { + Flows.fromValues(1, 2, 3) + .map(value -> { throw new IllegalStateException(); }) + .buffer(5) + .runToList(); + }); + assertInstanceOf(IllegalStateException.class, exception.getCause()); + } + @Test void shouldMap() throws Throwable { // given diff --git a/flows/src/test/java/com/softwaremill/jox/FlowsTest.java b/flows/src/test/java/com/softwaremill/jox/FlowsTest.java new file mode 100644 index 0000000..58b91c8 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/FlowsTest.java @@ -0,0 +1,226 @@ +package com.softwaremill.jox; + +import com.softwaremill.jox.structured.Fork; +import com.softwaremill.jox.structured.UnsupervisedScope; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +class FlowsTest { + + @Test + void shouldBeEmpty() throws Exception { + assertTrue(Flows.empty().runToList().isEmpty()); + } + + @Test + @WithUnsupervisedScope + void shouldCreateFlowFromFork(UnsupervisedScope scope) throws Exception { + Fork f = scope.forkUnsupervised(() -> 1); + Flow c = Flows.fromFork(f); + assertEquals(List.of(1), c.runToList()); + } + + @Test + void shouldCreateIteratingFlow() throws Exception { + // given + Flow c = Flows.iterate(1, i -> i + 1); + + // when & then + assertEquals(List.of(1, 2, 3), c.take(3).runToList()); + } + + @Test + void shouldProduceRange() throws Exception { + assertEquals(List.of(1, 2, 3, 4, 5), Flows.range(1, 5, 1).runToList()); + assertEquals(List.of(1, 3, 5), Flows.range(1, 5, 2).runToList()); + assertEquals(List.of(1, 4, 7, 10), Flows.range(1, 11, 3).runToList()); + } + + @Test + @WithUnsupervisedScope + public void shouldFailOnReceive(UnsupervisedScope scope) throws Exception { + // when + Flow s = Flows.failed(new RuntimeException("boom")); + + // then + Object received = s.runToChannel(scope).receiveOrClosed(); + assertInstanceOf(ChannelError.class, received); + assertEquals("boom", ((ChannelError) received).cause().getMessage()); + } + + @Test + void shouldReturnOriginalFutureFailureWhenFutureFails() { + // given + RuntimeException failure = new RuntimeException("future failed"); + + // when & then + assertThrows(RuntimeException.class, + () -> Flows.fromCompletableFuture(CompletableFuture.failedFuture(failure)).runToList(), + failure.getMessage()); + } + + @Test + void shouldReturnFutureValue() throws Exception { + // given + CompletableFuture future = CompletableFuture.completedFuture(1); + + // when + List result = Flows.fromCompletableFuture(future).runToList(); + + // then + assertEquals(List.of(1), result); + } + + @Test + @WithUnsupervisedScope + void shouldReturnFuturesSourceValues(UnsupervisedScope scope) throws Exception { + // given + CompletableFuture> completableFuture = CompletableFuture + .completedFuture(Flows.fromValues(1, 2).runToChannel(scope)); + + // when + List result = Flows.fromFutureSource(completableFuture).runToList(); + + // then + assertEquals(List.of(1, 2), result); + } + + @Test + void shouldReturnOriginalFutureFailureWhenSourceFutureFails() { + // given + RuntimeException failure = new RuntimeException("future failed"); + + // when & then + assertThrows(RuntimeException.class, + () -> Flows.fromFutureSource(CompletableFuture.failedFuture(failure)).runToList(), + failure.getMessage()); + } + + @Test + void shouldEvaluateElementBeforeEachSend() throws Exception { + // given + AtomicInteger i = new AtomicInteger(0); + + // when + List actual = Flows.repeatEval(i::incrementAndGet) + .take(3) + .runToList(); + + // then + assertEquals(List.of(1, 2, 3), actual); + } + + @Test + void shouldEvaluateElementBeforeEachSendAsLongAsDefined() throws Exception { + // given + AtomicInteger i = new AtomicInteger(0); + Set evaluated = new HashSet<>(); + + // when + List result = Flows.repeatEvalWhileDefined(() -> { + int value = i.incrementAndGet(); + evaluated.add(value); + return value < 5 ? Optional.of(value) : Optional.empty(); + }) + .runToList(); + + // then + assertEquals(List.of(1, 2, 3, 4), result); + assertEquals(Set.of(1, 2, 3, 4, 5), evaluated); + } + + @Test + void shouldRepeatTheSameElement() throws Exception { + // when + List result = Flows.repeat(2137) + .take(3) + .runToList(); + + // then + assertEquals(List.of(2137, 2137, 2137), result); + } + + @Test + @Timeout(value = 1, unit = TimeUnit.SECONDS) + @WithUnsupervisedScope + void shouldTickRegularly(UnsupervisedScope scope) throws InterruptedException { + var c = Flows.tick(Duration.ofMillis(100), 1L) + .runToChannel(scope); + var start = System.currentTimeMillis(); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 0); + assertTrue(System.currentTimeMillis() - start <= 50); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 100); + assertTrue(System.currentTimeMillis() - start <= 150); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + } + + @Test + @Timeout(value = 1, unit = TimeUnit.SECONDS) + @WithUnsupervisedScope + void shouldTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal(UnsupervisedScope scope) throws InterruptedException { + var c = Flows.tick(Duration.ofMillis(100), 1L) + .runToChannel(scope); + var start = System.currentTimeMillis(); + + Thread.sleep(200); + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + } + + @Test + void shouldTimeout() throws Exception { + // given + long start = System.currentTimeMillis(); + Flow c = Flows.concat( + Flows.timeout(Duration.ofMillis(100)), + Flows.fromValues(1) + ); + + // when + List result = c.runToList(); + + // then + assertEquals(List.of(1), result); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed >= 100L); + assertTrue(elapsed <= 150L); + } + + @Test + void shouldConcatFlows() throws Exception { + // given + Flow flow = Flows.concat( + Flows.fromValues(1, 2, 3), + Flows.fromValues(4, 5, 6) + ); + + // when + List result = flow.runToList(); + + // then + assertEquals(List.of(1, 2, 3, 4, 5, 6), result); + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/UnsupervisedScopeExtension.java b/flows/src/test/java/com/softwaremill/jox/UnsupervisedScopeExtension.java new file mode 100644 index 0000000..7981731 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/UnsupervisedScopeExtension.java @@ -0,0 +1,39 @@ +package com.softwaremill.jox; + +import com.softwaremill.jox.structured.Scopes; +import com.softwaremill.jox.structured.UnsupervisedScope; +import org.junit.jupiter.api.extension.*; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Objects; + +public class UnsupervisedScopeExtension implements ParameterResolver, InvocationInterceptor { + + @Override + public void interceptTestMethod(Invocation invocation, ReflectiveInvocationContext invocationContext, ExtensionContext extensionContext) throws Throwable { + Scopes.unsupervised(scope -> { + try { + extensionContext.getRequiredTestMethod().invoke(extensionContext.getRequiredTestInstance(), scope); + } catch (InvocationTargetException e) { + if (Objects.requireNonNull(e.getTargetException()) instanceof Error) { + throw (Error) e.getTargetException(); + } + throw new IllegalStateException("Unexpected value: " + e.getTargetException()); + } + invocation.skip(); + return null; + }); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) { + return parameterContext.getParameter().getType().isAssignableFrom(UnsupervisedScope.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) { +// required by ParameterResolver but actual value is passed in interceptTestMethod + return null; + } +} \ No newline at end of file diff --git a/flows/src/test/java/com/softwaremill/jox/WithUnsupervisedScope.java b/flows/src/test/java/com/softwaremill/jox/WithUnsupervisedScope.java new file mode 100644 index 0000000..ce35206 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/WithUnsupervisedScope.java @@ -0,0 +1,14 @@ +package com.softwaremill.jox; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@ExtendWith(UnsupervisedScopeExtension.class) +public @interface WithUnsupervisedScope { +} \ No newline at end of file From 3df33d2915bf54026b80d38bfaf1c4d575783bed Mon Sep 17 00:00:00 2001 From: emilb Date: Thu, 12 Dec 2024 11:33:15 +0100 Subject: [PATCH 3/6] * add missing documentation * suppress warnings * removed redundant exception wrapping in FlowEmit --- .../java/com/softwaremill/jox/FlowEmit.java | 18 ++++++++++++------ .../java/com/softwaremill/jox/FlowStage.java | 4 ++++ .../main/java/com/softwaremill/jox/Flows.java | 6 +++++- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/flows/src/main/java/com/softwaremill/jox/FlowEmit.java b/flows/src/main/java/com/softwaremill/jox/FlowEmit.java index a967e73..c75b632 100644 --- a/flows/src/main/java/com/softwaremill/jox/FlowEmit.java +++ b/flows/src/main/java/com/softwaremill/jox/FlowEmit.java @@ -2,9 +2,19 @@ import java.util.concurrent.ExecutionException; +/** + * Instances of this interface should be considered thread-unsafe, and only used within the scope in which they have been obtained, e.g. as + * part of {@link Flows#usingEmit} or {@link Flow#mapUsingEmit}. + */ public interface FlowEmit { + + /** Emit a value to be processed downstream. Blocks until the value is fully processed, or throws an exception if an error occurred. */ void apply(T t) throws Exception; + /** + * Propagates all elements to the given emit. Completes once the channel completes as done. Throws an exception if the channel transits + * to an error state. + */ static void channelToEmit(Source source, FlowEmit emit) throws Exception { boolean shouldRun = true; while (shouldRun) { @@ -13,12 +23,8 @@ static void channelToEmit(Source source, FlowEmit emit) throws Excepti case ChannelDone done -> false; case ChannelError error -> throw error.toException(); default -> { - try { - //noinspection unchecked - emit.apply((T) t); - } catch (Throwable e) { - throw new ExecutionException(e); - } + //noinspection unchecked + emit.apply((T) t); yield true; } }; diff --git a/flows/src/main/java/com/softwaremill/jox/FlowStage.java b/flows/src/main/java/com/softwaremill/jox/FlowStage.java index 17d98b0..392cc12 100644 --- a/flows/src/main/java/com/softwaremill/jox/FlowStage.java +++ b/flows/src/main/java/com/softwaremill/jox/FlowStage.java @@ -1,5 +1,9 @@ package com.softwaremill.jox; +/** + * Contains the logic for running a single flow stage. As part of `run`s implementation, previous flow stages might be run, either + * synchronously or asynchronously. + */ public interface FlowStage { void run(FlowEmit emit) throws Exception; } diff --git a/flows/src/main/java/com/softwaremill/jox/Flows.java b/flows/src/main/java/com/softwaremill/jox/Flows.java index 4bc353b..cdf1b9e 100644 --- a/flows/src/main/java/com/softwaremill/jox/Flows.java +++ b/flows/src/main/java/com/softwaremill/jox/Flows.java @@ -74,6 +74,7 @@ public static Flow fromFork(Fork f) { public static Flow iterate(T zero, Function mappingFunction) { return usingEmit(emit -> { T t = zero; + //noinspection InfiniteLoopStatement while (true) { emit.apply(t); t = mappingFunction.apply(t); @@ -109,12 +110,14 @@ public static Flow range(int from, int to, int step) { */ public static Flow tick(Duration interval, T value) { return usingEmit(emit -> { + //noinspection InfiniteLoopStatement while (true) { long start = System.nanoTime(); emit.apply(value); long end = System.nanoTime(); long sleep = interval.toNanos() - (end - start); if (sleep > 0) { + //noinspection BusyWait Thread.sleep(TimeUnit.NANOSECONDS.toMillis(sleep), (int) (sleep % 1_000_000)); } } @@ -134,6 +137,7 @@ public static Flow repeat(T element) { */ public static Flow repeatEval(Supplier supplierFunction) { return usingEmit(emit -> { + //noinspection InfiniteLoopStatement while (true) { emit.apply(supplierFunction.get()); } @@ -213,7 +217,7 @@ public static Flow fromFutureSource(CompletableFuture> from) { * The {@link java.lang.Exception} to fail with */ public static Flow failed(Exception t) { - return new Flow<>(emit -> { + return usingEmit(emit -> { throw t; }); } From 182806537bb2bb00e3a3e2473aa6b02cf5f69407 Mon Sep 17 00:00:00 2001 From: emilb Date: Mon, 16 Dec 2024 12:42:40 +0100 Subject: [PATCH 4/6] Review comments: * added module in parent pom * moved classes into dedicated package --- .../main/java/com/softwaremill/jox/{ => flows}/Flow.java | 6 ++++-- .../java/com/softwaremill/jox/{ => flows}/FlowEmit.java | 6 ++++-- .../java/com/softwaremill/jox/{ => flows}/FlowStage.java | 2 +- .../main/java/com/softwaremill/jox/{ => flows}/Flows.java | 3 ++- .../softwaremill/jox/{ => flows}/SourceBackedFlowStage.java | 4 +++- .../com/softwaremill/jox/{ => flows}/ThrowingConsumer.java | 2 +- .../jox/{ => flows}/FlowCompleteCallbacksTest.java | 2 +- .../com/softwaremill/jox/{ => flows}/FlowGroupedTest.java | 4 +++- .../java/com/softwaremill/jox/{ => flows}/FlowTest.java | 5 ++++- .../java/com/softwaremill/jox/{ => flows}/FlowsTest.java | 4 +++- .../jox/{ => flows}/UnsupervisedScopeExtension.java | 2 +- .../softwaremill/jox/{ => flows}/WithUnsupervisedScope.java | 2 +- pom.xml | 1 + 13 files changed, 29 insertions(+), 14 deletions(-) rename flows/src/main/java/com/softwaremill/jox/{ => flows}/Flow.java (98%) rename flows/src/main/java/com/softwaremill/jox/{ => flows}/FlowEmit.java (88%) rename flows/src/main/java/com/softwaremill/jox/{ => flows}/FlowStage.java (87%) rename flows/src/main/java/com/softwaremill/jox/{ => flows}/Flows.java (99%) rename flows/src/main/java/com/softwaremill/jox/{ => flows}/SourceBackedFlowStage.java (73%) rename flows/src/main/java/com/softwaremill/jox/{ => flows}/ThrowingConsumer.java (69%) rename flows/src/test/java/com/softwaremill/jox/{ => flows}/FlowCompleteCallbacksTest.java (98%) rename flows/src/test/java/com/softwaremill/jox/{ => flows}/FlowGroupedTest.java (95%) rename flows/src/test/java/com/softwaremill/jox/{ => flows}/FlowTest.java (97%) rename flows/src/test/java/com/softwaremill/jox/{ => flows}/FlowsTest.java (98%) rename flows/src/test/java/com/softwaremill/jox/{ => flows}/UnsupervisedScopeExtension.java (97%) rename flows/src/test/java/com/softwaremill/jox/{ => flows}/WithUnsupervisedScope.java (91%) diff --git a/flows/src/main/java/com/softwaremill/jox/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java similarity index 98% rename from flows/src/main/java/com/softwaremill/jox/Flow.java rename to flows/src/main/java/com/softwaremill/jox/flows/Flow.java index 440ff18..65628ad 100644 --- a/flows/src/main/java/com/softwaremill/jox/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -1,6 +1,8 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.UnsupervisedScope; import java.util.ArrayList; @@ -14,7 +16,7 @@ import java.util.function.Function; import java.util.function.Predicate; -import static com.softwaremill.jox.Flows.usingEmit; +import static com.softwaremill.jox.flows.Flows.usingEmit; import static com.softwaremill.jox.structured.Scopes.unsupervised; /** diff --git a/flows/src/main/java/com/softwaremill/jox/FlowEmit.java b/flows/src/main/java/com/softwaremill/jox/flows/FlowEmit.java similarity index 88% rename from flows/src/main/java/com/softwaremill/jox/FlowEmit.java rename to flows/src/main/java/com/softwaremill/jox/flows/FlowEmit.java index c75b632..c003715 100644 --- a/flows/src/main/java/com/softwaremill/jox/FlowEmit.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/FlowEmit.java @@ -1,6 +1,8 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; -import java.util.concurrent.ExecutionException; +import com.softwaremill.jox.ChannelDone; +import com.softwaremill.jox.ChannelError; +import com.softwaremill.jox.Source; /** * Instances of this interface should be considered thread-unsafe, and only used within the scope in which they have been obtained, e.g. as diff --git a/flows/src/main/java/com/softwaremill/jox/FlowStage.java b/flows/src/main/java/com/softwaremill/jox/flows/FlowStage.java similarity index 87% rename from flows/src/main/java/com/softwaremill/jox/FlowStage.java rename to flows/src/main/java/com/softwaremill/jox/flows/FlowStage.java index 392cc12..e8d7099 100644 --- a/flows/src/main/java/com/softwaremill/jox/FlowStage.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/FlowStage.java @@ -1,4 +1,4 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; /** * Contains the logic for running a single flow stage. As part of `run`s implementation, previous flow stages might be run, either diff --git a/flows/src/main/java/com/softwaremill/jox/Flows.java b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java similarity index 99% rename from flows/src/main/java/com/softwaremill/jox/Flows.java rename to flows/src/main/java/com/softwaremill/jox/flows/Flows.java index cdf1b9e..40e9762 100644 --- a/flows/src/main/java/com/softwaremill/jox/Flows.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java @@ -1,5 +1,6 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; +import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.Fork; import java.time.Duration; diff --git a/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java b/flows/src/main/java/com/softwaremill/jox/flows/SourceBackedFlowStage.java similarity index 73% rename from flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java rename to flows/src/main/java/com/softwaremill/jox/flows/SourceBackedFlowStage.java index cba40d9..0f43101 100644 --- a/flows/src/main/java/com/softwaremill/jox/SourceBackedFlowStage.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/SourceBackedFlowStage.java @@ -1,4 +1,6 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; + +import com.softwaremill.jox.Source; record SourceBackedFlowStage(Source source) implements FlowStage { diff --git a/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java b/flows/src/main/java/com/softwaremill/jox/flows/ThrowingConsumer.java similarity index 69% rename from flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java rename to flows/src/main/java/com/softwaremill/jox/flows/ThrowingConsumer.java index 5afda6b..aae5d0c 100644 --- a/flows/src/main/java/com/softwaremill/jox/ThrowingConsumer.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/ThrowingConsumer.java @@ -1,4 +1,4 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; public interface ThrowingConsumer { void accept(T t) throws Exception; diff --git a/flows/src/test/java/com/softwaremill/jox/FlowCompleteCallbacksTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowCompleteCallbacksTest.java similarity index 98% rename from flows/src/test/java/com/softwaremill/jox/FlowCompleteCallbacksTest.java rename to flows/src/test/java/com/softwaremill/jox/flows/FlowCompleteCallbacksTest.java index 8fd5209..17bc956 100644 --- a/flows/src/test/java/com/softwaremill/jox/FlowCompleteCallbacksTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowCompleteCallbacksTest.java @@ -1,4 +1,4 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; import org.junit.jupiter.api.Test; diff --git a/flows/src/test/java/com/softwaremill/jox/FlowGroupedTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java similarity index 95% rename from flows/src/test/java/com/softwaremill/jox/FlowGroupedTest.java rename to flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java index 5f3108d..77de001 100644 --- a/flows/src/test/java/com/softwaremill/jox/FlowGroupedTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java @@ -1,5 +1,7 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; +import com.softwaremill.jox.ChannelClosedException; +import com.softwaremill.jox.ChannelError; import com.softwaremill.jox.structured.UnsupervisedScope; import org.junit.jupiter.api.Test; diff --git a/flows/src/test/java/com/softwaremill/jox/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java similarity index 97% rename from flows/src/test/java/com/softwaremill/jox/FlowTest.java rename to flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java index 0909d9b..116ca09 100644 --- a/flows/src/test/java/com/softwaremill/jox/FlowTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java @@ -1,5 +1,8 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.ChannelClosedException; +import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.UnsupervisedScope; import org.junit.jupiter.api.Test; diff --git a/flows/src/test/java/com/softwaremill/jox/FlowsTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java similarity index 98% rename from flows/src/test/java/com/softwaremill/jox/FlowsTest.java rename to flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java index 58b91c8..e3ff9b4 100644 --- a/flows/src/test/java/com/softwaremill/jox/FlowsTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java @@ -1,5 +1,7 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; +import com.softwaremill.jox.ChannelError; +import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.Fork; import com.softwaremill.jox.structured.UnsupervisedScope; import org.junit.jupiter.api.Test; diff --git a/flows/src/test/java/com/softwaremill/jox/UnsupervisedScopeExtension.java b/flows/src/test/java/com/softwaremill/jox/flows/UnsupervisedScopeExtension.java similarity index 97% rename from flows/src/test/java/com/softwaremill/jox/UnsupervisedScopeExtension.java rename to flows/src/test/java/com/softwaremill/jox/flows/UnsupervisedScopeExtension.java index 7981731..5bf3926 100644 --- a/flows/src/test/java/com/softwaremill/jox/UnsupervisedScopeExtension.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/UnsupervisedScopeExtension.java @@ -1,4 +1,4 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; import com.softwaremill.jox.structured.Scopes; import com.softwaremill.jox.structured.UnsupervisedScope; diff --git a/flows/src/test/java/com/softwaremill/jox/WithUnsupervisedScope.java b/flows/src/test/java/com/softwaremill/jox/flows/WithUnsupervisedScope.java similarity index 91% rename from flows/src/test/java/com/softwaremill/jox/WithUnsupervisedScope.java rename to flows/src/test/java/com/softwaremill/jox/flows/WithUnsupervisedScope.java index ce35206..ccafe2b 100644 --- a/flows/src/test/java/com/softwaremill/jox/WithUnsupervisedScope.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/WithUnsupervisedScope.java @@ -1,4 +1,4 @@ -package com.softwaremill.jox; +package com.softwaremill.jox.flows; import org.junit.jupiter.api.extension.ExtendWith; diff --git a/pom.xml b/pom.xml index d3af42d..56a7aa7 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ structured bench channel-ops + flows From 489ed16fc32a3e3620afdf30023a4e9cc959a32d Mon Sep 17 00:00:00 2001 From: emilb Date: Mon, 16 Dec 2024 14:08:05 +0100 Subject: [PATCH 5/6] Review comments: * removed @WithUnsupervisedScope --- .../jox/flows/FlowGroupedTest.java | 91 +++++++------- .../com/softwaremill/jox/flows/FlowTest.java | 52 ++++---- .../com/softwaremill/jox/flows/FlowsTest.java | 113 ++++++++++-------- .../jox/flows/UnsupervisedScopeExtension.java | 39 ------ .../jox/flows/WithUnsupervisedScope.java | 14 --- 5 files changed, 139 insertions(+), 170 deletions(-) delete mode 100644 flows/src/test/java/com/softwaremill/jox/flows/UnsupervisedScopeExtension.java delete mode 100644 flows/src/test/java/com/softwaremill/jox/flows/WithUnsupervisedScope.java diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java index 77de001..f6b49b2 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java @@ -2,10 +2,11 @@ import com.softwaremill.jox.ChannelClosedException; import com.softwaremill.jox.ChannelError; -import com.softwaremill.jox.structured.UnsupervisedScope; +import com.softwaremill.jox.structured.Scopes; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.*; @@ -34,20 +35,22 @@ void shouldEmitGroupedElementsAndIncludeRemainingValuesWhenFlowCloses() throws E } @Test - @WithUnsupervisedScope - void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed(UnsupervisedScope scope) throws InterruptedException { - // given - RuntimeException failure = new RuntimeException(); - - // when - Object result = Flows.failed(failure) - .grouped(3) - .runToChannel(scope) - .receiveOrClosed(); - - // then - assertInstanceOf(ChannelError.class, result); - assertEquals(failure, ((ChannelError) result).cause()); + void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + // given + RuntimeException failure = new RuntimeException(); + + // when + Object result = Flows.failed(failure) + .grouped(3) + .runToChannel(scope) + .receiveOrClosed(); + + // then + assertInstanceOf(ChannelError.class, result); + assertEquals(failure, ((ChannelError) result).cause()); + return null; + }); } @Test @@ -62,34 +65,38 @@ void shouldEmitGroupedElementsWithCustomCostFunction() throws Exception { } @Test - @WithUnsupervisedScope - void shouldReturnFailedFlowWhenCostFunctionThrowsException(UnsupervisedScope scope) { - // when - ChannelClosedException exception = assertThrows(ChannelClosedException.class, () -> - Flows.fromValues(1, 2, 3, 0, 4, 5, 6, 7) - .groupedWeighted(150, n -> (long) (100 / n)) - .runToChannel(scope) - .forEach(i -> { - })); - - // then - assertInstanceOf(ArithmeticException.class, exception.getCause()); + void shouldReturnFailedFlowWhenCostFunctionThrowsException() throws ExecutionException, InterruptedException { + Scopes.unsupervised(scope -> { + // when + ChannelClosedException exception = assertThrows(ChannelClosedException.class, () -> + Flows.fromValues(1, 2, 3, 0, 4, 5, 6, 7) + .groupedWeighted(150, n -> (long) (100 / n)) + .runToChannel(scope) + .forEach(i -> { + })); + + // then + assertInstanceOf(ArithmeticException.class, exception.getCause()); + return null; + }); } @Test - @WithUnsupervisedScope - void shouldReturnFailedSourceWhenTheOriginalSourceIsFailed(UnsupervisedScope scope) throws InterruptedException { - // given - RuntimeException failure = new RuntimeException(); - - // when - Object result = Flows.failed(failure) - .groupedWeighted(10, n -> Long.parseLong(n.toString()) * 2) - .runToChannel(scope) - .receiveOrClosed(); - - // then - assertInstanceOf(ChannelError.class, result); - assertEquals(failure, ((ChannelError) result).cause()); + void shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + // given + RuntimeException failure = new RuntimeException(); + + // when + Object result = Flows.failed(failure) + .groupedWeighted(10, n -> Long.parseLong(n.toString()) * 2) + .runToChannel(scope) + .receiveOrClosed(); + + // then + assertInstanceOf(ChannelError.class, result); + assertEquals(failure, ((ChannelError) result).cause()); + return null; + }); } -} +} \ No newline at end of file diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java index 116ca09..545ddf4 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java @@ -3,7 +3,7 @@ import com.softwaremill.jox.Channel; import com.softwaremill.jox.ChannelClosedException; import com.softwaremill.jox.Source; -import com.softwaremill.jox.structured.UnsupervisedScope; +import com.softwaremill.jox.structured.Scopes; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -39,32 +39,36 @@ void shouldRunToList() throws Throwable { } @Test - @WithUnsupervisedScope - void shouldRunToChannel(UnsupervisedScope scope) throws Throwable { - // given - Flow flow = Flows.fromValues(1, 2, 3); - - // when - Source source = flow.runToChannel(scope); - - // then - assertEquals(1, source.receive()); - assertEquals(2, source.receive()); - assertEquals(3, source.receive()); + void shouldRunToChannel() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(scope); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + return null; + }); } @Test - @WithUnsupervisedScope - void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow(UnsupervisedScope scope) throws Throwable { - // given - Channel channel = Channel.newUnlimitedChannel(); - Flow flow = Flows.fromSource(channel); - - // when - Source receivedChannel = flow.runToChannel(scope); - - // then - assertEquals(channel, receivedChannel); + void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Channel channel = Channel.newUnlimitedChannel(); + Flow flow = Flows.fromSource(channel); + + // when + Source receivedChannel = flow.runToChannel(scope); + + // then + assertEquals(channel, receivedChannel); + return null; + }); } @Test diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java index e3ff9b4..a30e07e 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java @@ -3,7 +3,7 @@ import com.softwaremill.jox.ChannelError; import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.Fork; -import com.softwaremill.jox.structured.UnsupervisedScope; +import com.softwaremill.jox.structured.Scopes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -13,6 +13,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -26,11 +27,13 @@ void shouldBeEmpty() throws Exception { } @Test - @WithUnsupervisedScope - void shouldCreateFlowFromFork(UnsupervisedScope scope) throws Exception { - Fork f = scope.forkUnsupervised(() -> 1); - Flow c = Flows.fromFork(f); - assertEquals(List.of(1), c.runToList()); + void shouldCreateFlowFromFork() throws Exception { + Scopes.unsupervised(scope -> { + Fork f = scope.forkUnsupervised(() -> 1); + Flow c = Flows.fromFork(f); + assertEquals(List.of(1), c.runToList()); + return null; + }); } @Test @@ -50,15 +53,17 @@ void shouldProduceRange() throws Exception { } @Test - @WithUnsupervisedScope - public void shouldFailOnReceive(UnsupervisedScope scope) throws Exception { - // when - Flow s = Flows.failed(new RuntimeException("boom")); + public void shouldFailOnReceive() throws Exception { + Scopes.unsupervised(scope -> { + // when + Flow s = Flows.failed(new RuntimeException("boom")); - // then - Object received = s.runToChannel(scope).receiveOrClosed(); - assertInstanceOf(ChannelError.class, received); - assertEquals("boom", ((ChannelError) received).cause().getMessage()); + // then + Object received = s.runToChannel(scope).receiveOrClosed(); + assertInstanceOf(ChannelError.class, received); + assertEquals("boom", ((ChannelError) received).cause().getMessage()); + return null; + }); } @Test @@ -85,17 +90,19 @@ void shouldReturnFutureValue() throws Exception { } @Test - @WithUnsupervisedScope - void shouldReturnFuturesSourceValues(UnsupervisedScope scope) throws Exception { - // given - CompletableFuture> completableFuture = CompletableFuture - .completedFuture(Flows.fromValues(1, 2).runToChannel(scope)); + void shouldReturnFuturesSourceValues() throws Exception { + Scopes.unsupervised(scope -> { + // given + CompletableFuture> completableFuture = CompletableFuture + .completedFuture(Flows.fromValues(1, 2).runToChannel(scope)); - // when - List result = Flows.fromFutureSource(completableFuture).runToList(); + // when + List result = Flows.fromFutureSource(completableFuture).runToList(); - // then - assertEquals(List.of(1, 2), result); + // then + assertEquals(List.of(1, 2), result); + return null; + }); } @Test @@ -155,41 +162,45 @@ void shouldRepeatTheSameElement() throws Exception { @Test @Timeout(value = 1, unit = TimeUnit.SECONDS) - @WithUnsupervisedScope - void shouldTickRegularly(UnsupervisedScope scope) throws InterruptedException { - var c = Flows.tick(Duration.ofMillis(100), 1L) - .runToChannel(scope); - var start = System.currentTimeMillis(); + void shouldTickRegularly() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + var c = Flows.tick(Duration.ofMillis(100), 1L) + .runToChannel(scope); + var start = System.currentTimeMillis(); - c.receive(); - assertTrue(System.currentTimeMillis() - start >= 0); - assertTrue(System.currentTimeMillis() - start <= 50); + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 0); + assertTrue(System.currentTimeMillis() - start <= 50); - c.receive(); - assertTrue(System.currentTimeMillis() - start >= 100); - assertTrue(System.currentTimeMillis() - start <= 150); + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 100); + assertTrue(System.currentTimeMillis() - start <= 150); - c.receive(); - assertTrue(System.currentTimeMillis() - start >= 200); - assertTrue(System.currentTimeMillis() - start <= 250); + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + return null; + }); } @Test @Timeout(value = 1, unit = TimeUnit.SECONDS) - @WithUnsupervisedScope - void shouldTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal(UnsupervisedScope scope) throws InterruptedException { - var c = Flows.tick(Duration.ofMillis(100), 1L) - .runToChannel(scope); - var start = System.currentTimeMillis(); - - Thread.sleep(200); - c.receive(); - assertTrue(System.currentTimeMillis() - start >= 200); - assertTrue(System.currentTimeMillis() - start <= 250); - - c.receive(); - assertTrue(System.currentTimeMillis() - start >= 200); - assertTrue(System.currentTimeMillis() - start <= 250); + void shouldTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + var c = Flows.tick(Duration.ofMillis(100), 1L) + .runToChannel(scope); + var start = System.currentTimeMillis(); + + Thread.sleep(200); + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + return null; + }); } @Test diff --git a/flows/src/test/java/com/softwaremill/jox/flows/UnsupervisedScopeExtension.java b/flows/src/test/java/com/softwaremill/jox/flows/UnsupervisedScopeExtension.java deleted file mode 100644 index 5bf3926..0000000 --- a/flows/src/test/java/com/softwaremill/jox/flows/UnsupervisedScopeExtension.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.softwaremill.jox.flows; - -import com.softwaremill.jox.structured.Scopes; -import com.softwaremill.jox.structured.UnsupervisedScope; -import org.junit.jupiter.api.extension.*; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Objects; - -public class UnsupervisedScopeExtension implements ParameterResolver, InvocationInterceptor { - - @Override - public void interceptTestMethod(Invocation invocation, ReflectiveInvocationContext invocationContext, ExtensionContext extensionContext) throws Throwable { - Scopes.unsupervised(scope -> { - try { - extensionContext.getRequiredTestMethod().invoke(extensionContext.getRequiredTestInstance(), scope); - } catch (InvocationTargetException e) { - if (Objects.requireNonNull(e.getTargetException()) instanceof Error) { - throw (Error) e.getTargetException(); - } - throw new IllegalStateException("Unexpected value: " + e.getTargetException()); - } - invocation.skip(); - return null; - }); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) { - return parameterContext.getParameter().getType().isAssignableFrom(UnsupervisedScope.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) { -// required by ParameterResolver but actual value is passed in interceptTestMethod - return null; - } -} \ No newline at end of file diff --git a/flows/src/test/java/com/softwaremill/jox/flows/WithUnsupervisedScope.java b/flows/src/test/java/com/softwaremill/jox/flows/WithUnsupervisedScope.java deleted file mode 100644 index ccafe2b..0000000 --- a/flows/src/test/java/com/softwaremill/jox/flows/WithUnsupervisedScope.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.softwaremill.jox.flows; - -import org.junit.jupiter.api.extension.ExtendWith; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Target(ElementType.METHOD) -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(UnsupervisedScopeExtension.class) -public @interface WithUnsupervisedScope { -} \ No newline at end of file From d21c53d7b426851a312b54a169cc7c6c0312cd22 Mon Sep 17 00:00:00 2001 From: emilb Date: Wed, 18 Dec 2024 10:31:00 +0100 Subject: [PATCH 6/6] review fixes * fixed documentation * added ScopedValue support --- .../java/com/softwaremill/jox/Channel.java | 14 ++++++++ .../java/com/softwaremill/jox/flows/Flow.java | 19 ++++++----- .../com/softwaremill/jox/flows/FlowTest.java | 33 ++++++++++++++++--- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 19d1e4a..7e3ab4b 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -87,6 +87,13 @@ operations on these (previous) segments, and we'll end up wanting to remove such operations won't use them, so the relinking won't be useful. */ + /** + * Can be used with {@link Channel#withScopedBufferSize()} to pass buffer size value from scope. + * e.g. `ScopedValues.where(BUFFER_SIZE, 8).run(() -> Channel.withScopedBufferSize())` will create a channel with buffer size = 8 + * **/ + public static final ScopedValue BUFFER_SIZE = ScopedValue.newInstance(); + public static final int DEFAULT_BUFFER_SIZE = 16; + // immutable state private final int capacity; @@ -202,6 +209,13 @@ public static Channel newUnlimitedChannel() { return new Channel<>(UNLIMITED_CAPACITY); } + /** + * Allows for creating Channel with buffer size specified in scope by {@link ScopedValue} {@link Channel#BUFFER_SIZE} + */ + public static Channel withScopedBufferSize() { + return new Channel<>(BUFFER_SIZE.orElse(DEFAULT_BUFFER_SIZE)); + } + private static final int UNLIMITED_CAPACITY = -1; // ******* diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java index 65628ad..d3c75b7 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -1,9 +1,8 @@ package com.softwaremill.jox.flows; -import com.softwaremill.jox.Channel; -import com.softwaremill.jox.Source; -import com.softwaremill.jox.structured.UnsupervisedScope; +import static com.softwaremill.jox.flows.Flows.usingEmit; +import static com.softwaremill.jox.structured.Scopes.unsupervised; import java.util.ArrayList; import java.util.List; @@ -16,8 +15,9 @@ import java.util.function.Function; import java.util.function.Predicate; -import static com.softwaremill.jox.flows.Flows.usingEmit; -import static com.softwaremill.jox.structured.Scopes.unsupervised; +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.Source; +import com.softwaremill.jox.structured.UnsupervisedScope; /** * Describes an asynchronous transformation pipeline. When run, emits elements of type `T`. @@ -60,15 +60,18 @@ public List runToList() throws Exception { /** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result * of this method. *

- * By default, buffer capacity is unlimited. + * Buffer capacity can be set via scoped value {@link Channel#BUFFER_SIZE}. If not specified in scope, {@link Channel#DEFAULT_BUFFER_SIZE} is used. *

- * Blocks until the flow completes. + * Method does not block until the flow completes. + * + * @param scope + * Required for creating async forks responsible for writing to channel */ public Source runToChannel(UnsupervisedScope scope) { if (last instanceof SourceBackedFlowStage(Source source)) { return source; } else { - Channel channel = new Channel<>(); + Channel channel = Channel.withScopedBufferSize(); runLastToChannelAsync(scope, channel); return channel; } diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java index 545ddf4..25cdea9 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java @@ -1,16 +1,18 @@ package com.softwaremill.jox.flows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.List; + import com.softwaremill.jox.Channel; import com.softwaremill.jox.ChannelClosedException; import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.Scopes; import org.junit.jupiter.api.Test; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.*; - class FlowTest { @Test @@ -55,6 +57,27 @@ void shouldRunToChannel() throws Throwable { }); } + @Test + void shouldRunToChannelWithBufferSizeDefinedInScope() throws Throwable { + ScopedValue.where(Channel.BUFFER_SIZE, 2).call(() -> { + Scopes.unsupervised(scope -> { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(scope); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + return null; + }); + return null; + }); + } + + @Test void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { Scopes.unsupervised(scope -> {