Skip to content

Commit dd574ce

Browse files
committed
workquque: use typed interface
Signed-off-by: zhangzujian <[email protected]>
1 parent c9d0a5f commit dd574ce

32 files changed

+446
-3048
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
github.com/parnurzeal/gorequest v0.3.0
3232
github.com/prometheus-community/pro-bing v0.4.1
3333
github.com/prometheus/client_golang v1.20.0
34+
github.com/puzpuzpuz/xsync/v3 v3.4.0
3435
github.com/scylladb/go-set v1.0.2
3536
github.com/sirupsen/logrus v1.9.3
3637
github.com/spf13/pflag v1.0.5

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
514514
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
515515
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
516516
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
517+
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
518+
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
517519
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
518520
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
519521
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=

pkg/controller/admin_network_policy.go

Lines changed: 10 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"reflect"
77
"strings"
8-
"time"
98
"unicode"
109

1110
"github.com/scylladb/go-set/strset"
@@ -38,7 +37,7 @@ type ChangedName struct {
3837
curRuleName string
3938
}
4039

41-
type ChangedDelta struct {
40+
type AdminNetworkPolicyChangedDelta struct {
4241
key string
4342
ruleNames [util.AnpMaxRules]ChangedName
4443
field ChangedField
@@ -56,14 +55,9 @@ func (c *Controller) enqueueAddAnp(obj interface{}) {
5655
}
5756

5857
func (c *Controller) enqueueDeleteAnp(obj interface{}) {
59-
var key string
60-
var err error
61-
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
62-
utilruntime.HandleError(err)
63-
return
64-
}
65-
klog.V(3).Infof("enqueue delete anp %s", key)
66-
c.deleteAnpQueue.Add(obj)
58+
anp := obj.(*v1alpha1.AdminNetworkPolicy)
59+
klog.V(3).Infof("enqueue delete anp %s", anp.Name)
60+
c.deleteAnpQueue.Add(anp)
6761
}
6862

6963
func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
@@ -104,7 +98,7 @@ func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
10498
// The remaining changes do not affect the acls. The port-group or address-set should be updated.
10599
// The port-group for anp should be updated
106100
if !reflect.DeepEqual(oldAnpObj.Spec.Subject, newAnpObj.Spec.Subject) {
107-
c.updateAnpQueue.Add(ChangedDelta{key: newAnpObj.Name, field: ChangedSubject})
101+
c.updateAnpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newAnpObj.Name, field: ChangedSubject})
108102
}
109103

