Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit bffcc78

Browse files
authored
Merge pull request #685 from asifdxtreme/automated-cherry-pick-of-#643-#642-#638-#645-#647-#651-#652-#655-#658-#649-#660-#666-#671-#673-upstream-release-0.4
Automated cherry pick of #643: Return err in Allocate if any error occurs #642: Add event when task is scheduled #638: Take init containers into account when getting pod resource #645: Order task by CreationTimestamp first, then by UID #647: In allocate, skip adding Job if its queue is not found #651: Return err in functions of session.go if any error occurs #652: Change run option SchedulePeriod's type to make it clear #655: Do graceful eviction using default policy #658: Address helm install error in tutorial.md #649: Preempt lowest priority task first #660: Fix sub exception in reclaim and preempt #666: Fix wrong caculation for deserved in proportion plugin #671: Change base image to alphine to reduce image size #673: Do not create PodGroup and Job for task whose scheduler is
2 parents ddf0eef + 3af9100 commit bffcc78

File tree

20 files changed

+460
-69
lines changed

20 files changed

+460
-69
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ generate-code:
2626

2727
rel_bins:
2828
go get github.com/mitchellh/gox
29-
gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} \
29+
CGO_ENABLED=0 gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} \
3030
-output=${BIN_DIR}/{{.OS}}/{{.Arch}}/kube-batch ./cmd/kube-batch
3131

3232
images: rel_bins

cmd/kube-batch/app/options/options.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,20 @@ import (
2323
"github.com/spf13/pflag"
2424
)
2525

26+
const (
27+
defaultSchedulerName = "kube-batch"
28+
defaultSchedulerPeriod = time.Second
29+
defaultQueue = "default"
30+
defaultListenAddress = ":8080"
31+
)
32+
2633
// ServerOption is the main context object for the controller manager.
2734
type ServerOption struct {
2835
Master string
2936
Kubeconfig string
3037
SchedulerName string
3138
SchedulerConf string
32-
SchedulePeriod string
39+
SchedulePeriod time.Duration
3340
EnableLeaderElection bool
3441
LockObjectNamespace string
3542
DefaultQueue string
@@ -48,25 +55,22 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
4855
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
4956
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information")
5057
// kube-batch will ignore pods with scheduler names other than specified with the option
51-
fs.StringVar(&s.SchedulerName, "scheduler-name", "kube-batch", "kube-batch will handle pods with the scheduler-name")
58+
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "kube-batch will handle pods with the scheduler-name")
5259
fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file")
53-
fs.StringVar(&s.SchedulePeriod, "schedule-period", "1s", "The period between each scheduling cycle")
54-
fs.StringVar(&s.DefaultQueue, "default-queue", "default", "The default queue name of the job")
60+
fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle")
61+
fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job")
5562
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection,
5663
"Start a leader election client and gain leadership before "+
5764
"executing the main loop. Enable this when running replicated kube-batch for high availability")
5865
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
5966
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object")
60-
fs.StringVar(&s.ListenAddress, "listen-address", ":8080", "The address to listen on for HTTP requests.")
67+
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
6168
}
6269

6370
func (s *ServerOption) CheckOptionOrDie() error {
6471
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
6572
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
6673
}
67-
if _, err := time.ParseDuration(s.SchedulePeriod); err != nil {
68-
return fmt.Errorf("failed to parse --schedule-period: %v", err)
69-
}
7074

7175
return nil
7276
}
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package options
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
"time"
23+
24+
"github.com/spf13/pflag"
25+
)
26+
27+
func TestAddFlags(t *testing.T) {
28+
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
29+
s := NewServerOption()
30+
s.AddFlags(fs)
31+
32+
args := []string{
33+
"--schedule-period=5m",
34+
}
35+
fs.Parse(args)
36+
37+
// This is a snapshot of expected options parsed by args.
38+
expected := &ServerOption{
39+
SchedulerName: defaultSchedulerName,
40+
SchedulePeriod: 5 * time.Minute,
41+
DefaultQueue: defaultQueue,
42+
ListenAddress: defaultListenAddress,
43+
}
44+
45+
if !reflect.DeepEqual(expected, s) {
46+
t.Errorf("Got different run options than expected.\nGot: %+v\nExpected: %+v\n", s, expected)
47+
}
48+
}

deployment/images/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
From ubuntu:18.04
1+
From alpine:3.9
22

33
ADD kube-batch /usr/local/bin
44

doc/usage/tutorial.md

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ Run the `kube-batch` as kubernetes scheduler
3535
# helm install $GOPATH/src/github.com/kubernetes-sigs/kube-batch/deployment/kube-batch --namespace kube-system
3636
```
3737

38+
Note: If there is an error `Error: apiVersion "scheduling.incubator.k8s.io/v1alpha1" in kube-batch/templates/default.yaml is not available`, please update your helm to latest version and try it again.
39+
3840
Verify the release
3941

4042
```bash

