Skip to content

Commit ead917e

Browse files
author
calvin
committed
support split table for internal storage
Signed-off-by: calvin <[email protected]>
1 parent 29c0306 commit ead917e

File tree

6 files changed

+185
-92
lines changed

6 files changed

+185
-92
lines changed

pkg/storage/internalstorage/config.go

+3-23
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"gopkg.in/natefinch/lumberjack.v2"
1616
"gorm.io/gorm/logger"
1717
"k8s.io/klog/v2"
18-
19-
clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
2018
)
2119

2220
const (
@@ -30,7 +28,7 @@ type DivisionPolicy string
3028

3129
const (
3230
DivisionPolicyNone DivisionPolicy = "None"
33-
DivisionPolicyGroupResource DivisionPolicy = "GroupResource"
31+
DivisionPolicyGroupResource DivisionPolicy = "GVR"
3432
DivisionPolicyCustom DivisionPolicy = "Custom"
3533
)
3634

@@ -58,30 +56,12 @@ type Config struct {
5856

5957
Params map[string]string `yaml:"params"`
6058

61-
AutoMigration *bool `yaml:"autoMigration"` // If set to false, no tables will be created
62-
DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"`
63-
Mapper []ResourceMapper `yaml:"mapper"` // Only DivisionPolicy is DivisionPolicyCustom it need to specify the mapping between resource and table
59+
SkipAutoMigration bool `yaml:"autoMigration"` // If set to false, no tables will be created
60+
DivisionPolicy DivisionPolicy `yaml:"divisionPolicy"`
6461

6562
Log *LogConfig `yaml:"log"`
6663
}
6764

68-
type ResourceMapper struct {
69-
Table *Table `yaml:"table"`
70-
Resources []clusterv1alpha2.ClusterGroupResources `yaml:"resources"`
71-
}
72-
73-
type Table struct {
74-
Name string `yaml:"name"`
75-
ExtraFields []ExtraField `yaml:"extraFields"`
76-
}
77-
78-
type ExtraField struct {
79-
Name string `yaml:"name"`
80-
PlainPath string `yaml:"plainPath"`
81-
Type string `yaml:"type"`
82-
Index string `yaml:"index"`
83-
}
84-
8565
type LogConfig struct {
8666
Stdout bool `yaml:"stdout"`
8767
Level string `yaml:"level"`

pkg/storage/internalstorage/register.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
9393
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
9494
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)
9595

96-
return &StorageFactory{db}, nil
96+
return &StorageFactory{
97+
db: db,
98+
SkipAutoMigration: cfg.SkipAutoMigration,
99+
DivisionPolicy: cfg.DivisionPolicy,
100+
}, nil
97101
}
98102

99103
func newLogger(cfg *Config) (logger.Interface, error) {

pkg/storage/internalstorage/resource_storage.go

+34-28
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ import (
2727
)
2828

2929
type ResourceStorage struct {
30-
db *gorm.DB
31-
codec runtime.Codec
32-
30+
db *gorm.DB
31+
codec runtime.Codec
32+
model interface{}
3333
storageGroupResource schema.GroupResource
3434
storageVersion schema.GroupVersion
3535
memoryVersion schema.GroupVersion
@@ -116,14 +116,18 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
116116
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
117117
}
118118

119-
result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
120-
"cluster": cluster,
121-
"group": s.storageGroupResource.Group,
122-
"version": s.storageVersion.Version,
123-
"resource": s.storageGroupResource.Resource,
124-
"namespace": metaobj.GetNamespace(),
125-
"name": metaobj.GetName(),
126-
}).Updates(updatedResource)
119+
result := s.db.WithContext(ctx).
120+
Model(s.model).
121+
Where(map[string]interface{}{
122+
"cluster": cluster,
123+
"group": s.storageGroupResource.Group,
124+
"version": s.storageVersion.Version,
125+
"resource": s.storageGroupResource.Resource,
126+
"namespace": metaobj.GetNamespace(),
127+
"name": metaobj.GetName(),
128+
}).
129+
Updates(updatedResource)
130+
127131
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
128132
}
129133

@@ -144,14 +148,15 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object,
144148
}
145149

