Skip to content

Commit fd40b61

Browse files
authored
Make plugin metric registration thread safe (#6532)
* Make plugin metric registration thread safe Signed-off-by: Jason Parraga <[email protected]> * Address feedback Signed-off-by: Jason Parraga <[email protected]> --------- Signed-off-by: Jason Parraga <[email protected]>
1 parent a840927 commit fd40b61

File tree

1 file changed

+51
-26
lines changed
  • flytepropeller/pkg/controller/nodes/task

1 file changed

+51
-26
lines changed

flytepropeller/pkg/controller/nodes/task/handler.go

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"runtime/debug"
7+
"sync"
78
"time"
89

910
regErrors "github.com/pkg/errors"
@@ -234,24 +235,25 @@ type taskType = string
234235
type pluginID = string
235236

236237
type Handler struct {
237-
catalog catalog.Client
238-
asyncCatalog catalog.AsyncClient
239-
defaultPlugins map[pluginCore.TaskType]pluginCore.Plugin
240-
pluginsForType map[pluginCore.TaskType]map[pluginID]pluginCore.Plugin
241-
taskMetricsMap map[MetricKey]*taskMetrics
242-
defaultPlugin pluginCore.Plugin
243-
metrics *metrics
244-
pluginRegistry PluginRegistryIface
245-
kubeClient pluginCore.KubeClient
246-
kubeClientset kubernetes.Interface
247-
secretManager pluginCore.SecretManager
248-
resourceManager resourcemanager.BaseResourceManager
249-
cfg *config.Config
250-
pluginScope promutils.Scope
251-
eventConfig *controllerConfig.EventConfig
252-
clusterID string
253-
agentService *agent.AgentService
254-
connectorService *connector.ConnectorService
238+
catalog catalog.Client
239+
asyncCatalog catalog.AsyncClient
240+
defaultPlugins map[pluginCore.TaskType]pluginCore.Plugin
241+
pluginsForType map[pluginCore.TaskType]map[pluginID]pluginCore.Plugin
242+
taskMetricsMap map[MetricKey]*taskMetrics
243+
taskMetricsMapMutex sync.RWMutex
244+
defaultPlugin pluginCore.Plugin
245+
metrics *metrics
246+
pluginRegistry PluginRegistryIface
247+
kubeClient pluginCore.KubeClient
248+
kubeClientset kubernetes.Interface
249+
secretManager pluginCore.SecretManager
250+
resourceManager resourcemanager.BaseResourceManager
251+
cfg *config.Config
252+
pluginScope promutils.Scope
253+
eventConfig *controllerConfig.EventConfig
254+
clusterID string
255+
agentService *agent.AgentService
256+
connectorService *connector.ConnectorService
255257
}
256258

257259
func (t *Handler) FinalizeRequired() bool {
@@ -435,15 +437,38 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics
435437
if err != nil {
436438
return nil, err
437439
}
438-
if _, ok := t.taskMetricsMap[metricNameKey]; !ok {
439-
t.taskMetricsMap[metricNameKey] = &taskMetrics{
440-
taskSucceeded: labeled.NewCounter(metricNameKey+"_success",
441-
"Task "+metricNameKey+" finished successfully", t.pluginScope, labeled.EmitUnlabeledMetric),
442-
taskFailed: labeled.NewCounter(metricNameKey+"_failure",
443-
"Task "+metricNameKey+" failed", t.pluginScope, labeled.EmitUnlabeledMetric),
444-
}
440+
441+
// Acquire read lock for fast read, this is the happy case
442+
t.taskMetricsMapMutex.RLock()
443+
existingTaskMetrics, ok := t.taskMetricsMap[metricNameKey]
444+
t.taskMetricsMapMutex.RUnlock()
445+
446+
if ok {
447+
return existingTaskMetrics, nil
445448
}
446-
return t.taskMetricsMap[metricNameKey], nil
449+
450+
// Acquire write lock since we may need to populate the map. We use a lock to avoid panics for concurrent writes
451+
// and duplicate prometheus metrics
452+
t.taskMetricsMapMutex.Lock()
453+
defer t.taskMetricsMapMutex.Unlock()
454+
455+
// check condition again
456+
existingTaskMetrics, ok = t.taskMetricsMap[metricNameKey]
457+
458+
if ok {
459+
return existingTaskMetrics, nil
460+
}
461+
462+
newTaskMetrics := &taskMetrics{
463+
taskSucceeded: labeled.NewCounter(metricNameKey+"_success",
464+
"Task "+metricNameKey+" finished successfully", t.pluginScope, labeled.EmitUnlabeledMetric),
465+
taskFailed: labeled.NewCounter(metricNameKey+"_failure",
466+
"Task "+metricNameKey+" failed", t.pluginScope, labeled.EmitUnlabeledMetric),
467+
}
468+
469+
t.taskMetricsMap[metricNameKey] = newTaskMetrics
470+
471+
return newTaskMetrics, nil
447472
}
448473

449474
func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) {

0 commit comments

Comments
 (0)