Skip to content

Commit bdc3ff1

Browse files
author
mada 00483107
committed
Merge branch 'opt/opt_task_related' into 'master'
Fix infinite loop when JobInfo incomplete CloseBug volcano-sh#39 by accepting updating existing JobInfo object in cache when Job attribute is empty. Also add some useful log output. Issues info: Issue ID: 39 Title: Infinite loop in vk-controller when pod event comes ahead of job event. Issue url: CBU-PaaS/Community/volcano/volcano#39 See merge request CBU-PaaS/Community/volcano/volcano!26
2 parents 823b7d4 + 6ce976c commit bdc3ff1

File tree

4 files changed

+29
-14
lines changed

4 files changed

+29
-14
lines changed

pkg/controllers/job/apis/types.go

+7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package apis
1818

1919
import (
20+
"fmt"
2021
"k8s.io/api/core/v1"
2122

2223
vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
@@ -35,3 +36,9 @@ type Request struct {
3536
Event vkbatchv1.Event
3637
Action vkbatchv1.Action
3738
}
39+
40+
func (r Request) String() string {
41+
return fmt.Sprintf(
42+
"Job: %s/%s, Task:%s, Event:%s, Action:%s",
43+
r.Namespace, r.JobName, r.TaskName, r.Event, r.Action)
44+
}

pkg/controllers/job/cache/cache.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,14 @@ func (jc *jobCache) Add(obj *v1alpha1.Job) error {
116116
defer jc.Unlock()
117117

118118
key := JobKey(obj)
119-
if _, found := jc.jobs[key]; found {
120-
return fmt.Errorf("duplicated job <%v>", key)
119+
if job, found := jc.jobs[key]; found {
120+
// JobInfo was created by pod event previously,
121+
// therefore we will have an empty Job attribute.
122+
if job.Job == nil {
123+
job.Job = obj
124+
} else {
125+
return fmt.Errorf("duplicated job <%v>", key)
126+
}
121127
}
122128

123129
jc.jobs[key] = &apis.JobInfo{

pkg/controllers/job/job_controller.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,9 @@ func (cc *Controller) worker() {
218218
}
219219

220220
if jobInfo.Job == nil {
221-
glog.V(3).Infof("Cache is out of sync for <%v>, retry it.", req)
221+
glog.V(3).Infof(
222+
"Cache is incomplete for request <%v>, will retry it later.",
223+
req)
222224
cc.queue.AddRateLimited(req)
223225
return
224226
}

pkg/controllers/job/job_controller_handler.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (cc *Controller) addJob(obj interface{}) {
7474

7575
// TODO(k82cn): if failed to add job, the cache should be refresh
7676
if err := cc.cache.Add(job); err != nil {
77-
glog.Errorf("Failed to add job <%s/%s>: %v",
77+
glog.Errorf("Failed to add job <%s/%s>: %v in cache",
7878
job.Namespace, job.Name, err)
7979
}
8080
cc.queue.Add(req)
@@ -95,7 +95,7 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
9595
}
9696

9797
if err := cc.cache.Update(newJob); err != nil {
98-
glog.Errorf("Failed to update job <%s/%s>: %v",
98+
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
9999
newJob.Namespace, newJob.Name, err)
100100
}
101101

@@ -110,7 +110,7 @@ func (cc *Controller) deleteJob(obj interface{}) {
110110
}
111111

112112
if err := cc.cache.Delete(job); err != nil {
113-
glog.Errorf("Failed to delete job <%s/%s>: %v",
113+
glog.Errorf("Failed to delete job <%s/%s>: %v in cache",
114114
job.Namespace, job.Name, err)
115115
}
116116
}
@@ -124,7 +124,7 @@ func (cc *Controller) addPod(obj interface{}) {
124124

125125
jobName, found := pod.Annotations[vkbatchv1.JobNameKey]
126126
if !found {
127-
glog.Errorf("Failed to find jobName of Pod <%s/%s>",
127+
glog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
128128
pod.Namespace, pod.Name)
129129
return
130130
}
@@ -137,7 +137,7 @@ func (cc *Controller) addPod(obj interface{}) {
137137
}
138138

139139
if err := cc.cache.AddPod(pod); err != nil {
140-
glog.Errorf("Failed to add Pod <%s/%s>: %v",
140+
glog.Errorf("Failed to add Pod <%s/%s>: %v to cache",
141141
pod.Namespace, pod.Name, err)
142142
}
143143
cc.queue.Add(req)
@@ -158,14 +158,14 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
158158

159159
taskName, found := newPod.Annotations[vkbatchv1.TaskSpecKey]
160160
if !found {
161-
glog.Errorf("Failed to find taskName of Pod <%s/%s>",
161+
glog.Infof("Failed to find taskName of Pod <%s/%s>, skipping",
162162
newPod.Namespace, newPod.Name)
163163
return
164164
}
165165

166166
jobName, found := newPod.Annotations[vkbatchv1.JobNameKey]
167167
if !found {
168-
glog.Errorf("Failed to find jobName of Pod <%s/%s>",
168+
glog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
169169
newPod.Namespace, newPod.Name)
170170
return
171171
}
@@ -185,7 +185,7 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
185185
}
186186

187187
if err := cc.cache.UpdatePod(newPod); err != nil {
188-
glog.Errorf("Failed to update Pod <%s/%s>: %v",
188+
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
189189
newPod.Namespace, newPod.Name, err)
190190
}
191191

@@ -211,14 +211,14 @@ func (cc *Controller) deletePod(obj interface{}) {
211211

212212
taskName, found := pod.Annotations[vkbatchv1.TaskSpecKey]
213213
if !found {
214-
glog.Errorf("Failed to find taskName of Pod <%s/%s>",
214+
glog.Infof("Failed to find taskName of Pod <%s/%s>, skipping",
215215
pod.Namespace, pod.Name)
216216
return
217217
}
218218

219219
jobName, found := pod.Annotations[vkbatchv1.JobNameKey]
220220
if !found {
221-
glog.Errorf("Failed to find jobName of Pod <%s/%s>",
221+
glog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
222222
pod.Namespace, pod.Name)
223223
return
224224
}
@@ -232,7 +232,7 @@ func (cc *Controller) deletePod(obj interface{}) {
232232
}
233233

234234
if err := cc.cache.DeletePod(pod); err != nil {
235-
glog.Errorf("Failed to update Pod <%s/%s>: %v",
235+
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
236236
pod.Namespace, pod.Name, err)
237237
}
238238

0 commit comments

Comments
 (0)