Skip to content

Commit a8363a5

Browse files
author
Zachary Seguin
committed
feat(listers): Switch to informers + listers to reduce the number of API calls to the Kubernetes API server
1 parent 0f3a9ba commit a8363a5

15 files changed

+190
-718
lines changed

go.mod

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,26 @@ module github.com/StatCan/jupyter-apis
33
go 1.14
44

55
require (
6-
github.com/StatCan/kubeflow-controller v0.0.0-20200805150330-c19fa4b0fcb6
6+
github.com/StatCan/kubeflow-controller v0.0.0-20200811133651-33215007413e
77
github.com/andanhm/go-prettytime v1.0.0
8+
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
9+
github.com/golang/protobuf v1.4.2 // indirect
810
github.com/gorilla/handlers v1.4.2
911
github.com/gorilla/mux v1.7.4
10-
github.com/hashicorp/vault/api v1.0.4 // indirect
12+
github.com/hashicorp/golang-lru v0.5.4 // indirect
1113
github.com/imdario/mergo v0.3.10 // indirect
14+
github.com/json-iterator/go v1.1.10 // indirect
1215
github.com/kr/pretty v0.2.0 // indirect
1316
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 // indirect
1417
golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
1518
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
19+
golang.org/x/text v0.3.3 // indirect
1620
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
21+
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
1722
k8s.io/api v0.18.6
1823
k8s.io/apimachinery v0.18.6
1924
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
20-
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000
25+
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451 // indirect
2126
)
2227

2328
replace k8s.io/client-go => k8s.io/client-go v0.18.6

go.sum

Lines changed: 2 additions & 226 deletions
Large diffs are not rendered by default.

listers.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
kubeflowinformers "github.com/StatCan/kubeflow-controller/pkg/generated/informers/externalversions"
10+
"k8s.io/client-go/informers"
11+
"k8s.io/client-go/tools/cache"
12+
)
13+
14+
func (s *server) setupListers(ctx context.Context) error {
15+
factory := informers.NewSharedInformerFactory(s.clientsets.kubernetes, 5*time.Minute)
16+
kubeflowFactory := kubeflowinformers.NewSharedInformerFactory(s.clientsets.kubeflow, time.Minute*5)
17+
18+
// Events
19+
eventsInformer := factory.Core().V1().Events()
20+
go eventsInformer.Informer().Run(ctx.Done())
21+
22+
s.listers.events = eventsInformer.Lister()
23+
24+
// StorageClasses
25+
storageClassesInformer := factory.Storage().V1().StorageClasses()
26+
go storageClassesInformer.Informer().Run(ctx.Done())
27+
28+
s.listers.storageClasses = storageClassesInformer.Lister()
29+
30+
// PersistentVolumeClaims
31+
pvcInformer := factory.Core().V1().PersistentVolumeClaims()
32+
go pvcInformer.Informer().Run(ctx.Done())
33+
34+
s.listers.persistentVolumeClaims = pvcInformer.Lister()
35+
36+
// PodDefaults
37+
podDefaultsInformer := kubeflowFactory.Kubeflow().V1alpha1().PodDefaults()
38+
go podDefaultsInformer.Informer().Run(ctx.Done())
39+
40+
s.listers.podDefaults = podDefaultsInformer.Lister()
41+
42+
// Notebooks
43+
notebooksInformer := kubeflowFactory.Kubeflow().V1().Notebooks()
44+
go notebooksInformer.Informer().Run(ctx.Done())
45+
46+
s.listers.notebooks = notebooksInformer.Lister()
47+
48+
// Wait until sync
49+
log.Printf("synching caches...")
50+
tctx, _ := context.WithTimeout(ctx, time.Minute)
51+
if !cache.WaitForCacheSync(tctx.Done(), eventsInformer.Informer().HasSynced, storageClassesInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced, podDefaultsInformer.Informer().HasSynced, notebooksInformer.Informer().HasSynced) {
52+
return fmt.Errorf("timeout synching caches")
53+
}
54+
log.Printf("done synching caches")
55+
56+
return nil
57+
}

