Skip to content

Commit 723c0d2

Browse files
author
calvin
committed
support split table for internal storage
Signed-off-by: calvin <[email protected]>
1 parent 29c0306 commit 723c0d2

File tree

6 files changed

+170
-91
lines changed

6 files changed

+170
-91
lines changed

pkg/storage/internalstorage/config.go

+5-25
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"gopkg.in/natefinch/lumberjack.v2"
1616
"gorm.io/gorm/logger"
1717
"k8s.io/klog/v2"
18-
19-
clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
2018
)
2119

2220
const (
@@ -29,9 +27,9 @@ const (
2927
type DivisionPolicy string
3028

3129
const (
32-
DivisionPolicyNone DivisionPolicy = "None"
33-
DivisionPolicyGroupResource DivisionPolicy = "GroupResource"
34-
DivisionPolicyCustom DivisionPolicy = "Custom"
30+
DivisionPolicyNone DivisionPolicy = "None"
31+
DivisionPolicyGroupVersionResource DivisionPolicy = "GVR"
32+
DivisionPolicyCustom DivisionPolicy = "Custom"
3533
)
3634

3735
type Config struct {
@@ -58,30 +56,12 @@ type Config struct {
5856

5957
Params map[string]string `yaml:"params"`
6058

61-
AutoMigration *bool `yaml:"autoMigration"` // If set to false, no tables will be created
62-
DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"`
63-
Mapper []ResourceMapper `yaml:"mapper"` // Only DivisionPolicy is DivisionPolicyCustom it need to specify the mapping between resource and table
59+
SkipAutoMigration bool `yaml:"skipAutoMigration"` // If set to false, no tables will be created
60+
DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"`
6461

6562
Log *LogConfig `yaml:"log"`
6663
}
6764

68-
type ResourceMapper struct {
69-
Table *Table `yaml:"table"`
70-
Resources []clusterv1alpha2.ClusterGroupResources `yaml:"resources"`
71-
}
72-
73-
type Table struct {
74-
Name string `yaml:"name"`
75-
ExtraFields []ExtraField `yaml:"extraFields"`
76-
}
77-
78-
type ExtraField struct {
79-
Name string `yaml:"name"`
80-
PlainPath string `yaml:"plainPath"`
81-
Type string `yaml:"type"`
82-
Index string `yaml:"index"`
83-
}
84-
8565
type LogConfig struct {
8666
Stdout bool `yaml:"stdout"`
8767
Level string `yaml:"level"`

pkg/storage/internalstorage/register.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,19 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
9393
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
9494
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)
9595

96-
return &StorageFactory{db}, nil
96+
if !cfg.SkipAutoMigration && (cfg.DivisionPolicy == DivisionPolicyNone || cfg.DivisionPolicy == "") {
97+
if exist := db.Migrator().HasTable("resources"); !exist {
98+
if err := db.AutoMigrate(&Resource{}); err != nil {
99+
return nil, err
100+
}
101+
}
102+
}
103+
104+
return &StorageFactory{
105+
db: db,
106+
SkipAutoMigration: cfg.SkipAutoMigration,
107+
DivisionPolicy: cfg.DivisionPolicy,
108+
}, nil
97109
}
98110

99111
func newLogger(cfg *Config) (logger.Interface, error) {

pkg/storage/internalstorage/resource_storage.go

+27-23
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ import (
2727
)
2828

2929
type ResourceStorage struct {
30-
db *gorm.DB
31-
codec runtime.Codec
32-
30+
db *gorm.DB
31+
codec runtime.Codec
3332
storageGroupResource schema.GroupResource
3433
storageVersion schema.GroupVersion
3534
memoryVersion schema.GroupVersion
@@ -116,14 +115,17 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
116115
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
117116
}
118117

119-
result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
120-
"cluster": cluster,
121-
"group": s.storageGroupResource.Group,
122-
"version": s.storageVersion.Version,
123-
"resource": s.storageGroupResource.Resource,
124-
"namespace": metaobj.GetNamespace(),
125-
"name": metaobj.GetName(),
126-
}).Updates(updatedResource)
118+
result := s.db.WithContext(ctx).
119+
Where(map[string]interface{}{
120+
"cluster": cluster,
121+
"group": s.storageGroupResource.Group,
122+
"version": s.storageVersion.Version,
123+
"resource": s.storageGroupResource.Resource,
124+
"namespace": metaobj.GetNamespace(),
125+
"name": metaobj.GetName(),
126+
}).
127+
Updates(updatedResource)
128+
127129
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
128130
}
129131

@@ -143,8 +145,8 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object,
143145
return &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}, nil
144146
}
145147

