Skip to content

Commit 7c39621

Browse files
author
YN
committed
The library has been rewritten to work with generics and with simplified semantics.
1 parent 4259fb6 commit 7c39621

File tree

8 files changed

+305
-314
lines changed

8 files changed

+305
-314
lines changed

.golangci.yml

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ linters-settings:
88
funlen:
99
lines: 100
1010
statements: 50
11-
gci:
12-
local-prefixes: github.com/golangci/golangci-lint
1311
goconst:
1412
min-len: 2
1513
min-occurrences: 2
@@ -38,12 +36,10 @@ linters-settings:
3836

3937
linters:
4038
enable:
41-
- deadcode
4239
- depguard
4340
- dogsled
4441
- dupl
4542
- errcheck
46-
- exportloopref
4743
- exhaustive
4844
- funlen
4945
- gochecknoinits
@@ -52,7 +48,6 @@ linters:
5248
- gocyclo
5349
- gofmt
5450
- goimports
55-
- gomnd
5651
- goprintffuncname
5752
- gosec
5853
- gosimple
@@ -64,19 +59,16 @@ linters:
6459
- nolintlint
6560
- rowserrcheck
6661
- staticcheck
67-
- structcheck
6862
- stylecheck
6963
- typecheck
7064
- unconvert
7165
- unparam
7266
- unused
73-
- varcheck
7467
- whitespace
7568
- asciicheck
7669
- gochecknoglobals
7770
- gocognit
7871
- godox
79-
- goerr113
8072
- nestif
8173
- prealloc
8274
- revive
@@ -91,11 +83,9 @@ linters:
9183
- predeclared
9284
- tparallel
9385
disable:
94-
- exhaustivestruct
9586
- paralleltest
9687
- bodyclose
9788
- godot
98-
- ifshort
9989
- noctx
10090
- sqlclosecheck
10191
- testpackage
@@ -104,4 +94,13 @@ issues:
10494
exclude-rules:
10595
- path: _test\.go
10696
linters:
107-
- gosec
97+
- gosec
98+
99+
- path: _test\.go$
100+
linters:
101+
- revive
102+
text: "empty-block"
103+
- path: _test\.go$
104+
linters:
105+
- depguard
106+
text: "not allowed from list 'Main'"

README.md

Lines changed: 45 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -24,127 +24,91 @@ Installation
2424
------------
2525

2626
```
27-
go get github.com/nazar256/parapipe
27+
go get -u github.com/nazar256/parapipe@latest
2828
```
2929

3030
Usage
3131
-----
3232

33-
1. Create a pipeline
33+
1. Create a pipeline with first step. Processing callback is generic (so as the pipeline).
34+
It may receive and return any type of data, but the second return value should always be a boolean.
3435

3536
```go
36-
cfg := parapipe.Config{
37-
ProcessErrors: false, // messages implementing "error" interface will not be passed to subsequent workers
38-
}
39-
pipeline := parapipe.NewPipeline(cfg)
37+
concurrency := runtime.NumCPU() // how many messages to process concurrently for each pipe
38+
pipeline := parapipe.NewPipeline(concurrency, func(msg YourInputType) (YourOutputType, bool) {
39+
// do something and generate a new value "someValue"
40+
shouldProceedWithNextStep := true
41+
return someValue, shouldProceedWithNextStep
42+
})
4043
```
4144

