Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 5bd2487

Browse files
committed
Take init containers into account when getting pod resource request
1 parent 7ef45da commit 5bd2487

File tree

8 files changed

+278
-27
lines changed

8 files changed

+278
-27
lines changed

pkg/scheduler/actions/allocate/allocate.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
143143
selectedNodes := util.SelectBestNode(nodeScores)
144144
for _, node := range selectedNodes {
145145
// Allocate idle resource to the task.
146-
if task.Resreq.LessEqual(node.Idle) {
146+
if task.ResreqWithInitContainers.LessEqual(node.Idle) {
147147
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
148148
task.Namespace, task.Name, node.Name)
149149
if err := ssn.Allocate(task, node.Name); err != nil {
@@ -162,9 +162,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
162162
}
163163

164164
// Allocate releasing resource to the task if any.
165-
if task.Resreq.LessEqual(node.Releasing) {
165+
if task.ResreqWithInitContainers.LessEqual(node.Releasing) {
166166
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
167-
task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
167+
task.Namespace, task.Name, node.Name, task.ResreqWithInitContainers, node.Releasing)
168168
if err := ssn.Pipeline(task, node.Name); err != nil {
169169
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
170170
task.UID, node.Name, ssn.UID)

pkg/scheduler/actions/backfill/backfill.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
4444
// TODO (k82cn): When backfill, it's also need to balance between Queues.
4545
for _, job := range ssn.Jobs {
4646
for _, task := range job.TaskStatusIndex[api.Pending] {
47-
if task.Resreq.IsEmpty() {
47+
if task.ResreqWithInitContainers.IsEmpty() {
4848
// As task did not request resources, so it only need to meet predicates.
4949
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
5050
for _, node := range ssn.Nodes {

pkg/scheduler/actions/preempt/preempt.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func preempt(
203203

204204
var preemptees []*api.TaskInfo
205205
preempted := api.EmptyResource()
206-
resreq := preemptor.Resreq.Clone()
206+
resreq := preemptor.ResreqWithInitContainers.Clone()
207207

208208
for _, task := range node.Tasks {
209209
if filter == nil {
@@ -239,9 +239,9 @@ func preempt(
239239

240240
metrics.RegisterPreemptionAttempts()
241241
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
242-
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)
242+
preempted, preemptor.Namespace, preemptor.Name, preemptor.ResreqWithInitContainers)
243243

244-
if preemptor.Resreq.LessEqual(preempted) {
244+
if preemptor.ResreqWithInitContainers.LessEqual(preempted) {
245245
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
246246
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
247247
preemptor.Namespace, preemptor.Name, node.Name)

pkg/scheduler/actions/reclaim/reclaim.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,13 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
110110
}
111111

112112
assigned := false
113-
114113
for _, n := range ssn.Nodes {
115114
// If predicates failed, next node.
116115
if err := ssn.PredicateFn(task, n); err != nil {
117116
continue
118117
}
119118

120-
resreq := task.Resreq.Clone()
119+
resreq := task.ResreqWithInitContainers.Clone()
121120
reclaimed := api.EmptyResource()
122121

123122
glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
@@ -172,9 +171,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
172171
}
173172

174173
glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",
175-
reclaimed, task.Namespace, task.Name, task.Resreq)
174+
reclaimed, task.Namespace, task.Name, task.ResreqWithInitContainers)
176175

177-
if task.Resreq.LessEqual(reclaimed) {
176+
if task.ResreqWithInitContainers.LessEqual(reclaimed) {
178177
if err := ssn.Pipeline(task, n.Name); err != nil {
179178
glog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
180179
task.Namespace, task.Name, n.Name)

pkg/scheduler/api/job_info.go

+18-16
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ type TaskInfo struct {
3838
Name string
3939
Namespace string
4040

41-
Resreq *Resource
41+
// Resreq is pod resource request, and it does not contain init containers resource request.
42+
// ResreqWithInitContainers is also pod resource request, however it contains init containers resource request.
43+
// To be consistent with kubernetes default scheduler, ResreqWithInitContainers is only used for predicates
44+
// of actions (e.g.allocate, backfill, preempt, reclaim), please use Resreq for other cases.
45+
Resreq *Resource
46+
ResreqWithInitContainers *Resource
4247

4348
NodeName string
4449
Status TaskStatus
@@ -61,25 +66,22 @@ func getJobID(pod *v1.Pod) JobID {
6166
}
6267

6368
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
64-
req := EmptyResource()
65-
66-
// TODO(k82cn): also includes initContainers' resource.
67-
for _, c := range pod.Spec.Containers {
68-
req.Add(NewResource(c.Resources.Requests))
69-
}
69+
req := GetPodResourceWithoutInitContainers(pod)
70+
resreqWithInitContainers := GetPodResourceRequest(pod)
7071

7172
jobID := getJobID(pod)
7273

7374
ti := &TaskInfo{
74-
UID: TaskID(pod.UID),
75-
Job: jobID,
76-
Name: pod.Name,
77-
Namespace: pod.Namespace,
78-
NodeName: pod.Spec.NodeName,
79-
Status: getTaskStatus(pod),
80-
Priority: 1,
81-
Pod: pod,
82-
Resreq: req,
75+
UID: TaskID(pod.UID),
76+
Job: jobID,
77+
Name: pod.Name,
78+
Namespace: pod.Namespace,
79+
NodeName: pod.Spec.NodeName,
80+
Status: getTaskStatus(pod),
81+
Priority: 1,
82+
Pod: pod,
83+
Resreq: req,
84+
ResreqWithInitContainers: resreqWithInitContainers,
8385
}
8486

8587
if pod.Spec.Priority != nil {

pkg/scheduler/api/pod_info.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"k8s.io/api/core/v1"
21+
)
22+
23+
// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest.
24+
//
25+
// GetResourceRequest returns a *Resource that covers the largest width in each resource dimension.
26+
// Because init-containers run sequentially, we collect the max in each dimension iteratively.
27+
// In contrast, we sum the resource vectors for regular containers since they run simultaneously.
28+
//
29+
// To be consistent with kubernetes default scheduler, it is only used for predicates of actions(e.g.
30+
// allocate, backfill, preempt, reclaim), please use GetPodResourceWithoutInitContainers for other cases.
31+
//
32+
// Example:
33+
//
34+
// Pod:
35+
// InitContainers
36+
// IC1:
37+
// CPU: 2
38+
// Memory: 1G
39+
// IC2:
40+
// CPU: 2
41+
// Memory: 3G
42+
// Containers
43+
// C1:
44+
// CPU: 2
45+
// Memory: 1G
46+
// C2:
47+
// CPU: 1
48+
// Memory: 1G
49+
//
50+
// Result: CPU: 3, Memory: 3G
51+
func GetPodResourceRequest(pod *v1.Pod) *Resource {
52+
result := GetPodResourceWithoutInitContainers(pod)
53+
54+
// take max_resource(sum_pod, any_init_container)
55+
for _, container := range pod.Spec.InitContainers {
56+
result.SetMaxResource(NewResource(container.Resources.Requests))
57+
}
58+
59+
return result
60+
}
61+
62+
// GetPodResourceWithoutInitContainers returns Pod's resource request, it does not contain
63+
// init containers' resource request.
64+
func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource {
65+
result := EmptyResource()
66+
for _, container := range pod.Spec.Containers {
67+
result.Add(NewResource(container.Resources.Requests))
68+
}
69+
70+
return result
71+
}

pkg/scheduler/api/pod_info_test.go

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
23+
"k8s.io/api/core/v1"
24+
)
25+
26+
func TestGetPodResourceRequest(t *testing.T) {
27+
tests := []struct {
28+
name string
29+
pod *v1.Pod
30+
expectedResource *Resource
31+
}{
32+
{
33+
name: "get resource for pod without init containers",
34+
pod: &v1.Pod{
35+
Spec: v1.PodSpec{
36+
Containers: []v1.Container{
37+
{
38+
Resources: v1.ResourceRequirements{
39+
Requests: buildResourceList("1000m", "1G"),
40+
},
41+
},
42+
{
43+
Resources: v1.ResourceRequirements{
44+
Requests: buildResourceList("2000m", "1G"),
45+
},
46+
},
47+
},
48+
},
49+
},
50+
expectedResource: NewResource(buildResourceList("3000m", "2G")),
51+
},
52+
{
53+
name: "get resource for pod with init containers",
54+
pod: &v1.Pod{
55+
Spec: v1.PodSpec{
56+
InitContainers: []v1.Container{
57+
{
58+
Resources: v1.ResourceRequirements{
59+
Requests: buildResourceList("2000m", "5G"),
60+
},
61+
},
62+
{
63+
Resources: v1.ResourceRequirements{
64+
Requests: buildResourceList("2000m", "1G"),
65+
},
66+
},
67+
},
68+
Containers: []v1.Container{
69+
{
70+
Resources: v1.ResourceRequirements{
71+
Requests: buildResourceList("1000m", "1G"),
72+
},
73+
},
74+
{
75+
Resources: v1.ResourceRequirements{
76+
Requests: buildResourceList("2000m", "1G"),
77+
},
78+
},
79+
},
80+
},
81+
},
82+
expectedResource: NewResource(buildResourceList("3000m", "5G")),
83+
},
84+
}
85+
86+
for i, test := range tests {
87+
req := GetPodResourceRequest(test.pod)
88+
if !reflect.DeepEqual(req, test.expectedResource) {
89+
t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n",
90+
i, test.name, test.expectedResource, req)
91+
}
92+
}
93+
}
94+
95+
func TestGetPodResourceWithoutInitContainers(t *testing.T) {
96+
tests := []struct {
97+
name string
98+
pod *v1.Pod
99+
expectedResource *Resource
100+
}{
101+
{
102+
name: "get resource for pod without init containers",
103+
pod: &v1.Pod{
104+
Spec: v1.PodSpec{
105+
Containers: []v1.Container{
106+
{
107+
Resources: v1.ResourceRequirements{
108+
Requests: buildResourceList("1000m", "1G"),
109+
},
110+
},
111+
{
112+
Resources: v1.ResourceRequirements{
113+
Requests: buildResourceList("2000m", "1G"),
114+
},
115+
},
116+
},
117+
},
118+
},
119+
expectedResource: NewResource(buildResourceList("3000m", "2G")),
120+
},
121+
{
122+
name: "get resource for pod with init containers",
123+
pod: &v1.Pod{
124+
Spec: v1.PodSpec{
125+
InitContainers: []v1.Container{
126+
{
127+
Resources: v1.ResourceRequirements{
128+
Requests: buildResourceList("2000m", "5G"),
129+
},
130+
},
131+
{
132+
Resources: v1.ResourceRequirements{
133+
Requests: buildResourceList("2000m", "1G"),
134+
},
135+
},
136+
},
137+
Containers: []v1.Container{
138+
{
139+
Resources: v1.ResourceRequirements{
140+
Requests: buildResourceList("1000m", "1G"),
141+
},
142+
},
143+
{
144+
Resources: v1.ResourceRequirements{
145+
Requests: buildResourceList("2000m", "1G"),
146+
},
147+
},
148+
},
149+
},
150+
},
151+
expectedResource: NewResource(buildResourceList("3000m", "2G")),
152+
},
153+
}
154+
155+
for i, test := range tests {
156+
req := GetPodResourceWithoutInitContainers(test.pod)
157+
if !reflect.DeepEqual(req, test.expectedResource) {
158+
t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n",
159+
i, test.name, test.expectedResource, req)
160+
}
161+
}
162+
}

pkg/scheduler/api/resource_info.go

+17
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ func (r *Resource) Sub(rr *Resource) *Resource {
109109
r, rr))
110110
}
111111

112+
// SetMaxResource compares with ResourceList and takes max value for each Resource.
113+
func (r *Resource) SetMaxResource(rr *Resource) {
114+
if r == nil || rr == nil {
115+
return
116+
}
117+
118+
if rr.MilliCPU > r.MilliCPU {
119+
r.MilliCPU = rr.MilliCPU
120+
}
121+
if rr.Memory > r.Memory {
122+
r.Memory = rr.Memory
123+
}
124+
if rr.MilliGPU > r.MilliGPU {
125+
r.MilliGPU = rr.MilliGPU
126+
}
127+
}
128+
112129
//Computes the delta between a resource oject representing available
113130
//resources an operand representing resources being requested. Any
114131
//field that is less than 0 after the operation represents an

0 commit comments

Comments
 (0)