@@ -26,6 +26,7 @@ import (
26
26
27
27
"k8s.io/api/core/v1"
28
28
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
+ "k8s.io/apimachinery/pkg/util/wait"
29
30
"k8s.io/client-go/informers"
30
31
infov1 "k8s.io/client-go/informers/core/v1"
31
32
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
@@ -35,6 +36,7 @@ import (
35
36
"k8s.io/client-go/rest"
36
37
"k8s.io/client-go/tools/cache"
37
38
"k8s.io/client-go/tools/record"
39
+ "k8s.io/client-go/util/workqueue"
38
40
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
39
41
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
40
42
@@ -79,8 +81,8 @@ type SchedulerCache struct {
79
81
Nodes map [string ]* kbapi.NodeInfo
80
82
Queues map [kbapi.QueueID ]* kbapi.QueueInfo
81
83
82
- errTasks * cache. FIFO
83
- deletedJobs * cache. FIFO
84
+ errTasks workqueue. RateLimitingInterface
85
+ deletedJobs workqueue. RateLimitingInterface
84
86
85
87
namespaceAsQueue bool
86
88
}
@@ -129,12 +131,12 @@ type defaultStatusUpdater struct {
129
131
}
130
132
131
133
// Update pod with podCondition
132
- func (su * defaultStatusUpdater ) UpdatePod (pod * v1.Pod , condition * v1.PodCondition ) (* v1.Pod , error ) {
134
+ func (su * defaultStatusUpdater ) UpdatePodCondition (pod * v1.Pod , condition * v1.PodCondition ) (* v1.Pod , error ) {
133
135
glog .V (3 ).Infof ("Updating pod condition for %s/%s to (%s==%s)" , pod .Namespace , pod .Name , condition .Type , condition .Status )
134
136
if podutil .UpdatePodCondition (& pod .Status , condition ) {
135
137
return su .kubeclient .CoreV1 ().Pods (pod .Namespace ).UpdateStatus (pod )
136
138
}
137
- return nil , fmt . Errorf ( "failed to update pod condition" )
139
+ return pod , nil
138
140
}
139
141
140
142
// Update pod with podCondition
@@ -164,41 +166,13 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {
164
166
return dvb .volumeBinder .Binder .BindPodVolumes (task .Pod )
165
167
}
166
168
167
- func taskKey (obj interface {}) (string , error ) {
168
- if obj == nil {
169
- return "" , fmt .Errorf ("the object is nil" )
170
- }
171
-
172
- task , ok := obj .(* kbapi.TaskInfo )
173
-
174
- if ! ok {
175
- return "" , fmt .Errorf ("failed to convert %v to TaskInfo" , obj )
176
- }
177
-
178
- return string (task .UID ), nil
179
- }
180
-
181
- func jobKey (obj interface {}) (string , error ) {
182
- if obj == nil {
183
- return "" , fmt .Errorf ("the object is nil" )
184
- }
185
-
186
- job , ok := obj .(* kbapi.JobInfo )
187
-
188
- if ! ok {
189
- return "" , fmt .Errorf ("failed to convert %v to TaskInfo" , obj )
190
- }
191
-
192
- return string (job .UID ), nil
193
- }
194
-
195
169
func newSchedulerCache (config * rest.Config , schedulerName string , nsAsQueue bool ) * SchedulerCache {
196
170
sc := & SchedulerCache {
197
171
Jobs : make (map [kbapi.JobID ]* kbapi.JobInfo ),
198
172
Nodes : make (map [string ]* kbapi.NodeInfo ),
199
173
Queues : make (map [kbapi.QueueID ]* kbapi.QueueInfo ),
200
- errTasks : cache . NewFIFO ( taskKey ),
201
- deletedJobs : cache . NewFIFO ( jobKey ),
174
+ errTasks : workqueue . NewRateLimitingQueue ( workqueue . DefaultControllerRateLimiter () ),
175
+ deletedJobs : workqueue . NewRateLimitingQueue ( workqueue . DefaultControllerRateLimiter () ),
202
176
kubeclient : kubernetes .NewForConfigOrDie (config ),
203
177
kbclient : kbver .NewForConfigOrDie (config ),
204
178
namespaceAsQueue : nsAsQueue ,
@@ -324,10 +298,10 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
324
298
}
325
299
326
300
// Re-sync error tasks.
327
- go sc . resync ( )
301
+ go wait . Until ( sc . processResyncTask , 0 , stopCh )
328
302
329
303
// Cleanup jobs.
330
- go sc . cleanupJobs ( )
304
+ go wait . Until ( sc . processCleanupJob , 0 , stopCh )
331
305
}
332
306
333
307
func (sc * SchedulerCache ) WaitForCacheSync (stopCh <- chan struct {}) bool {
@@ -388,7 +362,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
388
362
}
389
363
390
364
// Add new task to node.
391
- node .UpdateTask (task )
365
+ if err := node .UpdateTask (task ); err != nil {
366
+ return err
367
+ }
392
368
393
369
p := task .Pod
394
370
@@ -430,7 +406,9 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error
430
406
task .NodeName = hostname
431
407
432
408
// Add task to the node.
433
- node .AddTask (task )
409
+ if err := node .AddTask (task ); err != nil {
410
+ return err
411
+ }
434
412
435
413
p := task .Pod
436
414
@@ -461,7 +439,7 @@ func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string)
461
439
pod := task .Pod .DeepCopy ()
462
440
463
441
sc .Recorder .Eventf (pod , v1 .EventTypeWarning , string (v1 .PodReasonUnschedulable ), message )
464
- if _ , err := sc .StatusUpdater .UpdatePod (pod , & v1.PodCondition {
442
+ if _ , err := sc .StatusUpdater .UpdatePodCondition (pod , & v1.PodCondition {
465
443
Type : v1 .PodScheduled ,
466
444
Status : v1 .ConditionFalse ,
467
445
Reason : v1 .PodReasonUnschedulable ,
@@ -476,74 +454,52 @@ func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string)
476
454
func (sc * SchedulerCache ) deleteJob (job * kbapi.JobInfo ) {
477
455
glog .V (3 ).Infof ("Try to delete Job <%v:%v/%v>" , job .UID , job .Namespace , job .Name )
478
456
479
- time .AfterFunc (5 * time .Second , func () {
480
- sc .deletedJobs .AddIfNotPresent (job )
481
- })
457
+ sc .deletedJobs .AddRateLimited (job )
482
458
}
483
459
484
- func (sc * SchedulerCache ) processCleanupJob () error {
485
- _ , err := sc .deletedJobs .Pop (func (obj interface {}) error {
486
- job , ok := obj .(* kbapi.JobInfo )
487
- if ! ok {
488
- return fmt .Errorf ("failed to convert %v to *v1.Pod" , obj )
489
- }
490
-
491
- func () {
492
- sc .Mutex .Lock ()
493
- defer sc .Mutex .Unlock ()
494
-
495
- if kbapi .JobTerminated (job ) {
496
- delete (sc .Jobs , job .UID )
497
- glog .V (3 ).Infof ("Job <%v:%v/%v> was deleted." , job .UID , job .Namespace , job .Name )
498
- } else {
499
- // Retry
500
- sc .deleteJob (job )
501
- }
502
- }()
460
+ func (sc * SchedulerCache ) processCleanupJob () {
461
+ obj , shutdown := sc .deletedJobs .Get ()
462
+ if shutdown {
463
+ return
464
+ }
503
465
504
- return nil
505
- })
466
+ job , found := obj .(* kbapi.JobInfo )
467
+ if ! found {
468
+ glog .Errorf ("Failed to convert <%v> to *JobInfo" , obj )
469
+ return
470
+ }
506
471
507
- return err
508
- }
472
+ sc . Mutex . Lock ()
473
+ defer sc . Mutex . Unlock ()
509
474
510
- func ( sc * SchedulerCache ) cleanupJobs ( ) {
511
- for {
512
- err := sc . processCleanupJob ( )
513
- if err != nil {
514
- glog . Errorf ( "Failed to process job clean up: %v" , err )
515
- }
475
+ if kbapi . JobTerminated ( job ) {
476
+ delete ( sc . Jobs , job . UID )
477
+ glog . V ( 3 ). Infof ( "Job <%v:%v/%v> was deleted." , job . UID , job . Namespace , job . Name )
478
+ } else {
479
+ // Retry
480
+ sc . deleteJob ( job )
516
481
}
517
482
}
518
483
519
484
func (sc * SchedulerCache ) resyncTask (task * kbapi.TaskInfo ) {
520
- sc .errTasks .AddIfNotPresent (task )
485
+ sc .errTasks .AddRateLimited (task )
521
486
}
522
487
523
- func (sc * SchedulerCache ) resync () {
524
- for {
525
- err := sc .processResyncTask ()
526
- if err != nil {
527
- glog .Errorf ("Failed to process resync: %v" , err )
528
- }
488
+ func (sc * SchedulerCache ) processResyncTask () {
489
+ obj , shutdown := sc .errTasks .Get ()
490
+ if shutdown {
491
+ return
492
+ }
493
+ task , ok := obj .(* kbapi.TaskInfo )
494
+ if ! ok {
495
+ glog .Errorf ("failed to convert %v to *v1.Pod" , obj )
496
+ return
529
497
}
530
- }
531
-
532
- func (sc * SchedulerCache ) processResyncTask () error {
533
- _ , err := sc .errTasks .Pop (func (obj interface {}) error {
534
- task , ok := obj .(* kbapi.TaskInfo )
535
- if ! ok {
536
- return fmt .Errorf ("failed to convert %v to *v1.Pod" , obj )
537
- }
538
-
539
- if err := sc .syncTask (task ); err != nil {
540
- glog .Errorf ("Failed to sync pod <%v/%v>" , task .Namespace , task .Name )
541
- return err
542
- }
543
- return nil
544
- })
545
498
546
- return err
499
+ if err := sc .syncTask (task ); err != nil {
500
+ glog .Errorf ("Failed to sync pod <%v/%v>, retry it." , task .Namespace , task .Name )
501
+ sc .resyncTask (task )
502
+ }
547
503
}
548
504
549
505
func (sc * SchedulerCache ) Snapshot () * kbapi.ClusterInfo {
0 commit comments