Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit ee19552

Browse files
committed
Take init containers into account when getting pod resource request
1 parent 7ef45da commit ee19552

File tree

8 files changed

+265
-17
lines changed

8 files changed

+265
-17
lines changed

pkg/scheduler/actions/allocate/allocate.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
141141
}
142142
}
143143
selectedNodes := util.SelectBestNode(nodeScores)
144+
req := api.GetPodResourceRequest(task.Pod)
144145
for _, node := range selectedNodes {
145146
// Allocate idle resource to the task.
146-
if task.Resreq.LessEqual(node.Idle) {
147+
if req.LessEqual(node.Idle) {
147148
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
148149
task.Namespace, task.Name, node.Name)
149150
if err := ssn.Allocate(task, node.Name); err != nil {
@@ -162,9 +163,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
162163
}
163164

164165
// Allocate releasing resource to the task if any.
165-
if task.Resreq.LessEqual(node.Releasing) {
166+
if req.LessEqual(node.Releasing) {
166167
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
167-
task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
168+
task.Namespace, task.Name, node.Name, req, node.Releasing)
168169
if err := ssn.Pipeline(task, node.Name); err != nil {
169170
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
170171
task.UID, node.Name, ssn.UID)

pkg/scheduler/actions/backfill/backfill.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
4444
// TODO (k82cn): When backfill, it's also need to balance between Queues.
4545
for _, job := range ssn.Jobs {
4646
for _, task := range job.TaskStatusIndex[api.Pending] {
47-
if task.Resreq.IsEmpty() {
47+
req := api.GetPodResourceRequest(task.Pod)
48+
if req.IsEmpty() {
4849
// As task did not request resources, so it only need to meet predicates.
4950
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
5051
for _, node := range ssn.Nodes {

pkg/scheduler/actions/preempt/preempt.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,14 @@ func preempt(
197197
}
198198
}
199199
selectedNodes := util.SelectBestNode(nodeScores)
200+
preemptorResreq := api.GetPodResourceRequest(preemptor.Pod)
200201
for _, node := range selectedNodes {
201202
glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
202203
preemptor.Namespace, preemptor.Name, node.Name)
203204

204205
var preemptees []*api.TaskInfo
205206
preempted := api.EmptyResource()
206-
resreq := preemptor.Resreq.Clone()
207+
resreq := preemptorResreq.Clone()
207208

208209
for _, task := range node.Tasks {
209210
if filter == nil {
@@ -239,9 +240,9 @@ func preempt(
239240

240241
metrics.RegisterPreemptionAttempts()
241242
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
242-
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)
243+
preempted, preemptor.Namespace, preemptor.Name, preemptorResreq)
243244

244-
if preemptor.Resreq.LessEqual(preempted) {
245+
if preemptorResreq.LessEqual(preempted) {
245246
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
246247
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
247248
preemptor.Namespace, preemptor.Name, node.Name)

pkg/scheduler/actions/reclaim/reclaim.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,14 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
110110
}
111111

112112
assigned := false
113-
113+
taskResqeq := api.GetPodResourceRequest(task.Pod)
114114
for _, n := range ssn.Nodes {
115115
// If predicates failed, next node.
116116
if err := ssn.PredicateFn(task, n); err != nil {
117117
continue
118118
}
119119

120-
resreq := task.Resreq.Clone()
120+
resreq := taskResqeq.Clone()
121121
reclaimed := api.EmptyResource()
122122

123123
glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
@@ -172,9 +172,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
172172
}
173173

174174
glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",
175-
reclaimed, task.Namespace, task.Name, task.Resreq)
175+
reclaimed, task.Namespace, task.Name, taskResqeq)
176176

177-
if task.Resreq.LessEqual(reclaimed) {
177+
if taskResqeq.LessEqual(reclaimed) {
178178
if err := ssn.Pipeline(task, n.Name); err != nil {
179179
glog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
180180
task.Namespace, task.Name, n.Name)

pkg/scheduler/api/job_info.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,7 @@ func getJobID(pod *v1.Pod) JobID {
6161
}
6262

6363
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
64-
req := EmptyResource()
65-
66-
// TODO(k82cn): also includes initContainers' resource.
67-
for _, c := range pod.Spec.Containers {
68-
req.Add(NewResource(c.Resources.Requests))
69-
}
64+
req := GetPodResourceWithoutInitContainers(pod)
7065

7166
jobID := getJobID(pod)
7267

pkg/scheduler/api/pod_info.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"k8s.io/api/core/v1"
21+
)
22+
23+
// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest.
24+
//
25+
// GetResourceRequest returns a *Resource that covers the largest width in each resource dimension.
26+
// Because init-containers run sequentially, we collect the max in each dimension iteratively.
27+
// In contrast, we sum the resource vectors for regular containers since they run simultaneously.
28+
//
29+
// To be consistent with kubernetes default scheduler, it is only used for other cases, please use
30+
// GetPodResourceWithoutInitContainers for other cases.
31+
//
32+
// Example:
33+
//
34+
// Pod:
35+
// InitContainers
36+
// IC1:
37+
// CPU: 2
38+
// Memory: 1G
39+
// IC2:
40+
// CPU: 2
41+
// Memory: 3G
42+
// Containers
43+
// C1:
44+
// CPU: 2
45+
// Memory: 1G
46+
// C2:
47+
// CPU: 1
48+
// Memory: 1G
49+
//
50+
// Result: CPU: 3, Memory: 3G
51+
func GetPodResourceRequest(pod *v1.Pod) *Resource {
52+
result := GetPodResourceWithoutInitContainers(pod)
53+
54+
// take max_resource(sum_pod, any_init_container)
55+
for _, container := range pod.Spec.InitContainers {
56+
result.SetMaxResource(NewResource(container.Resources.Requests))
57+
}
58+
59+
return result
60+
}
61+
62+
// GetPodResourceWithoutInitContainers returns Pod's resource request, it does not contain
63+
// init containers' resource request.
64+
func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource {
65+
result := EmptyResource()
66+
for _, container := range pod.Spec.Containers {
67+
result.Add(NewResource(container.Resources.Requests))
68+
}
69+
70+
return result
71+
}

pkg/scheduler/api/pod_info_test.go

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
23+
"k8s.io/api/core/v1"
24+
)
25+
26+
func TestGetPodResourceRequest(t *testing.T) {
27+
tests := []struct {
28+
name string
29+
pod *v1.Pod
30+
expectedResource *Resource
31+
}{
32+
{
33+
name: "get resource for pod without init containers",
34+
pod: &v1.Pod{
35+
Spec: v1.PodSpec{
36+
Containers: []v1.Container{
37+
{
38+
Resources: v1.ResourceRequirements{
39+
Requests: buildResourceList("1000m", "1G"),
40+
},
41+
},
42+
{
43+
Resources: v1.ResourceRequirements{
44+
Requests: buildResourceList("2000m", "1G"),
45+
},
46+
},
47+
},
48+
},
49+
},
50+
expectedResource: NewResource(buildResourceList("3000m", "2G")),
51+
},
52+
{
53+
name: "get resource for pod with init containers",
54+
pod: &v1.Pod{
55+
Spec: v1.PodSpec{
56+
InitContainers: []v1.Container{
57+
{
58+
Resources: v1.ResourceRequirements{
59+
Requests: buildResourceList("2000m", "5G"),
60+
},
61+
},
62+
{
63+
Resources: v1.ResourceRequirements{
64+
Requests: buildResourceList("2000m", "1G"),
65+
},
66+
},
67+
},
68+
Containers: []v1.Container{
69+
{
70+
Resources: v1.ResourceRequirements{
71+
Requests: buildResourceList("1000m", "1G"),
72+
},
73+
},
74+
{
75+
Resources: v1.ResourceRequirements{
76+
Requests: buildResourceList("2000m", "1G"),
77+
},
78+
},
79+
},
80+
},
81+
},
82+
expectedResource: NewResource(buildResourceList("3000m", "5G")),
83+
},
84+
}
85+
86+
for i, test := range tests {
87+
req := GetPodResourceRequest(test.pod)
88+
if !reflect.DeepEqual(req, test.expectedResource) {
89+
t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n",
90+
i, test.name, test.expectedResource, req)
91+
}
92+
}
93+
}
94+
95+
func TestGetPodResourceWithoutInitContainers(t *testing.T) {
96+
tests := []struct {
97+
name string
98+
pod *v1.Pod
99+
expectedResource *Resource
100+
}{
101+
{
102+
name: "get resource for pod without init containers",
103+
pod: &v1.Pod{
104+
Spec: v1.PodSpec{
105+
Containers: []v1.Container{
106+
{
107+
Resources: v1.ResourceRequirements{
108+
Requests: buildResourceList("1000m", "1G"),
109+
},
110+
},
111+
{
112+
Resources: v1.ResourceRequirements{
113+
Requests: buildResourceList("2000m", "1G"),
114+
},
115+
},
116+
},
117+
},
118+
},
119+
expectedResource: NewResource(buildResourceList("3000m", "2G")),
120+
},
121+
{
122+
name: "get resource for pod with init containers",
123+
pod: &v1.Pod{
124+
Spec: v1.PodSpec{
125+
InitContainers: []v1.Container{
126+
{
127+
Resources: v1.ResourceRequirements{
128+
Requests: buildResourceList("2000m", "5G"),
129+
},
130+
},
131+
{
132+
Resources: v1.ResourceRequirements{
133+
Requests: buildResourceList("2000m", "1G"),
134+
},
135+
},
136+
},
137+
Containers: []v1.Container{
138+
{
139+
Resources: v1.ResourceRequirements{
140+
Requests: buildResourceList("1000m", "1G"),
141+
},
142+
},
143+
{
144+
Resources: v1.ResourceRequirements{
145+
Requests: buildResourceList("2000m", "1G"),
146+
},
147+
},
148+
},
149+
},
150+
},
151+
expectedResource: NewResource(buildResourceList("3000m", "2G")),
152+
},
153+
}
154+
155+
for i, test := range tests {
156+
req := GetPodResourceWithoutInitContainers(test.pod)
157+
if !reflect.DeepEqual(req, test.expectedResource) {
158+
t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n",
159+
i, test.name, test.expectedResource, req)
160+
}
161+
}
162+
}

pkg/scheduler/api/resource_info.go

+17
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ func (r *Resource) Sub(rr *Resource) *Resource {
109109
r, rr))
110110
}
111111

112+
// SetMaxResource compares with ResourceList and takes max value for each Resource.
113+
func (r *Resource) SetMaxResource(rr *Resource) {
114+
if r == nil || rr == nil {
115+
return
116+
}
117+
118+
if rr.MilliCPU > r.MilliCPU {
119+
r.MilliCPU = rr.MilliCPU
120+
}
121+
if rr.Memory > r.Memory {
122+
r.Memory = rr.Memory
123+
}
124+
if rr.MilliGPU > r.MilliGPU {
125+
r.MilliGPU = rr.MilliGPU
126+
}
127+
}
128+
112129
//Computes the delta between a resource oject representing available
113130
//resources an operand representing resources being requested. Any
114131
//field that is less than 0 after the operation represents an

0 commit comments

Comments
 (0)