Skip to content

Commit 84bb8e0

Browse files
kaichiachencraigcondit
authored andcommitted
[YUNIKORN-2153] Refactor placement of predicateCheckResult (#1011)
Closes: #1011 Signed-off-by: Craig Condit <[email protected]>
1 parent ee8d7fa commit 84bb8e0

File tree

4 files changed

+426
-173
lines changed

4 files changed

+426
-173
lines changed

pkg/scheduler/objects/predicates.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package objects
20+
21+
import (
22+
"fmt"
23+
"strings"
24+
"sync"
25+
26+
"go.uber.org/zap"
27+
28+
"github.com/apache/yunikorn-core/pkg/log"
29+
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
30+
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
31+
)
32+
33+
type predicateCheckResult struct {
34+
allocationKey string
35+
nodeID string
36+
success bool
37+
index int
38+
victims []*Allocation
39+
}
40+
41+
func (pcr *predicateCheckResult) betterThan(other *predicateCheckResult, allocationsByNode map[string][]*Allocation) bool {
42+
return pcr.getSolutionScore(allocationsByNode) < other.getSolutionScore(allocationsByNode)
43+
}
44+
45+
func (pcr *predicateCheckResult) getSolutionScore(allocationsByNode map[string][]*Allocation) uint64 {
46+
if pcr == nil || !pcr.success {
47+
return scoreUnfit
48+
}
49+
allocations, ok := allocationsByNode[pcr.nodeID]
50+
if !ok {
51+
return scoreUnfit
52+
}
53+
54+
var score uint64 = 0
55+
if pcr.index < 0 {
56+
return score
57+
}
58+
if pcr.index >= len(allocations) {
59+
// shouldn't happen
60+
return scoreUnfit
61+
}
62+
for i := 0; i <= pcr.index; i++ {
63+
allocation := allocations[i]
64+
if allocation.IsOriginator() {
65+
score |= scoreOriginator
66+
}
67+
if !allocation.IsAllowPreemptSelf() {
68+
score |= scoreNoPreempt
69+
}
70+
}
71+
score += uint64(pcr.index) + 1 // need to add 1 to differentiate between no preemption and preempt 1 container
72+
73+
return score
74+
}
75+
76+
func (pcr *predicateCheckResult) isSatisfactory(allocationsByNode map[string][]*Allocation) bool {
77+
return pcr.getSolutionScore(allocationsByNode) < scoreFitMax
78+
}
79+
80+
func (pcr *predicateCheckResult) populateVictims(victimsByNode map[string][]*Allocation) {
81+
if pcr == nil {
82+
return
83+
}
84+
pcr.victims = nil
85+
if !pcr.success {
86+
return
87+
}
88+
89+
// abort if node was not found
90+
victimList, ok := victimsByNode[pcr.nodeID]
91+
if !ok {
92+
log.Log(log.SchedPreemption).Warn("BUG: Unable to find node in victim map", zap.String("nodeID", pcr.nodeID))
93+
pcr.success = false
94+
pcr.index = -1
95+
return
96+
}
97+
98+
// abort if index is too large
99+
if pcr.index >= len(victimList) {
100+
log.Log(log.SchedPreemption).Warn("BUG: Got invalid index into allocation list",
101+
zap.String("nodeID", pcr.nodeID),
102+
zap.Int("index", pcr.index),
103+
zap.Int("length", len(victimList)))
104+
pcr.success = false
105+
pcr.index = -1
106+
return
107+
}
108+
109+
pcr.victims = make([]*Allocation, 0)
110+
for i := 0; i <= pcr.index; i++ {
111+
victim := victimList[i]
112+
pcr.victims = append(pcr.victims, victim)
113+
}
114+
}
115+
116+
// preemptPredicateCheck performs a single predicate check and reports the resultType on a channel
117+
func preemptPredicateCheck(plugin api.ResourceManagerCallback, ch chan<- *predicateCheckResult, wg *sync.WaitGroup, args *si.PreemptionPredicatesArgs) {
118+
defer wg.Done()
119+
result := &predicateCheckResult{
120+
allocationKey: args.AllocationKey,
121+
nodeID: args.NodeID,
122+
success: false,
123+
index: -1,
124+
}
125+
if len(args.PreemptAllocationKeys) == 0 {
126+
// normal check; there are sufficient resources to run on this node
127+
if err := plugin.Predicates(&si.PredicatesArgs{
128+
AllocationKey: args.AllocationKey,
129+
NodeID: args.NodeID,
130+
Allocate: true,
131+
}); err == nil {
132+
result.success = true
133+
result.index = -1
134+
} else {
135+
log.Log(log.SchedPreemption).Debug("Normal predicate check failed",
136+
zap.String("AllocationKey", args.AllocationKey),
137+
zap.String("NodeID", args.NodeID),
138+
zap.Error(err))
139+
}
140+
} else if response := plugin.PreemptionPredicates(args); response != nil {
141+
// preemption check; at least one allocation will need preemption
142+
result.success = response.GetSuccess()
143+
if result.success {
144+
result.index = int(response.GetIndex())
145+
}
146+
}
147+
ch <- result
148+
}
149+
150+
func (p *predicateCheckResult) String() string {
151+
if p.nodeID == "" {
152+
return ""
153+
}
154+
var result strings.Builder
155+
result.WriteString(fmt.Sprintf("node: %s, ", p.nodeID))
156+
result.WriteString(fmt.Sprintf("alloc: %s, ", p.allocationKey))
157+
result.WriteString(fmt.Sprintf("success: %v, ", p.success))
158+
result.WriteString(fmt.Sprintf("index: %d", p.index))
159+
if len(p.victims) > 0 {
160+
result.WriteString(", victims: [")
161+
for i, victim := range p.victims {
162+
if i > 0 {
163+
result.WriteString(", ")
164+
}
165+
result.WriteString(victim.String())
166+
}
167+
result.WriteString("]")
168+
}
169+
return result.String()
170+
}

0 commit comments

Comments
 (0)