Skip to content

Commit f819cd9

Browse files
committed
internalstorage: add methods for getting resource events
Signed-off-by: Iceber Gu <[email protected]>
1 parent 899b3a2 commit f819cd9

File tree

3 files changed

+194
-27
lines changed

3 files changed

+194
-27
lines changed

pkg/storage/internalstorage/resource_storage.go

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"database/sql"
7+
"encoding/json"
78
"errors"
89
"fmt"
910
"reflect"
@@ -106,30 +107,6 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
106107
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
107108
}
108109

109-
var codec = scheme.LegacyResourceCodecs.LegacyCodec(corev1.SchemeGroupVersion)
110-
111-
func (s *ResourceStorage) RecordEvent(ctx context.Context, cluster string, event *corev1.Event) error {
112-
if event.InvolvedObject.UID == "" {
113-
return errors.New("invalid event: involedObject.UID is empty")
114-
}
115-
116-
var buffer bytes.Buffer
117-
if err := codec.Encode(event, &buffer); err != nil {
118-
return err
119-
}
120-
key, _ := cache.MetaNamespaceKeyFunc(event)
121-
122-
if err := s.db.WithContext(ctx).Model(&Resource{}).Where(
123-
map[string]interface{}{"cluster": cluster, "uid": event.InvolvedObject.UID},
124-
).UpdateColumns(map[string]interface{}{
125-
"events": JSONUpdate("events", string(event.UID), buffer.Bytes()),
126-
"event_resource_versions": JSONUpdate("event_resource_versions", key, []byte(event.ResourceVersion)),
127-
}).Error; err != nil {
128-
return InterpretResourceDBError(cluster, "", err)
129-
}
130-
return nil
131-
}
132-
133110
func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtime.Object) error {
134111
metaobj, err := meta.Accessor(obj)
135112
if err != nil {
@@ -224,9 +201,16 @@ func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name stri
224201
}
225202

226203
func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *internal.ListOptions) (int64, *int64, *gorm.DB, ObjectList, error) {
227-
var result ObjectList = &BytesList{}
228-
if opts.OnlyMetadata {
204+
var result ObjectList
205+
switch {
206+
case opts.OnlyMetadata && opts.WithEvents:
207+
result = &ResourceMetadataWithEventsList{}
208+
case opts.OnlyMetadata:
229209
result = &ResourceMetadataList{}
210+
case opts.WithEvents:
211+
result = &BytesWithEventsList{}
212+
default:
213+
result = &BytesList{}
230214
}
231215

232216
db := s.db.WithContext(ctx)
@@ -321,6 +305,19 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
321305
if err != nil {
322306
return err
323307
}
308+
if events := object.GetEvents(); events != nil {
309+
if m, err := meta.Accessor(obj); err == nil {
310+
annos := m.GetAnnotations()
311+
if annos == nil {
312+
annos = make(map[string]string, 1)
313+
}
314+
data, err := json.Marshal(events)
315+
if err == nil {
316+
annos[internal.ShadowAnnotationEvents] = string(data)
317+
m.SetAnnotations(annos)
318+
}
319+
}
320+
}
324321
slice.Index(i).Set(reflect.ValueOf(obj).Elem())
325322
}
326323
v.Set(slice)
@@ -331,6 +328,43 @@ func (s *ResourceStorage) Watch(_ context.Context, _ *internal.ListOptions) (wat
331328
return nil, apierrors.NewMethodNotSupported(s.groupResource, "watch")
332329
}
333330

331+
var codec = scheme.LegacyResourceCodecs.LegacyCodec(corev1.SchemeGroupVersion)
332+
333+
func (s *ResourceStorage) RecordEvent(ctx context.Context, cluster string, event *corev1.Event) error {
334+
if event.InvolvedObject.UID == "" {
335+
return errors.New("invalid event: involedObject.UID is empty")
336+
}
337+
338+
var buffer bytes.Buffer
339+
if err := codec.Encode(event, &buffer); err != nil {
340+
return err
341+
}
342+
key, _ := cache.MetaNamespaceKeyFunc(event)
343+
344+
if err := s.db.WithContext(ctx).Model(&Resource{}).Where(
345+
map[string]interface{}{"cluster": cluster, "uid": event.InvolvedObject.UID},
346+
).UpdateColumns(map[string]interface{}{
347+
"events": JSONUpdate("events", string(event.UID), buffer.Bytes()),
348+
"event_resource_versions": JSONUpdate("event_resource_versions", key, []byte(event.ResourceVersion)),
349+
}).Error; err != nil {
350+
return InterpretResourceDBError(cluster, "", err)
351+
}
352+
return nil
353+
}
354+
355+
func (s *ResourceStorage) GetResourceEvents(ctx context.Context, cluster, namespace, name string) ([]*corev1.Event, error) {
356+
var data []EventsBytes
357+
358+
result := s.db.WithContext(ctx).Model(&Resource{}).Select("events").Where(s.resourceKeyMap(cluster, namespace, name)).First(&data)
359+
if result.Error != nil {
360+
return nil, InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
361+
}
362+
if len(data) == 0 {
363+
return nil, nil
364+
}
365+
return data[0].Decode()
366+
}
367+
334368
func applyListOptionsToResourceQuery(db *gorm.DB, query *gorm.DB, opts *internal.ListOptions) (int64, *int64, *gorm.DB, error) {
335369
applyFn := func(query *gorm.DB, opts *internal.ListOptions) (*gorm.DB, error) {
336370
query, err := applyOwnerToResourceQuery(db, query, opts)

pkg/storage/internalstorage/types.go

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,26 @@ package internalstorage
33
import (
44
"database/sql"
55
"database/sql/driver"
6+
"encoding/json"
67
"errors"
78
"reflect"
89
"time"
910

1011
"gorm.io/datatypes"
1112
"gorm.io/gorm"
13+
corev1 "k8s.io/api/core/v1"
1214
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1315
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1416
"k8s.io/apimachinery/pkg/runtime"
1517
"k8s.io/apimachinery/pkg/runtime/schema"
1618
"k8s.io/apimachinery/pkg/types"
17-
"k8s.io/apimachinery/pkg/util/json"
1819
)
1920

2021
type Object interface {
2122
GetResourceType() ResourceType
2223
ConvertToUnstructured() (*unstructured.Unstructured, error)
2324
ConvertTo(codec runtime.Codec, object runtime.Object) (runtime.Object, error)
25+
GetEvents() []*corev1.Event
2426
}
2527

2628
type ObjectList interface {
@@ -102,6 +104,10 @@ func (res Resource) ConvertTo(codec runtime.Codec, object runtime.Object) (runti
102104
return obj, err
103105
}
104106

107+
func (res Resource) GetEvents() []*corev1.Event {
108+
panic("no implemented")
109+
}
110+
105111
type ResourceMetadata struct {
106112
ResourceType `gorm:"embedded"`
107113

@@ -159,6 +165,10 @@ func (data ResourceMetadata) GetResourceType() ResourceType {
159165
return data.ResourceType
160166
}
161167

168+
func (data ResourceMetadata) GetEvents() []*corev1.Event {
169+
return nil
170+
}
171+
162172
type Bytes datatypes.JSON
163173

164174
func (bytes *Bytes) Scan(data any) error {
@@ -186,6 +196,10 @@ func (bytes Bytes) GetResourceType() ResourceType {
186196
return ResourceType{}
187197
}
188198

199+
func (bytes Bytes) GetEvents() []*corev1.Event {
200+
return nil
201+
}
202+
189203
type ResourceList []Resource
190204

191205
func (list *ResourceList) From(db *gorm.DB) error {
@@ -248,3 +262,120 @@ func (list BytesList) Items() []Object {
248262
}
249263
return objects
250264
}
265+
266+
type EventsBytes Bytes
267+
268+
func (bytes *EventsBytes) Scan(data any) error {
269+
return (*datatypes.JSON)(bytes).Scan(data)
270+
}
271+
272+
func (bytes EventsBytes) Value() (driver.Value, error) {
273+
return (datatypes.JSON)(bytes).Value()
274+
}
275+
276+
func (bytes EventsBytes) Decode() ([]*corev1.Event, error) {
277+
var objects map[string]json.RawMessage
278+
if err := json.Unmarshal(bytes, &objects); err != nil {
279+
return nil, err
280+
}
281+
282+
events := make([]*corev1.Event, 0, len(objects))
283+
for _, obj := range objects {
284+
event := &corev1.Event{}
285+
if _, _, err := codec.Decode([]byte(obj), nil, event); err != nil {
286+
return nil, err
287+
}
288+
events = append(events, event)
289+
}
290+
return events, nil
291+
}
292+
293+
type ResourceMetadataWithEvents struct {
294+
ResourceMetadata `gorm:"embedded"`
295+
296+
Events EventsBytes
297+
}
298+
299+
func (bytes ResourceMetadataWithEvents) ConvertToUnstructured() (*unstructured.Unstructured, error) {
300+
return bytes.ResourceMetadata.ConvertToUnstructured()
301+
}
302+
303+
func (bytes ResourceMetadataWithEvents) ConvertTo(codec runtime.Codec, object runtime.Object) (runtime.Object, error) {
304+
return bytes.ResourceMetadata.ConvertTo(codec, object)
305+
}
306+
307+
func (bytes ResourceMetadataWithEvents) GetResourceType() ResourceType {
308+
return bytes.ResourceMetadata.GetResourceType()
309+
}
310+
311+
func (bytes ResourceMetadataWithEvents) GetEvents() []*corev1.Event {
312+
events, _ := bytes.Events.Decode()
313+
return events
314+
}
315+
316+
type ResourceMetadataWithEventsList []ResourceMetadata
317+
318+
func (list *ResourceMetadataWithEventsList) From(db *gorm.DB) error {
319+
switch db.Dialector.Name() {
320+
case "sqlite", "sqlite3", "mysql":
321+
db = db.Select("`group`, version, resource, kind, object->>'$.metadata' as metadata, events")
322+
case "postgres":
323+
db = db.Select(`"group", version, resource, kind, object->>'metadata' as metadata, events`)
324+
default:
325+
panic("storage: only support sqlite3, mysql or postgres")
326+
}
327+
metadatas := []ResourceMetadata{}
328+
if result := db.Find(&metadatas); result.Error != nil {
329+
return result.Error
330+
}
331+
*list = metadatas
332+
return nil
333+
}
334+
335+
func (list ResourceMetadataWithEventsList) Items() []Object {
336+
objects := make([]Object, 0, len(list))
337+
for _, object := range list {
338+
objects = append(objects, object)
339+
}
340+
return objects
341+
}
342+
343+
type BytesWithEvents struct {
344+
Object Bytes
345+
Events EventsBytes
346+
}
347+
348+
func (bytes BytesWithEvents) ConvertToUnstructured() (*unstructured.Unstructured, error) {
349+
return bytes.Object.ConvertToUnstructured()
350+
}
351+
352+
func (bytes BytesWithEvents) ConvertTo(codec runtime.Codec, object runtime.Object) (runtime.Object, error) {
353+
obj, _, err := codec.Decode(bytes.Object, nil, object)
354+
return obj, err
355+
}
356+
357+
func (bytes BytesWithEvents) GetResourceType() ResourceType {
358+
return bytes.Object.GetResourceType()
359+
}
360+
361+
func (bytes BytesWithEvents) GetEvents() []*corev1.Event {
362+
events, _ := bytes.Events.Decode()
363+
return events
364+
}
365+
366+
type BytesWithEventsList []BytesWithEvents
367+
368+
func (list *BytesWithEventsList) From(db *gorm.DB) error {
369+
if result := db.Select("object", "events").Find(list); result.Error != nil {
370+
return result.Error
371+
}
372+
return nil
373+
}
374+
375+
func (list BytesWithEventsList) Items() []Object {
376+
objects := make([]Object, 0, len(list))
377+
for _, object := range list {
378+
objects = append(objects, object)
379+
}
380+
return objects
381+
}

staging/src/github.com/clusterpedia-io/api/clusterpedia/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636

3737
ShadowAnnotationClusterName = "shadow.clusterpedia.io/cluster-name"
3838
ShadowAnnotationGroupVersionResource = "shadow.clusterpedia.io/gvr"
39+
ShadowAnnotationEvents = "shadow.clusterpedia.io/events"
3940
)
4041

4142
type OrderBy struct {
@@ -61,6 +62,7 @@ type ListOptions struct {
6162
Since *metav1.Time
6263
Before *metav1.Time
6364

65+
WithEvents bool
6466
WithContinue *bool
6567
WithRemainingCount *bool
6668

0 commit comments

Comments
 (0)