146-
func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
147-
return s.db.Model(&Resource{}).Where(map[string]interface{}{
148+
func (s *ResourceStorage) deleteObject(ctx context.Context, cluster, namespace, name string) *gorm.DB {
149+
return s.db.WithContext(ctx).Where(map[string]interface{}{
148150
"cluster": cluster,
149151
"group": s.storageGroupResource.Group,
150152
"version": s.storageVersion.Version,
@@ -160,21 +162,23 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
160162
return err
161163
}
162164

163-
if result := s.deleteObject(cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil {
165+
if result := s.deleteObject(ctx, cluster, metaobj.GetNamespace(), metaobj.GetName()); result.Error != nil {
164166
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
165167
}
166168
return nil
167169
}
168170

169171
func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
170-
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{
171-
"cluster": cluster,
172-
"group": s.storageGroupResource.Group,
173-
"version": s.storageVersion.Version,
174-
"resource": s.storageGroupResource.Resource,
175-
"namespace": namespace,
176-
"name": name,
177-
})
172+
return s.db.WithContext(ctx).
173+
Select("object").
174+
Where(map[string]interface{}{
175+
"cluster": cluster,
176+
"group": s.storageGroupResource.Group,
177+
"version": s.storageVersion.Version,
178+
"resource": s.storageGroupResource.Resource,
179+
"namespace": namespace,
180+
"name": name,
181+
})
178182
}
179183

180184
func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
@@ -199,7 +203,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
199203
result = &ResourceMetadataList{}
200204
}
201205

202-
query := s.db.WithContext(ctx).Model(&Resource{})
206+
query := s.db.WithContext(ctx)
203207
query = query.Where(map[string]interface{}{
204208
"group": s.storageGroupResource.Group,
205209
"version": s.storageVersion.Version,

pkg/storage/internalstorage/resource_storage_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func TestResourceStorage_deleteObject(t *testing.T) {
339339
postgreSQL := postgresDB.Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL(
340340
func(tx *gorm.DB) *gorm.DB {
341341
rs := newTestResourceStorage(tx, test.resource)
342-
return rs.deleteObject(test.cluster, test.namespace, test.resourceName)
342+
return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName)
343343
})
344344

345345
if postgreSQL != test.expected.postgres {
@@ -354,7 +354,7 @@ func TestResourceStorage_deleteObject(t *testing.T) {
354354
mysqlSQL := mysqlDBs[version].Session(&gorm.Session{SkipDefaultTransaction: true}).ToSQL(
355355
func(tx *gorm.DB) *gorm.DB {
356356
rs := newTestResourceStorage(tx, test.resource)
357-
return rs.deleteObject(test.cluster, test.namespace, test.resourceName)
357+
return rs.deleteObject(context.TODO(), test.cluster, test.namespace, test.resourceName)
358358
})
359359

360360
if mysqlSQL != test.expected.mysql {
@@ -465,7 +465,7 @@ func TestResourceStorage_Update(t *testing.T) {
465465

466466
func newTestResourceStorage(db *gorm.DB, storageGVK schema.GroupVersionResource) *ResourceStorage {
467467
return &ResourceStorage{
468-
db: db,
468+
db: db.Table("resources").Model(&Resource{}),
469469
storageGroupResource: storageGVK.GroupResource(),
470470
storageVersion: storageGVK.GroupVersion(),
471471
}

pkg/storage/internalstorage/storage.go

+99-38
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package internalstorage
33
import (
44
"context"
55
"fmt"
6+
"strings"
67

78
"gorm.io/gorm"
89
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -12,30 +13,12 @@ import (
1213
)
1314

1415
type StorageFactory struct {
15-
db *gorm.DB
16-
AutoMigration *bool
17-
DivisionPolicy DivisionPolicy
18-
Mapper []ResourceMapper
16+
db *gorm.DB
17+
SkipAutoMigration bool
18+
DivisionPolicy DivisionPolicy
1919
}
2020

2121
func (s *StorageFactory) AutoMigrate() error {
22-
if s.AutoMigration != nil && *s.AutoMigration {
23-
switch s.DivisionPolicy {
24-
if err := s.db.AutoMigrate(&Resource{}); err != nil {
25-
return err
26-
}
27-
case "", DivisionPolicyNone:
28-
case DivisionPolicyGroupResource:
29-
30-
}
31-
32-
if s.DivisionPolicy == "" || s.DivisionPolicy == DivisionPolicyNone {
33-
if err := s.db.AutoMigrate(&Resource{}); err != nil {
34-
return err
35-
}
36-
}
37-
}
38-
3922
return nil
4023
}
4124

@@ -44,10 +27,37 @@ func (s *StorageFactory) GetSupportedRequestVerbs() []string {
4427
}
4528

4629
func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
47-
return &ResourceStorage{
48-
db: s.db,
49-
codec: config.Codec,
30+
gvr := schema.GroupVersionResource{
31+
Group: config.StorageGroupResource.Group,
32+
Version: config.StorageVersion.Version,
33+
Resource: config.StorageGroupResource.Resource,
34+
}
35+
table := s.tableName(gvr)
36+
37+
var model interface{}
38+
switch s.DivisionPolicy {
39+
case DivisionPolicyGroupVersionResource:
40+
model = &GroupVersionResource{}
41+
if !s.SkipAutoMigration {
42+
if exist := s.db.Migrator().HasTable(table); !exist {
43+
if err := s.db.AutoMigrate(&GroupVersionResource{}); err != nil {
44+
return nil, err
45+
}
46+
47+
if err := s.db.Migrator().RenameTable("group_version_resources", table); err != nil {
48+
if !s.db.Migrator().HasTable(table) {
49+
return nil, err
50+
}
51+
}
52+
}
53+
}
54+
default:
55+
model = &Resource{}
56+
}
5057

58+
return &ResourceStorage{
59+
db: s.db.Table(table).Model(model),
60+
codec: config.Codec,
5161
storageGroupResource: config.StorageGroupResource,
5262
storageVersion: config.StorageVersion,
5363
memoryVersion: config.MemoryVersion,
@@ -64,12 +74,21 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
6474
}
6575

6676
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
77+
tables, err := s.db.Migrator().GetTables()
78+
if err != nil {
79+
return nil, err
80+
}
81+
6782
var resources []Resource
68-
result := s.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
69-
Where(map[string]interface{}{"cluster": cluster}).
70-
Find(&resources)
71-
if result.Error != nil {
72-
return nil, InterpretDBError(cluster, result.Error)
83+
for _, table := range tables {
84+
result := s.db.WithContext(ctx).
85+
Table(table).
86+
Select("group", "version", "resource", "namespace", "name", "resource_version").
87+
Where(map[string]interface{}{"cluster": cluster}).
88+
Find(&resources)
89+
if result.Error != nil {
90+
return nil, InterpretDBError(cluster, result.Error)
91+
}
7392
}
7493

7594
resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
@@ -91,18 +110,41 @@ func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
91110
}
92111

93112
func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
94-
result := s.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
95-
return InterpretDBError(cluster, result.Error)
113+
tables, err := s.db.Migrator().GetTables()
114+
if err != nil {
115+
return err
116+
}
117+
118+
for _, table := range tables {
119+
result := s.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
120+
if result.Error != nil {
121+
return InterpretDBError(cluster, result.Error)
122+
}
123+
}
124+
125+
return nil
96126
}
97127

98128
func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
99-
result := s.db.WithContext(ctx).Where(map[string]interface{}{
100-
"cluster": cluster,
101-
"group": gvr.Group,
102-
"version": gvr.Version,
103-
"resource": gvr.Resource,
104-
}).Delete(&Resource{})
105-
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error)
129+
err := s.db.Transaction(func(db *gorm.DB) error {
130+
result := s.db.WithContext(ctx).
131+
Table(s.tableName(gvr)).
132+
Where(map[string]interface{}{
133+
"cluster": cluster,
134+
"group": gvr.Group,
135+
"version": gvr.Version,
136+
"resource": gvr.Resource,
137+
}).
138+
Delete(&Resource{})
139+
140+
if result.Error != nil {
141+
return result.Error
142+
}
143+
144+
return nil
145+
})
146+
147+
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), err)
106148
}
107149

108150
func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) {
@@ -116,3 +158,22 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna
116158
func (s *StorageFactory) PrepareCluster(cluster string) error {
117159
return nil
118160
}
161+
162+
// GenerateTableFor return table name using gvr string
163+
func GenerateTableFor(gvr schema.GroupVersionResource) string {
164+
if gvr.Group == "" {
165+
return fmt.Sprintf("%s_%s", gvr.Version, gvr.Resource)
166+
}
167+
168+
group := strings.ReplaceAll(gvr.Group, ".", "_")
169+
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
170+
}
171+
172+
func (s *StorageFactory) tableName(gvr schema.GroupVersionResource) string {
173+
table := "resources"
174+
if s.DivisionPolicy == DivisionPolicyGroupVersionResource {
175+
table = GenerateTableFor(gvr)
176+
}
177+
178+
return table
179+
}

0 commit comments

Comments
 (0)