Skip to content
This repository was archived by the owner on Sep 12, 2023. It is now read-only.

Commit 521f3dd

Browse files
authored
Create expectation package (#74)
* Create expectation package The purpose of this PR is to get ride of direct Kubernetes dependency. Expectation is being used in JobController to tell the number of pods/services they expect. This is the only library we reference in common code base. It makes sense to copy to our code base. In the future, Instead of “import k8s.io/kubernetes/pkg/controller”, User need to “import github.com/kubeflow/common/pkg/controller.v1/expectation" and use `expectation.NewControllerExpectations` instead. Signed-off-by: Jiaxin Shan <[email protected]> * Fix indirect usage of prometheus * Use linter_config in CI
1 parent 5f6f32b commit 521f3dd

File tree

10 files changed

+368
-21
lines changed

10 files changed

+368
-21
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ install:
1414
script:
1515
- ./hack/verify-codegen.sh
1616
- go build ./...
17-
- golangci-lint run ./...
17+
- golangci-lint run --config=linter_config.yaml ./...
1818
# Here we run all tests in pkg and we have to use `-ignore`
1919
# since goveralls uses `filepath.Match` to match ignore files
2020
# and it does not support patterns like `**`.

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/google/btree v1.0.0 // indirect
1010
github.com/googleapis/gnostic v0.2.0 // indirect
1111
github.com/imdario/mergo v0.3.7 // indirect
12-
github.com/prometheus/client_golang v1.5.1 // indirect
12+
github.com/prometheus/client_golang v1.5.1
1313
github.com/sirupsen/logrus v1.4.2
1414
github.com/stretchr/testify v1.4.0
1515
gopkg.in/inf.v0 v0.9.1 // indirect

linter_config.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ linters:
3232
enable:
3333
- bodyclose
3434
- deadcode
35-
- errcheck
3635
- misspell
3736
- lll
3837
- typecheck

pkg/controller.v1/common/job_controller.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"k8s.io/client-go/tools/cache"
2222
"k8s.io/client-go/tools/record"
2323
"k8s.io/client-go/util/workqueue"
24-
"k8s.io/kubernetes/pkg/controller"
2524
"volcano.sh/volcano/pkg/apis/scheduling/v1beta1"
25+
"github.com/kubeflow/common/pkg/controller.v1/expectation"
2626
volcanoclient "volcano.sh/volcano/pkg/client/clientset/versioned"
2727
)
2828

