Skip to content

Commit f0b3bbf

Browse files
author
Klaus Ma
authored
Merge pull request #53 from volcano-sh/kb_042
Upgrade kube-batch to 0.4.2
2 parents 880e274 + 4052ecf commit f0b3bbf

File tree

26 files changed

+357
-102
lines changed

26 files changed

+357
-102
lines changed

Gopkg.lock

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ required = [
3838

3939
[[constraint]]
4040
name = "github.com/kubernetes-sigs/kube-batch"
41-
version = "0.4.1"
41+
version = "0.4.2"
4242

4343
[[constraint]]
4444
name = "github.com/onsi/ginkgo"

pkg/scheduler/actions/allocate/allocate.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,16 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
4747
jobsMap := map[api.QueueID]*util.PriorityQueue{}
4848

4949
for _, job := range ssn.Jobs {
50-
if _, found := jobsMap[job.Queue]; !found {
51-
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
52-
}
53-
5450
if queue, found := ssn.Queues[job.Queue]; found {
5551
queues.Push(queue)
52+
} else {
53+
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
54+
job.Namespace, job.Name, job.Queue)
55+
continue
56+
}
57+
58+
if _, found := jobsMap[job.Queue]; !found {
59+
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
5660
}
5761

5862
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
@@ -144,12 +148,12 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
144148
selectedNodes := util.SelectBestNode(nodeScores)
145149
for _, node := range selectedNodes {
146150
// Allocate idle resource to the task.
147-
if task.Resreq.LessEqual(node.Idle) {
151+
if task.InitResreq.LessEqual(node.Idle) {
148152
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
149153
task.Namespace, task.Name, node.Name)
150154
if err := ssn.Allocate(task, node.Name); err != nil {
151-
glog.Errorf("Failed to bind Task %v on %v in Session %v",
152-
task.UID, node.Name, ssn.UID)
155+
glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
156+
task.UID, node.Name, ssn.UID, err)
153157
continue
154158
}
155159
assigned = true
@@ -163,9 +167,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
163167
}
164168

165169
// Allocate releasing resource to the task if any.
166-
if task.Resreq.LessEqual(node.Releasing) {
170+
if task.InitResreq.LessEqual(node.Releasing) {
167171
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
168-
task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
172+
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
169173
if err := ssn.Pipeline(task, node.Name); err != nil {
170174
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
171175
task.UID, node.Name, ssn.UID)

pkg/scheduler/actions/backfill/backfill.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
4444
// TODO (k82cn): When backfill, it's also need to balance between Queues.
4545
for _, job := range ssn.Jobs {
4646
for _, task := range job.TaskStatusIndex[api.Pending] {
47-
if task.Resreq.IsEmpty() {
47+
if task.InitResreq.IsEmpty() {
4848
// As task did not request resources, so it only need to meet predicates.
4949
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
5050
for _, node := range ssn.Nodes {

pkg/scheduler/actions/preempt/preempt.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func preempt(
204204

205205
var preemptees []*api.TaskInfo
206206
preempted := api.EmptyResource()
207-
resreq := preemptor.Resreq.Clone()
207+
resreq := preemptor.InitResreq.Clone()
208208

209209
for _, task := range node.Tasks {
210210
if filter == nil {
@@ -221,8 +221,15 @@ func preempt(
221221
continue
222222
}
223223

224-
// Preempt victims for tasks.
225-
for _, preemptee := range victims {
224+
victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool {
225+
return !ssn.TaskOrderFn(l, r)
226+
})
227+
for _, victim := range victims {
228+
victimsQueue.Push(victim)
229+
}
230+
// Preempt victims for tasks, pick lowest priority task first.
231+
for !victimsQueue.Empty() {
232+
preemptee := victimsQueue.Pop().(*api.TaskInfo)
226233
glog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>",
227234
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
228235
if err := stmt.Evict(preemptee, "preempt"); err != nil {
@@ -239,9 +246,9 @@ func preempt(
239246

240247
metrics.RegisterPreemptionAttempts()
241248
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
242-
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)
249+
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)
243250

244-
if preemptor.Resreq.LessEqual(preempted) {
251+
if preemptor.InitResreq.LessEqual(preempted) {
245252
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
246253
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
247254
preemptor.Namespace, preemptor.Name, node.Name)

pkg/scheduler/actions/reclaim/reclaim.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,13 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
111111
}
112112

113113
assigned := false
114-
115114
for _, n := range ssn.Nodes {
116115
// If predicates failed, next node.
117116
if err := ssn.PredicateFn(task, n); err != nil {
118117
continue
119118
}
120119

121-
resreq := task.Resreq.Clone()
120+
resreq := task.InitResreq.Clone()
122121
reclaimed := api.EmptyResource()
123122

124123
glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
@@ -172,11 +171,11 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
172171
}
173172

174173
glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",
175-
reclaimed, task.Namespace, task.Name, task.Resreq)
174+
reclaimed, task.Namespace, task.Name, task.InitResreq)
176175

177-
if task.Resreq.LessEqual(reclaimed) {
176+
if task.InitResreq.LessEqual(reclaimed) {
178177
if err := ssn.Pipeline(task, n.Name); err != nil {
179-
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
178+
glog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
180179
task.Namespace, task.Name, n.Name)
181180
}
182181

pkg/scheduler/plugins/conformance/conformance.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525
)
2626

2727
type conformancePlugin struct {
28+
// Arguments given for the plugin
29+
pluginArguments map[string]string
2830
}
2931

30-
func New() framework.Plugin {
31-
return &conformancePlugin{}
32+
func New(arguments map[string]string) framework.Plugin {
33+
return &conformancePlugin{pluginArguments: arguments}
3234
}
3335

3436
func (pp *conformancePlugin) Name() string {

pkg/scheduler/plugins/drf/drf.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,16 @@ type drfPlugin struct {
3939

4040
// Key is Job ID
4141
jobOpts map[api.JobID]*drfAttr
42+
43+
// Arguments given for the plugin
44+
pluginArguments map[string]string
4245
}
4346

44-
func New() framework.Plugin {
47+
func New(arguments map[string]string) framework.Plugin {
4548
return &drfPlugin{
46-
totalResource: api.EmptyResource(),
47-
jobOpts: map[api.JobID]*drfAttr{},
49+
totalResource: api.EmptyResource(),
50+
jobOpts: map[api.JobID]*drfAttr{},
51+
pluginArguments: arguments,
4852
}
4953
}
5054

@@ -110,8 +114,8 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
110114
lv := l.(*api.JobInfo)
111115
rv := r.(*api.JobInfo)
112116

113-
glog.V(4).Infof("DRF JobOrderFn: <%v/%v> is ready: %d, <%v/%v> is ready: %d",
114-
lv.Namespace, lv.Name, lv.Priority, rv.Namespace, rv.Name, rv.Priority)
117+
glog.V(4).Infof("DRF JobOrderFn: <%v/%v> share state: %d, <%v/%v> share state: %d",
118+
lv.Namespace, lv.Name, drf.jobOpts[lv.UID].share, rv.Namespace, rv.Name, drf.jobOpts[rv.UID].share)
115119

116120
if drf.jobOpts[lv.UID].share == drf.jobOpts[rv.UID].share {
117121
return 0

pkg/scheduler/plugins/gang/gang.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import (
3131
)
3232

3333
type gangPlugin struct {
34+
// Arguments given for the plugin
35+
pluginArguments map[string]string
3436
}
3537

36-
func New() framework.Plugin {
37-
return &gangPlugin{}
38+
func New(arguments map[string]string) framework.Plugin {
39+
return &gangPlugin{pluginArguments: arguments}
3840
}
3941

4042
func (gp *gangPlugin) Name() string {

pkg/scheduler/plugins/nodeorder/nodeorder.go

+113-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package nodeorder
1818

1919
import (
2020
"fmt"
21+
"strconv"
2122

2223
"github.com/golang/glog"
2324

@@ -32,7 +33,20 @@ import (
3233
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
3334
)
3435

36+
const (
37+
// NodeAffinityWeight is the key for providing Node Affinity Priority Weight in YAML
38+
NodeAffinityWeight = "nodeaffinity.weight"
39+
// PodAffinityWeight is the key for providing Pod Affinity Priority Weight in YAML
40+
PodAffinityWeight = "podaffinity.weight"
41+
// LeastRequestedWeight is the key for providing Least Requested Priority Weight in YAML
42+
LeastRequestedWeight = "leastrequested.weight"
43+
// BalancedResourceWeight is the key for providing Balanced Resource Priority Weight in YAML
44+
BalancedResourceWeight = "balancedresource.weight"
45+
)
46+
3547
type nodeOrderPlugin struct {
48+
// Arguments given for the plugin
49+
pluginArguments map[string]string
3650
}
3751

3852
func getInterPodAffinityScore(name string, interPodAffinityScore schedulerapi.HostPriorityList) int {
@@ -145,17 +159,100 @@ func (nl *nodeLister) List() ([]*v1.Node, error) {
145159
}
146160

147161
//New function returns prioritizePlugin object
148-
func New() framework.Plugin {
149-
return &nodeOrderPlugin{}
162+
func New(aruguments map[string]string) framework.Plugin {
163+
return &nodeOrderPlugin{pluginArguments: aruguments}
150164
}
151165

152166
func (pp *nodeOrderPlugin) Name() string {
153167
return "nodeorder"
154168
}
155169

170+
type priorityWeight struct {
171+
leastReqWeight int
172+
nodeAffinityWeight int
173+
podAffinityWeight int
174+
balancedRescourceWeight int
175+
}
176+
177+
func calculateWeight(args map[string]string) priorityWeight {
178+
/*
179+
User Should give priorityWeight in this format(nodeaffinity.weight, podaffinity.weight, leastrequested.weight, balancedresource.weight).
180+
Currently supported only for nodeaffinity, podaffinity, leastrequested, balancedresouce priorities.
181+
182+
actions: "reclaim, allocate, backfill, preempt"
183+
tiers:
184+
- plugins:
185+
- name: priority
186+
- name: gang
187+
- name: conformance
188+
- plugins:
189+
- name: drf
190+
- name: predicates
191+
- name: proportion
192+
- name: nodeorder
193+
arguments:
194+
nodeaffinity.weight: 2
195+
podaffinity.weight: 2
196+
leastrequested.weight: 2
197+
balancedresource.weight: 2
198+
*/
199+
200+
// Values are initialized to 1.
201+
weight := priorityWeight{
202+
leastReqWeight: 1,
203+
nodeAffinityWeight: 1,
204+
podAffinityWeight: 1,
205+
balancedRescourceWeight: 1,
206+
}
207+
208+
// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
209+
if args[NodeAffinityWeight] != "" {
210+
val, err := strconv.Atoi(args[NodeAffinityWeight])
211+
if err != nil {
212+
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[NodeAffinityWeight], err)
213+
} else {
214+
weight.nodeAffinityWeight = val
215+
}
216+
}
217+
218+
// Checks whether podaffinity.weight is provided or not, if given, modifies the value in weight struct.
219+
if args[PodAffinityWeight] != "" {
220+
val, err := strconv.Atoi(args[PodAffinityWeight])
221+
if err != nil {
222+
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[PodAffinityWeight], err)
223+
} else {
224+
weight.podAffinityWeight = val
225+
}
226+
}
227+
228+
// Checks whether leastrequested.weight is provided or not, if given, modifies the value in weight struct.
229+
if args[LeastRequestedWeight] != "" {
230+
val, err := strconv.Atoi(args[LeastRequestedWeight])
231+
if err != nil {
232+
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[LeastRequestedWeight], err)
233+
} else {
234+
weight.leastReqWeight = val
235+
}
236+
}
237+
238+
// Checks whether balancedresource.weight is provided or not, if given, modifies the value in weight struct.
239+
if args[BalancedResourceWeight] != "" {
240+
val, err := strconv.Atoi(args[BalancedResourceWeight])
241+
if err != nil {
242+
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[BalancedResourceWeight], err)
243+
} else {
244+
weight.balancedRescourceWeight = val
245+
}
246+
}
247+
248+
return weight
249+
}
250+
156251
func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
157252
nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (int, error) {
158253

254+
weight := calculateWeight(pp.pluginArguments)
255+
159256
pl := &podLister{
160257
session: ssn,
161258
}
@@ -186,14 +283,24 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
186283
glog.Warningf("Least Requested Priority Failed because of Error: %v", err)
187284
return 0, err
188285
}
189-
score = score + host.Score
286+
// If leastReqWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
287+
score = score + (host.Score * weight.leastReqWeight)
288+
289+
host, err = priorities.BalancedResourceAllocationMap(task.Pod, nil, nodeInfo)
290+
if err != nil {
291+
glog.Warningf("Balanced Resource Allocation Priority Failed because of Error: %v", err)
292+
return 0, err
293+
}
294+
// If balancedRescourceWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
295+
score = score + (host.Score * weight.balancedRescourceWeight)
190296

191297
host, err = priorities.CalculateNodeAffinityPriorityMap(task.Pod, nil, nodeInfo)
192298
if err != nil {
193299
glog.Warningf("Calculate Node Affinity Priority Failed because of Error: %v", err)
194300
return 0, err
195301
}
196-
score = score + host.Score
302+
// If nodeAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
303+
score = score + (host.Score * weight.nodeAffinityWeight)
197304

198305
mapFn := priorities.NewInterPodAffinityPriority(cn, nl, pl, v1.DefaultHardPodAffinitySymmetricWeight)
199306
interPodAffinityScore, err = mapFn(task.Pod, nodeMap, nodeSlice)
@@ -202,7 +309,8 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
202309
return 0, err
203310
}
204311
hostScore := getInterPodAffinityScore(node.Name, interPodAffinityScore)
205-
score = score + hostScore
312+
// If podAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
313+
score = score + (hostScore * weight.podAffinityWeight)
206314

207315
glog.V(4).Infof("Total Score for that node is: %d", score)
208316
return score, nil

0 commit comments

Comments
 (0)