Pipeline is a high-performance, concurrent data processing library for Go that enables you to build flexible pipelines.
It's an implementation for this original thought: How to design a pipeline in go.
- High Concurrency: Process data in parallel with configurable worker pools at each pipeline stage
- Type Safety with Generics: Leverage Go's generics for compile-time type checking between pipeline stages
- Graceful Shutdown: Clean termination that waits for in-progress jobs to complete
- Cycle Detection: Automatic detection and prevention of circular dependencies that could cause deadlocks
- Flexible Error Handling: Customize error handling with your own error handler functions
- Job Tracking: Monitor pipeline progress by retrieving the latest job processed (can be used to log break point)
- Directed Acyclic Graph (DAG): Complex pipeline with multi branchs are supported
- Implements a DAG (Directed Acyclic Graph) architecture
- Provides cycle detection to prevent deadlocks
- Supports complex topologies with multiple branches
- Uses Go generics for type safety between pipeline stages
- Each node has:
- A processing function
- Configurable worker pool size
- Job queue with configurable size
- Error handling capabilities
- Ability to track the latest processed job
- Uses Go channels for communication between nodes
- Implements worker pools for parallel processing
- Provides graceful shutdown mechanism
- Uses sync.WaitGroup for coordinating workers
- Four log levels: Trace, Debug, Info, Error
- Configurable log level
go get github.com/huahuayu/pipeline
Create a simple pipeline A ─> B ─> C
then start, stop & wait for grace shutdown.
package main
import (
"fmt"
"time"
"github.com/huahuayu/pipeline"
)
func main() {
// Create processing nodes
nodeA := pipeline.NewDefaultNode[string]("nodeA", func(input string) (any, error) {
return input + " processed by A", nil
})
nodeB := pipeline.NewDefaultNode[string]("nodeB", func(input string) (any, error) {
return input + " -> B", nil
})
nodeC := pipeline.NewDefaultNode[string]("nodeC", func(input string) (any, error) {
fmt.Println("Result:", input+" -> C")
return nil, nil
})
// Connect nodes
nodeA.SetNext(nodeB)
nodeB.SetNext(nodeC)
// Create and start the pipeline
p, err := pipeline.NewPipeline(nodeA)
if err != nil {
panic(err)
}
p.Start()
// Send jobs to the pipeline
p.JobReceiver("Job 1")
p.JobReceiver("Job 2")
p.JobReceiver("Job 3")
// Wait for a while to let jobs process
time.Sleep(2 * time.Second)
// Stop the pipeline
p.Stop()
p.Wait()
fmt.Println("Pipeline processing complete")
}
The pipeline library supports Directed Acyclic Graph (DAG) structures, allowing you to create advanced processing workflows where data can flow through multiple parallel paths.
For example, you can create a branche topology where:
- Data starts at node A
- Flows through two parallel branches (B-D and C-E-F)
┌── B ──► D
│
A ──┴── C ─── E ───► F
Here's how to construct this topology in code:
// First create all nodes with their processing functions
nodeA := pipeline.NewDefaultNode[InputType]("nodeA", nodeAProcessingFunc)
nodeB := pipeline.NewDefaultNode[TypeFromA]("nodeB", nodeBProcessingFunc)
nodeC := pipeline.NewDefaultNode[TypeFromA]("nodeC", nodeCProcessingFunc)
nodeD := pipeline.NewDefaultNode[TypeFromB]("nodeD", nodeDProcessingFunc)
nodeE := pipeline.NewDefaultNode[TypeFromC]("nodeE", nodeEProcessingFunc)
nodeF := pipeline.NewDefaultNode[TypeFromDorE]("nodeF", nodeFProcessingFunc)
// Then connect the nodes according to the desired topology
nodeA.SetNext(nodeB) // A outputs to B
nodeA.SetNext(nodeC) // A also outputs to C
nodeB.SetNext(nodeD) // B outputs to D
nodeC.SetNext(nodeE) // C outputs to E
nodeE.SetNext(nodeF) // E outputs to F
Data will flow through all possible paths.
Customize the number of workers and job queue size:
// Create a node with 5 workers and a job queue size of 10
nodeA := pipeline.NewNode[string]("nodeA", processFunc, 10, 5, nil)
Provide custom error handling logic:
errorHandler := func(err error) {
// Custom error handling logic
log.Printf("Error in pipeline: %v", err)
metrics.IncrementErrorCounter()
}
nodeA := pipeline.NewNode[string]("nodeA", processFunc, 10, 5, errorHandler)
// Create a node with default configuration
NewDefaultNode[T any](name string, workFn func(T) (any, error), onError ...func(error)) *Node[T]
// Create a node with custom configuration
NewNode[T any](name string, workFn func(T) (any, error), jobPoolSize int, workerPoolSize int, onError ...func(error)) *Node[T]
// Create a new pipeline with a root node
NewPipeline(root INode) (*Pipeline, error)
// Start the pipeline
Start() chan struct{}
// Stop the pipeline
Stop()
// Wait for all nodes to finish processing
Wait()
// Get the latest job processed by the pipeline
LatestJob() any
// Set the log level
SetLevel(l LogLevel)
// Log levels
TraceLevel
DebugLevel
InfoLevel
ErrorLevel
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
Please make sure your code passes all tests.
This project is licensed under the MIT License.