Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use adaptive set in named port index #10027

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func New[ItemID comparable, Item Labeled](nameOfTrackedItems string) *LabelNameV
}

type values[ItemID comparable] struct {
m map[string]set.Set[ItemID]
m map[string]*set.Adaptive[ItemID]
count int
}

Expand All @@ -75,13 +75,13 @@ func (idx *LabelNameValueIndex[ItemID, Item]) Add(id ItemID, item Item) {
vals, ok := idx.labelNameToValueToIDs[k]
if !ok {
vals = values[ItemID]{
m: map[string]set.Set[ItemID]{},
m: map[string]*set.Adaptive[ItemID]{},
}
idx.labelNameToValueToIDs[k] = vals
}
setOfIDs := vals.m[v]
if setOfIDs == nil {
setOfIDs = set.New[ItemID]()
setOfIDs = set.NewAdaptive[ItemID]()
vals.m[v] = setOfIDs
}
setOfIDs.Add(id)
Expand Down
19 changes: 6 additions & 13 deletions felix/labelindex/named_port_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,15 @@ type endpointData struct {
ports []model.EndpointPort
parents []*npParentData

cachedMatchingIPSetIDs set.Set[string] /* or, as an optimization, nil if there are none */
cachedMatchingIPSetIDs set.Adaptive[string]
}

func (d *endpointData) AddMatchingIPSetID(id string) {
if d.cachedMatchingIPSetIDs == nil {
d.cachedMatchingIPSetIDs = set.New[string]()
}
d.cachedMatchingIPSetIDs.Add(id)
}

func (d *endpointData) RemoveMatchingIPSetID(id string) {
if d.cachedMatchingIPSetIDs == nil {
return
}
d.cachedMatchingIPSetIDs.Discard(id)
if d.cachedMatchingIPSetIDs.Len() == 0 {
d.cachedMatchingIPSetIDs = nil
}
}

func (d *endpointData) HasParent(parent *npParentData) bool {
Expand Down Expand Up @@ -717,7 +708,7 @@ func (idx *SelectorAndNamedPortIndex) scanEndpointAgainstIPSets(
) {
// Remove any previous match from the endpoint's cache. We'll re-add it
// below if the match is still correct.
epData.cachedMatchingIPSetIDs = nil
epData.cachedMatchingIPSetIDs.Clear()

// Iterate over potential new matches and incref any members that
// that produces. (This may temporarily over count.)
Expand Down Expand Up @@ -777,7 +768,9 @@ func (idx *SelectorAndNamedPortIndex) DeleteEndpoint(id any) {
return
}

log.WithField("oldContrib", oldEndpointData.cachedMatchingIPSetIDs).Debug("Old matching IP sets")
if log.IsLevelEnabled(log.DebugLevel) {
log.WithField("oldContrib", oldEndpointData.cachedMatchingIPSetIDs.String()).Debug("Old matching IP sets")
}
oldIPSetContributions := idx.RecalcCachedContributions(oldEndpointData)
for ipSetID, contributions := range oldIPSetContributions {
// Decref all the old members. If they hit 0 references, then the member has been
Expand Down Expand Up @@ -887,7 +880,7 @@ func (idx *SelectorAndNamedPortIndex) CalculateEndpointContribution(d *endpointD
// RecalcCachedContributions uses the cached set of matching IP set IDs in the endpoint
// struct to quickly recalculate the endpoint's contribution to all IP sets.
func (idx *SelectorAndNamedPortIndex) RecalcCachedContributions(epData *endpointData) map[string][]IPSetMember {
if epData.cachedMatchingIPSetIDs == nil {
if epData.cachedMatchingIPSetIDs.Len() == 0 {
return nil
}
contrib := map[string][]IPSetMember{}
Expand Down
282 changes: 282 additions & 0 deletions libcalico-go/lib/set/adaptive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
package set

import (
"unsafe"
)

const (
adaptiveSetArrayLimit = 16
sizeInMap = 0xff
)

// Adaptive is a set implementation that uses different underlying data
// structures depending on the size of the set. For sets that usually empty
// or have only one or two elements, it is more than twice as fast and it uses
// ~10x less memory. It gets progressively slower as the number of elements
// increases, but at adaptiveSetArrayLimit it switches to a map-based
// implementation like set.Typed (with slight overhead relative to set.Typed).
//
// The zero value of Adaptive is an empty set; it should not be copied after
// first use.
type Adaptive[T comparable] struct {
_ noCopy // Prevent copying of the set.

// p holds different types depending on the size of the set.
// if size == 0, p is nil.
// if size == 1, p is a pointer to the single element of the set.
// if size is in the range [2, adaptiveSetArrayLimit], p is a pointer to an array of size elements.
// if size > adaptiveSetArrayLimit, p is a pointer to a map[T]v
p unsafe.Pointer

// size is either the number of elements in the set, or sizeInMap if the set is backed by a map.
size uint8
}

type noCopy struct{}

func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}

func NewAdaptive[T comparable]() *Adaptive[T] {
return &Adaptive[T]{}
}

func (a *Adaptive[T]) Len() int {
if a.size == sizeInMap {
return len(*(*map[T]v)(a.p))
}
return int(a.size)
}

func (a *Adaptive[T]) Add(item T) {
switch a.size {
case 0:
a.p = unsafe.Pointer(&item)
a.size = 1
case 1:
theOne := (*T)(a.p)
if *theOne == item {
// The element is already in the set.
return
}
// Element is different from the one already in the set.
// Need to upgrade to an array.
arr := [2]T{*theOne, item}
a.p = unsafe.Pointer(&arr[0])
a.size = 2
case sizeInMap:
m := *(*map[T]v)(a.p)
m[item] = emptyValue
default:
tPtr := (*T)(a.p)
tSlice := unsafe.Slice(tPtr, a.size)
for _, t := range tSlice {
if t == item {
// The element is already in the set.
return
}
}
if a.size < adaptiveSetArrayLimit {
// Still allowed to grow the slice.
s2 := make([]T, a.size+1)
copy(s2, tSlice)
s2[a.size] = item
a.p = unsafe.Pointer(&s2[0])
a.size++
return
}
// Need to upgrade to a map.
m := make(map[T]v, a.size+1)
for _, t := range tSlice {
m[t] = emptyValue
}
m[item] = emptyValue
a.p = unsafe.Pointer(&m)
a.size = sizeInMap
}
}

func (a *Adaptive[T]) AddAll(itemArray []T) {
for _, v := range itemArray {
a.Add(v)
}
}

func (a *Adaptive[T]) AddSet(other Set[T]) {
other.Iter(func(item T) error {
a.Add(item)
return nil
})
}

func (a *Adaptive[T]) Discard(item T) {
switch a.size {
case 0:
return
case 1:
theOne := (*T)(a.p)
if *theOne == item {
a.p = nil
a.size = 0
}
case 2:
tSlice := (*[2]T)(a.p)[:]
if tSlice[0] == item {
a.p = unsafe.Pointer(&tSlice[1])
a.size = 1
return
}
if tSlice[1] == item {
a.p = unsafe.Pointer(&tSlice[0])
a.size = 1
return
}
case sizeInMap:
m := *(*map[T]v)(a.p)
delete(m, item)
if len(m) <= adaptiveSetArrayLimit {
// Downgrade to an array.
s := make([]T, 0, len(m))
for t := range m {
s = append(s, t)
}
a.p = unsafe.Pointer(&s[0])
a.size = uint8(len(m))
}
default:
tPtr := (*T)(a.p)
tSlice := unsafe.Slice(tPtr, a.size)
updated := make([]T, 0, a.size-1)
for _, t := range tSlice {
if t == item {
continue
}
if len(updated) == int(a.size-1) {
return
}
updated = append(updated, t)
}
a.size--
a.p = unsafe.Pointer(&updated[0])
}
}

func (a *Adaptive[T]) Clear() {
a.size = 0
a.p = nil
}

func (a *Adaptive[T]) Contains(t T) bool {
if a.size == 0 {
return false
}
if a.size == 1 {
return *(*T)(a.p) == t
}
if a.size <= adaptiveSetArrayLimit {
tSlice := unsafe.Slice((*T)(a.p), a.size)
for _, v := range tSlice {
if v == t {
return true
}
}
return false
}
m := *(*map[T]v)(a.p)
_, present := m[t]
return present
}

func (a *Adaptive[T]) Iter(f func(item T) error) {
if a.size == 0 {
return
}
if a.size == 1 {
err := f(*(*T)(a.p))
if err == StopIteration {
return
}
if err == RemoveItem {
a.size = 0
a.p = nil
return
}
return
}
if a.size <= adaptiveSetArrayLimit {
tSlice := unsafe.Slice((*T)(a.p), a.size)
for _, v := range tSlice {
err := f(v)
if err == StopIteration {
return
}
if err == RemoveItem {
a.Discard(v)
}
}
return
}
m := *(*map[T]v)(a.p)
for v := range m {
err := f(v)
if err == StopIteration {
return
}
if err == RemoveItem {
a.Discard(v)
}
}
}

func (a *Adaptive[T]) Copy() Set[T] {
other := NewAdaptive[T]()
a.Iter(func(item T) error {
other.Add(item)
return nil
})
return other
}

func (a *Adaptive[T]) Equals(s Set[T]) bool {
if a.Len() != s.Len() {
return false
}
equal := true
a.Iter(func(item T) error {
if !s.Contains(item) {
equal = false
return StopIteration
}
return nil
})
return equal
}

func (a *Adaptive[T]) ContainsAll(s Set[T]) bool {
seenAll := true
s.Iter(func(item T) error {
if !a.Contains(item) {
seenAll = false
return StopIteration
}
return nil
})
return seenAll
}

func (a *Adaptive[T]) Slice() []T {
s := make([]T, 0, a.size)
a.Iter(func(item T) error {
s = append(s, item)
return nil
})
return s
}

func (a *Adaptive[T]) String() string {
s := New[T]()
s.AddSet(a)
return s.String()
}

var _ Set[any] = &Adaptive[any]{}
Loading