Skip to content

Commit 0bb2300

Browse files
committed
record events for resources
Signed-off-by: Iceber Gu <[email protected]>
1 parent dd2f4d7 commit 0bb2300

19 files changed

+332
-55
lines changed

pkg/runtime/informer/event_handler.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package informer
22

3-
import "k8s.io/client-go/tools/cache"
4-
53
type ResourceEventHandler interface {
6-
cache.ResourceEventHandler
4+
OnAdd(obj interface{}, isInInitialList bool)
5+
OnUpdate(oldObj, newObj interface{}, isInInitialList bool)
6+
OnDelete(obj interface{}, isInInitialList bool)
77
OnSync(obj interface{})
88
}
99

@@ -14,13 +14,13 @@ type ResourceEventHandlerFuncs struct {
1414
SyncFunc func(obj interface{})
1515
}
1616

17-
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
17+
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, _ bool) {
1818
if r.AddFunc != nil {
1919
r.AddFunc(obj)
2020
}
2121
}
2222

23-
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
23+
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}, _ bool) {
2424
if r.UpdateFunc != nil {
2525
r.UpdateFunc(oldObj, newObj)
2626
}
@@ -55,21 +55,21 @@ func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}, isIn
5555
older := r.FilterFunc(oldObj)
5656
switch {
5757
case newer && older:
58-
r.Handler.OnUpdate(oldObj, newObj)
58+
r.Handler.OnUpdate(oldObj, newObj, isInInitialList)
5959
case newer && !older:
6060
r.Handler.OnAdd(newObj, isInInitialList)
6161
case !newer && older:
62-
r.Handler.OnDelete(oldObj)
62+
r.Handler.OnDelete(oldObj, isInInitialList)
6363
default:
6464
// do nothing
6565
}
6666
}
6767

68-
func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
68+
func (r FilteringResourceEventHandler) OnDelete(obj interface{}, isInInitialList bool) {
6969
if !r.FilterFunc(obj) {
7070
return
7171
}
72-
r.Handler.OnDelete(obj)
72+
r.Handler.OnDelete(obj, isInInitialList)
7373
}
7474

7575
func (r FilteringResourceEventHandler) OnSync(obj interface{}) {

pkg/runtime/informer/listwatch.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,20 @@ func (f *listerWatcherFactory) ForResourceWithOptions(namespace string, gvr sche
7474
},
7575
}
7676
}
77+
78+
func NewFilteredListerWatcher(lw cache.ListerWatcher, tweakListOptions TweakListOptionsFunc) cache.ListerWatcher {
79+
return &cache.ListWatch{
80+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
81+
if tweakListOptions != nil {
82+
tweakListOptions(&options)
83+
}
84+
return lw.List(options)
85+
},
86+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
87+
if tweakListOptions != nil {
88+
tweakListOptions(&options)
89+
}
90+
return lw.Watch(options)
91+
},
92+
}
93+
}

pkg/runtime/informer/named_controller.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ type controller struct {
6666

6767
reflectorMutex sync.RWMutex
6868
reflector *Reflector
69-
queue cache.Queue
7069
}
7170

