Skip to content

Commit 577131f

Browse files
author
Klaus Ma
authored
Merge pull request volcano-sh#36 from volcano-sh/feature/support_task_level_lifecycle
2 parents 277cf5c + f375255 commit 577131f

File tree

10 files changed

+166
-5
lines changed

10 files changed

+166
-5
lines changed

pkg/apis/batch/v1alpha1/job.go

+6
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ const (
9797
OutOfSyncEvent Event = "OutOfSync"
9898
// CommandIssuedEvent is triggered if a command is raised by user
9999
CommandIssuedEvent Event = "CommandIssued"
100+
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
101+
TaskCompletedEvent Event = "TaskCompleted"
100102
)
101103

102104
// Action is the action that Job controller will take according to the event.
@@ -114,6 +116,8 @@ const (
114116
// TerminateJobAction if this action is set, the whole job wil be terminated
115117
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
116118
TerminateJobAction Action = "TerminateJob"
119+
//CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
120+
CompleteJobAction Action = "CompleteJob"
117121

118122
// ResumeJobAction is the action to resume an aborted job.
119123
ResumeJobAction Action = "ResumeJob"
@@ -170,6 +174,8 @@ const (
170174
Running JobPhase = "Running"
171175
// Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
172176
Restarting JobPhase = "Restarting"
177+
// Completing is the phase that required tasks of job are completed, job starts to clean up
178+
Completing JobPhase = "Completing"
173179
// Completed is the phase that all tasks of Job are completed
174180
Completed JobPhase = "Completed"
175181
// Terminating is the phase that the Job is terminated, waiting for releasing pods

pkg/controllers/job/cache/cache.go

+31
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,37 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) {
230230
wait.Until(jc.processCleanupJob, 0, stopCh)
231231
}
232232

233+
func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
234+
var taskReplicas, completed int32
235+
236+
jobInfo, found := jc.jobs[jobKey]
237+
if !found {
238+
return false
239+
}
240+
241+
taskPods, found := jobInfo.Pods[taskName]
242+
243+
if !found {
244+
return false
245+
}
246+
247+
for _, task := range jobInfo.Job.Spec.Tasks {
248+
if task.Name == taskName {
249+
taskReplicas = task.Replicas
250+
}
251+
}
252+
if taskReplicas <= 0 {
253+
return false
254+
}
255+
256+
for _, pod := range taskPods {
257+
if pod.Status.Phase == v1.PodSucceeded {
258+
completed += 1
259+
}
260+
}
261+
return completed >= taskReplicas
262+
}
263+
233264
func (jc *jobCache) processCleanupJob() {
234265
obj, shutdown := jc.deletedJobs.Get()
235266
if shutdown {

pkg/controllers/job/cache/interface.go

+2
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ type Cache interface {
3535
AddPod(pod *v1.Pod) error
3636
UpdatePod(pod *v1.Pod) error
3737
DeletePod(pod *v1.Pod) error
38+
39+
TaskCompleted(jobKey, taskName string) bool
3840
}

pkg/controllers/job/job_controller_handler.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,24 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
197197
return
198198
}
199199

200+
if err := cc.cache.UpdatePod(newPod); err != nil {
201+
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
202+
newPod.Namespace, newPod.Name, err)
203+
}
204+
200205
event := vkbatchv1.OutOfSyncEvent
201206
if oldPod.Status.Phase != v1.PodFailed &&
202207
newPod.Status.Phase == v1.PodFailed {
203208
event = vkbatchv1.PodFailedEvent
204209
}
205210

211+
if oldPod.Status.Phase != v1.PodSucceeded &&
212+
newPod.Status.Phase == v1.PodSucceeded {
213+
if cc.cache.TaskCompleted(vkcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
214+
event = vkbatchv1.TaskCompletedEvent
215+
}
216+
}
217+
206218
req := apis.Request{
207219
Namespace: newPod.Namespace,
208220
JobName: jobName,
@@ -212,11 +224,6 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
212224
JobVersion: int32(dVersion),
213225
}
214226

