You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+73-39Lines changed: 73 additions & 39 deletions
Original file line number
Diff line number
Diff line change
@@ -120,8 +120,8 @@ class Demo2 {
120
120
}
121
121
```
122
122
123
-
There is also a possibility to pass `Channel`'s buffer size via `ScopedValue`.
124
-
The channel must be created via `Channel.withScopedBufferSize()` to get the value.
123
+
There is also a possibility to pass `Channel`'s buffer size via `ScopedValue`.
124
+
The channel must be created via `Channel.withScopedBufferSize()` to get the value.
125
125
126
126
If no value is in the scope, the default buffer size `Channel.DEFAULT_BUFFER_SIZE` is used
127
127
@@ -432,8 +432,10 @@ Jox implements the "let it crash" model. When an error occurs, the entire scope
432
432
so that it can be properly handled. Moreover, no detail is lost: all exceptions are preserved, either as causes, or
433
433
suppressed exceptions.
434
434
435
-
As `JoxScopeExecutionException` is unchecked, we introduced utility method called `JoxScopeExecutionException#unwrapAndThrow`.
436
-
If the wrapped exception is instance of any of passed classes, this method unwraps original exception and throws it as checked exception, `throws` signature forces exception handling.
435
+
As `JoxScopeExecutionException` is unchecked, we introduced utility method called
436
+
`JoxScopeExecutionException#unwrapAndThrow`.
437
+
If the wrapped exception is instance of any of passed classes, this method unwraps original exception and throws it as
438
+
checked exception, `throws` signature forces exception handling.
437
439
If the wrapped exception is not instance of any of passed classes, **nothing happens**.
438
440
All suppressed exceptions are rewritten from `JoxScopeExecutionException`
439
441
@@ -442,6 +444,7 @@ Method does **not** rethrow `JoxScopeExecutionException` by default.
442
444
So it is advised to manually rethrow it after calling `unwrapAndThrow` method.
A `Flow<T>` describes an asynchronous data transformation pipeline. When run, it emits elements of type `T`.
653
656
654
-
Flows are lazy, evaluation (and any effects) happen only when the flow is run. Flows might be finite or infinite; in the latter case running a flow never ends normally; it might be interrupted, though. Finally, any exceptions that occur when evaluating the flow's logic will be thrown when running the flow, after any cleanup logic completes.
657
+
Flows are lazy, evaluation (and any effects) happen only when the flow is run. Flows might be finite or infinite; in the
658
+
latter case running a flow never ends normally; it might be interrupted, though. Finally, any exceptions that occur when
659
+
evaluating the flow's logic will be thrown when running the flow, after any cleanup logic completes.
655
660
656
661
### Creating Flows
657
662
658
663
There are number of methods in the `Flows` class which allows to create a `Flow`.
659
664
660
665
```java
661
666
importjava.time.Duration;
667
+
662
668
importcom.softwaremill.jox.flows.Flows;
663
669
664
670
publicclassDemo {
665
671
666
-
publicstaticvoidmain(String[] args) {
667
-
Flows.fromValues(1, 2, 3); // a finite flow
668
-
Flows.tick(Duration.ofSeconds(1), "x"); // an infinite flow emitting "x" every second
669
-
Flows.iterate(0, i -> i +1); // an infinite flow iterating from 0
670
-
}
672
+
publicstaticvoidmain(String[] args) {
673
+
Flows.fromValues(1, 2, 3); // a finite flow
674
+
Flows.tick(Duration.ofSeconds(1), "x"); // an infinite flow emitting "x" every second
675
+
Flows.iterate(0, i -> i +1); // an infinite flow iterating from 0
676
+
}
671
677
}
672
678
```
673
-
Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the elements are emitted and any effects that are part of the flow's stages happen.
674
679
680
+
Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the
681
+
elements are emitted and any effects that are part of the flow's stages happen.
675
682
676
683
Flows can also be created using `Channel``Source`s:
677
684
@@ -721,19 +728,27 @@ public class Demo {
721
728
}
722
729
```
723
730
724
-
The `FlowEmit` instance is used to emit elements by the flow, that is process them further, as defined by the downstream pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's a processing error.
731
+
The `FlowEmit` instance is used to emit elements by the flow, that is process them further, as defined by the downstream
732
+
pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's
733
+
a processing error.
725
734
726
-
As part of the callback, you can create `Scope`, fork background computations or run other flows asynchronously. However, take care **not** to share the `FlowEmit` instance across threads. That is, instances of `FlowEmit` are thread-unsafe and should only be used on the calling thread.
735
+
As part of the callback, you can create `Scope`, fork background computations or run other flows asynchronously.
736
+
However, take care **not** to share the `FlowEmit` instance across threads. That is, instances of `FlowEmit` are
737
+
thread-unsafe and should only be used on the calling thread.
727
738
The lifetime of `FlowEmit` should not extend over the duration of the invocation of `usingEmit`.
728
739
729
-
Any asynchronous communication should be best done with `Channel`s. You can then manually forward any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.
740
+
Any asynchronous communication should be best done with `Channel`s. You can then manually forward any elements received
741
+
from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.
730
742
731
743
### Transforming flows: basics
732
744
733
-
Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages:
745
+
Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the
746
+
extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a
747
+
number of pre-defined transformation stages:
734
748
735
749
```java
736
750
importjava.util.Map;
751
+
737
752
importcom.softwaremill.jox.flows.Flows;
738
753
739
754
publicclassDemo {
@@ -749,7 +764,8 @@ public class Demo {
749
764
}
750
765
```
751
766
752
-
You can also define arbitrary element-emitting logic, using each incoming element using `.mapUsingEmit`, similarly to `Flows.usingEmit` above.
767
+
You can also define arbitrary element-emitting logic, using each incoming element using `.mapUsingEmit`, similarly to
Flows.tick(Duration.ofSeconds(1), "x").runDrain(); // never finishes
785
+
}
770
786
}
771
787
```
772
788
773
-
Running a flow is a blocking operation. Unless asynchronous boundaries are present (explicit or implicit, more on this below), the entire processing happens on the calling thread. For example such a pipeline:
789
+
Running a flow is a blocking operation. Unless asynchronous boundaries are present (explicit or implicit, more on this
790
+
below), the entire processing happens on the calling thread. For example such a pipeline:
774
791
775
792
```java
776
793
importcom.softwaremill.jox.flows.Flows;
@@ -785,19 +802,28 @@ public class Demo {
785
802
}
786
803
}
787
804
```
788
-
Processes the elements one-by-one on the thread that is invoking the run method.
789
805
806
+
Processes the elements one-by-one on the thread that is invoking the run method.
790
807
791
808
### Transforming flows: concurrency
792
809
793
-
A number of flow transformations introduces asynchronous boundaries. For example, `.mapPar(int parallelism, Function<T,U> mappingFunction)` describes a flow,
794
-
which runs the pipeline defined so far in the background, emitting elements to a `channel`. Another `fork` reads these elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by the returned flow.
810
+
A number of flow transformations introduces asynchronous boundaries. For example,
811
+
`.mapPar(int parallelism, Function<T,U> mappingFunction)` describes a flow,
812
+
which runs the pipeline defined so far in the background, emitting elements to a `channel`. Another `fork` reads these
813
+
elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by
814
+
the returned flow.
795
815
796
-
Behind the scenes, a new concurrency `Scope` is created along with a number of forks. In case of any exceptions, everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from the preceding pipeline are propagated through the channel.
816
+
Behind the scenes, a new concurrency `Scope` is created along with a number of forks. In case of any exceptions,
817
+
everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from
818
+
the preceding pipeline are propagated through the channel.
797
819
798
-
Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and `I/O` stages. The created channels serve as buffers between the pipeline stages, and their capacity is defined by the `ScopedValue``Channel.BUFFER_SIZE` in the scope, or default `Channel.DEFAULT_BUFFER_SIZE` is used.
820
+
Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and `I/O` stages. The
821
+
created channels serve as buffers between the pipeline stages, and their capacity is defined by the `ScopedValue`
822
+
`Channel.BUFFER_SIZE` in the scope, or default `Channel.DEFAULT_BUFFER_SIZE` is used.
799
823
800
-
Explicit asynchronous boundaries can be inserted using `.buffer()`. This might be useful if producing the next element to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the producer should buffer up elements.
824
+
Explicit asynchronous boundaries can be inserted using `.buffer()`. This might be useful if producing the next element
825
+
to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the
826
+
producer should buffer up elements.
801
827
802
828
### Interoperability with channels
803
829
@@ -825,20 +851,28 @@ public class Demo {
825
851
}
826
852
```
827
853
828
-
The method above needs to be run within a concurrency scope, as `.runToChannel()` creates a background fork which runs the pipeline described by the flow, and emits its elements onto the returned channel.
854
+
The method above needs to be run within a concurrency scope, as `.runToChannel()` creates a background fork which runs
855
+
the pipeline described by the flow, and emits its elements onto the returned channel.
829
856
830
857
### Text transformations and I/O operations
831
858
832
-
For smooth operations on `byte[]`, we've created a wrapper class `ByteChunk`. And for smooth type handling we created a dedicated `ByteFlow`, a subtype of `Flow<ByteChunk>`.
833
-
To be able to utilize text and I/O operations, you need to create or transform into `ByteFlow`. It can be created via `Flows.fromByteArray` or `Flows.fromByteChunk`.
834
-
`Flow` containing `byte[]` or `ByteChunk` can be transformed by using `toByteFlow()` method. Any other flow can be transformed by using `toByteFlow()` with mapping function.
859
+
For smooth operations on `byte[]`, we've created a wrapper class `ByteChunk`. And for smooth type handling we created a
860
+
dedicated `ByteFlow`, a subtype of `Flow<ByteChunk>`.
861
+
To be able to utilize text and I/O operations, you need to create or transform into `ByteFlow`. It can be created via
862
+
`Flows.fromByteArray` or `Flows.fromByteChunk`.
863
+
`Flow` containing `byte[]` or `ByteChunk` can be transformed by using `toByteFlow()` method. Any other flow can be
864
+
transformed by using `toByteFlow()` with mapping function.
835
865
836
866
#### Text operations
867
+
837
868
*`encodeUtf8` encodes a `Flow<String>` into a `ByteFlow`
838
-
*`linesUtf8` decodes a `ByteFlow` into a `Flow<String>`. Assumes that the input represents text with line breaks. The `String` elements emitted by resulting `Flow<String>` represent text lines.
839
-
*`decodeStringUtf8` to decode a `ByteFlow` into a `Flow<String>`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks.
869
+
*`linesUtf8` decodes a `ByteFlow` into a `Flow<String>`. Assumes that the input represents text with line breaks. The
870
+
`String` elements emitted by resulting `Flow<String>` represent text lines.
871
+
*`decodeStringUtf8` to decode a `ByteFlow` into a `Flow<String>`, without handling line breaks, just processing input
872
+
bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks.
840
873
841
874
#### I/O Operations
875
+
842
876
*`runToInputStream(UnsupervisedScope scope)` runs given flow asynchronously into returned `InputStream`
843
877
*`runToOutputStream(OutputStream outputStream)` runs given flow into provided `OutputStream`
844
878
*`runToFile(Path path)` runs given flow into file. If file does not exist, it's created.
@@ -847,7 +881,8 @@ It is also possible to create Flow from `inputStream` or `path` using `Flows` fa
847
881
848
882
### Logging
849
883
850
-
Jox does not have any integrations with logging libraries, but it provides a simple way to log elements emitted by flows using the `.tap` method:
884
+
Jox does not have any integrations with logging libraries, but it provides a simple way to log elements emitted by flows
885
+
using the `.tap` method:
851
886
852
887
```java
853
888
importcom.softwaremill.jox.flows.Flows;
@@ -874,19 +909,18 @@ process. Hence, the scope should remain active as long as the publisher is used.
874
909
Internally, elements emitted by the flow are buffered, using a buffer of capacity given by the `Channel.BUFFER_SIZE` in
875
910
scope.
876
911
877
-
To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the `reactive-streams` dependency and
912
+
To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the `reactive-streams` dependency and
878
913
use `org.reactivestreams.FlowAdapters`.
879
914
880
-
881
915
#### Publisher -> Flow
882
916
883
917
A `java.util.concurrent.Flow.Publisher` can be converted to a `Flow` using `Flow.fromPublisher`.
884
918
885
919
Internally, elements published to the subscription are buffered, using a buffer of capacity given by the
886
920
`Channel.BUFFER_SIZE` in scope. That's also how many elements will be at most requested from the publisher at a time.
887
921
888
-
To convert a `org.reactivestreams.Publisher` instance, you'll need the same dependency as above and use`org.reactivestreams.FlowAdapters`.
889
-
922
+
To convert a `org.reactivestreams.Publisher` instance, you'll need the same dependency as above and use
0 commit comments