Skip to content

Commit 1be22b5

Browse files
author
Klaus Ma
authored
Merge pull request volcano-sh#92 from hzxuzhonghu/job-controller
2 parents 60e8741 + 22879b0 commit 1be22b5

File tree

4 files changed

+91
-168
lines changed

4 files changed

+91
-168
lines changed

pkg/apis/helpers/helpers.go

-10
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,6 @@ func CreateConfigMapIfNotExist(job *vkv1.Job, kubeClients *kubernetes.Clientset,
9494
}
9595

9696
func DeleteConfigmap(job *vkv1.Job, kubeClients *kubernetes.Clientset, cmName string) error {
97-
if _, err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Get(cmName, metav1.GetOptions{}); err != nil {
98-
if !apierrors.IsNotFound(err) {
99-
glog.V(3).Infof("Failed to get Configmap for Job <%s/%s>: %v",
100-
job.Namespace, job.Name, err)
101-
return err
102-
} else {
103-
return nil
104-
}
105-
}
106-
10797
if err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Delete(cmName, nil); err != nil {
10898
if !apierrors.IsNotFound(err) {
10999
glog.Errorf("Failed to delete Configmap of Job %v/%v: %v",

pkg/controllers/job/job_controller.go

+6-46
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@ limitations under the License.
1717
package job
1818

1919
import (
20-
"fmt"
21-
2220
"github.com/golang/glog"
2321

2422
"k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/util/runtime"
2623
"k8s.io/apimachinery/pkg/util/wait"
2724
"k8s.io/client-go/informers"
2825
"k8s.io/client-go/kubernetes"
@@ -38,8 +35,6 @@ import (
3835
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
3936
kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1"
4037

41-
v1corev1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
42-
"volcano.sh/volcano/pkg/apis/helpers"
4338
vkver "volcano.sh/volcano/pkg/client/clientset/versioned"
4439
vkscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme"
4540
vkinfoext "volcano.sh/volcano/pkg/client/informers/externalversions"
@@ -127,53 +122,18 @@ func NewJobController(config *rest.Config) *Controller {
127122
cc.jobSynced = cc.jobInformer.Informer().HasSynced
128123

129124
cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Bus().V1alpha1().Commands()
130-
cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
131-
FilterFunc: func(obj interface{}) bool {
132-
switch t := obj.(type) {
133-
case *v1corev1.Command:
134-
return helpers.ControlledBy(t, helpers.JobKind)
135-
case cache.DeletedFinalStateUnknown:
136-
if cmd, ok := t.Obj.(*v1corev1.Command); ok {
137-
return helpers.ControlledBy(cmd, helpers.JobKind)
138-
}
139-
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Command", obj))
140-
return false
141-
default:
142-
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj))
143-
return false
144-
}
145-
},
146-
Handler: cache.ResourceEventHandlerFuncs{
147-
AddFunc: cc.addCommand,
148-
},
125+
cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
126+
AddFunc: cc.addCommand,
149127
})
150128
cc.cmdLister = cc.cmdInformer.Lister()
151129
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced
152130

153131
cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
154132
podInformer := cc.sharedInformers.Core().V1().Pods()
155-
156-
podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
157-
FilterFunc: func(obj interface{}) bool {
158-
switch t := obj.(type) {
159-
case *v1.Pod:
160-
return helpers.ControlledBy(t, helpers.JobKind)
161-
case cache.DeletedFinalStateUnknown:
162-
if pod, ok := t.Obj.(*v1.Pod); ok {
163-
return helpers.ControlledBy(pod, helpers.JobKind)
164-
}
165-
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj))
166-
return false
167-
default:
168-
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj))
169-
return false
170-
}
171-
},
172-
Handler: cache.ResourceEventHandlerFuncs{
173-
AddFunc: cc.addPod,
174-
UpdateFunc: cc.updatePod,
175-
DeleteFunc: cc.deletePod,
176-
},
133+
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
134+
AddFunc: cc.addPod,
135+
UpdateFunc: cc.updatePod,
136+
DeleteFunc: cc.deletePod,
177137
})
178138

179139
cc.podLister = podInformer.Lister()

pkg/controllers/job/job_controller_actions.go

+59-97
Original file line numberDiff line numberDiff line change
@@ -66,38 +66,20 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
6666
continue
6767
}
6868

69-
switch pod.Status.Phase {
70-
case v1.PodRunning:
71-
err := cc.deleteJobPod(job.Name, pod)
72-
if err != nil {
73-
running++
74-
errs = append(errs, err)
75-
continue
76-
}
69+
if err := cc.deleteJobPod(job.Name, pod); err == nil {
7770
terminating++
78-
case v1.PodPending:
79-
err := cc.deleteJobPod(job.Name, pod)
80-
if err != nil {
71+
} else {
72+
errs = append(errs, err)
73+
switch pod.Status.Phase {
74+
case v1.PodRunning:
75+
running++
76+
case v1.PodPending:
8177
pending++
82-
errs = append(errs, err)
83-
continue
84-
}
85-
terminating++
86-
case v1.PodSucceeded:
87-
err := cc.deleteJobPod(job.Name, pod)
88-
if err != nil {
78+
case v1.PodSucceeded:
8979
succeeded++
90-
errs = append(errs, err)
91-
continue
92-
}
93-
case v1.PodFailed:
94-
err := cc.deleteJobPod(job.Name, pod)
95-
if err != nil {
80+
case v1.PodFailed:
9681
failed++
97-
errs = append(errs, err)
98-
continue
9982
}
100-
terminating++
10183
}
10284
}
10385
}
@@ -220,32 +202,23 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
220202
}
221203
podToCreate = append(podToCreate, newPod)
222204
} else {
205+
delete(pods, podName)
223206
if pod.DeletionTimestamp != nil {
224207
glog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
225208
terminating++
226-
delete(pods, podName)
227209
continue
228210
}
229211

230212
switch pod.Status.Phase {
231213
case v1.PodPending:
232-
if pod.DeletionTimestamp != nil {
233-
terminating++
234-
} else {
235-
pending++
236-
}
214+
pending++
237215
case v1.PodRunning:
238-
if pod.DeletionTimestamp != nil {
239-
terminating++
240-
} else {
241-
running++
242-
}
216+
running++
243217
case v1.PodSucceeded:
244218
succeeded++
245219
case v1.PodFailed:
246220
failed++
247221
}
248-
delete(pods, podName)
249222
}
250223
}
251224

