Skip to content

Commit 6c069d4

Browse files
author
mada 00483107
committed
Merge branch 'dev' into 'master'
【Issue volcano-sh#57】【BUG2019040101375 】【BUG2019032102153 】增加policy中action和events的校验,去掉RestartTask action 单号: BUG2019040101375; BUG2019032102153 特性/模块名称: volcano-admission 修改原因: 1. policy中RestartTask action配置后不生效 2. policy中action和event缺少校验 修改内容: 1. RestartTask未被state实现,去掉这个action 2. 增加policy中可配置action和event的校验,action的可取值为"AbortJob","RestartJob","TerminateJob","CompleteJob","ResumeJob" events的可取值为"*","PodFailed","PodEvicted","Unknown",“TaskCompleted” 自验情况 自验通过 验证yaml [policy.yaml](/uploads/2d684ceab9b5a399571c4aff233e790b/policy.yaml) 验证结果 ![image](/uploads/13973e45fd75d45b611791fd039488be/image.png) ![image](/uploads/d0b9cb5e7a501dc87d53f78a4ac29477/image.png) Issues info: Issue ID: 57 Title: 【BUG2019040101375 】restartTask不生效【BUG2019032102153 】envents/action组合不合理 Issue url: CBU-PaaS/Community/volcano/volcano#57 See merge request CBU-PaaS/Community/volcano/volcano!92
2 parents e3c9966 + aae4f20 commit 6c069d4

File tree

6 files changed

+82
-13
lines changed

6 files changed

+82
-13
lines changed

docs/design/job-api.md

+11-8
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ const (
206206
OutOfSyncEvent Event = "OutOfSync"
207207
// CommandIssuedEvent is triggered if a command is raised by user
208208
CommandIssuedEvent Event = "CommandIssued"
209+
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
210+
TaskCompletedEvent Event = "TaskCompleted"
209211
)
210212