pkg/scheduler/actions/allocate/allocate.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,16 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
4646
jobsMap := map[api.QueueID]*util.PriorityQueue{}
4747

4848
for _, job := range ssn.Jobs {
49-
if _, found := jobsMap[job.Queue]; !found {
50-
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
51-
}
52-
5349
if queue, found := ssn.Queues[job.Queue]; found {
5450
queues.Push(queue)
51+
} else {
52+
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
53+
job.Namespace, job.Name, job.Queue)
54+
continue
55+
}
56+
57+
if _, found := jobsMap[job.Queue]; !found {
58+
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
5559
}
5660

5761
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
@@ -143,12 +147,12 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
143147
selectedNodes := util.SelectBestNode(nodeScores)
144148
for _, node := range selectedNodes {
145149
// Allocate idle resource to the task.
146-
if task.Resreq.LessEqual(node.Idle) {
150+
if task.InitResreq.LessEqual(node.Idle) {
147151
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
148152
task.Namespace, task.Name, node.Name)
149153
if err := ssn.Allocate(task, node.Name); err != nil {
150-
glog.Errorf("Failed to bind Task %v on %v in Session %v",
151-
task.UID, node.Name, ssn.UID)
154+
glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
155+
task.UID, node.Name, ssn.UID, err)
152156
continue
153157
}
154158
assigned = true
@@ -162,9 +166,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
162166
}
163167

