Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 68b458c

Browse files
authored
Merge pull request #602 from k82cn/automated-cherry-pick-of-#596-upstream-release-0.4
Automated cherry pick of #596: Set default PodGroup for Pods.
2 parents 68a0b9c + 2608273 commit 68b458c

File tree

8 files changed

+148
-56
lines changed

8 files changed

+148
-56
lines changed

pkg/scheduler/api/job_info.go

+49-13
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"k8s.io/apimachinery/pkg/types"
2828

2929
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
30-
arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
30+
"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
3131
"github.com/kubernetes-sigs/kube-batch/pkg/apis/utils"
3232
)
3333

@@ -40,6 +40,8 @@ type TaskInfo struct {
4040
Name string
4141
Namespace string
4242

43+
PodGroup *v1alpha1.PodGroup
44+
4345
Resreq *Resource
4446

4547
NodeName string
@@ -50,15 +52,34 @@ type TaskInfo struct {
5052
Pod *v1.Pod
5153
}
5254

53-
func getJobID(pod *v1.Pod) JobID {
55+
func getOwners(pod *v1.Pod) (JobID, *v1alpha1.PodGroup) {
5456
if len(pod.Annotations) != 0 {
55-
if gn, found := pod.Annotations[arbcorev1.GroupNameAnnotationKey]; found && len(gn) != 0 {
57+
if gn, found := pod.Annotations[v1alpha1.GroupNameAnnotationKey]; found && len(gn) != 0 {
5658
// Make sure Pod and PodGroup belong to the same namespace.
5759
jobID := fmt.Sprintf("%s/%s", pod.Namespace, gn)
58-
return JobID(jobID)
60+
return JobID(jobID), nil
5961
}
6062
}
61-
return JobID(utils.GetController(pod))
63+
64+
jobID := JobID(utils.GetController(pod))
65+
if len(jobID) == 0 {
66+
jobID = JobID(pod.UID)
67+
}
68+
69+
pg := &v1alpha1.PodGroup{
70+
ObjectMeta: metav1.ObjectMeta{
71+
Namespace: pod.Namespace,
72+
Name: string(jobID),
73+
Annotations: map[string]string{
74+
ShadowPodGroupKey: string(jobID),
75+
},
76+
},
77+
Spec: v1alpha1.PodGroupSpec{
78+
MinMember: 1,
79+
},
80+
}
81+
82+
return jobID, pg
6283
}
6384

6485
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
@@ -69,9 +90,12 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
6990
req.Add(NewResource(c.Resources.Requests))
7091
}
7192

93+
jobID, pg := getOwners(pod)
94+
7295
ti := &TaskInfo{
7396
UID: TaskID(pod.UID),
74-
Job: getJobID(pod),
97+
Job: jobID,
98+
PodGroup: pg,
7599
Name: pod.Name,
76100
Namespace: pod.Namespace,
77101
NodeName: pod.Spec.NodeName,
@@ -94,6 +118,7 @@ func (ti *TaskInfo) Clone() *TaskInfo {
94118
Job: ti.Job,
95119
Name: ti.Name,
96120
Namespace: ti.Namespace,
121+
PodGroup: ti.PodGroup,
97122
NodeName: ti.NodeName,
98123
Status: ti.Status,
99124
Priority: ti.Priority,
@@ -104,8 +129,8 @@ func (ti *TaskInfo) Clone() *TaskInfo {
104129
}
105130

106131
func (ti TaskInfo) String() string {
107-
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, resreq %v",
108-
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, ti.Resreq)
132+
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, type %v, pri %v, resreq %v",
133+
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.PodGroup, ti.Priority, ti.Resreq)
109134
}
110135

111136
// JobID is the type of JobInfo's ID.
@@ -138,14 +163,14 @@ type JobInfo struct {
138163
TotalRequest *Resource
139164

140165
CreationTimestamp metav1.Time
141-
PodGroup *arbcorev1.PodGroup
166+
PodGroup *v1alpha1.PodGroup
142167

143168
// TODO(k82cn): keep backward compatbility, removed it when v1alpha1 finalized.
144169
PDB *policyv1.PodDisruptionBudget
145170
}
146171

147-
func NewJobInfo(uid JobID) *JobInfo {
148-
return &JobInfo{
172+
func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo {
173+
job := &JobInfo{
149174
UID: uid,
150175

151176
MinAvailable: 0,
@@ -157,13 +182,19 @@ func NewJobInfo(uid JobID) *JobInfo {
157182
TaskStatusIndex: map[TaskStatus]tasksMap{},
158183
Tasks: tasksMap{},
159184
}
185+
186+
for _, task := range tasks {
187+
job.AddTaskInfo(task)
188+
}
189+
190+
return job
160191
}
161192

162193
func (ji *JobInfo) UnsetPodGroup() {
163194
ji.PodGroup = nil
164195
}
165196

166-
func (ji *JobInfo) SetPodGroup(pg *arbcorev1.PodGroup) {
197+
func (ji *JobInfo) SetPodGroup(pg *v1alpha1.PodGroup) {
167198
ji.Name = pg.Name
168199
ji.Namespace = pg.Namespace
169200
ji.MinAvailable = pg.Spec.MinMember
@@ -234,6 +265,10 @@ func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
234265
if AllocatedStatus(ti.Status) {
235266
ji.Allocated.Add(ti.Resreq)
236267
}
268+
269+
if ji.PodGroup == nil && ti.PodGroup != nil {
270+
ji.SetPodGroup(ti.PodGroup)
271+
}
237272
}
238273

239274
func (ji *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error {
@@ -321,7 +356,8 @@ func (ji JobInfo) String() string {
321356
i++
322357
}
323358

324-
return fmt.Sprintf("Job (%v): name %v, minAvailable %d", ji.UID, ji.Name, ji.MinAvailable) + res
359+
return fmt.Sprintf("Job (%v): namespace %v (%v), name %v, minAvailable %d, podGroup %+v",
360+
ji.UID, ji.Namespace, ji.Queue, ji.Name, ji.MinAvailable, ji.PodGroup) + res
325361
}
326362

327363
// Error returns detailed information on why a job's task failed to fit on

pkg/scheduler/api/job_info_test.go

+32-13
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,20 @@ func jobInfoEqual(l, r *JobInfo) bool {
3535
func TestAddTaskInfo(t *testing.T) {
3636
// case1
3737
case01_uid := JobID("uid")
38+
case01_ns := "c1"
3839
case01_owner := buildOwnerReference("uid")
3940

40-
case01_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
41+
case01_pod1 := buildPod(case01_ns, "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
4142
case01_task1 := NewTaskInfo(case01_pod1)
42-
case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
43+
case01_pod2 := buildPod(case01_ns, "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
4344
case01_task2 := NewTaskInfo(case01_pod2)
44-
case01_pod3 := buildPod("c1", "p3", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
45+
case01_pod3 := buildPod(case01_ns, "p3", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
4546
case01_task3 := NewTaskInfo(case01_pod3)
46-
case01_pod4 := buildPod("c1", "p4", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
47+
case01_pod4 := buildPod(case01_ns, "p4", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
4748
case01_task4 := NewTaskInfo(case01_pod4)
4849

50+
_, pg := getOwners(case01_pod1)
51+
4952
tests := []struct {
5053
name string
5154
uid JobID
@@ -58,7 +61,11 @@ func TestAddTaskInfo(t *testing.T) {
5861
pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3, case01_pod4},
5962
expected: &JobInfo{
6063
UID: case01_uid,
61-
MinAvailable: 0,
64+
Namespace: case01_ns,
65+
Queue: QueueID(case01_ns),
66+
Name: string(case01_uid),
67+
MinAvailable: 1,
68+
PodGroup: pg,
6269
Allocated: buildResource("4000m", "4G"),
6370
TotalRequest: buildResource("5000m", "5G"),
6471
Tasks: tasksMap{
@@ -103,21 +110,25 @@ func TestAddTaskInfo(t *testing.T) {
103110
func TestDeleteTaskInfo(t *testing.T) {
104111
// case1
105112
case01_uid := JobID("owner1")
113+
case01_ns := "c1"
106114
case01_owner := buildOwnerReference(string(case01_uid))
107-
case01_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
115+
case01_pod1 := buildPod(case01_ns, "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
108116
case01_task1 := NewTaskInfo(case01_pod1)
109-
case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
110-
case01_pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
117+
case01_pod2 := buildPod(case01_ns, "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
118+
case01_pod3 := buildPod(case01_ns, "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
111119
case01_task3 := NewTaskInfo(case01_pod3)
120+
_, case01_job := getOwners(case01_pod1)
112121

113122
// case2
114123
case02_uid := JobID("owner2")
124+
case02_ns := "c2"
115125
case02_owner := buildOwnerReference(string(case02_uid))
116-
case02_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
126+
case02_pod1 := buildPod(case02_ns, "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
117127
case02_task1 := NewTaskInfo(case02_pod1)
118-
case02_pod2 := buildPod("c1", "p2", "n1", v1.PodPending, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
119-
case02_pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
128+
case02_pod2 := buildPod(case02_ns, "p2", "n1", v1.PodPending, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
129+
case02_pod3 := buildPod(case02_ns, "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
120130
case02_task3 := NewTaskInfo(case02_pod3)
131+
_, case02_job := getOwners(case02_pod1)
121132

122133
tests := []struct {
123134
name string
@@ -133,7 +144,11 @@ func TestDeleteTaskInfo(t *testing.T) {
133144
rmPods: []*v1.Pod{case01_pod2},
134145
expected: &JobInfo{
135146
UID: case01_uid,
136-
MinAvailable: 0,
147+
Namespace: case01_ns,
148+
Name: string(case01_uid),
149+
Queue: QueueID(case01_ns),
150+
MinAvailable: 1,
151+
PodGroup: case01_job,
137152
Allocated: buildResource("3000m", "3G"),
138153
TotalRequest: buildResource("4000m", "4G"),
139154
Tasks: tasksMap{
@@ -155,7 +170,11 @@ func TestDeleteTaskInfo(t *testing.T) {
155170
rmPods: []*v1.Pod{case02_pod2},
156171
expected: &JobInfo{
157172
UID: case02_uid,
158-
MinAvailable: 0,
173+
Namespace: case02_ns,
174+
Name: string(case02_uid),
175+
Queue: QueueID(case02_ns),
176+
MinAvailable: 1,
177+
PodGroup: case02_job,
159178
Allocated: buildResource("3000m", "3G"),
160179
TotalRequest: buildResource("4000m", "4G"),
161180
Tasks: tasksMap{

pkg/scheduler/api/types.go

+4
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ const (
5353
Unknown
5454
)
5555

56+
const (
57+
ShadowPodGroupKey = "kube-batch/shadow-pod-group"
58+
)
59+
5660
func (ts TaskStatus) String() string {
5761
switch ts {
5862
case Pending:

pkg/scheduler/api/util.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
20+
21+
func ShadowPodGroup(pg *v1alpha1.PodGroup) bool {
22+
if pg == nil {
23+
return true
24+
}
25+
26+
_, found := pg.Annotations[ShadowPodGroupKey]
27+
28+
return found
29+
}

pkg/scheduler/cache/cache.go

+20-23
Original file line numberDiff line numberDiff line change
@@ -575,14 +575,7 @@ func (sc *SchedulerCache) String() string {
575575
if len(sc.Jobs) != 0 {
576576
str = str + "Jobs:\n"
577577
for _, job := range sc.Jobs {
578-
str = str + fmt.Sprintf("\t Job(%s) name(%s) minAvailable(%v)\n",
579-
job.UID, job.Name, job.MinAvailable)
580-
581-
i := 0
582-
for _, task := range job.Tasks {
583-
str = str + fmt.Sprintf("\t\t %d: %v\n", i, task)
584-
i++
585-
}
578+
str = str + fmt.Sprintf("\t %s\n", job)
586579
}
587580
}
588581

@@ -593,17 +586,19 @@ func (sc *SchedulerCache) String() string {
593586
func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
594587
jobErrMsg := job.FitError()
595588

596-
pgUnschedulable := job.PodGroup != nil &&
597-
(job.PodGroup.Status.Phase == v1alpha1.PodGroupUnknown ||
598-
job.PodGroup.Status.Phase == v1alpha1.PodGroupPending)
599-
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0
600-
601-
// If pending or unschedulable, record unschedulable event.
602-
if pgUnschedulable || pdbUnschedulabe {
603-
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
604-
len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
605-
sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning,
606-
string(v1alpha1.PodGroupUnschedulableType), msg)
589+
if !kbapi.ShadowPodGroup(job.PodGroup) {
590+
pgUnschedulable := job.PodGroup != nil &&
591+
(job.PodGroup.Status.Phase == v1alpha1.PodGroupUnknown ||
592+
job.PodGroup.Status.Phase == v1alpha1.PodGroupPending)
593+
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0
594+
595+
// If pending or unschedulable, record unschedulable event.
596+
if pgUnschedulable || pdbUnschedulabe {
597+
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
598+
len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
599+
sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning,
600+
string(v1alpha1.PodGroupUnschedulableType), msg)
601+
}
607602
}
608603

609604
// Update podCondition for tasks Allocated and Pending before job discarded
@@ -619,11 +614,13 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
619614

620615
// UpdateJobStatus update the status of job and its tasks.
621616
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, error) {
622-
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
623-
if err != nil {
624-
return nil, err
617+
if !kbapi.ShadowPodGroup(job.PodGroup) {
618+
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
619+
if err != nil {
620+
return nil, err
621+
}
622+
job.PodGroup = pg
625623
}
626-
job.PodGroup = pg
627624

628625
sc.RecordJobStatusEvent(job)
629626

pkg/scheduler/cache/cache_test.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,7 @@ func TestAddPod(t *testing.T) {
137137
[]metav1.OwnerReference{owner}, make(map[string]string))
138138
pi2 := api.NewTaskInfo(pod2)
139139

140-
j1 := api.NewJobInfo(api.JobID("j1"))
141-
j1.AddTaskInfo(pi1)
142-
j1.AddTaskInfo(pi2)
140+
j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2)
143141

144142
node1 := buildNode("n1", buildResourceList("2000m", "10G"))
145143
ni1 := api.NewNodeInfo(node1)
@@ -186,7 +184,6 @@ func TestAddPod(t *testing.T) {
186184
}
187185

188186
func TestAddNode(t *testing.T) {
189-
190187
// case 1
191188
node1 := buildNode("n1", buildResourceList("2000m", "10G"))
192189
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
@@ -198,6 +195,11 @@ func TestAddNode(t *testing.T) {
198195
ni1 := api.NewNodeInfo(node1)
199196
ni1.AddTask(pi2)
200197

198+
j1 := api.NewJobInfo("c1-p1")
199+
j1.AddTaskInfo(api.NewTaskInfo(pod1))
200+
j2 := api.NewJobInfo("c1-p2")
201+
j2.AddTaskInfo(api.NewTaskInfo(pod2))
202+
201203
tests := []struct {
202204
pods []*v1.Pod
203205
nodes []*v1.Node
@@ -210,6 +212,10 @@ func TestAddNode(t *testing.T) {
210212
Nodes: map[string]*api.NodeInfo{
211213
"n1": ni1,
212214
},
215+
Jobs: map[api.JobID]*api.JobInfo{
216+
"c1-p1": j1,
217+
"c1-p2": j2,
218+
},
213219
},
214220
},
215221
}

0 commit comments

Comments
 (0)