main.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,36 +11,48 @@ import (
1111
"sync"
1212
"time"
1313

14-
notebooksclient "github.com/StatCan/jupyter-apis/notebooks"
15-
notebooksv1 "github.com/StatCan/jupyter-apis/notebooks/api/v1"
14+
kubeflowv1 "github.com/StatCan/kubeflow-controller/pkg/apis/kubeflowcontroller/v1"
1615
kubeflowv1alpha1 "github.com/StatCan/kubeflow-controller/pkg/apis/kubeflowcontroller/v1alpha1"
1716
kubeflow "github.com/StatCan/kubeflow-controller/pkg/generated/clientset/versioned"
17+
kubeflowv1listers "github.com/StatCan/kubeflow-controller/pkg/generated/listers/kubeflowcontroller/v1"
18+
kubeflowv1alpha1listers "github.com/StatCan/kubeflow-controller/pkg/generated/listers/kubeflowcontroller/v1alpha1"
1819
"github.com/gorilla/handlers"
1920
"github.com/gorilla/mux"
2021
authorizationv1 "k8s.io/api/authorization/v1"
2122
corev1 "k8s.io/api/core/v1"
2223
"k8s.io/client-go/kubernetes"
24+
v1listers "k8s.io/client-go/listers/core/v1"
25+
storagev1listers "k8s.io/client-go/listers/storage/v1"
2326
"k8s.io/client-go/tools/clientcmd"
2427
"k8s.io/client-go/util/homedir"
2528
)
2629

2730
var kubeconfig string
2831
var userIDHeader string
2932

33+
type listers struct {
34+
events v1listers.EventLister
35+
storageClasses storagev1listers.StorageClassLister
36+
persistentVolumeClaims v1listers.PersistentVolumeClaimLister
37+
podDefaults kubeflowv1alpha1listers.PodDefaultLister
38+
notebooks kubeflowv1listers.NotebookLister
39+
}
40+
3041
type clientsets struct {
3142
kubernetes *kubernetes.Clientset
3243
kubeflow *kubeflow.Clientset
33-
notebooks *notebooksclient.Clientset
3444
}
3545

3646
type server struct {
3747
mux sync.Mutex
3848

3949
clientsets clientsets
50+
listers listers
4051
}
4152