@@ -260,7 +233,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
260233
go func(pod *v1.Pod) {
261234
defer waitCreationGroup.Done()
262235
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
263-
if err != nil {
236+
if err != nil && !apierrors.IsAlreadyExists(err) {
264237
// Failed to create Pod, waitCreationGroup a moment and then create it again
265238
// This is to ensure all podsMap under the same Job created
266239
// So gang-scheduling could schedule the Job successfully
@@ -280,6 +253,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
280253
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
281254
}
282255

256+
// TODO: Can hardly imagine when this is necessary.
283257
// Delete unnecessary pods.
284258
waitDeletionGroup := sync.WaitGroup{}
285259
waitDeletionGroup.Add(len(podToDelete))
@@ -337,16 +311,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
337311
return nil
338312
}
339313

340-
func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 {
341-
if current == 0 {
342-
current += 1
343-
}
344-
if bumpVersion {
345-
current += 1
346-
}
347-
return current
348-
}
349-
350314
func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error {
351315
// If Service does not exist, create one for Job.
352316
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
@@ -397,68 +361,66 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
397361
// If input/output PVC does not exist, create them for Job.
398362
inputPVC := job.Annotations[admissioncontroller.PVCInputName]
399363
outputPVC := job.Annotations[admissioncontroller.PVCOutputName]
400-
if job.Spec.Input != nil {
401-
if job.Spec.Input.VolumeClaim != nil {
402-
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil {
403-
if !apierrors.IsNotFound(err) {
404-
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v",
405-
job.Namespace, job.Name, err)
406-
return err
407-
}
364+
if job.Spec.Input != nil && job.Spec.Input.VolumeClaim != nil {
365+
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil {
366+
if !apierrors.IsNotFound(err) {
367+
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v",
368+
job.Namespace, job.Name, err)
369+
return err
370+
}
408371

409-
pvc := &v1.PersistentVolumeClaim{
410-
ObjectMeta: metav1.ObjectMeta{
411-
Namespace: job.Namespace,
412-
Name: inputPVC,
413-
OwnerReferences: []metav1.OwnerReference{
414-
*metav1.NewControllerRef(job, helpers.JobKind),
415-
},
372+
pvc := &v1.PersistentVolumeClaim{
373+
ObjectMeta: metav1.ObjectMeta{
374+
Namespace: job.Namespace,
375+
Name: inputPVC,
376+
OwnerReferences: []metav1.OwnerReference{
377+
*metav1.NewControllerRef(job, helpers.JobKind),
416378
},
417-
Spec: *job.Spec.Input.VolumeClaim,
418-
}
379+
},
380+
Spec: *job.Spec.Input.VolumeClaim,
381+
}
419382

420-
glog.V(3).Infof("Try to create input PVC: %v", pvc)
383+
glog.V(3).Infof("Try to create input PVC: %v", pvc)
421384

422-
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
423-
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
424-
job.Namespace, job.Name, err)
425-
return err
426-
}
385+
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
386+
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
387+
job.Namespace, job.Name, err)
388+
return err
427389
}
428390
}
429391
}
430-
if job.Spec.Output != nil {
431-
if job.Spec.Output.VolumeClaim != nil {
432-
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
433-
if !apierrors.IsNotFound(err) {
434-
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
435-
job.Namespace, job.Name, err)
436-
return err
437-
}
438392

439-
pvc := &v1.PersistentVolumeClaim{
440-
ObjectMeta: metav1.ObjectMeta{
441-
Namespace: job.Namespace,
442-
Name: outputPVC,
443-
OwnerReferences: []metav1.OwnerReference{
444-
*metav1.NewControllerRef(job, helpers.JobKind),
445-
},
393+
if job.Spec.Output != nil && job.Spec.Output.VolumeClaim != nil {
394+
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
395+
if !apierrors.IsNotFound(err) {
396+
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
397+
job.Namespace, job.Name, err)
398+
return err
399+
}
400+
401+
pvc := &v1.PersistentVolumeClaim{
402+
ObjectMeta: metav1.ObjectMeta{
403+
Namespace: job.Namespace,
404+
Name: outputPVC,
405+
OwnerReferences: []metav1.OwnerReference{
406+
*metav1.NewControllerRef(job, helpers.JobKind),
446407
},
447-
Spec: *job.Spec.Output.VolumeClaim,
448-
}
408+
},
409+
Spec: *job.Spec.Output.VolumeClaim,
410+
}
449411

450-
glog.V(3).Infof("Try to create output PVC: %v", pvc)
412+
glog.V(3).Infof("Try to create output PVC: %v", pvc)
451413

452-
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
453-
if !apierrors.IsAlreadyExists(err) {
454-
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
455-
job.Namespace, job.Name, err)
456-
return err
457-
}
414+
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
415+
if !apierrors.IsAlreadyExists(err) {
416+
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
417+
job.Namespace, job.Name, err)
418+
return err
458419
}
459420
}
460421
}
461422
}
423+
462424
return nil
463425
}
464426

0 commit comments

Comments
 (0)