-
Notifications
You must be signed in to change notification settings - Fork 203
Add generic buffer.TypedRingGrowing and shrinkable buffer.Ring #323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,31 +16,57 @@ limitations under the License. | |
|
||
package buffer | ||
|
||
// defaultRingSize defines the default ring size if not specified | ||
const defaultRingSize = 16 | ||
|
||
// RingGrowingOptions sets parameters for [RingGrowing] and | ||
// [TypedRingGrowing]. | ||
type RingGrowingOptions struct { | ||
// InitialSize is the number of pre-allocated elements in the | ||
// initial underlying storage buffer. | ||
InitialSize int | ||
} | ||
|
||
// RingGrowing is a growing ring buffer. | ||
// Not thread safe. | ||
type RingGrowing struct { | ||
data []interface{} | ||
// | ||
// Deprecated: Use TypedRingGrowing[any] instead. | ||
type RingGrowing = TypedRingGrowing[any] | ||
|
||
// NewRingGrowing constructs a new RingGrowing instance with provided parameters. | ||
// | ||
// Deprecated: Use NewTypedRingGrowing[any] instead. | ||
func NewRingGrowing(initialSize int) *RingGrowing { | ||
return NewTypedRingGrowing[any](RingGrowingOptions{InitialSize: initialSize}) | ||
} | ||
|
||
// TypedRingGrowing is a growing ring buffer. | ||
// The zero value has an initial size of 0 and is ready to use. | ||
// Not thread safe. | ||
type TypedRingGrowing[T any] struct { | ||
data []T | ||
n int // Size of Data | ||
beg int // First available element | ||
readable int // Number of data items available | ||
} | ||
|
||
// NewRingGrowing constructs a new RingGrowing instance with provided parameters. | ||
func NewRingGrowing(initialSize int) *RingGrowing { | ||
return &RingGrowing{ | ||
data: make([]interface{}, initialSize), | ||
n: initialSize, | ||
// NewTypedRingGrowing constructs a new TypedRingGrowing instance with provided parameters. | ||
func NewTypedRingGrowing[T any](opts RingGrowingOptions) *TypedRingGrowing[T] { | ||
return &TypedRingGrowing[T]{ | ||
data: make([]T, opts.InitialSize), | ||
n: opts.InitialSize, | ||
} | ||
} | ||
|
||
// ReadOne reads (consumes) first item from the buffer if it is available, otherwise returns false. | ||
func (r *RingGrowing) ReadOne() (data interface{}, ok bool) { | ||
func (r *TypedRingGrowing[T]) ReadOne() (data T, ok bool) { | ||
if r.readable == 0 { | ||
return nil, false | ||
return | ||
} | ||
r.readable-- | ||
element := r.data[r.beg] | ||
r.data[r.beg] = nil // Remove reference to the object to help GC | ||
var zero T | ||
r.data[r.beg] = zero // Remove reference to the object to help GC | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is only true if T is a pointer, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. e.g. (I'm not the author of the PR but I wrote the original code) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, yeah , I mean , if we have an int this will be 0 , right :) ... maybe pedantic, or nitpicking, just for correctness There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ash2k: perhaps you can help with the review then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pohly I did have a look earlier today and I have the same questions re. allocations. Overall it looks good. |
||
if r.beg == r.n-1 { | ||
// Was the last element | ||
r.beg = 0 | ||
|
@@ -51,11 +77,14 @@ func (r *RingGrowing) ReadOne() (data interface{}, ok bool) { | |
} | ||
|
||
// WriteOne adds an item to the end of the buffer, growing it if it is full. | ||
func (r *RingGrowing) WriteOne(data interface{}) { | ||
func (r *TypedRingGrowing[T]) WriteOne(data T) { | ||
if r.readable == r.n { | ||
// Time to grow | ||
newN := r.n * 2 | ||
newData := make([]interface{}, newN) | ||
if newN == 0 { | ||
newN = defaultRingSize | ||
} | ||
newData := make([]T, newN) | ||
to := r.beg + r.readable | ||
if to <= r.n { | ||
copy(newData, r.data[r.beg:to]) | ||
|
@@ -72,11 +101,70 @@ func (r *RingGrowing) WriteOne(data interface{}) { | |
} | ||
|
||
// Len returns the number of items in the buffer. | ||
func (r *RingGrowing) Len() int { | ||
func (r *TypedRingGrowing[T]) Len() int { | ||
return r.readable | ||
} | ||
|
||
// Cap returns the capacity of the buffer. | ||
func (r *RingGrowing) Cap() int { | ||
func (r *TypedRingGrowing[T]) Cap() int { | ||
return r.n | ||
} | ||
|
||
// RingOptions sets parameters for [Ring]. | ||
type RingOptions struct { | ||
// InitialSize is the number of pre-allocated elements in the | ||
// initial underlying storage buffer. | ||
InitialSize int | ||
// NormalSize is the number of elements to allocate for new storage | ||
// buffers once the Ring is consumed and | ||
// can shrink again. | ||
NormalSize int | ||
} | ||
|
||
// Ring is a dynamically-sized ring buffer which can grow and shrink as-needed. | ||
// The zero value has an initial size and normal size of 0 and is ready to use. | ||
// Not thread safe. | ||
type Ring[T any] struct { | ||
growing TypedRingGrowing[T] | ||
normalSize int // Limits the size of the buffer that is kept for reuse. Read-only. | ||
nojnhuh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// NewRing constructs a new Ring instance with provided parameters. | ||
func NewRing[T any](opts RingOptions) *Ring[T] { | ||
return &Ring[T]{ | ||
growing: *NewTypedRingGrowing[T](RingGrowingOptions{InitialSize: opts.InitialSize}), | ||
normalSize: opts.NormalSize, | ||
} | ||
} | ||
|
||
// ReadOne reads (consumes) first item from the buffer if it is available, | ||
// otherwise returns false. When the buffer has been totally consumed and has | ||
// grown in size beyond its normal size, it shrinks down to its normal size again. | ||
func (r *Ring[T]) ReadOne() (data T, ok bool) { | ||
element, ok := r.growing.ReadOne() | ||
|
||
if r.growing.readable == 0 && r.growing.n > r.normalSize { | ||
// The buffer is empty. Reallocate a new buffer so the old one can be | ||
// garbage collected. | ||
r.growing.data = make([]T, r.normalSize) | ||
r.growing.n = r.normalSize | ||
r.growing.beg = 0 | ||
} | ||
Comment on lines
+146
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this effective? allocating and deallocating memory vs reusing it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way I understood this change is that the idea is to shrink the hugely expanded buffer back to some typical size. E.g. there was a spike of usage (as it happens when e.g. a controller starts and fills up a workqueue but the workers only start when all informers have synced). Waiting for it to get to 0 first allows to eliminate the need for copying the data, which is good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I.e. it is less effective but the goal is to reduce the amount of ram used so we have to free the "big" buffer and allocate a new one that is "normal". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
A new, smaller array gets allocated here so the old, larger array can be garbage collected. I don't think it's possible to inform the Go runtime that part of an array backing a slice can be garbage collected, only that the entire backing array can be by removing all references to any part of the backing array. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack |
||
|
||
return element, ok | ||
} | ||
|
||
// WriteOne adds an item to the end of the buffer, growing it if it is full. | ||
func (r *Ring[T]) WriteOne(data T) { | ||
r.growing.WriteOne(data) | ||
} | ||
|
||
// Len returns the number of items in the buffer. | ||
func (r *Ring[T]) Len() int { | ||
return r.growing.Len() | ||
} | ||
|
||
// Cap returns the capacity of the buffer. | ||
func (r *Ring[T]) Cap() int { | ||
return r.growing.Cap() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,47 +17,211 @@ limitations under the License. | |
package buffer | ||
|
||
import ( | ||
"reflect" | ||
"testing" | ||
) | ||
|
||
func TestGrowth(t *testing.T) { | ||
func TestGrowthGrowing(t *testing.T) { | ||
t.Parallel() | ||
x := 10 | ||
g := NewRingGrowing(1) | ||
for i := 0; i < x; i++ { | ||
if e, a := i, g.readable; !reflect.DeepEqual(e, a) { | ||
t.Fatalf("expected equal, got %#v, %#v", e, a) | ||
} | ||
g.WriteOne(i) | ||
} | ||
read := 0 | ||
for g.readable > 0 { | ||
v, ok := g.ReadOne() | ||
if !ok { | ||
t.Fatal("expected true") | ||
} | ||
if read != v { | ||
t.Fatalf("expected %#v==%#v", read, v) | ||
} | ||
read++ | ||
tests := map[string]struct { | ||
ring *TypedRingGrowing[int] | ||
initialSize int | ||
}{ | ||
"implicit-zero": { | ||
ring: new(TypedRingGrowing[int]), | ||
}, | ||
"explicit-zero": { | ||
ring: NewTypedRingGrowing[int](RingGrowingOptions{InitialSize: 0}), | ||
initialSize: 0, | ||
}, | ||
"nonzero": { | ||
ring: NewTypedRingGrowing[int](RingGrowingOptions{InitialSize: 1}), | ||
initialSize: 1, | ||
}, | ||
} | ||
if x != read { | ||
t.Fatalf("expected to have read %d items: %d", x, read) | ||
|
||
for name, test := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
initialSize := test.initialSize | ||
g := test.ring | ||
|
||
if expected, actual := 0, g.Len(); expected != actual { | ||
t.Fatalf("expected Len to be %d, got %d", expected, actual) | ||
} | ||
if expected, actual := initialSize, g.Cap(); expected != actual { | ||
t.Fatalf("expected Cap to be %d, got %d", expected, actual) | ||
} | ||
|
||
x := 10 | ||
for i := 0; i < x; i++ { | ||
if e, a := i, g.readable; e != a { | ||
t.Fatalf("expected equal, got %#v, %#v", e, a) | ||
} | ||
g.WriteOne(i) | ||
} | ||
|
||
if expected, actual := x, g.Len(); expected != actual { | ||
t.Fatalf("expected Len to be %d, got %d", expected, actual) | ||
} | ||
if expected, actual := 16, g.Cap(); expected != actual { | ||
t.Fatalf("expected Cap to be %d, got %d", expected, actual) | ||
} | ||
|
||
read := 0 | ||
for g.readable > 0 { | ||
v, ok := g.ReadOne() | ||
if !ok { | ||
t.Fatal("expected true") | ||
} | ||
if read != v { | ||
t.Fatalf("expected %#v==%#v", read, v) | ||
} | ||
read++ | ||
} | ||
if x != read { | ||
t.Fatalf("expected to have read %d items: %d", x, read) | ||
} | ||
if expected, actual := 0, g.Len(); expected != actual { | ||
t.Fatalf("expected Len to be %d, got %d", expected, actual) | ||
} | ||
if expected, actual := 16, g.Cap(); expected != actual { | ||
t.Fatalf("expected Cap to be %d, got %d", expected, actual) | ||
} | ||
}) | ||
} | ||
if g.readable != 0 { | ||
t.Fatalf("expected readable to be zero: %d", g.readable) | ||
|
||
} | ||
|
||
func TestGrowth(t *testing.T) { | ||
t.Parallel() | ||
|
||
tests := map[string]struct { | ||
ring *Ring[int] | ||
initialSize int | ||
normalSize int | ||
}{ | ||
"implicit-zero": { | ||
ring: new(Ring[int]), | ||
}, | ||
"explicit-zero": { | ||
ring: NewRing[int](RingOptions{InitialSize: 0, NormalSize: 0}), | ||
initialSize: 0, | ||
normalSize: 0, | ||
}, | ||
"smaller-initial-size": { | ||
ring: NewRing[int](RingOptions{InitialSize: 1, NormalSize: 2}), | ||
initialSize: 1, | ||
normalSize: 2, | ||
}, | ||
"smaller-normal-size": { | ||
ring: NewRing[int](RingOptions{InitialSize: 2, NormalSize: 1}), | ||
initialSize: 2, | ||
normalSize: 1, | ||
}, | ||
} | ||
if 16 != g.n { | ||
t.Fatalf("expected N to be 16: %d", g.n) | ||
|
||
for name, test := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
initialSize := test.initialSize | ||
normalSize := test.normalSize | ||
g := test.ring | ||
|
||
if expected, actual := 0, g.Len(); expected != actual { | ||
t.Fatalf("expected Len to be %d, got %d", expected, actual) | ||
} | ||
if expected, actual := initialSize, g.Cap(); expected != actual { | ||
t.Fatalf("expected Cap to be %d, got %d", expected, actual) | ||
} | ||
|
||
x := 10 | ||
for i := 0; i < x; i++ { | ||
if e, a := i, g.growing.readable; e != a { | ||
t.Fatalf("expected equal, got %#v, %#v", e, a) | ||
} | ||
g.WriteOne(i) | ||
} | ||
|
||
if expected, actual := x, g.Len(); expected != actual { | ||
t.Fatalf("expected Len to be %d, got %d", expected, actual) | ||
} | ||
if expected, actual := 16, g.Cap(); expected != actual { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note to self, this is 16 because is the next power of 2 that can accomodate 10 elements, x is 10 |
||
t.Fatalf("expected Cap to be %d, got %d", expected, actual) | ||
} | ||
|
||
read := 0 | ||
for g.growing.readable > 0 { | ||
v, ok := g.ReadOne() | ||
if !ok { | ||
t.Fatal("expected true") | ||
} | ||
if read != v { | ||
t.Fatalf("expected %#v==%#v", read, v) | ||
} | ||
read++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note to self, we write the sequence of the index for loop, 1 2 3 ... so we check we read exactly those values |
||
} | ||
if x != read { | ||
t.Fatalf("expected to have read %d items: %d", x, read) | ||
} | ||
if expected, actual := 0, g.Len(); expected != actual { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. buffer should be empty |
||
t.Fatalf("expected Len to be %d, got %d", expected, actual) | ||
} | ||
if expected, actual := normalSize, g.Cap(); expected != actual { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but the capacity should be equal to the configured normal size |
||
t.Fatalf("expected Cap to be %d, got %d", expected, actual) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestEmpty(t *testing.T) { | ||
t.Parallel() | ||
g := NewRingGrowing(1) | ||
g := NewTypedRingGrowing[struct{}](RingGrowingOptions{InitialSize: 1}) | ||
_, ok := g.ReadOne() | ||
if ok != false { | ||
t.Fatal("expected false") | ||
} | ||
} | ||
|
||
const ( | ||
spikeSize = 100 // Number of items to write during a spike | ||
normalSize = 64 // Normal capacity for the Ring type after shrinking | ||
initialSize = 16 // Initial capacity for buffers | ||
) | ||
|
||
func BenchmarkTypedRingGrowing_Spike(b *testing.B) { | ||
b.ReportAllocs() | ||
var item int // ensure item is used | ||
|
||
for i := 0; i < b.N; i++ { | ||
buffer := NewTypedRingGrowing[int](RingGrowingOptions{InitialSize: initialSize}) | ||
|
||
for j := 0; j < spikeSize; j++ { | ||
buffer.WriteOne(j) | ||
} | ||
|
||
for buffer.Len() > 0 { | ||
item, _ = buffer.ReadOne() | ||
} | ||
} | ||
_ = item // use item | ||
} | ||
|
||
func BenchmarkRing_Spike_And_Shrink(b *testing.B) { | ||
b.ReportAllocs() | ||
var item int // ensure item is used | ||
|
||
for i := 0; i < b.N; i++ { | ||
// Create a new buffer for each benchmark iteration | ||
buffer := NewRing[int](RingOptions{ | ||
InitialSize: initialSize, | ||
NormalSize: normalSize, | ||
}) | ||
|
||
for j := 0; j < spikeSize; j++ { | ||
buffer.WriteOne(j) | ||
} | ||
|
||
for buffer.Len() > 0 { | ||
item, _ = buffer.ReadOne() | ||
} | ||
} | ||
_ = item // use item | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seem to break compatibility https://go-review.googlesource.com/c/tools/+/618215/3/internal/apidiff/testdata/tests.go#535
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our case we don't have
~
on the type parameter, so maybe it's not a problem? https://go.dev/play/p/cTywdiHoQp_l this works fine (removed~
).A similar change was made to gRPC and it seems nothing broke: https://github.com/grpc/grpc-go/pull/7057/files#diff-c5003637b707b222097960cf01b1d09d77126e39ff0073bff748bbc84951e6cfR74.
Disclaimer: I'm not an expert on Go generics by any means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
me neither, just why I'm broadcasting :) I feel that since this is only used in one place and I already tested that there is no need to adapt the code after vendor with this change, we can override , @dims what do you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aojea @ash2k as long as you can confirm that just updating the library does not break existing code, we can land this. looks like you both have tried it already. So we should be good to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did replace the vendor in k/k with this branch and binaries compiled just fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dims I did one more test with the latest k/k master and with this branch updated to the master branch here and things still compile/relevant tests still pass. Could you PTAL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!