Skip to content

Commit 1b50d4b

Browse files
committed
clustersynchro: add resource synchro metrics
Signed-off-by: Iceber Gu <[email protected]>
1 parent fc6817d commit 1b50d4b

File tree

6 files changed

+492
-7
lines changed

6 files changed

+492
-7
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package options
2+
3+
import (
4+
"github.com/spf13/pflag"
5+
6+
metricsserver "github.com/clusterpedia-io/clusterpedia/pkg/metrics/server"
7+
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
8+
)
9+
10+
type MetricsOptions struct {
11+
Metrics *metricsserver.Options
12+
13+
ResourceSynchroMetricsLabels string
14+
}
15+
16+
func NewMetricsOptions() *MetricsOptions {
17+
return &MetricsOptions{
18+
Metrics: metricsserver.NewOptions(),
19+
ResourceSynchroMetricsLabels: resourcesynchro.DefaultMetricsWrapperFactory.Config().ScopeLabels(),
20+
}
21+
}
22+
23+
func (o *MetricsOptions) AddFlags(fs *pflag.FlagSet) {
24+
o.Metrics.AddFlags(fs)
25+
26+
fs.StringVar(&o.ResourceSynchroMetricsLabels, "resource-synchro-metrics-labels", o.ResourceSynchroMetricsLabels, "The resource synchronizer's metrics aggregation scope, which supports 'empty', 'cluster', 'gv','gvr', 'cluster,gv', 'cluster,gvr', etc.")
27+
}
28+
29+
func (o *MetricsOptions) Validate() []error {
30+
errs := o.Metrics.Validate()
31+
32+
if _, err := resourcesynchro.ParseToMetricsWrapperConfig(o.ResourceSynchroMetricsLabels); err != nil {
33+
errs = append(errs, err)
34+
}
35+
return errs
36+
}
37+
38+
func (o *MetricsOptions) ServerConfig() metricsserver.Config {
39+
return o.Metrics.Config()
40+
}
41+
42+
func (o *MetricsOptions) ResourceSynchroConfig() resourcesynchro.MetricsWrapperConfig {
43+
config, err := resourcesynchro.ParseToMetricsWrapperConfig(o.ResourceSynchroMetricsLabels)
44+
if err != nil {
45+
panic(err)
46+
}
47+
return config
48+
}

cmd/clustersynchro-manager/app/options/options.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import (
2323
"github.com/clusterpedia-io/clusterpedia/cmd/clustersynchro-manager/app/config"
2424
crdclientset "github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned"
2525
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
26-
metricsserver "github.com/clusterpedia-io/clusterpedia/pkg/metrics/server"
2726
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
2827
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2928
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
29+
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
3030
)
3131

