Skip to content

Commit ee45eac

Browse files
authored
feat: Cache semaphore limit lookup (#14205)
Signed-off-by: Dmitri Rabinowitz <[email protected]>
1 parent 1f738e6 commit ee45eac

12 files changed

+295
-58
lines changed

config/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ type Config struct {
108108
// Workflow retention by number of workflows
109109
RetentionPolicy *RetentionPolicy `json:"retentionPolicy,omitempty"`
110110

111+
// SemaphoreLimitCacheSeconds specifies the duration in seconds before the workflow controller will re-fetch the limit
112+
// for a semaphore from its associated ConfigMap(s). Defaults to 0 seconds (re-fetch every time the semaphore is checked).
113+
SemaphoreLimitCacheSeconds *int64 `json:"semaphoreLimitCacheSeconds,omitempty"`
114+
111115
// NavColor is an ui navigation bar background color
112116
NavColor string `json:"navColor,omitempty"`
113117

docs/scaling.md

+6
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ In order to protect users against infinite recursion, the controller has a defau
137137

138138
This protection can be disabled with the [environment variable](environment-variables.md#controller) `DISABLE_MAX_RECURSION=true`
139139

140+
### Caching Semaphore Limit ConfigMap Requests
141+
142+
By default the controller will reload the ConfigMap(s) referenced by a semaphore from kube every time that workflow is queued. If you notice high latency from queuing workflows leveraging semaphores you can cache semaphore limits by editing the `semaphoreLimitCacheSeconds` parameter in [`workflow-controller-configmap.yaml`](workflow-controller-configmap.yaml).
143+
144+
Note that this will mean that Argo will not immediately pick up changes to your config map limits.
145+
140146
## Miscellaneous
141147

142148
See also [Running At Massive Scale](running-at-massive-scale.md).

docs/workflow-controller-configmap.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,10 @@ data:
335335
# failed: 3
336336
# errored: 3
337337

338+
# SemaphoreLimitCacheSeconds specifies the duration in seconds before the workflow controller will re-fetch the limit
339+
# for a semaphore from its associated ConfigMap(s). Defaults to 0 seconds (re-fetch every time the semaphore is checked).
340+
semaphoreLimitCacheSeconds: "0"
341+
338342
# Default values that will apply to all Workflows from this controller, unless overridden on the Workflow-level
339343
# See more: docs/default-workflow-specs.md
340344
workflowDefaults: |

workflow/controller/controller.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,12 @@ func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context)
411411
return exists
412412
}
413413

414-
wfc.syncManager = sync.NewLockManager(getSyncLimit, nextWorkflow, isWFDeleted)
414+
syncLimitCacheTTL := time.Duration(0)
415+
if wfc.Config.SemaphoreLimitCacheSeconds != nil {
416+
syncLimitCacheTTL = time.Duration(*wfc.Config.SemaphoreLimitCacheSeconds) * time.Second
417+
}
418+
419+
wfc.syncManager = sync.NewLockManager(getSyncLimit, syncLimitCacheTTL, nextWorkflow, isWFDeleted)
415420
}
416421

417422
// list all running workflows to initialize throttler and syncManager

workflow/controller/operator_concurrency_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func TestSemaphoreTmplLevel(t *testing.T) {
163163
cancel, controller := newController()
164164
defer cancel()
165165
ctx := context.Background()
166-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
166+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
167167
}, workflowExistenceFunc)
168168
var cm apiv1.ConfigMap
169169
wfv1.MustUnmarshal([]byte(configMap), &cm)
@@ -224,7 +224,7 @@ func TestSemaphoreScriptTmplLevel(t *testing.T) {
224224
cancel, controller := newController()
225225
defer cancel()
226226
ctx := context.Background()
227-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
227+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
228228
}, workflowExistenceFunc)
229229
var cm apiv1.ConfigMap
230230
wfv1.MustUnmarshal([]byte(configMap), &cm)
@@ -284,7 +284,7 @@ func TestSemaphoreScriptConfigMapInDifferentNamespace(t *testing.T) {
284284
cancel, controller := newController()
285285
defer cancel()
286286
ctx := context.Background()
287-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
287+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
288288
}, workflowExistenceFunc)
289289
var cm apiv1.ConfigMap
290290
wfv1.MustUnmarshal([]byte(configMap), &cm)
@@ -346,7 +346,7 @@ func TestSemaphoreResourceTmplLevel(t *testing.T) {
346346
cancel, controller := newController()
347347
defer cancel()
348348
ctx := context.Background()
349-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
349+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
350350
}, workflowExistenceFunc)
351351
var cm apiv1.ConfigMap
352352
wfv1.MustUnmarshal([]byte(configMap), &cm)
@@ -408,7 +408,7 @@ func TestSemaphoreWithOutConfigMap(t *testing.T) {
408408
defer cancel()
409409

410410
ctx := context.Background()
411-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
411+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
412412
}, workflowExistenceFunc)
413413

