Skip to content

Commit f378dcc

Browse files
committed
add Stream.Peek
1 parent ed0fd99 commit f378dcc

File tree

2 files changed

+101
-6
lines changed

2 files changed

+101
-6
lines changed

stream.go

+20-6
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,6 @@ func (s Stream[T]) Concurrency() int {
7878
//
7979
// This is used for concurrent methods such as Stream.Map.
8080
//
81-
// Consumption is ordered by the stream's channel but output
82-
// may be unordered (a slow consumer will be "out-raced" by faster
83-
// consumers). Ordering is dependent on the implementation of
84-
// concurrency. For instance Stream.Map() is orderly but
85-
// Stream.ForEachC is not.
86-
//
8781
// Note that to switch off concurrency, you should provide n = 0.
8882
// With n = 1, concurrency is internal whereby the Stream writer
8983
// will not block on writing a single element (i.e. buffered
@@ -307,6 +301,26 @@ func (s Stream[T]) ForEach(c Consumer[T]) {
307301
}
308302
}
309303

304+
// Peek is akin to ForEach but returns the Stream.
305+
//
306+
// This is useful e.g. for debugging.
307+
//
308+
// This function streams continuously until the in-stream is closed at
309+
// which point the out-stream will be closed too.
310+
func (s Stream[T]) Peek(consumer Consumer[T]) Stream[T] {
311+
outstream := make(chan T, cap(s.stream))
312+
313+
go func() {
314+
defer close(outstream)
315+
s.ForEach(func(e T) {
316+
consumer(e)
317+
outstream <- e
318+
})
319+
}()
320+
321+
return NewConcurrentStream(outstream, s.concurrency)
322+
}
323+
310324
// ToSlice extracts the elements of the stream into a []T.
311325
//
312326
// This is a special case of a reduction.

stream_test.go

+81
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,87 @@ func TestStream_Distinct(t *testing.T) {
595595
}
596596
}
597597

598+
func TestStream_Peek(t *testing.T) {
599+
computeSumTotal := func(callCount, total *int) Consumer[int] {
600+
return func(value int) {
601+
*callCount++
602+
*total += value
603+
}
604+
}
605+
606+
tt := []struct {
607+
name string
608+
stream chan int
609+
consumer func(callCount, total *int) Consumer[int]
610+
want []int
611+
wantTotal int
612+
wantCallCount int
613+
}{
614+
{
615+
name: "Should peek and return empty stream when nil in-stream",
616+
stream: nil,
617+
consumer: computeSumTotal,
618+
want: []int{},
619+
wantTotal: 0,
620+
wantCallCount: 0,
621+
},
622+
{
623+
name: "Should peek and return empty stream when empty in-stream",
624+
stream: func() chan int {
625+
c := make(chan int)
626+
go func() {
627+
defer close(c)
628+
}()
629+
return c
630+
}(),
631+
consumer: computeSumTotal,
632+
want: []int{},
633+
wantTotal: 0,
634+
wantCallCount: 0,
635+
},
636+
{
637+
name: "Should peek and return stream when populated in-stream",
638+
stream: func() chan int {
639+
c := make(chan int)
640+
go func() {
641+
defer close(c)
642+
c <- 1
643+
c <- 2
644+
c <- 3
645+
c <- 5
646+
c <- 8
647+
}()
648+
return c
649+
}(),
650+
consumer: computeSumTotal,
651+
want: []int{
652+
1,
653+
2,
654+
3,
655+
5,
656+
8,
657+
},
658+
wantTotal: 19,
659+
wantCallCount: 5,
660+
},
661+
}
662+
663+
for _, tc := range tt {
664+
t.Run(tc.name, func(t *testing.T) {
665+
callCount, total := 0, 0
666+
667+
s := Stream[int]{
668+
stream: tc.stream,
669+
}
670+
671+
got := s.Peek(tc.consumer(&callCount, &total))
672+
assert.EqualValues(t, tc.want, got.ToSlice())
673+
assert.Equal(t, tc.wantTotal, total)
674+
assert.Equal(t, tc.wantCallCount, callCount)
675+
})
676+
}
677+
}
678+
598679
var float2int = func() Function[float32, Any] {
599680
return func(f float32) Any {
600681
return int(f)

0 commit comments

Comments
 (0)