Skip to content

Commit d1abf47

Browse files
authored
Merge pull request #708 from clusterpedia-io/memory_v2
add memory storage layer v2
2 parents 7143cb0 + 44280d6 commit d1abf47

File tree

8 files changed

+1096
-0
lines changed

8 files changed

+1096
-0
lines changed

.golangci.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,8 @@ linters-settings:
3535
- "-SA1019" # disable the rule SA1019(Using a deprecated function, variable, constant or field)
3636
output:
3737
sort-results: true
38+
39+
issues:
40+
exclude-rules:
41+
- path: pkg/storage/memorystorage/v2/
42+
linters: unused # memory v2 is in alpha and tolerates unused functions or fields
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Memory Storage Layer v2 (Alpha)
2+
Due to significant updates and modifications required for the memory storage layer, version 2 (v2) has been introduced. Unless otherwise necessary, memory v1 will only be supported in Clusterpedia 0.x.
3+
4+
⚠️ The current memory v2 is in the alpha stage, has not undergone rigorous testing, and the foundational storage layer functionalities will be gradually improved and implemented.
5+
6+
This version draws inspiration from the [apiserver/storage/cacher](https://github.com/kubernetes/kubernetes/tree/master/staging/src/k8s.io/apiserver/pkg/storage/cacher) library but does not necessarily follow all its design principles.
7+
8+
### Major Changes Compared to v1
9+
#### ResourceVersion Format Changes
10+
In v1, the resource version used a base64 encoded JSON format to merge the resource versions of each cluster into the resource's resource version field.
11+
12+
In v2, the resource version format is `<prefix>.<increase int>.<original resource version>`:
13+
* An incrementing integer is used to represent the sequential order of resources, for version operations during List and Watch.
14+
* The original resource version is retained.
15+
* The prefix is used to identify the validity of the incrementing integer when requests switch between instances.
16+
17+
For Watch requests, a JSON formatted `ListOptions.ResourceVersion = {"<cluster>": "<resource version>"}` is supported to maintain continuity of Watch requests when switching instances.
18+
> The clusterpedia version of Informer is required to replace the k8s.io/client-go Informer.
19+
20+
#### Using a Dedicated Resource Synchro
21+
Due to the unique nature of the memory storage layer, it is unnecessary to use the default resource synchronizer of ClusterSynchro to maintain the informer store.
22+
23+
Memory v2 will directly use the native k8s informer as the resource synchronizer. Memory v2 will act as the Store for the k8s Informer, saving data directly into storage, avoiding intermediate operations and memory usage.
24+
25+
#### Supporting Dual Data Source Updates
26+
In addition to the resource synchronizer saving resources in memory v2, external active additions, deletions, and modifications of memory v2 resources are also supported.
27+
28+
This ensures consistency of requests when supporting write operations at the apiserver layer through dual-write operations.
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package memorystorage
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"k8s.io/apimachinery/pkg/fields"
8+
"k8s.io/apimachinery/pkg/labels"
9+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
10+
"k8s.io/apimachinery/pkg/watch"
11+
)
12+
13+
// cacheWatcher implements watch.Interface
14+
type cacheWatcher struct {
15+
input chan *watchCacheEvent
16+
result chan watch.Event
17+
18+
filter filterWithAttrsFunc
19+
20+
stopped bool
21+
done chan struct{}
22+
forget func()
23+
}
24+
25+
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
26+
27+
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc) *cacheWatcher {
28+
return &cacheWatcher{
29+
input: make(chan *watchCacheEvent, chanSize),
30+
result: make(chan watch.Event, chanSize),
31+
done: make(chan struct{}),
32+
filter: filter,
33+
forget: func() {},
34+
stopped: false,
35+
}
36+
}
37+
38+
// ResultChan implements watch.Interface.
39+
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
40+
return c.result
41+
}
42+
43+
// Stop implements watch.Interface.
44+
func (c *cacheWatcher) Stop() {
45+
c.forget()
46+
}
47+
48+
func (c *cacheWatcher) stopLocked() {
49+
if !c.stopped {
50+
c.stopped = true
51+
close(c.done)
52+
close(c.input)
53+
}
54+
}
55+
56+
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
57+
select {
58+
case c.input <- event:
59+
return true
60+
default:
61+
return false
62+
}
63+
}
64+
65+
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
66+
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
67+
// Try to send the event immediately, without blocking.
68+
if c.nonblockingAdd(event) {
69+
return true
70+
}
71+
72+
closeFunc := func() {
73+
c.forget()
74+
}
75+
76+
if timer == nil {
77+
closeFunc()
78+
return false
79+
}
80+
81+
select {
82+
case c.input <- event:
83+
return true
84+
case <-timer.C:
85+
closeFunc()
86+
return false
87+
}
88+
}
89+
90+
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
91+
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
92+
var oldObjPasses bool
93+
if event.PrevObject != nil {
94+
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
95+
}
96+
if !curObjPasses && !oldObjPasses {
97+
return nil
98+
}
99+
100+
switch {
101+
case curObjPasses && !oldObjPasses:
102+
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
103+
case curObjPasses && oldObjPasses:
104+
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
105+
106+
case !curObjPasses && oldObjPasses:
107+
oldObj := event.PrevObject.DeepCopyObject()
108+
return &watch.Event{Type: watch.Deleted, Object: oldObj}
109+
}
110+
return nil
111+
}
112+
113+
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
114+
watchEvent := c.convertToWatchEvent(event)
115+
if watchEvent == nil {
116+
// Watcher is not interested in that object.
117+
return
118+
}
119+
120+
// We need to ensure that if we put event X to the c.result, all
121+
// previous events were already put into it before, no matter whether
122+
// c.done is close or not.
123+
// Thus we cannot simply select from c.done and c.result and this
124+
// would give us non-determinism.
125+
// At the same time, we don't want to block infinitely on putting
126+
// to c.result, when c.done is already closed.
127+
128+
// This ensures that with c.done already close, we at most once go
129+
// into the next select after this. With that, no matter which
130+
// statement we choose there, we will deliver only consecutive
131+
// events.
132+
select {
133+
case <-c.done:
134+
return
135+
default:
136+
}
137+
138+
select {
139+
case c.result <- *watchEvent:
140+
case <-c.done:
141+
}
142+
}
143+
144+
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, indexRV uint64) {
145+
defer utilruntime.HandleCrash()
146+
147+
defer close(c.result)
148+
defer c.Stop()
149+
150+
for {
151+
event, err := cacheInterval.Next()
152+
if err != nil {
153+
return
154+
}
155+
if event == nil {
156+
break
157+
}
158+
c.sendWatchCacheEvent(event)
159+
160+
if event.IndexRV > indexRV {
161+
indexRV = event.IndexRV
162+
}
163+
}
164+
165+
for {
166+
select {
167+
case event, ok := <-c.input:
168+
if !ok {
169+
return
170+
}
171+
if event.IndexRV > indexRV {
172+
c.sendWatchCacheEvent(event)
173+
}
174+
case <-ctx.Done():
175+
return
176+
}
177+
}
178+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package memorystorage
2+
3+
import (
4+
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
5+
)
6+
7+
const (
8+
StorageName = "memory.v2/alpha"
9+
)
10+
11+
func init() {
12+
storage.RegisterStorageFactoryFunc(StorageName, NewStorageFactory)
13+
}
14+
15+
func NewStorageFactory(_ string) (storage.StorageFactory, error) {
16+
storageFactory := &StorageFactory{}
17+
return storageFactory, nil
18+
}

0 commit comments

Comments
 (0)