414414
t.Run("SemaphoreRefWithOutConfigMap", func(t *testing.T) {
@@ -464,7 +464,7 @@ func TestMutexInDAG(t *testing.T) {
464464
cancel, controller := newController()
465465
defer cancel()
466466
ctx := context.Background()
467-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
467+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
468468
}, workflowExistenceFunc)
469469
t.Run("MutexWithDAG", func(t *testing.T) {
470470
wf := wfv1.MustUnmarshalWorkflow(DAGWithMutex)
@@ -536,7 +536,7 @@ func TestMutexInDAGWithInterpolation(t *testing.T) {
536536
cancel, controller := newController()
537537
defer cancel()
538538
ctx := context.Background()
539-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
539+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
540540
}, workflowExistenceFunc)
541541
t.Run("InterpolatedMutexWithDAG", func(t *testing.T) {
542542
wf := wfv1.MustUnmarshalWorkflow(DAGWithInterpolatedMutex)
@@ -601,7 +601,7 @@ func TestSynchronizationWithRetry(t *testing.T) {
601601
cancel, controller := newController()
602602
defer cancel()
603603
ctx := context.Background()
604-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
604+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
605605
}, workflowExistenceFunc)
606606
var cm apiv1.ConfigMap
607607
wfv1.MustUnmarshal([]byte(configMap), &cm)
@@ -809,7 +809,7 @@ func TestSynchronizationWithStep(t *testing.T) {
809809
cancel, controller := newController()
810810
defer cancel()
811811
ctx := context.Background()
812-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
812+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
813813
}, workflowExistenceFunc)
814814
var cm apiv1.ConfigMap
815815
wfv1.MustUnmarshal([]byte(configMap), &cm)
@@ -886,7 +886,7 @@ func TestSynchronizationWithStepRetry(t *testing.T) {
886886
cancel, controller := newController()
887887
defer cancel()
888888
ctx := context.Background()
889-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
889+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
890890
}, workflowExistenceFunc)
891891
var cm apiv1.ConfigMap
892892
wfv1.MustUnmarshal([]byte(configMap), &cm)
@@ -950,7 +950,7 @@ func TestSynchronizationForPendingShuttingdownWfs(t *testing.T) {
950950
cancel, controller := newController()
951951
defer cancel()
952952
ctx := context.Background()
953-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
953+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
954954
}, workflowExistenceFunc)
955955