215-
if err := cc.cache.UpdatePod(newPod); err != nil {
216-
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
217-
newPod.Namespace, newPod.Name, err)
218-
}
219-
220227
cc.queue.Add(req)
221228
}
222229

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
Copyright 2019 The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package state
18+
19+
import (
20+
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
21+
"volcano.sh/volcano/pkg/controllers/job/apis"
22+
)
23+
24+
type completingState struct {
25+
job *apis.JobInfo
26+
}
27+
28+
func (ps *completingState) Execute(action vkv1.Action) error {
29+
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
30+
// If any "alive" pods, still in Completing phase
31+
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
32+
return vkv1.JobState{
33+
Phase: vkv1.Completing,
34+
}
35+
}
36+
37+
return vkv1.JobState{
38+
Phase: vkv1.Completed,
39+
}
40+
})
41+
}

pkg/controllers/job/state/factory.go

+2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ func NewState(jobInfo *apis.JobInfo) State {
5353
return &abortingState{job: jobInfo}
5454
case vkv1.Aborted:
5555
return &abortedState{job: jobInfo}
56+
case vkv1.Completing:
57+
return &completingState{job: jobInfo}
5658
}
5759

5860
// It's pending by default.

pkg/controllers/job/state/pending.go

+11
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
4646
phase = vkv1.Aborting
4747
}
4848

49+
return vkv1.JobState{
50+
Phase: phase,
51+
}
52+
})
53+
case vkv1.CompleteJobAction:
54+
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
55+
phase := vkv1.Completed
56+
if status.Terminating != 0 {
57+
phase = vkv1.Completing
58+
}
59+
4960
return vkv1.JobState{
5061
Phase: phase,
5162
}

pkg/controllers/job/state/running.go

+11
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ func (ps *runningState) Execute(action vkv1.Action) error {
5656
phase = vkv1.Terminating
5757
}
5858

59+
return vkv1.JobState{
60+
Phase: phase,
61+
}
62+
})
63+
case vkv1.CompleteJobAction:
64+
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
65+
phase := vkv1.Completed
66+
if status.Terminating != 0 {
67+
phase = vkv1.Completing
68+
}
69+
5970
return vkv1.JobState{
6071
Phase: phase,
6172
}

test/e2e/job_error_handling.go

+40
Original file line numberDiff line numberDiff line change
@@ -429,4 +429,44 @@ var _ = Describe("Job Error Handling", func() {
429429
Expect(err).NotTo(HaveOccurred())
430430
})
431431

432+
It("job level LifecyclePolicy, Event: TaskCompleted; Action: CompletedJob", func() {
433+
By("init test context")
434+
context := initTestContext()
435+
defer cleanupTestContext(context)
436+
437+
By("create job")
438+
job := createJob(context, &jobSpec{
439+
name: "any-restart-job",
440+
policies: []vkv1.LifecyclePolicy{
441+
{
442+
Action: vkv1.CompleteJobAction,
443+
Event: vkv1.TaskCompletedEvent,
444+
},
445+
},
446+
tasks: []taskSpec{
447+
{
448+
name: "completed-task",
449+
img: defaultBusyBoxImage,
450+
min: 2,
451+
rep: 2,
452+
//Sleep 5 seconds ensure job in running state
453+
command: "sleep 5",
454+
},
455+
{
456+
name: "terminating-task",
457+
img: defaultNginxImage,
458+
min: 2,
459+
rep: 2,
460+
},
461+
},
462+
})
463+
464+
By("job scheduled, then task 'completed_task' finished and job finally complete")
465+
// job phase: pending -> running -> completing -> completed
466+
err := waitJobStates(context, job, []vkv1.JobPhase{
467+
vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed})
468+
Expect(err).NotTo(HaveOccurred())
469+
470+
})
471+
432472
})

test/e2e/util.go

+10
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,16 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error {
455455
return nil
456456
}
457457

458+
func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error {
459+
for _, phase := range phases {
460+
err := wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, phase))
461+
if err != nil {
462+
return err
463+
}
464+
}
465+
return nil
466+
}
467+
458468
func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error {
459469
return wait.Poll(100*time.Millisecond, twoMinute, func() (bool, error) {
460470
newJob, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})

0 commit comments

Comments
 (0)