Skip to content

Commit 31cb3b7

Browse files
author
Klaus Ma
authored
Merge pull request volcano-sh#81 from volcano-sh/exit-code
2 parents 2b4eeba + 81024f7 commit 31cb3b7

25 files changed

+1429
-23
lines changed

Gopkg.lock

+17-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

+4
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,7 @@ required = [
100100
name = "k8s.io/code-generator"
101101
unused-packages = false
102102

103+
104+
[[constraint]]
105+
name = "github.com/hashicorp/go-multierror"
106+
version = "1.0.0"

pkg/admission/admission_controller.go

+34-11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/golang/glog"
2323

24+
"github.com/hashicorp/go-multierror"
2425
"k8s.io/api/admission/v1beta1"
2526
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
2627
corev1 "k8s.io/api/core/v1"
@@ -61,27 +62,49 @@ func ToAdmissionResponse(err error) *v1beta1.AdmissionResponse {
6162
}
6263
}
6364

64-
func CheckPolicyDuplicate(policies []v1alpha1.LifecyclePolicy) (string, bool) {
65-
policyEvents := map[v1alpha1.Event]v1alpha1.Event{}
66-
hasDuplicate := false
67-
var duplicateInfo string
65+
func ValidatePolicies(policies []v1alpha1.LifecyclePolicy) error {
66+
var err error
67+
policyEvents := map[v1alpha1.Event]struct{}{}
68+
exitCodes := map[int32]struct{}{}
6869

6970
for _, policy := range policies {
70-
if _, found := policyEvents[policy.Event]; found {
71-
hasDuplicate = true
72-
duplicateInfo = fmt.Sprintf("%v", policy.Event)
71+
if policy.Event != "" && policy.ExitCode != nil {
72+
err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously"))
7373
break
74+
}
75+
76+
if policy.Event == "" && policy.ExitCode == nil {
77+
err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified"))
78+
break
79+
}
80+
81+
if policy.Event != "" {
82+
// TODO: check event is in supported Event
83+
if _, found := policyEvents[policy.Event]; found {
84+
err = multierror.Append(err, fmt.Errorf("duplicate event %v", policy.Event))
85+
break
86+
} else {
87+
policyEvents[policy.Event] = struct{}{}
88+
}
7489
} else {
75-
policyEvents[policy.Event] = policy.Event
90+
if *policy.ExitCode == 0 {
91+
err = multierror.Append(err, fmt.Errorf("0 is not a valid error code"))
92+
break
93+
}
94+
if _, found := exitCodes[*policy.ExitCode]; found {
95+
err = multierror.Append(err, fmt.Errorf("duplicate exitCode %v", *policy.ExitCode))
96+
break
97+
} else {
98+
exitCodes[*policy.ExitCode] = struct{}{}
99+
}
76100
}
77101
}
78102

79103
if _, found := policyEvents[v1alpha1.AnyEvent]; found && len(policyEvents) > 1 {
80-
hasDuplicate = true
81-
duplicateInfo = "if there's * here, no other policy should be here"
104+
err = multierror.Append(err, fmt.Errorf("if there's * here, no other policy should be here"))
82105
}
83106

84-
return duplicateInfo, hasDuplicate
107+
return err
85108
}
86109

87110
func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource) (v1alpha1.Job, error) {

pkg/admission/admit_job.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"fmt"
2121
"reflect"
2222
"strings"
23-
"volcano.sh/volcano/pkg/controllers/job/plugins"
2423

2524
"github.com/golang/glog"
2625

@@ -29,6 +28,7 @@ import (
2928
"k8s.io/apimachinery/pkg/util/validation"
3029

3130
v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
31+
"volcano.sh/volcano/pkg/controllers/job/plugins"
3232
)
3333

3434
// job admit.
@@ -98,22 +98,20 @@ func validateJobSpec(jobSpec v1alpha1.JobSpec, reviewResponse *v1beta1.Admission
9898
taskNames[task.Name] = task.Name
9999
}
100100

101-
//duplicate task event policies
102-
if duplicateInfo, ok := CheckPolicyDuplicate(task.Policies); ok {
103-
msg = msg + fmt.Sprintf(" duplicated task event policies: %s;", duplicateInfo)
101+
if err := ValidatePolicies(task.Policies); err != nil {
102+
msg = msg + err.Error()
104103
}
105104
}
106105

107106
if totalReplicas < jobSpec.MinAvailable {
108107
msg = msg + " 'minAvailable' should not be greater than total replicas in tasks;"
109108
}
110109

111-
//duplicate job event policies
112-
if duplicateInfo, ok := CheckPolicyDuplicate(jobSpec.Policies); ok {
113-
msg = msg + fmt.Sprintf(" duplicated job event policies: %s;", duplicateInfo)
110+
if err := ValidatePolicies(jobSpec.Policies); err != nil {
111+
msg = msg + err.Error()
114112
}
115113

116-
//invalid job plugins
114+
// invalid job plugins
117115
if len(jobSpec.Plugins) != 0 {
118116
for name := range jobSpec.Plugins {
119117
if _, found := plugins.GetPluginBuilder(name); !found {

pkg/controllers/apis/job_info.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,14 @@ type Request struct {
133133
TaskName string
134134

135135
Event v1alpha1.Event
136+
ExitCode int32
136137
Action v1alpha1.Action
137138
JobVersion int32
138139
}
139140

140141
func (r Request) String() string {
141142
return fmt.Sprintf(
142-
"Job: %s/%s, Task:%s, Event:%s, Action:%s, JobVersion: %d",
143-
r.Namespace, r.JobName, r.TaskName, r.Event, r.Action, r.JobVersion)
143+
"Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
144+
r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)
144145

145146
}

pkg/controllers/job/job_controller_handler.go

+5
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,13 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
203203
}
204204

205205
event := vkbatchv1.OutOfSyncEvent
206+
var exitCode int32
206207
if oldPod.Status.Phase != v1.PodFailed &&
207208
newPod.Status.Phase == v1.PodFailed {
208209
event = vkbatchv1.PodFailedEvent
210+
// TODO: currently only one container pod is supported by volcano
211+
// Once multi containers pod is supported, update accordingly.
212+
exitCode = newPod.Status.ContainerStatuses[0].State.Terminated.ExitCode
209213
}
210214

211215
if oldPod.Status.Phase != v1.PodSucceeded &&
@@ -221,6 +225,7 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
221225
TaskName: taskName,
222226

223227
Event: event,
228+
ExitCode: exitCode,
224229
JobVersion: int32(dVersion),
225230
}
226231

pkg/controllers/job/job_controller_util.go

+10
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
185185
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
186186
return policy.Action
187187
}
188+
189+
// 0 is not an error code, is prevented in validation admission controller
190+
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
191+
return policy.Action
192+
}
188193
}
189194
break
190195
}
@@ -196,6 +201,11 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
196201
if policy.Event == req.Event || policy.Event == vkv1.AnyEvent {
197202
return policy.Action
198203
}
204+
205+
// 0 is not an error code, is prevented in validation admission controller
206+
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
207+
return policy.Action
208+
}
199209
}
200210