7271
func NewNamedController(name string, config *Config) cache.Controller {
@@ -128,10 +127,7 @@ func (c *controller) HasSynced() bool {
128127
c.reflectorMutex.RLock()
129128
defer c.reflectorMutex.RUnlock()
130129

131-
if c.queue == nil {
132-
return false
133-
}
134-
return c.queue.HasSynced() && c.reflector.HasInitializedSynced()
130+
return c.config.Queue.HasSynced() && c.reflector.HasInitializedSynced()
135131
}
136132

137133
func (c *controller) LastSyncResourceVersion() string {

pkg/runtime/informer/reflector.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ type WatchErrorHandler func(r *Reflector, err error)
125125

126126
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
127127
func DefaultWatchErrorHandler(r *Reflector, err error) {
128+
if err == nil {
129+
return
130+
}
128131
switch {
129132
case isExpiredError(err):
130133
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already

pkg/runtime/informer/resourceversion_informer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,12 @@ func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas, isInI
116116
if err := informer.storage.Update(d.Object); err != nil {
117117
return err
118118
}
119-
informer.handler.OnUpdate(nil, d.Object)
119+
informer.handler.OnUpdate(nil, d.Object, isInInitialList)
120120
case cache.Deleted:
121121
if err := informer.storage.Delete(d.Object); err != nil {
122122
return err
123123
}
124-
informer.handler.OnDelete(d.Object)
124+
informer.handler.OnDelete(d.Object, isInInitialList)
125125
}
126126
}
127127
return nil

pkg/storage/internalstorage/resource_storage.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"database/sql"
7+
"errors"
78
"fmt"
89
"reflect"
910
"strconv"
@@ -12,6 +13,7 @@ import (
1213
"go.opentelemetry.io/otel/attribute"
1314
"gorm.io/datatypes"
1415
"gorm.io/gorm"
16+
corev1 "k8s.io/api/core/v1"
1517
apierrors "k8s.io/apimachinery/pkg/api/errors"
1618
"k8s.io/apimachinery/pkg/api/meta"
1719
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -26,6 +28,7 @@ import (
2628
"k8s.io/component-base/tracing"
2729

2830
internal "github.com/clusterpedia-io/api/clusterpedia"
31+
"github.com/clusterpedia-io/clusterpedia/pkg/runtime/scheme"
2932
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
3033
)
3134

@@ -103,6 +106,30 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
103106
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
104107
}
105108

109+
var codec = scheme.LegacyResourceCodecs.LegacyCodec(corev1.SchemeGroupVersion)
110+
111+
func (s *ResourceStorage) RecordEvent(ctx context.Context, cluster string, event *corev1.Event) error {
112+
if event.InvolvedObject.UID == "" {
113+
return errors.New("invalid event: involedObject.UID is empty")
114+
}
115+
116+
var buffer bytes.Buffer
117+
if err := codec.Encode(event, &buffer); err != nil {
118+
return err
119+
}
120+
key, _ := cache.MetaNamespaceKeyFunc(event)
121+
122+
if err := s.db.WithContext(ctx).Model(&Resource{}).Where(
123+
map[string]interface{}{"cluster": cluster, "uid": event.InvolvedObject.UID},
124+
).UpdateColumns(map[string]interface{}{
125+
"events": JSONUpdate("events", string(event.UID), buffer.Bytes()),
126+
"event_resource_versions": JSONUpdate("event_resource_versions", key, []byte(event.ResourceVersion)),
127+
}).Error; err != nil {
128+
return InterpretResourceDBError(cluster, "", err)
129+
}
130+
return nil
131+
}
132+
106133
func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtime.Object) error {
107134
metaobj, err := meta.Accessor(obj)
108135
if err != nil {

pkg/storage/internalstorage/storage.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,35 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
3737
return nil, fmt.Errorf("not support collection resource: %s", cr.Name)
3838
}
3939

40-
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
40+
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]storage.ClusterResourceVersions, error) {
4141
var resources []Resource
42-
result := s.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
42+
result := s.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version", "event_resource_versions").
4343
Where(map[string]interface{}{"cluster": cluster}).
4444
Find(&resources)
4545
if result.Error != nil {
4646
return nil, InterpretDBError(cluster, result.Error)
4747
}
4848

49-
resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
49+
resourceversions := make(map[schema.GroupVersionResource]storage.ClusterResourceVersions)
5050
for _, resource := range resources {
5151
gvr := resource.GroupVersionResource()
52-
versions := resourceversions[gvr]
53-
if versions == nil {
54-
versions = make(map[string]interface{})
52+
versions, ok := resourceversions[gvr]
53+
if !ok {
54+
versions = storage.ClusterResourceVersions{
55+
Resources: make(map[string]interface{}),
56+
Events: make(map[string]interface{}, 0),
57+
}
5558
resourceversions[gvr] = versions
5659
}
5760

5861
key := resource.Name
5962
if resource.Namespace != "" {
6063
key = resource.Namespace + "/" + resource.Name
6164
}
62-
versions[key] = resource.ResourceVersion
65+
versions.Resources[key] = resource.ResourceVersion
66+
for k, v := range resource.EventResourceVersions {
67+
versions.Events[k] = v
68+
}
6369
}
6470
return resourceversions, nil
6571
}

pkg/storage/internalstorage/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ type Resource struct {
6464

6565
Object datatypes.JSON `gorm:"not null"`
6666

67+
Events datatypes.JSONMap `gorm:"not null;default:'{}'"`
68+
EventResourceVersions datatypes.JSONMap `gorm:"not null;default:'{}'"`
69+
6770
CreatedAt time.Time `gorm:"not null"`
6871
SyncedAt time.Time `gorm:"not null;autoUpdateTime"`
6972
DeletedAt sql.NullTime

pkg/storage/memorystorage/memory_resource_storage.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"reflect"
88
"sync"
99

10+
corev1 "k8s.io/api/core/v1"
1011
"k8s.io/apimachinery/pkg/api/meta"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213
"k8s.io/apimachinery/pkg/conversion"
@@ -57,6 +58,10 @@ func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig {
5758
return s.storageConfig
5859
}
5960

61+
func (s *ResourceStorage) RecordEvent(ctx context.Context, cluster string, event *corev1.Event) error {
62+
return nil
63+
}
64+
6065
func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtime.Object) error {
6166
resourceVersion, err := s.CrvSynchro.UpdateClusterResourceVersion(obj, cluster)
6267
if err != nil {

pkg/storage/memorystorage/memory_storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
7272
return nil, nil
7373
}
7474

75-
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
75+
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]storage.ClusterResourceVersions, error) {
7676
return nil, nil
7777
}
7878