4253
func main() {
4354
var err error
55+
gctx, gcancel := context.WithCancel(context.Background())
4456

4557
// Setup the default path to the of the kubeconfig file.
4658
// TODO: This breaks the in-cluster config and needs to be commented out in those instances. Need to find a fix.
@@ -73,11 +85,7 @@ func main() {
7385
log.Fatal(err)
7486
}
7587

76-
// Generate the Notebooks clientset
77-
s.clientsets.notebooks, err = notebooksclient.NewForConfig(config)
78-
if err != nil {
79-
log.Fatal(err)
80-
}
88+
err = s.setupListers(gctx)
8189

8290
// Generate the Gorilla Mux router
8391
router := mux.NewRouter()
@@ -88,30 +96,30 @@ func main() {
8896
router.HandleFunc("/api/namespaces/{namespace}/notebooks", s.checkAccess(authorizationv1.SubjectAccessReview{
8997
Spec: authorizationv1.SubjectAccessReviewSpec{
9098
ResourceAttributes: &authorizationv1.ResourceAttributes{
91-
Group: notebooksv1.GroupVersion.Group,
99+
Group: kubeflowv1.SchemeGroupVersion.Group,
92100
Verb: "list",
93101
Resource: "notebooks",
94-
Version: notebooksv1.GroupVersion.Version,
102+
Version: kubeflowv1.SchemeGroupVersion.Version,
95103
},
96104
},
97105
}, s.GetNotebooks)).Methods("GET")
98106
router.HandleFunc("/api/namespaces/{namespace}/notebooks", s.checkAccess(authorizationv1.SubjectAccessReview{
99107
Spec: authorizationv1.SubjectAccessReviewSpec{
100108
ResourceAttributes: &authorizationv1.ResourceAttributes{
101-
Group: notebooksv1.GroupVersion.Group,
109+
Group: kubeflowv1.SchemeGroupVersion.Group,
102110
Verb: "create",
103111
Resource: "notebooks",
104-
Version: notebooksv1.GroupVersion.Version,
112+
Version: kubeflowv1.SchemeGroupVersion.Version,
105113
},
106114
},
107115
}, s.NewNotebook)).Headers("Content-Type", "application/json").Methods("POST")
108116
router.HandleFunc("/api/namespaces/{namespace}/notebooks/{notebook}", s.checkAccess(authorizationv1.SubjectAccessReview{
109117
Spec: authorizationv1.SubjectAccessReviewSpec{
110118
ResourceAttributes: &authorizationv1.ResourceAttributes{
111-
Group: notebooksv1.GroupVersion.Group,
119+
Group: kubeflowv1.SchemeGroupVersion.Group,
112120
Verb: "delete",
113121
Resource: "notebooks",
114-
Version: notebooksv1.GroupVersion.Version,
122+
Version: kubeflowv1.SchemeGroupVersion.Version,
115123
},
116124
},
117125
}, s.DeleteNotebook)).Methods("DELETE")
@@ -166,6 +174,9 @@ func main() {
166174
// Block until we receive our signal
167175
<-c
168176

177+
// Cancel global context
178+
gcancel()
179+
169180
// Create a deadline to wait for
170181
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
171182
defer cancel()

notebooks.go

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import (
1010
"sort"
1111
"strings"
1212

13-
notebooksv1 "github.com/StatCan/jupyter-apis/notebooks/api/v1"
13+
kubeflowv1 "github.com/StatCan/kubeflow-controller/pkg/apis/kubeflowcontroller/v1"
1414
"github.com/andanhm/go-prettytime"
1515
"github.com/gorilla/mux"
1616
corev1 "k8s.io/api/core/v1"
1717
"k8s.io/apimachinery/pkg/api/resource"
1818
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"k8s.io/apimachinery/pkg/labels"
1920
)
2021

2122
const DefaultServiceAccountName string = "default-editor"
@@ -42,8 +43,8 @@ type volumerequest struct {
4243
}
4344

4445
type gpurequest struct {
45-
Quantity resource.Quantity `json:"num"`
46-
Vendor string `json:"vendor"`
46+
Quantity string `json:"num"`
47+
Vendor string `json:"vendor"`
4748
}
4849

4950
type newnotebookrequest struct {
@@ -105,21 +106,7 @@ const (
105106
GPUVendorAMD GPUVendor = "amd"
106107
)
107108

108-
type EventsByTimestamp []corev1.Event
109-
110-
func (e EventsByTimestamp) Len() int {
111-
return len(e)
112-
}
113-
114-
func (e EventsByTimestamp) Less(a, b int) bool {
115-
return e[b].CreationTimestamp.Before(&e[a].CreationTimestamp)
116-
}
117-
118-
func (e EventsByTimestamp) Swap(a, b int) {
119-
e[a], e[b] = e[b], e[a]
120-
}
121-
122-
func processStatus(notebook notebooksv1.Notebook, events []corev1.Event) (Status, string) {
109+
func processStatus(notebook *kubeflowv1.Notebook, events []*corev1.Event) (Status, string) {
123110
// Notebook is being deleted
124111
if notebook.DeletionTimestamp != nil {
125112
return StatusWaiting, "Deleting Notebook Server"
@@ -155,7 +142,7 @@ func processStatus(notebook notebooksv1.Notebook, events []corev1.Event) (Status
155142
return "", ""
156143
}
157144

158-
func processGPU(notebook notebooksv1.Notebook) (resource.Quantity, GPUVendor) {
145+
func processGPU(notebook *kubeflowv1.Notebook) (resource.Quantity, GPUVendor) {
159146
if limit, ok := notebook.Spec.Template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"]; ok {
160147
return limit, GPUVendorNvidia
161148
} else if limit, ok := notebook.Spec.Template.Spec.Containers[0].Resources.Limits["amd.com/gpu"]; ok {
@@ -171,36 +158,38 @@ func (s *server) GetNotebooks(w http.ResponseWriter, r *http.Request) {
171158

172159
log.Printf("loading notebooks for %q", namespace)
173160

174-
notebooks, err := s.clientsets.notebooks.V1().Notebooks(namespace).List(r.Context())
161+
notebooks, err := s.listers.notebooks.Notebooks(namespace).List(labels.Everything())
175162
if err != nil {
176163
s.error(w, r, err)
177164
return
178165
}
179166

167+
sort.Sort(notebooksByName(notebooks))
168+
180169
resp := notebooksresponse{
181170
APIResponse: APIResponse{
182171
Success: true,
183172
},
184173
Notebooks: make([]notebookresponse, 0),
185174
}
186175

187-
for _, notebook := range notebooks.Items {
176+
for _, notebook := range notebooks {
188177
// Load events
189-
allevents, err := s.clientsets.kubernetes.CoreV1().Events(notebook.Namespace).List(r.Context(), v1.ListOptions{
190-
FieldSelector: fmt.Sprintf("involvedObject.kind=Notebook,involvedObject.name=%s", notebook.Name),
191-
})
178+
allevents, err := s.listers.events.Events(notebook.Namespace).List(labels.Everything())
192179
if err != nil {
193180
log.Printf("failed to load events for %s/%s: %v", notebook.Namespace, notebook.Name, err)
194181
}
195182

196183
// Filter past events
197-
events := make([]corev1.Event, 0)
198-
for _, event := range allevents.Items {
199-
if !event.CreationTimestamp.Before(&notebook.CreationTimestamp) {
200-
events = append(events, event)
184+
events := make([]*corev1.Event, 0)
185+
for _, event := range allevents {
186+
if event.InvolvedObject.Kind != "Notebook" || event.InvolvedObject.Name != notebook.Name || event.CreationTimestamp.Before(&notebook.CreationTimestamp) {
187+
continue
201188
}
189+
190+
events = append(events, event)
202191
}
203-
sort.Sort(EventsByTimestamp(events))
192+
sort.Sort(eventsByTimestamp(events))
204193

205194
imageparts := strings.SplitAfter(notebook.Spec.Template.Spec.Containers[0].Image, "/")
206195

@@ -234,7 +223,7 @@ func (s *server) GetNotebooks(w http.ResponseWriter, r *http.Request) {
234223
s.respond(w, r, resp)
235224
}
236225

237-
func (s *server) handleVolume(ctx context.Context, req volumerequest, notebook *notebooksv1.Notebook) error {
226+
func (s *server) handleVolume(ctx context.Context, req volumerequest, notebook *kubeflowv1.Notebook) error {
238227
if req.Type == VolumeTypeNew {
239228
// Create the PVC
240229
pvc := corev1.PersistentVolumeClaim{
@@ -308,13 +297,13 @@ func (s *server) NewNotebook(w http.ResponseWriter, r *http.Request) {
308297

309298
// Setup the notebook
310299
// TODO: Work with default CPU/memory limits from config
311-
notebook := notebooksv1.Notebook{
300+
notebook := kubeflowv1.Notebook{
312301
ObjectMeta: v1.ObjectMeta{
313302
Name: req.Name,
314303
Namespace: namespace,
315304
},
316-
Spec: notebooksv1.NotebookSpec{
317-
Template: notebooksv1.NotebookTemplateSpec{
305+
Spec: kubeflowv1.NotebookSpec{
306+
Template: kubeflowv1.NotebookTemplateSpec{
318307
Spec: corev1.PodSpec{
319308
ServiceAccountName: DefaultServiceAccountName,
320309
Containers: []corev1.Container{
@@ -374,15 +363,21 @@ func (s *server) NewNotebook(w http.ResponseWriter, r *http.Request) {
374363
}
375364

376365
// Add GPU
377-
if !req.GPUs.Quantity.IsZero() {
378-
notebook.Spec.Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(req.GPUs.Vendor)] = req.GPUs.Quantity
379-
notebook.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(req.GPUs.Vendor)] = req.GPUs.Quantity
366+
if req.GPUs.Quantity != "none" {
367+
qty, err := resource.ParseQuantity(req.GPUs.Quantity)
368+
if err != nil {
369+
s.error(w, r, err)
370+
return
371+
}
372+
373+
notebook.Spec.Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(req.GPUs.Vendor)] = qty
374+
notebook.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(req.GPUs.Vendor)] = qty
380375
}
381376

382377
log.Printf("creating notebook %q for %q", notebook.ObjectMeta.Name, namespace)
383378

384379
// Submit the notebook to the API server
385-
_, err = s.clientsets.notebooks.V1().Notebooks(namespace).Create(r.Context(), &notebook)
380+
_, err = s.clientsets.kubeflow.KubeflowV1().Notebooks(namespace).Create(r.Context(), &notebook, v1.CreateOptions{})
386381
if err != nil {
387382
s.error(w, r, err)
388383
return
@@ -401,7 +396,7 @@ func (s *server) DeleteNotebook(w http.ResponseWriter, r *http.Request) {
401396
log.Printf("deleting notebook %q for %q", notebook, namespace)
402397

403398
propagation := v1.DeletePropagationForeground
404-
err := s.clientsets.notebooks.V1().Notebooks(namespace).Delete(r.Context(), notebook, &v1.DeleteOptions{
399+
err := s.clientsets.kubeflow.KubeflowV1().Notebooks(namespace).Delete(r.Context(), notebook, v1.DeleteOptions{
405400
PropagationPolicy: &propagation,
406401
})
407402
if err != nil {

0 commit comments

Comments
 (0)