diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 19d1e4a..7e3ab4b 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -87,6 +87,13 @@ operations on these (previous) segments, and we'll end up wanting to remove such operations won't use them, so the relinking won't be useful. */ + /** + * Can be used with {@link Channel#withScopedBufferSize()} to pass buffer size value from scope. + * e.g. `ScopedValues.where(BUFFER_SIZE, 8).run(() -> Channel.withScopedBufferSize())` will create a channel with buffer size = 8 + * **/ + public static final ScopedValue BUFFER_SIZE = ScopedValue.newInstance(); + public static final int DEFAULT_BUFFER_SIZE = 16; + // immutable state private final int capacity; @@ -202,6 +209,13 @@ public static Channel newUnlimitedChannel() { return new Channel<>(UNLIMITED_CAPACITY); } + /** + * Allows for creating Channel with buffer size specified in scope by {@link ScopedValue} {@link Channel#BUFFER_SIZE} + */ + public static Channel withScopedBufferSize() { + return new Channel<>(BUFFER_SIZE.orElse(DEFAULT_BUFFER_SIZE)); + } + private static final int UNLIMITED_CAPACITY = -1; // ******* diff --git a/flows/pom.xml b/flows/pom.xml new file mode 100644 index 0000000..b9c4b9c --- /dev/null +++ b/flows/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + + com.softwaremill.jox + parent + 0.3.1 + + + flows + 0.3.1 + jar + + + + org.junit.jupiter + junit-jupiter + test + + + org.awaitility + awaitility + test + + + com.softwaremill.jox + channels + 0.3.1 + + + com.softwaremill.jox + structured + 0.3.1 + + + diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java new file mode 100644 index 0000000..d3c75b7 --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -0,0 +1,325 @@ +package com.softwaremill.jox.flows; + + +import static com.softwaremill.jox.flows.Flows.usingEmit; +import static com.softwaremill.jox.structured.Scopes.unsupervised; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.Source; +import com.softwaremill.jox.structured.UnsupervisedScope; + +/** + * Describes an asynchronous transformation pipeline. When run, emits elements of type `T`. + *

+ * A flow is lazy - evaluation happens only when it's run. + *