146150
func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
147-
return s.db.Model(&Resource{}).Where(map[string]interface{}{
148-
"cluster": cluster,
149-
"group": s.storageGroupResource.Group,
150-
"version": s.storageVersion.Version,
151-
"resource": s.storageGroupResource.Resource,
152-
"namespace": namespace,
153-
"name": name,
154-
}).Delete(&Resource{})
151+
return s.db.Where(map[string]interface{}{
152+
"cluster": cluster,
153+
"group": s.storageGroupResource.Group,
154+
"version": s.storageVersion.Version,
155+
"resource": s.storageGroupResource.Resource,
156+
"namespace": namespace,
157+
"name": name,
158+
}).
159+
Delete(&Resource{})
155160
}
156161

157162
func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error {
@@ -167,14 +172,15 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
167172
}
168173

169174
func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
170-
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{
171-
"cluster": cluster,
172-
"group": s.storageGroupResource.Group,
173-
"version": s.storageVersion.Version,
174-
"resource": s.storageGroupResource.Resource,
175-
"namespace": namespace,
176-
"name": name,
177-
})
175+
return s.db.WithContext(ctx).Select("object").
176+
Where(map[string]interface{}{
177+
"cluster": cluster,
178+
"group": s.storageGroupResource.Group,
179+
"version": s.storageVersion.Version,
180+
"resource": s.storageGroupResource.Resource,
181+
"namespace": namespace,
182+
"name": name,
183+
})
178184
}
179185

180186
func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
@@ -199,7 +205,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
199205
result = &ResourceMetadataList{}
200206
}
201207