@@ -97,7 +97,7 @@ type JobController struct {
9797
// - "tf-operator/tfjob-abc/ps/pods", expects 2 adds.
9898
// - "tf-operator/tfjob-abc/worker/services", expects 4 adds.
9999
// - "tf-operator/tfjob-abc/worker/pods", expects 4 adds.
100-
Expectations controller.ControllerExpectationsInterface
100+
Expectations expectation.ControllerExpectationsInterface
101101

102102
// WorkQueue is a rate limited work queue. This is used to queue work to be
103103
// processed instead of performing it as soon as a change happens. This
@@ -136,7 +136,7 @@ func NewJobController(
136136
Config: jobControllerConfig,
137137
KubeClientSet: kubeClientSet,
138138
VolcanoClientSet: volcanoClientSet,
139-
Expectations: controller.NewControllerExpectations(),
139+
Expectations: expectation.NewControllerExpectations(),
140140
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
141141
Recorder: recorder,
142142
}

pkg/controller.v1/common/pod.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ package common
1616

1717
import (
1818
"fmt"
19-
"github.com/kubeflow/common/pkg/controller.v1/control"
2019
"reflect"
2120
"strconv"
2221
"strings"
2322

23+
"github.com/kubeflow/common/pkg/controller.v1/control"
24+
"github.com/kubeflow/common/pkg/controller.v1/expectation"
2425
log "github.com/sirupsen/logrus"
2526
"k8s.io/api/core/v1"
2627
"k8s.io/apimachinery/pkg/api/errors"
@@ -84,7 +85,7 @@ func (jc *JobController) AddPod(obj interface{}) {
8485
}
8586

8687
rtype := pod.Labels[apiv1.ReplicaTypeLabel]
87-
expectationPodsKey := GenExpectationPodsKey(jobKey, rtype)
88+
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
8889

8990
jc.Expectations.CreationObserved(expectationPodsKey)
9091
// TODO: we may need add backoff here
@@ -185,7 +186,7 @@ func (jc *JobController) DeletePod(obj interface{}) {
185186
}
186187

187188
rtype := pod.Labels[apiv1.ReplicaTypeLabel]
188-
expectationPodsKey := GenExpectationPodsKey(jobKey, rtype)
189+
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
189190

190191
jc.Expectations.DeletionObserved(expectationPodsKey)
191192
// TODO: we may need add backoff here
@@ -363,7 +364,7 @@ func (jc *JobController) createNewPod(job interface{}, rt, index string, spec *a
363364
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
364365
return err
365366
}
366-
expectationPodsKey := GenExpectationPodsKey(jobKey, rt)
367+
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)
367368
err = jc.Expectations.ExpectCreations(expectationPodsKey, 1)
368369
if err != nil {
369370
return err

pkg/controller.v1/common/service.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ package common
1515

1616
import (
1717
"fmt"
18-
"github.com/kubeflow/common/pkg/controller.v1/control"
1918
"strconv"
2019
"strings"
2120

2221
apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
22+
"github.com/kubeflow/common/pkg/controller.v1/control"
23+
"github.com/kubeflow/common/pkg/controller.v1/expectation"
2324
commonutil "github.com/kubeflow/common/pkg/util"
2425
log "github.com/sirupsen/logrus"
2526
"k8s.io/api/core/v1"
@@ -72,7 +73,7 @@ func (jc *JobController) AddService(obj interface{}) {
7273
}
7374

7475
rtype := service.Labels[apiv1.ReplicaTypeLabel]
75-
expectationServicesKey := GenExpectationServicesKey(jobKey, rtype)
76+
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
7677

7778
jc.Expectations.CreationObserved(expectationServicesKey)
7879
// TODO: we may need add backoff here
@@ -244,7 +245,7 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
244245

245246
// Convert ReplicaType to lower string.
246247
rt := strings.ToLower(string(rtype))
247-
expectationServicesKey := GenExpectationServicesKey(jobKey, rt)
248+
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rt)
248249
err = jc.Expectations.ExpectCreations(expectationServicesKey, 1)
249250
if err != nil {
250251
return err

pkg/controller.v1/common/util.go

-8
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,6 @@ func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() er
4343
}
4444
}
4545

46-
func GenExpectationPodsKey(jobKey, replicaType string) string {
47-
return jobKey + "/" + strings.ToLower(replicaType) + "/pods"
48-
}
49-
50-
func GenExpectationServicesKey(jobKey, replicaType string) string {
51-
return jobKey + "/" + strings.ToLower(replicaType) + "/services"
52-
}
53-
5446
func MaxInt(x, y int) int {
5547
if x < y {
5648
return y
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package expectation
2+
3+
import (
4+
"fmt"
5+
log "github.com/sirupsen/logrus"
6+
"sync/atomic"
7+
"time"
8+
9+
"k8s.io/apimachinery/pkg/util/clock"
10+
"k8s.io/client-go/tools/cache"
11+
)
12+
13+
const (
14+
// If a watch drops a delete event for a pod, it'll take this long
15+
// before a dormant controller waiting for those packets is woken up anyway. It is
16+
// specifically targeted at the case where some problem prevents an update
17+
// of expectations, without it the controller could stay asleep forever. This should
18+
// be set based on the expected latency of watch events.
19+
//
20+
// Currently a controller can service (create *and* observe the watch events for said
21+
// creation) about 10 pods a second, so it takes about 1 min to service
22+
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
23+
// latency/pod at the scale of 3000 pods over 100 nodes.
24+
ExpectationsTimeout = 5 * time.Minute
25+
)
26+
27+
// Expectations are a way for controllers to tell the controller manager what they expect. eg:
28+
// ControllerExpectations: {
29+
// controller1: expects 2 adds in 2 minutes
30+
// controller2: expects 2 dels in 2 minutes
31+
// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met
32+
// }
33+
//
34+
// Implementation:
35+
// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion
36+
// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller
37+
//
38+
// * Once set expectations can only be lowered
39+
// * A controller isn't synced till its expectations are either fulfilled, or expire
40+
// * Controllers that don't set expectations will get woken up for every matching controllee
41+
42+
// ExpKeyFunc to parse out the key from a ControlleeExpectation
43+
var ExpKeyFunc = func(obj interface{}) (string, error) {
44+
if e, ok := obj.(*ControlleeExpectations); ok {
45+
return e.key, nil
46+
}
47+
return "", fmt.Errorf("could not find key for obj %#v", obj)
48+
}
49+
50+
// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations.
51+
// Only abstracted out for testing.
52+
// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
53+
// types of controllers, because the keys might conflict across types.
54+
type ControllerExpectationsInterface interface {
55+
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
56+
SatisfiedExpectations(controllerKey string) bool
57+
DeleteExpectations(controllerKey string)
58+
SetExpectations(controllerKey string, add, del int) error
59+
ExpectCreations(controllerKey string, adds int) error
60+
ExpectDeletions(controllerKey string, dels int) error
61+
CreationObserved(controllerKey string)
62+
DeletionObserved(controllerKey string)
63+
RaiseExpectations(controllerKey string, add, del int)
64+
LowerExpectations(controllerKey string, add, del int)
65+
}
66+
67+
// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync.
68+
type ControllerExpectations struct {
69+
cache.Store
70+
}
71+
72+
// GetExpectations returns the ControlleeExpectations of the given controller.
73+
func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
74+
exp, exists, err := r.GetByKey(controllerKey)
75+
if err == nil && exists {
76+
return exp.(*ControlleeExpectations), true, nil
77+
}
78+
return nil, false, err
79+
}
80+
81+
// DeleteExpectations deletes the expectations of the given controller from the TTLStore.
82+
func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
83+
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
84+
if err := r.Delete(exp); err != nil {
85+
log.Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
86+
}
87+
}
88+
}
89+
90+
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
91+
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
92+
// manager.
93+
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
94+
if exp, exists, err := r.GetExpectations(controllerKey); exists {
95+
if exp.Fulfilled() {
96+
log.Infof("Controller expectations fulfilled %#v", exp)
97+
return true
98+
} else if exp.isExpired() {
99+
log.Infof("Controller expectations expired %#v", exp)
100+
return true
101+
} else {
102+
log.Infof("Controller still waiting on expectations %#v", exp)
103+
return false
104+
}
105+
} else if err != nil {
106+
log.Infof("Error encountered while checking expectations %#v, forcing sync", err)
107+
} else {
108+
// When a new controller is created, it doesn't have expectations.
109+
// When it doesn't see expected watch events for > TTL, the expectations expire.
110+
// - In this case it wakes up, creates/deletes controllees, and sets expectations again.
111+
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
112+
// - In this case it continues without setting expectations till it needs to create/delete controllees.
113+
log.Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
114+
}
115+
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
116+
// getting from local store) or this controller hasn't established expectations.
117+
return true
118+
}
119+
120+
// TODO: Extend ExpirationCache to support explicit expiration.
121+
// TODO: Make this possible to disable in tests.
122+
// TODO: Support injection of clock.
123+
func (exp *ControlleeExpectations) isExpired() bool {
124+
return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
125+
}
126+
127+
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
128+
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
129+
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
130+
log.Infof("Setting expectations %#v", exp)
131+
return r.Add(exp)
132+
}
133+
134+
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
135+
return r.SetExpectations(controllerKey, adds, 0)
136+
}
137+
138+
func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error {
139+
return r.SetExpectations(controllerKey, 0, dels)
140+
}
141+
142+
// Decrements the expectation counts of the given controller.
143+
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
144+
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
145+
exp.Add(int64(-add), int64(-del))
146+
// The expectations might've been modified since the update on the previous line.
147+
log.Infof("Lowered expectations %#v", exp)
148+
}
149+
}
150+
151+
// Increments the expectation counts of the given controller.
152+
func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) {
153+
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
154+
exp.Add(int64(add), int64(del))
155+
// The expectations might've been modified since the update on the previous line.
156+
log.Infof("Raised expectations %#v", exp)
157+
}
158+
}
159+
160+
// CreationObserved atomically decrements the `add` expectation count of the given controller.
161+
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
162+
r.LowerExpectations(controllerKey, 1, 0)
163+
}
164+
165+
// DeletionObserved atomically decrements the `del` expectation count of the given controller.
166+
func (r *ControllerExpectations) DeletionObserved(controllerKey string) {
167+
r.LowerExpectations(controllerKey, 0, 1)
168+
}
169+
170+
// Expectations are either fulfilled, or expire naturally.
171+
type Expectations interface {
172+
Fulfilled() bool
173+
}
174+
175+
// ControlleeExpectations track controllee creates/deletes.
176+
type ControlleeExpectations struct {
177+
// Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
178+
// See: https://golang.org/pkg/sync/atomic/ for more information
179+
add int64
180+
del int64
181+
key string
182+
timestamp time.Time
183+
}
184+
185+
// Add increments the add and del counters.
186+
func (e *ControlleeExpectations) Add(add, del int64) {
187+
atomic.AddInt64(&e.add, add)
188+
atomic.AddInt64(&e.del, del)
189+
}
190+
191+
// Fulfilled returns true if this expectation has been fulfilled.
192+
func (e *ControlleeExpectations) Fulfilled() bool {
193+
// TODO: think about why this line being atomic doesn't matter
194+
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
195+
}
196+
197+
// GetExpectations returns the add and del expectations of the controllee.
198+
func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
199+
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
200+
}
201+
202+
// NewControllerExpectations returns a store for ControllerExpectations.
203+
func NewControllerExpectations() *ControllerExpectations {
204+
return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
205+
}

0 commit comments

Comments
 (0)