3232
const (
@@ -42,7 +42,7 @@ type Options struct {
4242

4343
Logs *logs.Options
4444
Storage *storageoptions.StorageOptions
45-
Metrics *metricsserver.Options
45+
Metrics *MetricsOptions
4646
KubeStateMetrics *kubestatemetrics.Options
4747

4848
WorkerNumber int // WorkerNumber is the number of worker goroutines
@@ -76,7 +76,7 @@ func NewClusterSynchroManagerOptions() (*Options, error) {
7676

7777
options.Logs = logs.NewOptions()
7878
options.Storage = storageoptions.NewStorageOptions()
79-
options.Metrics = metricsserver.NewOptions()
79+
options.Metrics = NewMetricsOptions()
8080
options.KubeStateMetrics = kubestatemetrics.NewOptions()
8181

8282
options.WorkerNumber = 5
@@ -155,13 +155,17 @@ func (o *Options) Config() (*config.Config, error) {
155155
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
156156
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: ClusterSynchroManagerUserAgent})
157157

158-
metricsConfig := o.Metrics.Config()
158+
metricsConfig := o.Metrics.ServerConfig()
159159
metricsStoreBuilder, err := o.KubeStateMetrics.MetricsStoreBuilderConfig().New()
160160
if err != nil {
161161
return nil, err
162162
}
163163
kubeStateMetricsServerConfig := o.KubeStateMetrics.ServerConfig(metricsConfig)
164164

165+
if config := o.Metrics.ResourceSynchroConfig(); config != resourcesynchro.DefaultMetricsWrapperFactory.Config() {
166+
resourcesynchro.DefaultMetricsWrapperFactory = resourcesynchro.NewMetricsWrapperFactory(config)
167+
}
168+
165169
if o.ShardingName != "" {
166170
o.LeaderElection.ResourceName = fmt.Sprintf("%s-%s", o.LeaderElection.ResourceName, o.ShardingName)
167171
}

pkg/synchromanager/clustersynchro/cluster_synchro.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat
129129
synchro.resourceSynchroFactory = factory
130130
} else {
131131
synchro.resourceSynchroFactory = DefaultResourceSynchroFactory{}
132+
registerResourceSynchroMetrics()
132133
}
133134

134135
var refresherOnce sync.Once

pkg/synchromanager/clustersynchro/default_resource_synchro.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"k8s.io/apimachinery/pkg/util/wait"
1616
genericstorage "k8s.io/apiserver/pkg/storage"
1717
"k8s.io/client-go/tools/cache"
18+
compbasemetrics "k8s.io/component-base/metrics"
1819
"k8s.io/klog/v2"
1920
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"
2021

@@ -39,6 +40,7 @@ type resourceSynchro struct {
3940
listerWatcher cache.ListerWatcher
4041
metricsExtraStore informer.ExtraStore
4142
metricsWriter *metricsstore.MetricsWriter
43+
metricsWrapper resourcesynchro.MetricsWrapper
4244

4345
queue queue.EventQueue
4446
cache *informer.ResourceVersionStorage
@@ -68,6 +70,8 @@ type resourceSynchro struct {
6870

6971
// for debug
7072
runningStage string
73+
74+
storageMaxRetry int
7175
}
7276

7377
type DefaultResourceSynchroFactory struct{}
@@ -88,9 +92,10 @@ func (factory DefaultResourceSynchroFactory) NewResourceSynchro(cluster string,
8892
// all resources saved to the queue are `runtime.Object`
8993
queue: queue.NewPressureQueue(cache.MetaNamespaceKeyFunc),
9094

91-
storage: config.ResourceStorage,
92-
convertor: config.ObjectConvertor,
93-
memoryVersion: storageConfig.MemoryResource.GroupVersion(),
95+
storage: config.ResourceStorage,
96+
convertor: config.ObjectConvertor,
97+
memoryVersion: storageConfig.MemoryResource.GroupVersion(),
98+
metricsWrapper: resourcesynchro.DefaultMetricsWrapperFactory.NewWrapper(cluster, config.GroupVersionResource),
9499

95100
stopped: make(chan struct{}),
96101
isRunnableForStorage: atomic.NewBool(true),
@@ -158,6 +163,10 @@ func (synchro *resourceSynchro) Run(shutdown <-chan struct{}) {
158163

159164
synchro.setStatus(clusterv1alpha2.ResourceSyncStatusStop, "", "")
160165
synchro.runningStage = "shutdown"
166+
167+
for _, m := range resourceSynchroMetrics {
168+
synchro.metricsWrapper.Delete(m.(resourcesynchro.DeletableVecMetrics))
169+
}
161170
}
162171

163172
func (synchro *resourceSynchro) Close() <-chan struct{} {
@@ -256,6 +265,7 @@ func (synchro *resourceSynchro) Start(stopCh <-chan struct{}) {
256265
for r, v := range synchro.rvs {
257266
rvs[r] = v
258267
}
268+
synchro.metricsWrapper.Sum(storagedResourcesTotal, float64(len(rvs)))
259269
synchro.cache = informer.NewResourceVersionStorage()
260270
synchro.rvsLock.Unlock()
261271

@@ -403,45 +413,63 @@ func (synchro *resourceSynchro) handleResourceEvent(event *queue.Event) {
403413
}
404414
utils.InjectClusterName(obj, synchro.cluster)
405415

416+
var metric compbasemetrics.CounterMetric
406417
switch event.Action {
407418
case queue.Added:
408419
handler = synchro.createOrUpdateResource
420+
metric = synchro.metricsWrapper.Counter(resourceAddedCounter)
409421
case queue.Updated:
410422
handler = synchro.updateOrCreateResource
423+
metric = synchro.metricsWrapper.Counter(resourceUpdatedCounter)
411424
}
412425
callback = func(obj runtime.Object) {
426+
metric.Inc()
413427
metaobj, _ := meta.Accessor(obj)
414428
synchro.rvsLock.Lock()
415429
synchro.rvs[key] = metaobj.GetResourceVersion()
430+
431+
synchro.metricsWrapper.Sum(storagedResourcesTotal, float64(len(synchro.rvs)))
416432
synchro.rvsLock.Unlock()
417433
}
418434
} else {
419435
handler, callback = synchro.deleteResource, func(_ runtime.Object) {
420436
synchro.rvsLock.Lock()
421437
delete(synchro.rvs, key)
438+
synchro.metricsWrapper.Sum(storagedResourcesTotal, float64(len(synchro.rvs)))
422439
synchro.rvsLock.Unlock()
440+
synchro.metricsWrapper.Counter(resourceDeletedCounter).Inc()
423441
}
424442
}
425443

426444
// TODO(Iceber): put the event back into the queue to retry?
427445
for i := 0; ; i++ {
446+
now := time.Now()
428447
ctx, cancel := context.WithTimeout(synchro.ctx, 30*time.Second)
429448
err := handler(ctx, obj)
430449
cancel()
431450
if err == nil {
432451
callback(obj)
433452

453+
if i != 0 && i > synchro.storageMaxRetry {
454+
synchro.storageMaxRetry = i
455+
synchro.metricsWrapper.Max(resourceMaxRetryGauge, float64(i))
456+
}
457+
434458
if !synchro.isRunnableForStorage.Load() && synchro.queue.Len() == 0 {
435459
// Start the informer after processing the data in the queue to ensure that storage is up and running for a period of time.
436460
synchro.setRunnableForStorage()
437461
}
462+
synchro.metricsWrapper.Historgram(resourceStorageDuration).Observe(time.Since(now).Seconds())
438463
return
439464
}
440465

441466
if errors.Is(err, context.Canceled) {
442467
return
443468
}
469+
470+
synchro.metricsWrapper.Counter(resourceFailedCounter).Inc()
444471
if !storage.IsRecoverableException(err) {
472+
synchro.metricsWrapper.Counter(resourceDroppedCounter).Inc()
445473
klog.ErrorS(err, "Failed to storage resource", "cluster", synchro.cluster,
446474
"action", event.Action, "resource", synchro.storageResource, "key", key)
447475

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package clustersynchro
2+
3+
import (
4+
"sync"
5+
6+
compbasemetrics "k8s.io/component-base/metrics"
7+
"k8s.io/component-base/metrics/legacyregistry"
8+
9+
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
10+
)
11+
12+
// Note: The current design pattern does not account for the presence of multiple different types of resource synchronizers simultaneously.
13+
// Future updates can be made based on requirements.
14+
15+
const (
16+
namespace = "clustersynchro"
17+
subsystem = "resourcesynchro"
18+
)
19+
20+
var (
21+
// storagedResourcesTotal records the total number of resources stored in the storage layer.
22+
storagedResourcesTotal *compbasemetrics.GaugeVec
23+
24+
// resourceAddedCounter records the number of times resources are added to the storage layer.
25+
resourceAddedCounter *compbasemetrics.CounterVec
26+
27+
// resourceUpdatedCounter records the number of times resources are updated in the storage layer.
28+
resourceUpdatedCounter *compbasemetrics.CounterVec
29+
30+
// resourceDeletedCounter records the number of times resources are deleted from the storage layer.
31+
resourceDeletedCounter *compbasemetrics.CounterVec
32+
33+
// resourceFailedCounter records the number of times resource operations fail.
34+
resourceFailedCounter *compbasemetrics.CounterVec
35+
36+
// resourceDroppedCounter records the number of times resources are dropped.
37+
resourceDroppedCounter *compbasemetrics.CounterVec
38+
39+
// resourceMaxRetryGauge provides the maximum number of retries during resource operations.
40+
resourceMaxRetryGauge *compbasemetrics.GaugeVec
41+
42+
// resourceStorageDuration records the time interval from when a resource is fetched from the queue to when it is processed.
43+
resourceStorageDuration *compbasemetrics.HistogramVec
44+
)
45+
46+
var resourceSynchroMetrics = []interface{}{
47+
storagedResourcesTotal,
48+
resourceAddedCounter,
49+
resourceUpdatedCounter,
50+
resourceDeletedCounter,
51+
resourceFailedCounter,
52+
resourceMaxRetryGauge,
53+
resourceDroppedCounter,
54+
resourceStorageDuration,
55+
}
56+
57+
var registerOnce sync.Once
58+
59+
func registerResourceSynchroMetrics() {
60+
registerOnce.Do(func() {
61+
storagedResourcesTotal = resourcesynchro.DefaultMetricsWrapperFactory.NewGaugeVec(
62+
&compbasemetrics.GaugeOpts{
63+
Namespace: namespace,
64+
Subsystem: subsystem,
65+
Name: "storaged_resource_total",
66+
Help: "Number of resources stored in the storage layer.",
67+
StabilityLevel: compbasemetrics.ALPHA,
68+
},
69+
)
70+
71+
resourceAddedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
72+
&compbasemetrics.CounterOpts{
73+
Namespace: namespace,
74+
Subsystem: subsystem,
75+
Name: "resource_added_total",
76+
Help: "Number of times resources are deleted from the storage layer.",
77+
StabilityLevel: compbasemetrics.ALPHA,
78+
},
79+
)
80+
81+
resourceUpdatedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
82+
&compbasemetrics.CounterOpts{
83+
Namespace: namespace,
84+
Subsystem: subsystem,
85+
Name: "resource_updated_total",
86+
Help: "Number of times resources are updated in the storage layer.",
87+
StabilityLevel: compbasemetrics.ALPHA,
88+
},
89+
)
90+
91+
resourceDeletedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
92+
&compbasemetrics.CounterOpts{
93+
Namespace: namespace,
94+
Subsystem: subsystem,
95+
Name: "resource_deleted_total",
96+
Help: "Number of times resources are deleted from the storage layer.",
97+
StabilityLevel: compbasemetrics.ALPHA,
98+
},
99+
)
100+
101+
resourceFailedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
102+
&compbasemetrics.CounterOpts{
103+
Namespace: namespace,
104+
Subsystem: subsystem,
105+
Name: "resource_failed_total",
106+
Help: "Number of times resource operations fail.",
107+
StabilityLevel: compbasemetrics.ALPHA,
108+
},
109+
)
110+
111+
resourceMaxRetryGauge = resourcesynchro.DefaultMetricsWrapperFactory.NewGaugeVec(
112+
&compbasemetrics.GaugeOpts{
113+
Namespace: namespace,
114+
Subsystem: subsystem,
115+
Name: "resource_max_retry_total",
116+
Help: "The maximum number of retries during resource operations.",
117+
StabilityLevel: compbasemetrics.ALPHA,
118+
},
119+
)
120+
121+
resourceDroppedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
122+
&compbasemetrics.CounterOpts{
123+
Namespace: namespace,
124+
Subsystem: subsystem,
125+
Name: "failed_resource_total",
126+
Help: "Number of times resources are dropped.",
127+
StabilityLevel: compbasemetrics.ALPHA,
128+
},
129+
)
130+
131+
resourceStorageDuration = resourcesynchro.DefaultMetricsWrapperFactory.NewHistogramVec(
132+
&compbasemetrics.HistogramOpts{
133+
Namespace: namespace,
134+
Subsystem: subsystem,
135+
Name: "storage_duration_seconds",
136+
Help: "The time interval from when a resource is fetched from the queue to when it is processed.",
137+
StabilityLevel: compbasemetrics.ALPHA,
138+
Buckets: []float64{0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 1.0, 1.5, 3, 5, 8, 15},
139+
},
140+
)
141+
142+
resourceSynchroMetrics = []interface{}{
143+
storagedResourcesTotal,
144+
resourceAddedCounter,
145+
resourceUpdatedCounter,
146+
resourceDeletedCounter,
147+
resourceFailedCounter,
148+
resourceMaxRetryGauge,
149+
resourceDroppedCounter,
150+
resourceStorageDuration,
151+
}
152+
for _, m := range resourceSynchroMetrics {
153+
legacyregistry.MustRegister(m.(compbasemetrics.Registerable))
154+
}
155+
})
156+
}

0 commit comments

Comments
 (0)