211213
// Action is the type of event handling
@@ -217,12 +219,11 @@ const (
217219
AbortJobAction Action = "AbortJob"
218220
// RestartJobAction if this action is set, the whole job will be restarted
219221
RestartJobAction Action = "RestartJob"
220-
// RestartTaskAction if this action is set, only the task will be restarted; default action.
221-
// This action can not work together with job level events, e.g. JobUnschedulable
222-
RestartTaskAction Action = "RestartTask"
223222
// TerminateJobAction if this action is set, the whole job wil be terminated
224223
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
225224
TerminateJobAction Action = "TerminateJob"
225+
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
226+
CompleteJobAction Action = "CompleteJob"
226227

227228
// ResumeJobAction is the action to resume an aborted job.
228229
ResumeJobAction Action = "ResumeJob"
@@ -300,8 +301,8 @@ spec:
300301
```
301302

302303
Some BigData framework (e.g. Spark) may have different requirements. Take Spark as example, the whole job will be restarted
303-
if 'driver' tasks failed and only restart the task if 'executor' tasks failed. As `RestartTask` is the default action of
304-
task events, `RestartJob` is set for driver `spec.tasks.policies` as follow.
304+
if 'driver' tasks failed and only restart the task if 'executor' tasks failed. `OnFailure` restartPolicy is set for executor
305+
and `RestartJob` is set for driver `spec.tasks.policies` as follow.
305306

306307
```yaml
307308
apiVersion: batch.volcano.sh/v1alpha1
@@ -327,6 +328,7 @@ spec:
327328
containers:
328329
- name: executor
329330
image: executor-img
331+
restartPolicy: OnFailure
330332
```
331333
332334
## Features Interaction
@@ -508,6 +510,8 @@ const (
508510
OutOfSyncEvent Event = "OutOfSync"
509511
// CommandIssuedEvent is triggered if a command is raised by user
510512
CommandIssuedEvent Event = "CommandIssued"
513+
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
514+
TaskCompletedEvent Event = "TaskCompleted"
511515
)
512516

513517
// Action is the action that Job controller will take according to the event.
@@ -519,12 +523,11 @@ const (
519523
AbortJobAction Action = "AbortJob"
520524
// RestartJobAction if this action is set, the whole job will be restarted
521525
RestartJobAction Action = "RestartJob"
522-
// RestartTaskAction if this action is set, only the task will be restarted; default action.
523-
// This action can not work together with job level events, e.g. JobUnschedulable
524-
RestartTaskAction Action = "RestartTask"
525526
// TerminateJobAction if this action is set, the whole job wil be terminated
526527
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
527528
TerminateJobAction Action = "TerminateJob"
529+
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
530+
CompleteJobAction Action = "CompleteJob"
528531

529532
// ResumeJobAction is the action to resume an aborted job.
530533
ResumeJobAction Action = "ResumeJob"

example/tensorflow-benchmark.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ spec:
1010
policies:
1111
- event: PodEvicted
1212
action: RestartJob
13-
- event: PodFailed
14-
action: RestartTask
1513
tasks:
1614
- replicas: 2
1715
name: ps

pkg/admission/admission_controller.go

+59
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/runtime"
2929
"k8s.io/apimachinery/pkg/runtime/serializer"
30+
"k8s.io/apimachinery/pkg/util/validation/field"
3031

3132
v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
3233
)
@@ -43,6 +44,27 @@ type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse
4344
var scheme = runtime.NewScheme()
4445
var Codecs = serializer.NewCodecFactory(scheme)
4546

47+
// policyEventMap defines all policy events and whether to allow external use
48+
var policyEventMap = map[v1alpha1.Event]bool{
49+
v1alpha1.AnyEvent: true,
50+
v1alpha1.PodFailedEvent: true,
51+
v1alpha1.PodEvictedEvent: true,
52+
v1alpha1.JobUnknownEvent: true,
53+
v1alpha1.TaskCompletedEvent: true,
54+
v1alpha1.OutOfSyncEvent: false,
55+
v1alpha1.CommandIssuedEvent: false,
56+
}
57+
58+
// policyActionMap defines all policy actions and whether to allow external use
59+
var policyActionMap = map[v1alpha1.Action]bool{
60+
v1alpha1.AbortJobAction: true,
61+
v1alpha1.RestartJobAction: true,
62+
v1alpha1.TerminateJobAction: true,
63+
v1alpha1.CompleteJobAction: true,
64+
v1alpha1.ResumeJobAction: true,
65+
v1alpha1.SyncJobAction: false,
66+
}
67+
4668
func init() {
4769
addToScheme(scheme)
4870
}
@@ -102,3 +124,40 @@ func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource
102124

103125
return job, nil
104126
}
127+
128+
func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path) error {
129+
errs := field.ErrorList{}
130+
for _, p := range policies {
131+
if allow, ok := policyEventMap[p.Event]; !ok || !allow {
132+
errs = append(errs, field.Invalid(fldPath, p.Event, fmt.Sprintf("invalid policy event")))
133+
}
134+
135+
if allow, ok := policyActionMap[p.Action]; !ok || !allow {
136+
errs = append(errs, field.Invalid(fldPath, p.Action, fmt.Sprintf("invalid policy action")))
137+
}
138+
}
139+
140+
return errs.ToAggregate()
141+
}
142+
143+
func getValidEvents() []v1alpha1.Event {
144+
var events []v1alpha1.Event
145+
for e, allow := range policyEventMap {
146+
if allow {
147+
events = append(events, e)
148+
}
149+
}
150+
151+
return events
152+
}
153+
154+
func getValidActions() []v1alpha1.Action {
155+
var actions []v1alpha1.Action
156+
for a, allow := range policyActionMap {
157+
if allow {
158+
actions = append(actions, a)
159+
}
160+
}
161+
162+
return actions
163+
}

pkg/admission/admit_job.go

+11
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"k8s.io/api/admission/v1beta1"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/util/validation/field"
2829

2930
v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
3031
)
@@ -99,6 +100,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
99100
if duplicateInfo, ok := CheckPolicyDuplicate(task.Policies); ok {
100101
msg = msg + fmt.Sprintf(" duplicated task event policies: %s;", duplicateInfo)
101102
}
103+
104+
if err := validatePolicies(task.Policies, field.NewPath("spec.tasks.policies")); err != nil {
105+
msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
106+
getValidEvents(), getValidActions())
107+
}
102108
}
103109

104110
if totalReplicas < job.Spec.MinAvailable {
@@ -110,6 +116,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
110116
msg = msg + fmt.Sprintf(" duplicated job event policies: %s;", duplicateInfo)
111117
}
112118

119+
if err := validatePolicies(job.Spec.Policies, field.NewPath("spec.policies")); err != nil {
120+
msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
121+
getValidEvents(), getValidActions())
122+
}
123+
113124
if msg != "" {
114125
reviewResponse.Allowed = false
115126
}

pkg/apis/batch/v1alpha1/job.go

-3
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,6 @@ const (
119119
AbortJobAction Action = "AbortJob"
120120
// RestartJobAction if this action is set, the whole job will be restarted
121121
RestartJobAction Action = "RestartJob"
122-
// RestartTaskAction if this action is set, only the task will be restarted; default action.
123-
// This action can not work together with job level events, e.g. JobUnschedulable
124-
RestartTaskAction Action = "RestartTask"
125122
// TerminateJobAction if this action is set, the whole job wil be terminated
126123
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
127124
TerminateJobAction Action = "TerminateJob"

test/e2e/mpi.go

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ var _ = Describe("MPI E2E Test", func() {
2525
plugins: map[string][]string{
2626
"ssh": []string{},
2727
"env": []string{},
28+
"svc": []string{},
2829
},
2930
tasks: []taskSpec{
3031
{

0 commit comments

Comments
 (0)