42-
2. Add pipes - call `Pipe()` method one or more times
45+
2. Add pipes - call `Attach()` function one or more times to add steps to the pipeline
4346
```go
44-
concurrency := 5 // how many messages to process concurrently for each pipe
45-
pipeline.Pipe(concurrency, func(msg interface{}) interface{} {
46-
typedMsg := msg.(YourInputType) // assert your type for the message
47-
// do something and generate a new value "someValue"
48-
return someValue
47+
p1 := parapipe.NewPipeline(runtime.NumCPU(), func(msg int) (int, bool) {
48+
time.Sleep(30 * time.Millisecond)
49+
return msg + 1000, true
4950
})
50-
```
51+
p2 := parapipe.Attach(p1, parapipe.NewPipeline(concurrency, func(msg int) (string, bool) {
52+
time.Sleep(30 * time.Millisecond)
53+
return strconv.Itoa(msg), true
54+
}))
55+
56+
// final pipeline you are going to work with (push messages and read output)
57+
pipeline := parapipe.Attach(p2, parapipe.NewPipeline(concurrency, func(msg string) (string, bool) {
58+
time.Sleep(30 * time.Millisecond)
59+
return "#" + msg, true
60+
}))
61+
```
62+
5163
3. Get "out" channel when all pipes are added and read results from it
5264
```go
5365
for result := range pipeline.Out() {
54-
typedResut := result.(YourResultType)
5566
// do something with the result
5667
}
5768
```
58-
It's **important** to read everything from "out" even when the pipeline won't produce any viable result.
59-
It will be stuck otherwise.
69+
It's **important** to drain the pipeline (read everything from "out") even when the pipeline won't produce any viable result.
70+
It could be stuck otherwise.
6071

6172
4. Push values for processing into the pipeline:
6273
```go
6374
pipeline.Push("something")
6475
```
6576

66-
5. Close pipeline to clean up its resources and close its output channel after the last message.
67-
All internal channels, goroutines, including `Out()` channel will be closed in a cascade.
77+
5. Close pipeline to after the last message. This will cleanup its resources and close its output channel.
6878
It's not recommended closing pipeline using `defer` because you may not want to hang output util defer is executed.
6979
```go
7080
pipeline.Close()
7181
```
7282

73-
### Error handling
83+
### Circuit breaking
84+
85+
In some cases (errors) there could be impossible to process a message, thus there is no way to pass it further.
86+
In such case just return `false` as a second return value from the step processing callback.
87+
The first value will be ignored.
7488

75-
To handle errors just return them as a result then listen to them on Out.
76-
By default, errors will not be processed by subsequent stages.
7789
```go
78-
pipeline.Pipe(4, func(msg interface{}) interface{} {
79-
inputValue := msg.(YourInputType) // assert your type for the message
90+
pipeline.Pipe(4, func(inputValue InputType) (OutputType, bool) {
8091
someValue, err := someOperation(inputValue)
8192
if err != nil {
82-
return err // error can also be a result and can be returned from a pipeline stage (pipe)
93+
// handle the error
94+
// slog.Error("error when calling someOperation", "err", err)
95+
return someValue, false
8396
}
84-
return someValue
97+
return someValue, true
8598
})
8699
// ...
87100
for result := range pipeline.Out() {
88-
err := result.(error)
89-
if err != nil {
90-
// handle the error
91-
// you may want to stop sending new values to the pipeline in your own way and do close(pipeline.In())
92-
}
93-
typedResut := result.(YourResultType)
94101
// do something with the result
95102
}
96103
```
97104

98-
Optionally you may allow passing errors to subsequent pipes.
99-
For example, if you do not wish to stop the pipeline on errors, but rather process them in subsequent pipes.
100-
```go
101-
cfg := parapipe.Config{
102-
ProcessErrors: true, // messages implementing "error" interface will be passed to subsequent workers as any message
103-
}
104-
concurrency := 5 // how many messages to process concurrently for each pipe
105-
106-
pipeline := parapipe.NewPipeline(cfg).
107-
Pipe(concurrency, func(msg interface{}) interface{} {
108-
inputValue := msg.(YourInputType) // assert your type for the message
109-
someValue, err := someOperation(inputValue)
110-
if err != nil {
111-
return err // error can also be a result and can be returned from a pipeline stage (pipe)
112-
}
113-
return someValue
114-
}).
115-
Pipe(concurrency, func(msg interface{}) interface{} {
116-
switch inputValue := msg.(type) {
117-
case error:
118-
// process error
119-
case YourNormalExpectedType:
120-
// process message normally
121-
}
122-
})
123-
```
124-
125-
### Limitations
126-
127-
* `Out()` method can be used only once on each pipeline. Any subsequent `Pipe()` call will cause panic.
128-
Though, when you need to stream values somewhere from the middle of the pipeline - just send them to your own channel.
129-
* do not try to `Push` to the pipeline before the first `Pipe` is defined - it will panic
130-
* as at the time of writing Go does not have generics, you have to assert the type for incoming messages in pipes explicitly,
131-
which means the type of the message can be checked in runtime only.
132-
133105
### Performance
134106

135-
As already was mentioned, parapipe makes use of `interface{}` and also executes callbacks in a separate goroutine per each message.
136-
This can have a great performance impact because of heap allocation and creation of goroutines.
137-
For instance if you try to stream a slice of integers, each of them will be converted to an interface type and
138-
will likely be allocated in heap.
139-
Moreover, if an execution time of each step is relatively small,
140-
than a goroutine creation may decrease overall performance considerably.
141-
142-
If the performance is the priority, its recommended that you pack such messages in batches (i.e. slices)
143-
and stream that batches instead.
144-
Obviously that's your responsibility to process batch in the order you like inside step (pipe) callback.
145-
146-
Basically the overall recommendations for choosing batch size are in general the same as if you have to create a slice of interfaces
147-
or create a new goroutine.
107+
Parapipe makes use of generics and channels.
108+
Overall it should be performant enough for most of the cases.
109+
It has zero heap allocations in hot code, thus generates little load for garbage collector.
110+
However, it uses channels under the hood and is bottlenecked mostly by the channel operations which are several
111+
writes and reads per each message.
148112

149113
Examples
150114
--------
@@ -159,7 +123,7 @@ See the [working example of using parapipe in AMQP client](http://github.com/naz
159123

160124
With parapipe you can:
161125

162-
* respond a JSON-feed as stream, retrieve, enrich and marshal each object concurrently, in maintained order and return them to the client
126+
* in your API respond a long JSON-feed as stream, retrieve, enrich and marshal each object concurrently, in maintained order and return them to the client
163127
* fetch and merge entries from different sources as one stream
164-
* structure your HTTP-controllers
128+
* structure your API controllers or handlers
165129
* processing heavy files in effective way

gen_data_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,21 @@ package parapipe_test
22

33
import "github.com/nazar256/parapipe"
44

5-
func makeRange(min, max int) []int {
6-
a := make([]int, max-min+1)
5+
func makeRange(start, end int) []int {
6+
a := make([]int, end-start+1)
77
for i := range a {
8-
a[i] = min + i
8+
a[i] = start + i
99
}
1010

1111
return a
1212
}
1313

14-
func feedPipeline(pipeline *parapipe.Pipeline, amount int) {
14+
func feedPipeline[T any](pipeline *parapipe.Pipeline[int, T], amount int) {
1515
go func() {
1616
for i := 0; i < amount; i++ {
1717
pipeline.Push(i)
1818
}
19+
1920
pipeline.Close()
2021
}()
2122
}

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
module github.com/nazar256/parapipe
22

3+
go 1.18
4+

0 commit comments

Comments
 (0)