202-
query := s.db.WithContext(ctx).Model(&Resource{})
208+
query := s.db.WithContext(ctx)
203209
query = query.Where(map[string]interface{}{
204210
"group": s.storageGroupResource.Group,
205211
"version": s.storageVersion.Version,

pkg/storage/internalstorage/resource_storage_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,8 @@ func TestResourceStorage_Update(t *testing.T) {
465465

466466
func newTestResourceStorage(db *gorm.DB, storageGVK schema.GroupVersionResource) *ResourceStorage {
467467
return &ResourceStorage{
468-
db: db,
468+
db: db.Table("resources"),
469+
model: &Resource{},
469470
storageGroupResource: storageGVK.GroupResource(),
470471
storageVersion: storageGVK.GroupVersion(),
471472
}

pkg/storage/internalstorage/storage.go

+118-38
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package internalstorage
33
import (
44
"context"
55
"fmt"
6+
"strings"
7+
"sync"
68

79
"gorm.io/gorm"
810
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -12,30 +14,14 @@ import (
1214
)
1315

1416
type StorageFactory struct {
15-
db *gorm.DB
16-
AutoMigration *bool
17-
DivisionPolicy DivisionPolicy
18-
Mapper []ResourceMapper
17+
sync.Mutex
18+
19+
db *gorm.DB
20+
SkipAutoMigration bool
21+
DivisionPolicy DivisionPolicy
1922
}
2023

2124
func (s *StorageFactory) AutoMigrate() error {
22-
if s.AutoMigration != nil && *s.AutoMigration {
23-
switch s.DivisionPolicy {
24-
if err := s.db.AutoMigrate(&Resource{}); err != nil {
25-
return err
26-
}
27-
case "", DivisionPolicyNone:
28-
case DivisionPolicyGroupResource:
29-
30-
}
31-
32-
if s.DivisionPolicy == "" || s.DivisionPolicy == DivisionPolicyNone {
33-
if err := s.db.AutoMigrate(&Resource{}); err != nil {
34-
return err
35-
}
36-
}
37-
}
38-
3925
return nil
4026
}
4127

@@ -44,10 +30,47 @@ func (s *StorageFactory) GetSupportedRequestVerbs() []string {
4430
}
4531

4632
func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
47-
return &ResourceStorage{
48-
db: s.db,
49-
codec: config.Codec,
33+
s.Lock()
34+
defer s.Unlock()
5035

36+
gvr := schema.GroupVersionResource{
37+
Group: config.StorageGroupResource.Group,
38+
Version: config.StorageVersion.Version,
39+
Resource: config.StorageGroupResource.Resource,
40+
}
41+
table := s.tableName(gvr)
42+
43+
var model interface{}
44+
switch s.DivisionPolicy {
45+
case DivisionPolicyGroupResource:
46+
model = &GroupVersionResource{}
47+
if !s.SkipAutoMigration {
48+
if exist := s.db.Migrator().HasTable(table); !exist {
49+
if err := s.db.AutoMigrate(&GroupVersionResource{}); err != nil {
50+
return nil, err
51+
}
52+
53+
err := s.db.Migrator().RenameTable("groupversionresources", table)
54+
if err != nil {
55+
return nil, err
56+
}
57+
}
58+
}
59+
case "", DivisionPolicyNone:
60+
model = &Resource{}
61+
if !s.SkipAutoMigration {
62+
if exist := s.db.Migrator().HasTable(table); !exist {
63+
if err := s.db.AutoMigrate(&Resource{}); err != nil {
64+
return nil, err
65+
}
66+
}
67+
}
68+
}
69+
70+
return &ResourceStorage{
71+
db: s.db.Table(table).Model(model),
72+
model: model,
73+
codec: config.Codec,
5174
storageGroupResource: config.StorageGroupResource,
5275
storageVersion: config.StorageVersion,
5376
memoryVersion: config.MemoryVersion,
@@ -64,12 +87,24 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
6487
}
6588

6689
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
90+
s.Lock()
91+
tables, err := s.db.Migrator().GetTables()
92+
if err != nil {
93+
s.Unlock()
94+
return nil, err
95+
}
96+
s.Unlock()
97+
6798
var resources []Resource
68-
result := s.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
69-
Where(map[string]interface{}{"cluster": cluster}).
70-
Find(&resources)
71-
if result.Error != nil {
72-
return nil, InterpretDBError(cluster, result.Error)
99+
for _, table := range tables {
100+
result := s.db.WithContext(ctx).
101+
Table(table).
102+
Select("group", "version", "resource", "namespace", "name", "resource_version").
103+
Where(map[string]interface{}{"cluster": cluster}).
104+
Find(&resources)
105+
if result.Error != nil {
106+
return nil, InterpretDBError(cluster, result.Error)
107+
}
73108
}
74109

75110
resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
@@ -91,18 +126,44 @@ func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
91126
}
92127

93128
func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
94-
result := s.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
95-
return InterpretDBError(cluster, result.Error)
129+
s.Lock()
130+
tables, err := s.db.Migrator().GetTables()
131+
if err != nil {
132+
s.Unlock()
133+
return err
134+
}
135+
s.Unlock()
136+
137+
for _, table := range tables {
138+
result := s.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
139+
if result.Error != nil {
140+
return InterpretDBError(cluster, result.Error)
141+
}
142+
}
143+
144+
return nil
96145
}
97146

98147
func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
99-
result := s.db.WithContext(ctx).Where(map[string]interface{}{
100-
"cluster": cluster,
101-
"group": gvr.Group,
102-
"version": gvr.Version,
103-
"resource": gvr.Resource,
104-
}).Delete(&Resource{})
105-
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error)
148+
err := s.db.Transaction(func(db *gorm.DB) error {
149+
result := s.db.WithContext(ctx).
150+
Table(s.tableName(gvr)).
151+
Where(map[string]interface{}{
152+
"cluster": cluster,
153+
"group": gvr.Group,
154+
"version": gvr.Version,
155+
"resource": gvr.Resource,
156+
}).
157+
Delete(&Resource{})
158+
159+
if result.Error != nil {
160+
return result.Error
161+
}
162+
163+
return nil
164+
})
165+
166+
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), err)
106167
}
107168

108169
func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) {
@@ -116,3 +177,22 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna
116177
func (s *StorageFactory) PrepareCluster(cluster string) error {
117178
return nil
118179
}
180+
181+
// GenerateTableFor return table name using gvr string
182+
func GenerateTableFor(gvr schema.GroupVersionResource) string {
183+
if gvr.Group == "" {
184+
return fmt.Sprintf("%s_%s", gvr.Version, gvr.Resource)
185+
}
186+
187+
group := strings.ReplaceAll(gvr.Group, ".", "_")
188+
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
189+
}
190+
191+
func (s *StorageFactory) tableName(gvr schema.GroupVersionResource) string {
192+
table := "resources"
193+
if s.DivisionPolicy == DivisionPolicyGroupResource {
194+
table = GenerateTableFor(gvr)
195+
}
196+
197+
return table
198+
}

0 commit comments

Comments
 (0)