201211
return vkv1.SyncJobAction

test/e2e/admission.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ var _ = Describe("Job E2E Test: Test Admission service", func() {
9292
stError, ok := err.(*errors.StatusError)
9393
Expect(ok).To(Equal(true))
9494
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
95-
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated job event policies"))
95+
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicate event PodFailed"))
9696
})
9797

9898
It("Min Available illegal", func() {

test/e2e/job_error_handling.go

+38
Original file line numberDiff line numberDiff line change
@@ -469,4 +469,42 @@ var _ = Describe("Job Error Handling", func() {
469469

470470
})
471471

472+
It("job level LifecyclePolicy, error code: 3; Action: RestartJob", func() {
473+
By("init test context")
474+
context := initTestContext()
475+
defer cleanupTestContext(context)
476+
477+
By("create job")
478+
var erroCode int32 = 3
479+
job := createJob(context, &jobSpec{
480+
name: "errorcode-restart-job",
481+
policies: []vkv1.LifecyclePolicy{
482+
{
483+
Action: vkv1.RestartJobAction,
484+
ExitCode: &erroCode,
485+
},
486+
},
487+
tasks: []taskSpec{
488+
{
489+
name: "success",
490+
img: defaultNginxImage,
491+
min: 1,
492+
rep: 1,
493+
},
494+
{
495+
name: "fail",
496+
img: defaultNginxImage,
497+
min: 1,
498+
rep: 1,
499+
command: "sleep 10s && exit 3",
500+
restartPolicy: v1.RestartPolicyNever,
501+
},
502+
},
503+
})
504+
505+
// job phase: pending -> running -> restarting
506+
err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting})
507+
Expect(err).NotTo(HaveOccurred())
508+
})
509+
472510
})

0 commit comments

Comments
 (0)