Skip to content

Commit 12c0765

Browse files
author
Klaus Ma
authored
Merge pull request volcano-sh#57 from volcano-sh/controller
2 parents f8f8722 + 3f533a4 commit 12c0765

File tree

3 files changed

+44
-30
lines changed

3 files changed

+44
-30
lines changed

pkg/controllers/job/cache/cache.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"sync"
2222

2323
"github.com/golang/glog"
24-
2524
"k8s.io/api/core/v1"
2625
"k8s.io/apimachinery/pkg/util/wait"
2726
"k8s.io/client-go/util/workqueue"
@@ -268,17 +267,22 @@ func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
268267
return completed >= taskReplicas
269268
}
270269

271-
func (jc *jobCache) processCleanupJob() {
270+
func (jc *jobCache) worker() {
271+
for jc.processCleanupJob() {
272+
}
273+
}
274+
275+
func (jc *jobCache) processCleanupJob() bool {
272276
obj, shutdown := jc.deletedJobs.Get()
273277
if shutdown {
274-
return
278+
return false
275279
}
276280
defer jc.deletedJobs.Done(obj)
277281

278282
job, ok := obj.(*apis.JobInfo)
279283
if !ok {
280284
glog.Errorf("failed to convert %v to *apis.JobInfo", obj)
281-
return
285+
return true
282286
}
283287

284288
jc.Mutex.Lock()
@@ -293,6 +297,7 @@ func (jc *jobCache) processCleanupJob() {
293297
// Retry
294298
jc.deleteJob(job)
295299
}
300+
return true
296301
}
297302

298303
func (jc *jobCache) deleteJob(job *apis.JobInfo) {

pkg/controllers/job/job_controller.go

+27-24
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"k8s.io/apimachinery/pkg/util/runtime"
2626
"k8s.io/apimachinery/pkg/util/wait"
2727
"k8s.io/client-go/informers"
28-
coreinformers "k8s.io/client-go/informers/core/v1"
2928
"k8s.io/client-go/kubernetes"
3029
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
3130
corelisters "k8s.io/client-go/listers/core/v1"
@@ -60,12 +59,10 @@ type Controller struct {
6059
vkClients *vkver.Clientset
6160
kbClients *kbver.Clientset
6261

63-
jobInformer vkbatchinfo.JobInformer
64-
podInformer coreinformers.PodInformer
65-
pvcInformer coreinformers.PersistentVolumeClaimInformer
66-
pgInformer kbinfo.PodGroupInformer
67-
svcInformer coreinformers.ServiceInformer
68-
cmdInformer vkcoreinfo.CommandInformer
62+
jobInformer vkbatchinfo.JobInformer
63+
pgInformer kbinfo.PodGroupInformer
64+
cmdInformer vkcoreinfo.CommandInformer
65+
sharedInformers informers.SharedInformerFactory
6966

7067
// A store of jobs
7168
jobLister vkbatchlister.JobLister
@@ -152,9 +149,10 @@ func NewJobController(config *rest.Config) *Controller {
152149
cc.cmdLister = cc.cmdInformer.Lister()
153150
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced
154151

155-
cc.podInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Pods()
152+
cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
153+
podInformer := cc.sharedInformers.Core().V1().Pods()
156154

157-
cc.podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
155+
podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
158156
FilterFunc: func(obj interface{}) bool {
159157
switch t := obj.(type) {
160158
case *v1.Pod:
@@ -177,16 +175,16 @@ func NewJobController(config *rest.Config) *Controller {
177175
},
178176
})
179177

180-
cc.podLister = cc.podInformer.Lister()
181-
cc.podSynced = cc.podInformer.Informer().HasSynced
178+
cc.podLister = podInformer.Lister()
179+
cc.podSynced = podInformer.Informer().HasSynced
182180

183-
cc.pvcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().PersistentVolumeClaims()
184-
cc.pvcLister = cc.pvcInformer.Lister()
185-
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced
181+
pvcInformer := cc.sharedInformers.Core().V1().PersistentVolumeClaims()
182+
cc.pvcLister = pvcInformer.Lister()
183+
cc.pvcSynced = pvcInformer.Informer().HasSynced
186184

187-
cc.svcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Services()
188-
cc.svcLister = cc.svcInformer.Lister()
189-
cc.svcSynced = cc.svcInformer.Informer().HasSynced
185+
svcInformer := cc.sharedInformers.Core().V1().Services()
186+
cc.svcLister = svcInformer.Lister()
187+
cc.svcSynced = svcInformer.Informer().HasSynced
190188

191189
cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha1().PodGroups()
192190
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -205,11 +203,9 @@ func NewJobController(config *rest.Config) *Controller {
205203
// Run start JobController
206204
func (cc *Controller) Run(stopCh <-chan struct{}) {
207205
go cc.jobInformer.Informer().Run(stopCh)
208-
go cc.podInformer.Informer().Run(stopCh)
209-
go cc.pvcInformer.Informer().Run(stopCh)
210206
go cc.pgInformer.Informer().Run(stopCh)
211-
go cc.svcInformer.Informer().Run(stopCh)
212207
go cc.cmdInformer.Informer().Run(stopCh)
208+
go cc.sharedInformers.Start(stopCh)
213209

214210
cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
215211
cc.svcSynced, cc.cmdSynced, cc.pvcSynced)
@@ -223,10 +219,15 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
223219
}
224220

225221
func (cc *Controller) worker() {
222+
for cc.processNextReq() {
223+
}
224+
}
225+
226+
func (cc *Controller) processNextReq() bool {
226227
obj, shutdown := cc.queue.Get()
227228
if shutdown {
228229
glog.Errorf("Fail to pop item from queue")
229-
return
230+
return false
230231
}
231232

232233
req := obj.(apis.Request)
@@ -238,14 +239,14 @@ func (cc *Controller) worker() {
238239
if err != nil {
239240
// TODO(k82cn): ignore not-ready error.
240241
glog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
241-
return
242+
return true
242243
}
243244

244245
st := state.NewState(jobInfo)
245246
if st == nil {
246247
glog.Errorf("Invalid state <%s> of Job <%v/%v>",
247248
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
248-
return
249+
return true
249250
}
250251

251252
action := applyPolicies(jobInfo.Job, &req)
@@ -257,9 +258,11 @@ func (cc *Controller) worker() {
257258
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
258259
// If any error, requeue it.
259260
cc.queue.AddRateLimited(req)
260-
return
261+
return true
261262
}
262263

263264
// If no error, forget it.
264265
cc.queue.Forget(req)
266+
267+
return true
265268
}

pkg/controllers/job/job_controller_handler.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -301,17 +301,22 @@ func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.Job
301301
}
302302

303303
func (cc *Controller) handleCommands() {
304+
for cc.processNextCommand() {
305+
}
306+
}
307+
308+
func (cc *Controller) processNextCommand() bool {
304309
obj, shutdown := cc.commandQueue.Get()
305310
if shutdown {
306-
return
311+
return false
307312
}
308313
cmd := obj.(*vkbusv1.Command)
309314
defer cc.commandQueue.Done(cmd)
310315

311316
if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil {
312317
glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name)
313318
cc.commandQueue.AddRateLimited(cmd)
314-
return
319+
return true
315320
}
316321
cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name,
317322
vkbatchv1.CommandIssued,
@@ -326,6 +331,7 @@ func (cc *Controller) handleCommands() {
326331

327332
cc.queue.Add(req)
328333

334+
return true
329335
}
330336

331337
func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {

0 commit comments

Comments
 (0)