+ * Flows can be created using the {@link Flows#fromValues}, {@link Flows#fromSource}} and other `Flow.from*` methods, {@link Flows#tick} etc. + *

+ * Transformation stages can be added using the available combinators, such as {@link Flow#map}, {@link Flow#buffer}, {@link Flow#grouped}, etc. + * Each such method returns a new immutable {@link Flow} instance. + *

+ * Running a flow is possible using one of the `run*` methods, such as {@link Flow#runToList}, {@link Flow#runToChannel} or {@link Flow#runFold}. + */ +public class Flow { + protected final FlowStage last; + + public Flow(FlowStage last) { + this.last = last; + } + + // region Run operations + + /** Invokes the given function for each emitted element. Blocks until the flow completes. */ + public void runForeach(Consumer sink) throws Exception { + last.run(sink::accept); + } + + /** Invokes the provided {@link FlowEmit} for each emitted element. Blocks until the flow completes. */ + public void runToEmit(FlowEmit emit) throws Exception { + last.run(emit); + } + + /** Accumulates all elements emitted by this flow into a list. Blocks until the flow completes. */ + public List runToList() throws Exception { + List result = new ArrayList<>(); + runForeach(result::add); + return result; + } + + /** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result + * of this method. + *

+ * Buffer capacity can be set via scoped value {@link Channel#BUFFER_SIZE}. If not specified in scope, {@link Channel#DEFAULT_BUFFER_SIZE} is used. + *

+ * Method does not block until the flow completes. + * + * @param scope + * Required for creating async forks responsible for writing to channel + */ + public Source runToChannel(UnsupervisedScope scope) { + if (last instanceof SourceBackedFlowStage(Source source)) { + return source; + } else { + Channel channel = Channel.withScopedBufferSize(); + runLastToChannelAsync(scope, channel); + return channel; + } + } + + /** + * Uses `zero` as the current value and applies function `f` on it and a value emitted by this flow. The returned value is used as the + * next current value and `f` is applied again with the next value emitted by the flow. The operation is repeated until the flow emits + * all elements. + * + * @param zero + * An initial value to be used as the first argument to function `f` call. + * @param f + * A {@link BiFunction} that is applied to the current value and value emitted by the flow. + * @return + * Combined value retrieved from running function `f` on all flow elements in a cumulative manner where result of the previous call is + * used as an input value to the next. + */ + public U runFold(U zero, BiFunction f) throws Exception { + AtomicReference current = new AtomicReference<>(zero); + last.run(t -> current.set(f.apply(current.get(), t))); + return current.get(); + } + + /** + * Ignores all elements emitted by the flow. Blocks until the flow completes. + */ + public void runDrain() throws Exception { + runForeach(t -> {}); + } + + // endregion + + // region Flow operations + + /** When run, the current pipeline is run asynchronously in the background, emitting elements to a buffer. + * The elements of the buffer are then emitted by the returned flow. + * + * @param bufferCapacity determines size of a buffer. + * + * Any exceptions are propagated by the returned flow. + */ + public Flow buffer(int bufferCapacity) { + return usingEmit(emit -> { + Channel ch = new Channel<>(bufferCapacity); + try { + unsupervised(scope -> { + runLastToChannelAsync(scope, ch); + FlowEmit.channelToEmit(ch, emit); + return null; + }); + } catch (ExecutionException e) { + throw (Exception) e.getCause(); + } + }); + } + + /** + * Applies the given `mappingFunction` to each element emitted by this flow. The returned flow then emits the results. + */ + public Flow map(Function mappingFunction) { + return usingEmit(emit -> { + last.run(t -> emit.apply(mappingFunction.apply(t))); + }); + } + + /** + * Emits only those elements emitted by this flow, for which `filteringPredicate` returns `true`. + */ + public Flow filter(Predicate filteringPredicate) { + return usingEmit(emit -> { + last.run(t -> { + if (filteringPredicate.test(t)) { + emit.apply(t); + } + }); + }); + } + + /** + * Applies the given `mappingFunction` to each element emitted by this flow, in sequence. + * The given {@link Consumer} can be used to emit an arbitrary number of elements. + *

+ * The {@link FlowEmit} instance provided to the `mappingFunction` callback should only be used on the calling thread. + * That is, {@link FlowEmit} is thread-unsafe. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of `mappingFunction`. + */ + public Flow mapUsingEmit(Function>> mappingFunction) { + return usingEmit(emit -> last.run(t -> mappingFunction.apply(t).accept(emit))); + } + + /** + * Applies the given effectful function `f` to each element emitted by this flow. The returned flow emits the elements unchanged. + * If `f` throws an exceptions, the flow fails and propagates the exception. + */ + public Flow tap(Consumer f) { + return map(t -> { + f.accept(t); + return t; + }); + } + + /** + * Applies the given `mappingFunction` to each element emitted by this flow, obtaining a nested flow to run. + * The elements emitted by the nested flow are then emitted by the returned flow. + *

+ * The nested flows are run in sequence, that is, the next nested flow is started only after the previous one completes. + */ + public Flow flatMap(Function> mappingFunction) { + return usingEmit(emit -> last.run(t -> mappingFunction.apply(t).runToEmit(emit))); + } + + /** + * Takes the first `n` elements from this flow and emits them. If the flow completes before emitting `n` elements, the returned flow + * completes as well. + */ + public Flow take(int n) { + return Flows.usingEmit(emit -> { + AtomicInteger taken = new AtomicInteger(0); + try { + last.run(t -> { + if (taken.getAndIncrement() < n) { + emit.apply(t); + } else { + throw new BreakException(); + } + }); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof BreakException)) { + throw e; + } + // ignore + } catch (BreakException e) { + // ignore + } + }); + } + + private static class BreakException extends RuntimeException { + } + + /** + * Chunks up the elements into groups of the specified size. The last group may be smaller due to the flow being complete. + * + * @param n The number of elements in a group. + */ + public Flow> grouped(int n) { + return groupedWeighted(n, t -> 1L); + } + + /** + * Chunks up the elements into groups that have a cumulative weight greater or equal to the `minWeight`. The last group may be smaller + * due to the flow being complete. + * + * @param minWeight The minimum cumulative weight of elements in a group. + * @param costFn The function that calculates the weight of an element. + */ + public Flow> groupedWeighted(long minWeight, Function costFn) { + if (minWeight <= 0) { + throw new IllegalArgumentException("minWeight must be > 0"); + } + + return Flows.usingEmit(emit -> { + List buffer = new ArrayList<>(); + AtomicLong accumulatedCost = new AtomicLong(0L); + last.run(t -> { + buffer.add(t); + accumulatedCost.addAndGet(costFn.apply(t)); + + if (accumulatedCost.get() >= minWeight) { + emit.apply(new ArrayList<>(buffer)); + buffer.clear(); + accumulatedCost.set(0); + } + }); + if (!buffer.isEmpty()) { + emit.apply(buffer); + } + }); + } + + /** + * Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error). + */ + public Flow drain() { + return Flows.usingEmit(emit -> { + last.run(t -> {}); + }); + } + + /** + * Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error. + */ + public Flow onComplete(Runnable f) { + return Flows.usingEmit(emit -> { + try { + last.run(emit); + } finally { + f.run(); + } + }); + } + + /** + * Runs `f` after the flow completes successfully, that is when all elements are emitted. + */ + public Flow onDone(Runnable f) { + return Flows.usingEmit(emit -> { + last.run(emit); + f.run(); + }); + } + + /** + * Runs `f` after the flow completes with an error. The error can't be recovered. + */ + public Flow onError(Consumer f) { + return Flows.usingEmit(emit -> { + try { + last.run(emit); + } catch (Throwable e) { + f.accept(e); + throw e; + } + }); + } + + // endregion + + private void runLastToChannelAsync(Channel channel) throws ExecutionException, InterruptedException { + unsupervised(scope -> { + runLastToChannelAsync(scope, channel); + return null; + }); + } + + private void runLastToChannelAsync(UnsupervisedScope scope, Channel channel) { + scope.forkUnsupervised(() -> { + try { + last.run(channel::send); + channel.done(); + } catch (Throwable e) { + channel.error(e); + } + return null; + }); + } +} + + + + diff --git a/flows/src/main/java/com/softwaremill/jox/flows/FlowEmit.java b/flows/src/main/java/com/softwaremill/jox/flows/FlowEmit.java new file mode 100644 index 0000000..c003715 --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/FlowEmit.java @@ -0,0 +1,35 @@ +package com.softwaremill.jox.flows; + +import com.softwaremill.jox.ChannelDone; +import com.softwaremill.jox.ChannelError; +import com.softwaremill.jox.Source; + +/** + * Instances of this interface should be considered thread-unsafe, and only used within the scope in which they have been obtained, e.g. as + * part of {@link Flows#usingEmit} or {@link Flow#mapUsingEmit}. + */ +public interface FlowEmit { + + /** Emit a value to be processed downstream. Blocks until the value is fully processed, or throws an exception if an error occurred. */ + void apply(T t) throws Exception; + + /** + * Propagates all elements to the given emit. Completes once the channel completes as done. Throws an exception if the channel transits + * to an error state. + */ + static void channelToEmit(Source source, FlowEmit emit) throws Exception { + boolean shouldRun = true; + while (shouldRun) { + Object t = source.receiveOrClosed(); + shouldRun = switch (t) { + case ChannelDone done -> false; + case ChannelError error -> throw error.toException(); + default -> { + //noinspection unchecked + emit.apply((T) t); + yield true; + } + }; + } + } +} diff --git a/flows/src/main/java/com/softwaremill/jox/flows/FlowStage.java b/flows/src/main/java/com/softwaremill/jox/flows/FlowStage.java new file mode 100644 index 0000000..e8d7099 --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/FlowStage.java @@ -0,0 +1,9 @@ +package com.softwaremill.jox.flows; + +/** + * Contains the logic for running a single flow stage. As part of `run`s implementation, previous flow stages might be run, either + * synchronously or asynchronously. + */ +public interface FlowStage { + void run(FlowEmit emit) throws Exception; +} diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flows.java b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java new file mode 100644 index 0000000..40e9762 --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flows.java @@ -0,0 +1,225 @@ +package com.softwaremill.jox.flows; + +import com.softwaremill.jox.Source; +import com.softwaremill.jox.structured.Fork; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +public final class Flows { + + private Flows() {} + + /** + * Creates a flow, which when run, provides a {@link FlowEmit} instance to the given `withEmit` function. Elements can be emitted to be + * processed by downstream stages by calling {@link FlowEmit#apply}. + *

+ * The {@link FlowEmit} instance provided to the {@param withEmit} callback should only be used on the calling thread. + * That is, {@link FlowEmit} is thread-unsafe. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of {@param withEmit}. + */ + public static Flow usingEmit(ThrowingConsumer> withEmit) { + return new Flow<>(withEmit::accept); + } + + /** + * Creates a flow using the given {@param source}. An element is emitted for each value received from the source. + * If the source is completed with an error, is it propagated by throwing. + */ + public static Flow fromSource(Source source) { + return new Flow<>(new SourceBackedFlowStage<>(source)); + } + + /** + * Creates a flow from the given `iterable`. Each element of the iterable is emitted in order. + */ + public static Flow fromIterable(Iterable iterable) { + return fromIterator(iterable.iterator()); + } + + /** + * Creates a flow from the given values. Each value is emitted in order. + */ + @SafeVarargs + public static Flow fromValues(T... ts) { + return fromIterator(Arrays.asList(ts).iterator()); + } + + /** + * Creates a flow from the given (lazily evaluated) `iterator`. Each element of the iterator is emitted in order. + */ + public static Flow fromIterator(Iterator it) { + return usingEmit(emit -> { + while (it.hasNext()) { + emit.apply(it.next()); + } + }); + } + + /** + * Creates a flow from the given fork. The flow will emit up to one element, or complete by throwing an exception if the fork fails. + */ + public static Flow fromFork(Fork f) { + return usingEmit(emit -> emit.apply(f.join())); + } + + /** + * Creates a flow which emits elements starting with `zero`, and then applying `mappingFunction` to the previous element to get the next one. + */ + public static Flow iterate(T zero, Function mappingFunction) { + return usingEmit(emit -> { + T t = zero; + //noinspection InfiniteLoopStatement + while (true) { + emit.apply(t); + t = mappingFunction.apply(t); + } + }); + } + + /** + * Creates a flow which emits a range of numbers, from `from`, to `to` (inclusive), stepped by `step`. + */ + public static Flow range(int from, int to, int step) { + // do nothing + return usingEmit(emit -> { + for (int i = from; i <= to; i += step) { + emit.apply(i); + } + }); + } + + /** + * Creates a flow which emits the given `value` repeatedly, at least {@param interval} apart between each two elements. + * The first value is emitted immediately. + *

+ * The interval is measured between subsequent emissions. Hence, if the following transformation pipeline is slow, the next emission can + * occur immediately after the previous one is fully processed (if processing took more than the inter-emission interval duration). + * However, ticks do not accumulate; for example, if processing is slow enough that multiple intervals pass between send invocations, + * only one tick will be sent. + * + * @param interval + * The temporal spacing between subsequent ticks. + * @param value + * The element to emitted on every tick. + */ + public static Flow tick(Duration interval, T value) { + return usingEmit(emit -> { + //noinspection InfiniteLoopStatement + while (true) { + long start = System.nanoTime(); + emit.apply(value); + long end = System.nanoTime(); + long sleep = interval.toNanos() - (end - start); + if (sleep > 0) { + //noinspection BusyWait + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(sleep), (int) (sleep % 1_000_000)); + } + } + }); + } + + /** Creates a flow, which emits the given `element` repeatedly. */ + public static Flow repeat(T element) { + return repeatEval(() -> element); + } + + /** Creates a flow, which emits the result of evaluating `supplierFunction` repeatedly. As the parameter is passed by-name, the evaluation is deferred + * until the element is emitted, and happens multiple times. + * + * @param supplierFunction + * The code block, computing the element to emit. + */ + public static Flow repeatEval(Supplier supplierFunction) { + return usingEmit(emit -> { + //noinspection InfiniteLoopStatement + while (true) { + emit.apply(supplierFunction.get()); + } + }); + } + + /** Creates a flow, which emits the value contained in the result of evaluating `supplierFunction` repeatedly. + * When the evaluation of `supplierFunction` returns a {@link Optional#empty()}, the flow is completed as "done", and no more values are evaluated or emitted. + *

+ * As the `supplierFunction` parameter is passed by-name, the evaluation is deferred until the element is emitted, and happens multiple times. + * + * @param supplierFunction + * The code block, computing the optional element to emit. + */ + public static Flow repeatEvalWhileDefined(Supplier> supplierFunction) { + // do nothing + return usingEmit(emit -> { + boolean shouldRun = true; + while (shouldRun) { + Optional result = supplierFunction.get(); + if (result.isPresent()) { + emit.apply(result.get()); + } else { + shouldRun = false; + } + } + }); + } + + /** Create a flow which sleeps for the given `timeout` and then completes as done. */ + public static Flow timeout(Duration timeout) { + return usingEmit(emit -> Thread.sleep(timeout.toMillis())); + } + + /** + * Creates a flow which concatenates the given `flows` in order. First elements from the first flow are emitted, then from the second etc. + * If any of the flows completes with an error, it is propagated. + */ + @SafeVarargs + public static Flow concat(Flow... flows) { + return usingEmit(emit -> { + for (Flow currentFlow : flows) { + currentFlow.runToEmit(emit); + } + }); + } + + /** Creates an empty flow, which emits no elements and completes immediately. */ + public static Flow empty() { + return usingEmit(emit -> {}); + } + + /** Creates a flow that emits a single element when `from` completes, or throws an exception when `from` fails. */ + public static Flow fromCompletableFuture(CompletableFuture from) { + return usingEmit(emit -> { + try { + emit.apply(from.get()); + } catch (ExecutionException e) { + throw (Exception) e.getCause(); + } + }); + } + + /** + * Creates a flow that emits all elements from the given future {@link Source} when `from` completes. + * + * @param from the future source to emit elements from + */ + public static Flow fromFutureSource(CompletableFuture> from) { + return fromSource(from.join()); + } + + /** + * Creates a flow that fails immediately with the given {@link java.lang.Exception} + * + * @param t + * The {@link java.lang.Exception} to fail with + */ + public static Flow failed(Exception t) { + return usingEmit(emit -> { + throw t; + }); + } +} \ No newline at end of file diff --git a/flows/src/main/java/com/softwaremill/jox/flows/SourceBackedFlowStage.java b/flows/src/main/java/com/softwaremill/jox/flows/SourceBackedFlowStage.java new file mode 100644 index 0000000..0f43101 --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/SourceBackedFlowStage.java @@ -0,0 +1,11 @@ +package com.softwaremill.jox.flows; + +import com.softwaremill.jox.Source; + +record SourceBackedFlowStage(Source source) implements FlowStage { + + @Override + public void run(FlowEmit emit) throws Exception { + FlowEmit.channelToEmit(source, emit); + } +} diff --git a/flows/src/main/java/com/softwaremill/jox/flows/ThrowingConsumer.java b/flows/src/main/java/com/softwaremill/jox/flows/ThrowingConsumer.java new file mode 100644 index 0000000..aae5d0c --- /dev/null +++ b/flows/src/main/java/com/softwaremill/jox/flows/ThrowingConsumer.java @@ -0,0 +1,5 @@ +package com.softwaremill.jox.flows; + +public interface ThrowingConsumer { + void accept(T t) throws Exception; +} diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowCompleteCallbacksTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowCompleteCallbacksTest.java new file mode 100644 index 0000000..17bc956 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowCompleteCallbacksTest.java @@ -0,0 +1,101 @@ +package com.softwaremill.jox.flows; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.*; + +public class FlowCompleteCallbacksTest { + @Test + void ensureOnCompleteRunsInCaseOfSuccess() throws Exception { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .onComplete(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + f.runDrain(); + + // then + assertTrue(didRun.get()); + } + + @Test + void ensureOnCompleteRunsInCaseOfError() { + //given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .tap(i -> {throw new RuntimeException();}) + .onComplete(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + assertThrows(RuntimeException.class, f::runDrain); + + // then + assertTrue(didRun.get()); + } + + @Test + void ensureOnDoneRunsInCaseOfSuccess() throws Exception { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3).onDone(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + f.runDrain(); + + // then + assertTrue(didRun.get()); + } + + @Test + void ensureOnDoneDoesNotRunInCaseOfError() { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .tap(i -> {throw new RuntimeException();}) + .onDone(() -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + assertThrows(RuntimeException.class, f::runDrain); + + // then + assertFalse(didRun.get()); + } + + @Test + void ensureOnErrorDoesNotRunInCaseOfSuccess() throws Exception { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .onError(e -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + f.runDrain(); + + // then + assertFalse(didRun.get()); + } + + @Test + void ensureOnErrorRunsInCaseOfError() { + // given + AtomicBoolean didRun = new AtomicBoolean(false); + Flow f = Flows.fromValues(1, 2, 3) + .tap(i -> {throw new RuntimeException();}) + .onError(e -> didRun.set(true)); + assertFalse(didRun.get()); + + // when + assertThrows(RuntimeException.class, f::runDrain); + + // then + assertTrue(didRun.get()); + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java new file mode 100644 index 0000000..f6b49b2 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java @@ -0,0 +1,102 @@ +package com.softwaremill.jox.flows; + +import com.softwaremill.jox.ChannelClosedException; +import com.softwaremill.jox.ChannelError; +import com.softwaremill.jox.structured.Scopes; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.*; + +public class FlowGroupedTest { + + @Test + void shouldEmitGroupedElements() throws Exception { + // when + List> result = Flows.fromValues(1, 2, 3, 4, 5, 6) + .grouped(3) + .runToList(); + + // then + assertEquals(List.of(List.of(1, 2, 3), List.of(4, 5, 6)), result); + } + + @Test + void shouldEmitGroupedElementsAndIncludeRemainingValuesWhenFlowCloses() throws Exception { + // given + List> result = Flows.fromValues(1, 2, 3, 4, 5, 6, 7) + .grouped(3) + .runToList(); + + // then + assertEquals(List.of(List.of(1, 2, 3), List.of(4, 5, 6), List.of(7)), result); + } + + @Test + void shouldReturnFailedFlowWhenTheOriginalFlowIsFailed() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + // given + RuntimeException failure = new RuntimeException(); + + // when + Object result = Flows.failed(failure) + .grouped(3) + .runToChannel(scope) + .receiveOrClosed(); + + // then + assertInstanceOf(ChannelError.class, result); + assertEquals(failure, ((ChannelError) result).cause()); + return null; + }); + } + + @Test + void shouldEmitGroupedElementsWithCustomCostFunction() throws Exception { + // when + List> result = Flows.fromValues(1, 2, 3, 4, 5, 6, 5, 3, 1) + .groupedWeighted(10, n -> (long) (n * 2)) + .runToList(); + + // then + assertEquals(List.of(List.of(1, 2, 3), List.of(4, 5), List.of(6), List.of(5), List.of(3, 1)), result); + } + + @Test + void shouldReturnFailedFlowWhenCostFunctionThrowsException() throws ExecutionException, InterruptedException { + Scopes.unsupervised(scope -> { + // when + ChannelClosedException exception = assertThrows(ChannelClosedException.class, () -> + Flows.fromValues(1, 2, 3, 0, 4, 5, 6, 7) + .groupedWeighted(150, n -> (long) (100 / n)) + .runToChannel(scope) + .forEach(i -> { + })); + + // then + assertInstanceOf(ArithmeticException.class, exception.getCause()); + return null; + }); + } + + @Test + void shouldReturnFailedSourceWhenTheOriginalSourceIsFailed() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + // given + RuntimeException failure = new RuntimeException(); + + // when + Object result = Flows.failed(failure) + .groupedWeighted(10, n -> Long.parseLong(n.toString()) * 2) + .runToChannel(scope) + .receiveOrClosed(); + + // then + assertInstanceOf(ChannelError.class, result); + assertEquals(failure, ((ChannelError) result).cause()); + return null; + }); + } +} \ No newline at end of file diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java new file mode 100644 index 0000000..25cdea9 --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java @@ -0,0 +1,286 @@ +package com.softwaremill.jox.flows; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.List; + +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.ChannelClosedException; +import com.softwaremill.jox.Source; +import com.softwaremill.jox.structured.Scopes; +import org.junit.jupiter.api.Test; + +class FlowTest { + + @Test + void shouldRunForEach() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + flow.runForeach(results::add); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void shouldRunToList() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + List results = flow.runToList(); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void shouldRunToChannel() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(scope); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + return null; + }); + } + + @Test + void shouldRunToChannelWithBufferSizeDefinedInScope() throws Throwable { + ScopedValue.where(Channel.BUFFER_SIZE, 2).call(() -> { + Scopes.unsupervised(scope -> { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(scope); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + return null; + }); + return null; + }); + } + + + @Test + void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Channel channel = Channel.newUnlimitedChannel(); + Flow flow = Flows.fromSource(channel); + + // when + Source receivedChannel = flow.runToChannel(scope); + + // then + assertEquals(channel, receivedChannel); + return null; + }); + } + + @Test + void shouldThrowExceptionForFailedFlow() { + assertThrows(IllegalStateException.class, () -> { + Flows.failed(new IllegalStateException()) + .runFold(0, (acc, n) -> Integer.valueOf(acc.toString() + n)); + }); + } + + @Test + void shouldThrowExceptionThrownInFunctionF() { + RuntimeException thrown = assertThrows(RuntimeException.class, () -> { + Flows.fromValues(1) + .runFold(0, (acc, n) -> { throw new RuntimeException("Function `f` is broken"); }); + }); + assertEquals("Function `f` is broken", thrown.getMessage()); + } + + @Test + void shouldReturnZeroValueFromFoldOnEmptySource() throws Exception { + assertEquals(0, Flows.empty().runFold(0, (acc, n) -> Integer.valueOf(acc.toString() + n))); + } + + @Test + void shouldReturnFoldOnNonEmptyFold() throws Exception { + assertEquals(3, Flows.fromValues(1, 2).runFold(0, Integer::sum)); + } + + @Test + void shouldTakeFromSimpleFlow() throws Exception { + Flow f = Flows.fromValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = f.take(5).runToList(); + assertEquals(List.of(1, 2, 3, 4, 5), result); + } + + @Test + void shouldTakeFromAsyncFlow() throws Exception { + Flow f = Flows.fromValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .buffer(16); + List result = f.take(5).runToList(); + assertEquals(List.of(1, 2, 3, 4, 5), result); + } + + @Test + void shouldTakeAllIfFlowEndsSooner() throws Exception { + Flow f = Flows.fromValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = f.take(50).runToList(); + assertEquals(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), result); + } + + @Test + void shouldWorkWithASingleAsyncBoundary() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3, 4, 5) + .buffer(3); + + // when + List integers = flow.runToList(); + + // then + assertEquals(List.of(1, 2, 3, 4, 5), integers); + } + + @Test + void shouldWorkWithMultipleAsyncBoundaries() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3, 4, 5) + .buffer(3) + .map(i -> i * 2) + .buffer(2) + .map(i -> i + 1) + .buffer(5); + + // when + List integers = flow.runToList(); + + // then + assertEquals(List.of(3, 5, 7, 9, 11), integers); + } + + @Test + void shouldPropagateErrorsWhenUsingBuffer() { + Exception exception = assertThrows(ChannelClosedException.class, () -> { + Flows.fromValues(1, 2, 3) + .map(value -> { throw new IllegalStateException(); }) + .buffer(5) + .runToList(); + }); + assertInstanceOf(IllegalStateException.class, exception.getCause()); + } + + @Test + void shouldMap() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + Flow mapped = flow.map(Object::toString); + mapped.runForeach(results::add); + + // then + assertEquals(List.of("1", "2", "3"), results); + } + + @Test + void shouldMapUsingEmit() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + Flow mapped = flow.mapUsingEmit(i -> emit -> { + for (int j = 0; j < 2; j++) { + try { + emit.apply(i + j); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }); + + mapped.runForeach(results::add); + + // then + assertEquals(List.of(1, 2, 2, 3, 3, 4), results); + } + + @Test + void shouldFilter() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3, 4, 5); + List results = new ArrayList<>(); + + // when + flow.filter(i -> i % 2 == 0) + .runForeach(results::add); + + // then + assertEquals(List.of(2, 4), results); + } + + @Test + void shouldTap() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + flow + .tap(results::add) + .map(i -> i * 2) + .runForeach(j -> { + }); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void shouldFlatMap() throws Throwable { + // given + Flow flow = Flows.fromValues(10, 20, 30); + List results = new ArrayList<>(); + + // when + flow + .flatMap(i -> Flows.fromValues(i + 1, i + 2)) + .runForeach(results::add); + + // then + assertEquals(List.of(11, 12, 21, 22, 31, 32), results); + } + + @Test + void shouldPropagateErrorFromFlatMap() { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Flow mapped = flow.flatMap(i -> { + if (i == 2) { + throw new RuntimeException("error"); + } + return Flows.fromValues(i * 2); + }); + + // then + assertThrows(RuntimeException.class, () -> mapped.runForeach(i -> {})); + } +} \ No newline at end of file diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java new file mode 100644 index 0000000..a30e07e --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java @@ -0,0 +1,239 @@ +package com.softwaremill.jox.flows; + +import com.softwaremill.jox.ChannelError; +import com.softwaremill.jox.Source; +import com.softwaremill.jox.structured.Fork; +import com.softwaremill.jox.structured.Scopes; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +class FlowsTest { + + @Test + void shouldBeEmpty() throws Exception { + assertTrue(Flows.empty().runToList().isEmpty()); + } + + @Test + void shouldCreateFlowFromFork() throws Exception { + Scopes.unsupervised(scope -> { + Fork f = scope.forkUnsupervised(() -> 1); + Flow c = Flows.fromFork(f); + assertEquals(List.of(1), c.runToList()); + return null; + }); + } + + @Test + void shouldCreateIteratingFlow() throws Exception { + // given + Flow c = Flows.iterate(1, i -> i + 1); + + // when & then + assertEquals(List.of(1, 2, 3), c.take(3).runToList()); + } + + @Test + void shouldProduceRange() throws Exception { + assertEquals(List.of(1, 2, 3, 4, 5), Flows.range(1, 5, 1).runToList()); + assertEquals(List.of(1, 3, 5), Flows.range(1, 5, 2).runToList()); + assertEquals(List.of(1, 4, 7, 10), Flows.range(1, 11, 3).runToList()); + } + + @Test + public void shouldFailOnReceive() throws Exception { + Scopes.unsupervised(scope -> { + // when + Flow s = Flows.failed(new RuntimeException("boom")); + + // then + Object received = s.runToChannel(scope).receiveOrClosed(); + assertInstanceOf(ChannelError.class, received); + assertEquals("boom", ((ChannelError) received).cause().getMessage()); + return null; + }); + } + + @Test + void shouldReturnOriginalFutureFailureWhenFutureFails() { + // given + RuntimeException failure = new RuntimeException("future failed"); + + // when & then + assertThrows(RuntimeException.class, + () -> Flows.fromCompletableFuture(CompletableFuture.failedFuture(failure)).runToList(), + failure.getMessage()); + } + + @Test + void shouldReturnFutureValue() throws Exception { + // given + CompletableFuture future = CompletableFuture.completedFuture(1); + + // when + List result = Flows.fromCompletableFuture(future).runToList(); + + // then + assertEquals(List.of(1), result); + } + + @Test + void shouldReturnFuturesSourceValues() throws Exception { + Scopes.unsupervised(scope -> { + // given + CompletableFuture> completableFuture = CompletableFuture + .completedFuture(Flows.fromValues(1, 2).runToChannel(scope)); + + // when + List result = Flows.fromFutureSource(completableFuture).runToList(); + + // then + assertEquals(List.of(1, 2), result); + return null; + }); + } + + @Test + void shouldReturnOriginalFutureFailureWhenSourceFutureFails() { + // given + RuntimeException failure = new RuntimeException("future failed"); + + // when & then + assertThrows(RuntimeException.class, + () -> Flows.fromFutureSource(CompletableFuture.failedFuture(failure)).runToList(), + failure.getMessage()); + } + + @Test + void shouldEvaluateElementBeforeEachSend() throws Exception { + // given + AtomicInteger i = new AtomicInteger(0); + + // when + List actual = Flows.repeatEval(i::incrementAndGet) + .take(3) + .runToList(); + + // then + assertEquals(List.of(1, 2, 3), actual); + } + + @Test + void shouldEvaluateElementBeforeEachSendAsLongAsDefined() throws Exception { + // given + AtomicInteger i = new AtomicInteger(0); + Set evaluated = new HashSet<>(); + + // when + List result = Flows.repeatEvalWhileDefined(() -> { + int value = i.incrementAndGet(); + evaluated.add(value); + return value < 5 ? Optional.of(value) : Optional.empty(); + }) + .runToList(); + + // then + assertEquals(List.of(1, 2, 3, 4), result); + assertEquals(Set.of(1, 2, 3, 4, 5), evaluated); + } + + @Test + void shouldRepeatTheSameElement() throws Exception { + // when + List result = Flows.repeat(2137) + .take(3) + .runToList(); + + // then + assertEquals(List.of(2137, 2137, 2137), result); + } + + @Test + @Timeout(value = 1, unit = TimeUnit.SECONDS) + void shouldTickRegularly() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + var c = Flows.tick(Duration.ofMillis(100), 1L) + .runToChannel(scope); + var start = System.currentTimeMillis(); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 0); + assertTrue(System.currentTimeMillis() - start <= 50); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 100); + assertTrue(System.currentTimeMillis() - start <= 150); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + return null; + }); + } + + @Test + @Timeout(value = 1, unit = TimeUnit.SECONDS) + void shouldTickImmediatelyInCaseOfSlowConsumerAndThenResumeNormal() throws InterruptedException, ExecutionException { + Scopes.unsupervised(scope -> { + var c = Flows.tick(Duration.ofMillis(100), 1L) + .runToChannel(scope); + var start = System.currentTimeMillis(); + + Thread.sleep(200); + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + + c.receive(); + assertTrue(System.currentTimeMillis() - start >= 200); + assertTrue(System.currentTimeMillis() - start <= 250); + return null; + }); + } + + @Test + void shouldTimeout() throws Exception { + // given + long start = System.currentTimeMillis(); + Flow c = Flows.concat( + Flows.timeout(Duration.ofMillis(100)), + Flows.fromValues(1) + ); + + // when + List result = c.runToList(); + + // then + assertEquals(List.of(1), result); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed >= 100L); + assertTrue(elapsed <= 150L); + } + + @Test + void shouldConcatFlows() throws Exception { + // given + Flow flow = Flows.concat( + Flows.fromValues(1, 2, 3), + Flows.fromValues(4, 5, 6) + ); + + // when + List result = flow.runToList(); + + // then + assertEquals(List.of(1, 2, 3, 4, 5, 6), result); + } +} diff --git a/pom.xml b/pom.xml index d3af42d..56a7aa7 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ structured bench channel-ops + flows