pkg/storage/memorystorage/v2/resource_storage.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010
"time"
1111

12+
corev1 "k8s.io/api/core/v1"
1213
"k8s.io/apimachinery/pkg/api/meta"
1314
"k8s.io/apimachinery/pkg/conversion"
1415
"k8s.io/apimachinery/pkg/fields"
@@ -102,6 +103,10 @@ func (s *ResourceStorage) cleanCluster(cluster string) {
102103
// If a cluster removal event is sent, then the client informer needs to be adapted.
103104
}
104105

106+
func (s *ResourceStorage) RecordEvent(ctx context.Context, cluster string, event *corev1.Event) error {
107+
return nil
108+
}
109+
105110
func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig {
106111
return s.storageConfig
107112
}

pkg/storage/memorystorage/v2/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
2727
return nil, nil
2828
}
2929

30-
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
30+
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]storage.ClusterResourceVersions, error) {
3131
return nil, nil
3232
}
3333

pkg/storage/storage.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package storage
33
import (
44
"context"
55

6+
corev1 "k8s.io/api/core/v1"
67
"k8s.io/apimachinery/pkg/runtime"
78
"k8s.io/apimachinery/pkg/runtime/schema"
89
"k8s.io/apimachinery/pkg/watch"
@@ -11,14 +12,19 @@ import (
1112
"github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig"
1213
)
1314

15+
type ClusterResourceVersions struct {
16+
Resources map[string]interface{}
17+
Events map[string]interface{}
18+
}
19+
1420
type StorageFactory interface {
1521
// Currently only supports returning a union of verbs for all resources,
1622
// in the future it may be necessary to return verbs depending on different resources.
1723
GetSupportedRequestVerbs() []string
1824

1925
PrepareCluster(cluster string) error
2026

21-
GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error)
27+
GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]ClusterResourceVersions, error)
2228
GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error)
2329

2430
NewResourceStorage(config *ResourceStorageConfig) (ResourceStorage, error)
@@ -42,6 +48,8 @@ type ResourceStorage interface {
4248

4349
ConvertDeletedObject(obj interface{}) (runtime.Object, error)
4450
Delete(ctx context.Context, cluster string, obj runtime.Object) error
51+
52+
RecordEvent(ctx context.Context, cluster string, event *corev1.Event) error
4553
}
4654

4755
type CollectionResourceStorage interface {

0 commit comments

Comments
 (0)