110104
// Rule name or peer selector in ingress/egress rule has changed, the corresponding address-set need be updated
@@ -122,7 +116,7 @@ func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
122116
}
123117
}
124118
if ruleChanged {
125-
c.updateAnpQueue.Add(ChangedDelta{key: newAnpObj.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
119+
c.updateAnpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newAnpObj.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
126120
}
127121

128122
ruleChanged = false
@@ -138,113 +132,8 @@ func (c *Controller) enqueueUpdateAnp(oldObj, newObj interface{}) {
138132
}
139133
}
140134
if ruleChanged {
141-
c.updateAnpQueue.Add(ChangedDelta{key: newAnpObj.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
142-
}
143-
}
144-
145-
func (c *Controller) runAddAnpWorker() {
146-
for c.processNextAddAnpWorkItem() {
147-
}
148-
}
149-
150-
func (c *Controller) runUpdateAnpWorker() {
151-
for c.processNextUpdateAnpWorkItem() {
152-
}
153-
}
154-
155-
func (c *Controller) runDeleteAnpWorker() {
156-
for c.processNextDeleteAnpWorkItem() {
157-
}
158-
}
159-
160-
func (c *Controller) processNextAddAnpWorkItem() bool {
161-
obj, shutdown := c.addAnpQueue.Get()
162-
if shutdown {
163-
return false
164-
}
165-
now := time.Now()
166-
167-
err := func(obj interface{}) error {
168-
defer c.addAnpQueue.Done(obj)
169-
var key string
170-
var ok bool
171-
if key, ok = obj.(string); !ok {
172-
c.addAnpQueue.Forget(obj)
173-
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
174-
return nil
175-
}
176-
if err := c.handleAddAnp(key); err != nil {
177-
c.addAnpQueue.AddRateLimited(key)
178-
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
179-
}
180-
last := time.Since(now)
181-
klog.Infof("take %d ms to handle add anp %s", last.Milliseconds(), key)
182-
c.addAnpQueue.Forget(obj)
183-
return nil
184-
}(obj)
185-
if err != nil {
186-
utilruntime.HandleError(err)
187-
return true
188-
}
189-
return true
190-
}
191-
192-
func (c *Controller) processNextUpdateAnpWorkItem() bool {
193-
obj, shutdown := c.updateAnpQueue.Get()
194-
if shutdown {
195-
return false
196-
}
197-
198-
err := func(obj interface{}) error {
199-
defer c.updateAnpQueue.Done(obj)
200-
var key ChangedDelta
201-
var ok bool
202-
if key, ok = obj.(ChangedDelta); !ok {
203-
c.updateAnpQueue.Forget(obj)
204-
utilruntime.HandleError(fmt.Errorf("expected ChangedDelta in workqueue but got %#v", obj))
205-
return nil
206-
}
207-
if err := c.handleUpdateAnp(key); err != nil {
208-
c.updateAnpQueue.AddRateLimited(key)
209-
return fmt.Errorf("error syncing admin network policy %s: %w, requeuing", key.key, err)
210-
}
211-
c.updateAnpQueue.Forget(obj)
212-
return nil
213-
}(obj)
214-
if err != nil {
215-
utilruntime.HandleError(err)
216-
return true
217-
}
218-
return true
219-
}
220-
221-
func (c *Controller) processNextDeleteAnpWorkItem() bool {
222-
obj, shutdown := c.deleteAnpQueue.Get()
223-
if shutdown {
224-
return false
225-
}
226-
227-
err := func(obj interface{}) error {
228-
defer c.deleteAnpQueue.Done(obj)
229-
var anp *v1alpha1.AdminNetworkPolicy
230-
var ok bool
231-
if anp, ok = obj.(*v1alpha1.AdminNetworkPolicy); !ok {
232-
c.deleteAnpQueue.Forget(obj)
233-
utilruntime.HandleError(fmt.Errorf("expected anp object in workqueue but got %#v", obj))
234-
return nil
235-
}
236-
if err := c.handleDeleteAnp(anp); err != nil {
237-
c.deleteAnpQueue.AddRateLimited(obj)
238-
return fmt.Errorf("error syncing anp '%s': %s, requeuing", anp.Name, err.Error())
239-
}
240-
c.deleteAnpQueue.Forget(obj)
241-
return nil
242-
}(obj)
243-
if err != nil {
244-
utilruntime.HandleError(err)
245-
return true
135+
c.updateAnpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newAnpObj.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
246136
}
247-
return true
248137
}
249138

250139
func (c *Controller) handleAddAnp(key string) (err error) {
@@ -479,7 +368,7 @@ func (c *Controller) handleDeleteAnp(anp *v1alpha1.AdminNetworkPolicy) error {
479368
return nil
480369
}
481370

482-
func (c *Controller) handleUpdateAnp(changed ChangedDelta) error {
371+
func (c *Controller) handleUpdateAnp(changed *AdminNetworkPolicyChangedDelta) error {
483372
// Only handle updates that do not affect acls.
484373
c.anpKeyMutex.LockKey(changed.key)
485374
defer func() { _ = c.anpKeyMutex.UnlockKey(changed.key) }()
@@ -899,7 +788,7 @@ func (c *Controller) setAddrSetForAnpRule(anpName, pgName, ruleName string, inde
899788
func (c *Controller) updateAnpsByLabelsMatch(nsLabels, podLabels map[string]string) {
900789
anps, _ := c.anpsLister.List(labels.Everything())
901790
for _, anp := range anps {
902-
changed := ChangedDelta{
791+
changed := &AdminNetworkPolicyChangedDelta{
903792
key: anp.Name,
904793
}
905794

@@ -927,7 +816,7 @@ func (c *Controller) updateAnpsByLabelsMatch(nsLabels, podLabels map[string]stri
927816

928817
banps, _ := c.banpsLister.List(labels.Everything())
929818
for _, banp := range banps {
930-
changed := ChangedDelta{
819+
changed := &AdminNetworkPolicyChangedDelta{
931820
key: banp.Name,
932821
}
933822

pkg/controller/baseline_admin_network_policy.go

Lines changed: 7 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"reflect"
66
"strings"
7-
"time"
87

98
"github.com/scylladb/go-set/strset"
109
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -29,14 +28,9 @@ func (c *Controller) enqueueAddBanp(obj interface{}) {
2928
}
3029

3130
func (c *Controller) enqueueDeleteBanp(obj interface{}) {
32-
var key string
33-
var err error
34-
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
35-
utilruntime.HandleError(err)
36-
return
37-
}
38-
klog.V(3).Infof("enqueue delete banp %s", key)
39-
c.deleteBanpQueue.Add(obj)
31+
banp := obj.(*v1alpha1.BaselineAdminNetworkPolicy)
32+
klog.V(3).Infof("enqueue delete banp %s", banp.Name)
33+
c.deleteBanpQueue.Add(banp)
4034
}
4135

4236
func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
@@ -75,7 +69,7 @@ func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
7569
// The remaining changes do not affect the acls. The port-group or address-set should be updated.
7670
// The port-group for anp should be updated
7771
if !reflect.DeepEqual(oldBanp.Spec.Subject, newBanp.Spec.Subject) {
78-
c.updateBanpQueue.Add(ChangedDelta{key: newBanp.Name, field: ChangedSubject})
72+
c.updateBanpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newBanp.Name, field: ChangedSubject})
7973
}
8074

8175
// Rule name or peer selector in ingress/egress rule has changed, the corresponding address-set need be updated
@@ -94,7 +88,7 @@ func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
9488
}
9589
}
9690
if ruleChanged {
97-
c.updateBanpQueue.Add(ChangedDelta{key: newBanp.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
91+
c.updateBanpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newBanp.Name, ruleNames: changedIngressRuleNames, field: ChangedIngressRule})
9892
}
9993

10094
ruleChanged = false
@@ -110,113 +104,8 @@ func (c *Controller) enqueueUpdateBanp(oldObj, newObj interface{}) {
110104
}
111105
}
112106
if ruleChanged {
113-
c.updateBanpQueue.Add(ChangedDelta{key: newBanp.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
114-
}
115-
}
116-
117-
func (c *Controller) runAddBanpWorker() {
118-
for c.processNextAddBanpWorkItem() {
119-
}
120-
}
121-
122-
func (c *Controller) runUpdateBanpWorker() {
123-
for c.processNextUpdateBanpWorkItem() {
124-
}
125-
}
126-
127-
func (c *Controller) runDeleteBanpWorker() {
128-
for c.processNextDeleteBanpWorkItem() {
129-
}
130-
}
131-
132-
func (c *Controller) processNextAddBanpWorkItem() bool {
133-
obj, shutdown := c.addBanpQueue.Get()
134-
if shutdown {
135-
return false
136-
}
137-
now := time.Now()
138-
139-
err := func(obj interface{}) error {
140-
defer c.addBanpQueue.Done(obj)
141-
var key string
142-
var ok bool
143-
if key, ok = obj.(string); !ok {
144-
c.addBanpQueue.Forget(obj)
145-
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
146-
return nil
147-
}
148-
if err := c.handleAddBanp(key); err != nil {
149-
c.addBanpQueue.AddRateLimited(key)
150-
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
151-
}
152-
last := time.Since(now)
153-
klog.Infof("take %d ms to handle add banp %s", last.Milliseconds(), key)
154-
c.addBanpQueue.Forget(obj)
155-
return nil
156-
}(obj)
157-
if err != nil {
158-
utilruntime.HandleError(err)
159-
return true
160-
}
161-
return true
162-
}
163-
164-
func (c *Controller) processNextUpdateBanpWorkItem() bool {
165-
obj, shutdown := c.updateBanpQueue.Get()
166-
if shutdown {
167-
return false
168-
}
169-
170-
err := func(obj interface{}) error {
171-
defer c.updateBanpQueue.Done(obj)
172-
var key ChangedDelta
173-
var ok bool
174-
if key, ok = obj.(ChangedDelta); !ok {
175-
c.updateBanpQueue.Forget(obj)
176-
utilruntime.HandleError(fmt.Errorf("expected ChangedDelta in workqueue but got %#v", obj))
177-
return nil
178-
}
179-
if err := c.handleUpdateBanp(key); err != nil {
180-
c.updateBanpQueue.AddRateLimited(key)
181-
return fmt.Errorf("error syncing banp %s: %w, requeuing", key.key, err)
182-
}
183-
c.updateBanpQueue.Forget(obj)
184-
return nil
185-
}(obj)
186-
if err != nil {
187-
utilruntime.HandleError(err)
188-
return true
189-
}
190-
return true
191-
}
192-
193-
func (c *Controller) processNextDeleteBanpWorkItem() bool {
194-
obj, shutdown := c.deleteBanpQueue.Get()
195-
if shutdown {
196-
return false
197-
}
198-
199-
err := func(obj interface{}) error {
200-
defer c.deleteBanpQueue.Done(obj)
201-
var banp *v1alpha1.BaselineAdminNetworkPolicy
202-
var ok bool
203-
if banp, ok = obj.(*v1alpha1.BaselineAdminNetworkPolicy); !ok {
204-
c.deleteBanpQueue.Forget(obj)
205-
utilruntime.HandleError(fmt.Errorf("expected banp object in workqueue but got %#v", obj))
206-
return nil
207-
}
208-
if err := c.handleDeleteBanp(banp); err != nil {
209-
c.deleteBanpQueue.AddRateLimited(obj)
210-
return fmt.Errorf("error syncing banp '%s': %s, requeuing", banp.Name, err.Error())
211-
}
212-
c.deleteBanpQueue.Forget(obj)
213-
return nil
214-
}(obj)
215-
if err != nil {
216-
utilruntime.HandleError(err)
217-
return true
107+
c.updateBanpQueue.Add(&AdminNetworkPolicyChangedDelta{key: newBanp.Name, ruleNames: changedEgressRuleNames, field: ChangedEgressRule})
218108
}
219-
return true
220109
}
221110

222111
func (c *Controller) handleAddBanp(key string) (err error) {
@@ -436,7 +325,7 @@ func (c *Controller) handleDeleteBanp(banp *v1alpha1.BaselineAdminNetworkPolicy)
436325
return nil
437326
}
438327

439-
func (c *Controller) handleUpdateBanp(changed ChangedDelta) error {
328+
func (c *Controller) handleUpdateBanp(changed *AdminNetworkPolicyChangedDelta) error {
440329
// Only handle updates that do not affect acls.
441330
c.banpKeyMutex.LockKey(changed.key)
442331
defer func() { _ = c.banpKeyMutex.UnlockKey(changed.key) }()

0 commit comments

Comments
 (0)