956956
t.Run("PendingShuttingdownTerminatingWf", func(t *testing.T) {

workflow/controller/operator_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8581,7 +8581,7 @@ func TestMutexWfPendingWithNoPod(t *testing.T) {
85818581
cancel, controller := newController(wf)
85828582
defer cancel()
85838583
ctx := context.Background()
8584-
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
8584+
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), 0, func(key string) {
85858585
}, workflowExistenceFunc)
85868586

85878587
// preempt lock

workflow/sync/common.go

+5
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,10 @@ type semaphore interface {
1313
getCurrentPending() []string
1414
getName() string
1515
getLimit() int
16+
getLimitTimestamp() time.Time
17+
resetLimitTimestamp()
1618
resize(n int) bool
1719
}
20+
21+
// expose for overriding in tests
22+
var nowFn = time.Now

workflow/sync/multiple_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestMultipleMutexLock(t *testing.T) {
4242
kube := fake.NewSimpleClientset()
4343
syncLimitFunc := GetSyncLimitFunc(kube)
4444
t.Run("MultipleMutex", func(t *testing.T) {
45-
syncManager := NewLockManager(syncLimitFunc, func(key string) {},
45+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {},
4646
WorkflowExistenceFunc)
4747
wfall := templatedWorkflow("all",
4848
` mutexes:
@@ -122,7 +122,7 @@ func TestMultipleMutexLock(t *testing.T) {
122122
assert.True(t, wfUpdate)
123123
})
124124
t.Run("MultipleMutexOrdering", func(t *testing.T) {
125-
syncManager := NewLockManager(syncLimitFunc, func(key string) {},
125+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {},
126126
WorkflowExistenceFunc)
127127
wfall := templatedWorkflow("all",
128128
` mutexes:
@@ -204,7 +204,7 @@ func TestMutexAndSemaphore(t *testing.T) {
204204

205205
syncLimitFunc := GetSyncLimitFunc(kube)
206206
t.Run("MutexSemaphore", func(t *testing.T) {
207-
syncManager := NewLockManager(syncLimitFunc, func(key string) {},
207+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {},
208208
WorkflowExistenceFunc)
209209
wfmands1 := templatedWorkflow("mands1",
210210
` mutexes:
@@ -322,7 +322,7 @@ func TestPriority(t *testing.T) {
322322
kube := fake.NewSimpleClientset()
323323
syncLimitFunc := GetSyncLimitFunc(kube)
324324
t.Run("Priority", func(t *testing.T) {
325-
syncManager := NewLockManager(syncLimitFunc, func(key string) {},
325+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {},
326326
WorkflowExistenceFunc)
327327
wflow := templatedWorkflow("prioritylow",
328328
` mutexes:
@@ -399,7 +399,7 @@ func TestDuplicates(t *testing.T) {
399399
kube := fake.NewSimpleClientset()
400400
syncLimitFunc := GetSyncLimitFunc(kube)
401401
t.Run("Mutex", func(t *testing.T) {
402-
syncManager := NewLockManager(syncLimitFunc, func(key string) {},
402+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {},
403403
WorkflowExistenceFunc)
404404
wfdupmutex := templatedWorkflow("mutex",
405405
` mutexes:
@@ -410,7 +410,7 @@ func TestDuplicates(t *testing.T) {
410410
assert.Error(t, err)
411411
})
412412
t.Run("Semaphore", func(t *testing.T) {
413-
syncManager := NewLockManager(syncLimitFunc, func(key string) {},
413+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {},
414414
WorkflowExistenceFunc)
415415
wfdupsemaphore := templatedWorkflow("semaphore",
416416
` semaphores:

workflow/sync/mutex_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func TestMutexLock(t *testing.T) {
114114
kube := fake.NewSimpleClientset()
115115
syncLimitFunc := GetSyncLimitFunc(kube)
116116
t.Run("InitializeSynchronization", func(t *testing.T) {
117-
syncManager := NewLockManager(syncLimitFunc, func(key string) {
117+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {
118118
}, WorkflowExistenceFunc)
119119
wf := wfv1.MustUnmarshalWorkflow(mutexwfstatus)
120120
wfclientset := fakewfclientset.NewSimpleClientset(wf)
@@ -127,7 +127,7 @@ func TestMutexLock(t *testing.T) {
127127
})
128128
t.Run("WfLevelMutexAcquireAndRelease", func(t *testing.T) {
129129
var nextWorkflow string
130-
syncManager := NewLockManager(syncLimitFunc, func(key string) {
130+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {
131131
nextWorkflow = key
132132
}, WorkflowExistenceFunc)
133133
wf := wfv1.MustUnmarshalWorkflow(mutexWf)
@@ -208,7 +208,7 @@ func TestMutexLock(t *testing.T) {
208208

209209
t.Run("WfLevelMutexOthernamespace", func(t *testing.T) {
210210
var nextWorkflow string
211-
syncManager := NewLockManager(syncLimitFunc, func(key string) {
211+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {
212212
nextWorkflow = key
213213
}, WorkflowExistenceFunc)
214214
wf := wfv1.MustUnmarshalWorkflow(mutexWfNamespaced)
@@ -399,7 +399,7 @@ func TestMutexTmplLevel(t *testing.T) {
399399
syncLimitFunc := GetSyncLimitFunc(kube)
400400
t.Run("TemplateLevelAcquireAndRelease", func(t *testing.T) {
401401
// var nextKey string
402-
syncManager := NewLockManager(syncLimitFunc, func(key string) {
402+
syncManager := NewLockManager(syncLimitFunc, 0, func(key string) {
403403
// nextKey = key
404404
}, WorkflowExistenceFunc)
405405
wf := wfv1.MustUnmarshalWorkflow(mutexWfWithTmplLevel)

workflow/sync/semaphore.go

+24-13
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,27 @@ import (
1010
)
1111

1212
type prioritySemaphore struct {
13-
name string
14-
limit int
15-
pending *priorityQueue
16-
semaphore *sema.Weighted
17-
lockHolder map[string]bool
18-
nextWorkflow NextWorkflow
19-
log *log.Entry
13+
name string
14+
limit int
15+
limitTimestamp time.Time
16+
pending *priorityQueue
17+
semaphore *sema.Weighted
18+
lockHolder map[string]bool
19+
nextWorkflow NextWorkflow
20+
log *log.Entry
2021
}
2122

2223
var _ semaphore = &prioritySemaphore{}
2324

2425
func NewSemaphore(name string, limit int, nextWorkflow NextWorkflow, lockType string) *prioritySemaphore {
2526
return &prioritySemaphore{
26-
name: name,
27-
limit: limit,
28-
pending: &priorityQueue{itemByKey: make(map[string]*item)},
29-
semaphore: sema.NewWeighted(int64(limit)),
30-
lockHolder: make(map[string]bool),
31-
nextWorkflow: nextWorkflow,
27+
name: name,
28+
limit: limit,
29+
limitTimestamp: nowFn(),
30+
pending: &priorityQueue{itemByKey: make(map[string]*item)},
31+
semaphore: sema.NewWeighted(int64(limit)),
32+
lockHolder: make(map[string]bool),
33+
nextWorkflow: nextWorkflow,
3234
log: log.WithFields(log.Fields{
3335
lockType: name,
3436
}),
@@ -43,6 +45,14 @@ func (s *prioritySemaphore) getLimit() int {
4345
return s.limit
4446
}
4547

48+
func (s *prioritySemaphore) getLimitTimestamp() time.Time {
49+
return s.limitTimestamp
50+
}
51+
52+
func (s *prioritySemaphore) resetLimitTimestamp() {
53+
s.limitTimestamp = nowFn()
54+
}
55+
4656
func (s *prioritySemaphore) getCurrentPending() []string {
4757
var keys []string
4858
for _, item := range s.pending.items {
@@ -72,6 +82,7 @@ func (s *prioritySemaphore) resize(n int) bool {
7282
s.log.Infof("%s semaphore resized from %d to %d", s.name, cur, n)
7383
s.semaphore = semaphore
7484
s.limit = n
85+
s.limitTimestamp = nowFn()
7586
}
7687
return status
7788
}

workflow/sync/sync_manager.go

+20-11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strings"
77
"sync"
8+
"time"
89

910
log "github.com/sirupsen/logrus"
1011
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
@@ -19,20 +20,22 @@ type (
1920
)
2021

2122
type Manager struct {
22-
syncLockMap map[string]semaphore
23-
lock *sync.RWMutex
24-
nextWorkflow NextWorkflow
25-
getSyncLimit GetSyncLimit
26-
isWFDeleted IsWorkflowDeleted
23+
syncLockMap map[string]semaphore
24+
lock *sync.RWMutex
25+
nextWorkflow NextWorkflow
26+
getSyncLimit GetSyncLimit
27+
syncLimitCacheTTL time.Duration
28+
isWFDeleted IsWorkflowDeleted
2729
}
2830

29-
func NewLockManager(getSyncLimit GetSyncLimit, nextWorkflow NextWorkflow, isWFDeleted IsWorkflowDeleted) *Manager {
31+
func NewLockManager(getSyncLimit GetSyncLimit, syncLimitCacheTTL time.Duration, nextWorkflow NextWorkflow, isWFDeleted IsWorkflowDeleted) *Manager {
3032
return &Manager{
31-
syncLockMap: make(map[string]semaphore),
32-
lock: &sync.RWMutex{},
33-
nextWorkflow: nextWorkflow,
34-
getSyncLimit: getSyncLimit,
35-
isWFDeleted: isWFDeleted,
33+
syncLockMap: make(map[string]semaphore),
34+
lock: &sync.RWMutex{},
35+
nextWorkflow: nextWorkflow,
36+
getSyncLimit: getSyncLimit,
37+
syncLimitCacheTTL: syncLimitCacheTTL,
38+
isWFDeleted: isWFDeleted,
3639
}
3740
}
3841

@@ -462,12 +465,18 @@ func (sm *Manager) isSemaphoreSizeChanged(semaphore semaphore) (bool, int, error
462465
}
463466

464467
func (sm *Manager) checkAndUpdateSemaphoreSize(semaphore semaphore) error {
468+
if nowFn().Sub(semaphore.getLimitTimestamp()) < sm.syncLimitCacheTTL {
469+
return nil
470+
}
471+
465472
changed, newLimit, err := sm.isSemaphoreSizeChanged(semaphore)
466473
if err != nil {
467474
return err
468475
}
469476
if changed {
470477
semaphore.resize(newLimit)
478+
} else {
479+
semaphore.resetLimitTimestamp()
471480
}
472481
return nil
473482
}

0 commit comments

Comments
 (0)