164168
// Allocate releasing resource to the task if any.
165-
if task.Resreq.LessEqual(node.Releasing) {
169+
if task.InitResreq.LessEqual(node.Releasing) {
166170
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
167-
task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
171+
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
168172
if err := ssn.Pipeline(task, node.Name); err != nil {
169173
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
170174
task.UID, node.Name, ssn.UID)

pkg/scheduler/actions/backfill/backfill.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
4444
// TODO (k82cn): When backfill, it's also need to balance between Queues.
4545
for _, job := range ssn.Jobs {
4646
for _, task := range job.TaskStatusIndex[api.Pending] {
47-
if task.Resreq.IsEmpty() {
47+
if task.InitResreq.IsEmpty() {
4848
// As task did not request resources, so it only need to meet predicates.
4949
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
5050
for _, node := range ssn.Nodes {

pkg/scheduler/actions/preempt/preempt.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func preempt(
203203

204204
var preemptees []*api.TaskInfo
205205
preempted := api.EmptyResource()
206-
resreq := preemptor.Resreq.Clone()
206+
resreq := preemptor.InitResreq.Clone()
207207

208208
for _, task := range node.Tasks {
209209
if filter == nil {
@@ -220,8 +220,15 @@ func preempt(
220220
continue
221221
}
222222

223-
// Preempt victims for tasks.
224-
for _, preemptee := range victims {
223+
victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool {
224+
return !ssn.TaskOrderFn(l, r)
225+
})
226+
for _, victim := range victims {
227+
victimsQueue.Push(victim)
228+
}
229+
// Preempt victims for tasks, pick lowest priority task first.
230+
for !victimsQueue.Empty() {
231+
preemptee := victimsQueue.Pop().(*api.TaskInfo)
225232
glog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>",
226233
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
227234
if err := stmt.Evict(preemptee, "preempt"); err != nil {
@@ -231,17 +238,16 @@ func preempt(
231238
}
232239
preempted.Add(preemptee.Resreq)
233240
// If reclaimed enough resources, break loop to avoid Sub panic.
234-
if resreq.LessEqual(preemptee.Resreq) {
241+
if resreq.LessEqual(preempted) {
235242
break
236243
}
237-
resreq.Sub(preemptee.Resreq)
238244
}
239245

240246
metrics.RegisterPreemptionAttempts()
241247
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
242-
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)
248+
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)
243249

244-
if preemptor.Resreq.LessEqual(preempted) {
250+
if preemptor.InitResreq.LessEqual(preempted) {
245251
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
246252
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
247253
preemptor.Namespace, preemptor.Name, node.Name)

pkg/scheduler/actions/reclaim/reclaim.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,13 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
110110
}
111111

112112
assigned := false
113-
114113
for _, n := range ssn.Nodes {
115114
// If predicates failed, next node.
116115
if err := ssn.PredicateFn(task, n); err != nil {
117116
continue
118117
}
119118

120-
resreq := task.Resreq.Clone()
119+
resreq := task.InitResreq.Clone()
121120
reclaimed := api.EmptyResource()
122121

123122
glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
@@ -165,16 +164,15 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
165164
}
166165
reclaimed.Add(reclaimee.Resreq)
167166
// If reclaimed enough resources, break loop to avoid Sub panic.
168-
if resreq.LessEqual(reclaimee.Resreq) {
167+
if resreq.LessEqual(reclaimed) {
169168
break
170169
}
171-
resreq.Sub(reclaimee.Resreq)
172170
}
173171

174172
glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",
175-
reclaimed, task.Namespace, task.Name, task.Resreq)
173+
reclaimed, task.Namespace, task.Name, task.InitResreq)
176174

177-
if task.Resreq.LessEqual(reclaimed) {
175+
if task.InitResreq.LessEqual(reclaimed) {
178176
if err := ssn.Pipeline(task, n.Name); err != nil {
179177
glog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
180178
task.Namespace, task.Name, n.Name)

pkg/scheduler/api/job_info.go

+16-15
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ type TaskInfo struct {
3838
Name string
3939
Namespace string
4040

41+
// Resreq is the resource that used when task running.
4142
Resreq *Resource
43+
// InitResreq is the resource that used to launch a task.
44+
InitResreq *Resource
4245

4346
NodeName string
4447
Status TaskStatus
@@ -61,25 +64,22 @@ func getJobID(pod *v1.Pod) JobID {
6164
}
6265

6366
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
64-
req := EmptyResource()
65-
66-
// TODO(k82cn): also includes initContainers' resource.
67-
for _, c := range pod.Spec.Containers {
68-
req.Add(NewResource(c.Resources.Requests))
69-
}
67+
req := GetPodResourceWithoutInitContainers(pod)
68+
initResreq := GetPodResourceRequest(pod)
7069

7170
jobID := getJobID(pod)
7271

7372
ti := &TaskInfo{
74-
UID: TaskID(pod.UID),
75-
Job: jobID,
76-
Name: pod.Name,
77-
Namespace: pod.Namespace,
78-
NodeName: pod.Spec.NodeName,
79-
Status: getTaskStatus(pod),
80-
Priority: 1,
81-
Pod: pod,
82-
Resreq: req,
73+
UID: TaskID(pod.UID),
74+
Job: jobID,
75+
Name: pod.Name,
76+
Namespace: pod.Namespace,
77+
NodeName: pod.Spec.NodeName,
78+
Status: getTaskStatus(pod),
79+
Priority: 1,
80+
Pod: pod,
81+
Resreq: req,
82+
InitResreq: initResreq,
8383
}
8484

8585
if pod.Spec.Priority != nil {
@@ -100,6 +100,7 @@ func (ti *TaskInfo) Clone() *TaskInfo {
100100
Priority: ti.Priority,
101101
Pod: ti.Pod,
102102
Resreq: ti.Resreq.Clone(),
103+
InitResreq: ti.InitResreq.Clone(),
103104
VolumeReady: ti.VolumeReady,
104105
}
105106
}

pkg/scheduler/api/pod_info.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"k8s.io/api/core/v1"
21+
)
22+
23+
// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest.
24+
//
25+
// GetResourceRequest returns a *Resource that covers the largest width in each resource dimension.
26+
// Because init-containers run sequentially, we collect the max in each dimension iteratively.
27+
// In contrast, we sum the resource vectors for regular containers since they run simultaneously.
28+
//
29+
// To be consistent with kubernetes default scheduler, it is only used for predicates of actions(e.g.
30+
// allocate, backfill, preempt, reclaim), please use GetPodResourceWithoutInitContainers for other cases.
31+
//
32+
// Example:
33+
//
34+
// Pod:
35+
// InitContainers
36+
// IC1:
37+
// CPU: 2
38+
// Memory: 1G
39+
// IC2:
40+
// CPU: 2
41+
// Memory: 3G
42+
// Containers
43+
// C1:
44+
// CPU: 2
45+
// Memory: 1G
46+
// C2:
47+
// CPU: 1
48+
// Memory: 1G
49+
//
50+
// Result: CPU: 3, Memory: 3G
51+
func GetPodResourceRequest(pod *v1.Pod) *Resource {
52+
result := GetPodResourceWithoutInitContainers(pod)
53+
54+
// take max_resource(sum_pod, any_init_container)
55+
for _, container := range pod.Spec.InitContainers {
56+
result.SetMaxResource(NewResource(container.Resources.Requests))
57+
}
58+
59+
return result
60+
}
61+
62+
// GetPodResourceWithoutInitContainers returns Pod's resource request, it does not contain
63+
// init containers' resource request.
64+
func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource {
65+
result := EmptyResource()
66+
for _, container := range pod.Spec.Containers {
67+
result.Add(NewResource(container.Resources.Requests))
68+
}
69+
70+
return result
71+
}

0 commit comments

Comments
 (0)