Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 70177dc

Browse files
authored
Merge pull request #468 from adam-marek/detailed-resource-info
Detailed 'unschedulable' events
2 parents cdc85dc + b555449 commit 70177dc

File tree

6 files changed

+98
-17
lines changed

6 files changed

+98
-17
lines changed

pkg/scheduler/actions/allocate/allocate.go

+11
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
102102
glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
103103
len(ssn.Nodes), job.Namespace, job.Name)
104104

105+
//any task that doesn't fit will be the last processed
106+
//within this loop context so any existing contents of
107+
//NodesFitDelta are for tasks that eventually did fit on a
108+
//node
109+
if len(job.NodesFitDelta) > 0 {
110+
job.NodesFitDelta = make(api.NodeResourceMap)
111+
}
105112
for _, node := range ssn.Nodes {
106113
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
107114
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
@@ -124,6 +131,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
124131
}
125132
assigned = true
126133
break
134+
} else {
135+
//store information about missing resources
136+
job.NodesFitDelta[node.Name] = node.Idle.Clone()
137+
job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
127138
}
128139

129140
// Allocate releasing resource to the task if any.

pkg/scheduler/api/job_info.go

+51-12
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
2727
arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
2828
"github.com/kubernetes-sigs/kube-batch/pkg/apis/utils"
29+
"sort"
30+
"strings"
2931
)
3032

3133
type TaskID types.UID
@@ -73,9 +75,8 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
7375
NodeName: pod.Spec.NodeName,
7476
Status: getTaskStatus(pod),
7577
Priority: 1,
76-
77-
Pod: pod,
78-
Resreq: req,
78+
Pod: pod,
79+
Resreq: req,
7980
}
8081

8182
if pod.Spec.Priority != nil {
@@ -109,6 +110,8 @@ type JobID types.UID
109110

110111
type tasksMap map[TaskID]*TaskInfo
111112

113+
type NodeResourceMap map[string]*Resource
114+
112115
type JobInfo struct {
113116
UID JobID
114117

@@ -122,6 +125,8 @@ type JobInfo struct {
122125
NodeSelector map[string]string
123126
MinAvailable int32
124127

128+
NodesFitDelta NodeResourceMap
129+
125130
// All tasks of the Job.
126131
TaskStatusIndex map[TaskStatus]tasksMap
127132
Tasks tasksMap
@@ -140,11 +145,11 @@ func NewJobInfo(uid JobID) *JobInfo {
140145
return &JobInfo{
141146
UID: uid,
142147

143-
MinAvailable: 0,
144-
NodeSelector: make(map[string]string),
145-
146-
Allocated: EmptyResource(),
147-
TotalRequest: EmptyResource(),
148+
MinAvailable: 0,
149+
NodeSelector: make(map[string]string),
150+
NodesFitDelta: make(NodeResourceMap),
151+
Allocated: EmptyResource(),
152+
TotalRequest: EmptyResource(),
148153

149154
TaskStatusIndex: map[TaskStatus]tasksMap{},
150155
Tasks: tasksMap{},
@@ -278,10 +283,11 @@ func (ji *JobInfo) Clone() *JobInfo {
278283
Namespace: ji.Namespace,
279284
Queue: ji.Queue,
280285

281-
MinAvailable: ji.MinAvailable,
282-
NodeSelector: map[string]string{},
283-
Allocated: ji.Allocated.Clone(),
284-
TotalRequest: ji.TotalRequest.Clone(),
286+
MinAvailable: ji.MinAvailable,
287+
NodeSelector: map[string]string{},
288+
Allocated: ji.Allocated.Clone(),
289+
TotalRequest: ji.TotalRequest.Clone(),
290+
NodesFitDelta: make(NodeResourceMap),
285291

286292
PDB: ji.PDB,
287293
PodGroup: ji.PodGroup,
@@ -314,3 +320,36 @@ func (ji JobInfo) String() string {
314320

315321
return fmt.Sprintf("Job (%v): name %v, minAvailable %d", ji.UID, ji.Name, ji.MinAvailable) + res
316322
}
323+
324+
// Error returns detailed information on why a job's task failed to fit on
325+
// each available node
326+
func (f *JobInfo) FitError() string {
327+
if len(f.NodesFitDelta) == 0 {
328+
reasonMsg := fmt.Sprintf("0 nodes are available")
329+
return reasonMsg
330+
}
331+
332+
reasons := make(map[string]int)
333+
for _, v := range f.NodesFitDelta {
334+
if v.Get(v1.ResourceCPU) < 0 {
335+
reasons["cpu"]++
336+
}
337+
if v.Get(v1.ResourceMemory) < 0 {
338+
reasons["memory"]++
339+
}
340+
if v.Get(GPUResourceName) < 0 {
341+
reasons["GPU"]++
342+
}
343+
}
344+
345+
sortReasonsHistogram := func() []string {
346+
reasonStrings := []string{}
347+
for k, v := range reasons {
348+
reasonStrings = append(reasonStrings, fmt.Sprintf("%v insufficient %v", v, k))
349+
}
350+
sort.Strings(reasonStrings)
351+
return reasonStrings
352+
}
353+
reasonMsg := fmt.Sprintf("0/%v nodes are available, %v.", len(f.NodesFitDelta), strings.Join(sortReasonsHistogram(), ", "))
354+
return reasonMsg
355+
}

pkg/scheduler/api/job_info_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ func TestAddTaskInfo(t *testing.T) {
7979
case01_task4.UID: case01_task4,
8080
},
8181
},
82-
NodeSelector: make(map[string]string),
82+
NodeSelector: make(map[string]string),
83+
NodesFitDelta: make(NodeResourceMap),
8384
},
8485
},
8586
}
@@ -143,7 +144,8 @@ func TestDeleteTaskInfo(t *testing.T) {
143144
Pending: {case01_task1.UID: case01_task1},
144145
Running: {case01_task3.UID: case01_task3},
145146
},
146-
NodeSelector: make(map[string]string),
147+
NodeSelector: make(map[string]string),
148+
NodesFitDelta: make(NodeResourceMap),
147149
},
148150
},
149151
{
@@ -168,7 +170,8 @@ func TestDeleteTaskInfo(t *testing.T) {
168170
case02_task3.UID: case02_task3,
169171
},
170172
},
171-
NodeSelector: make(map[string]string),
173+
NodeSelector: make(map[string]string),
174+
NodesFitDelta: make(NodeResourceMap),
172175
},
173176
},
174177
}

pkg/scheduler/api/resource_info.go

+19
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,25 @@ func (r *Resource) Sub(rr *Resource) *Resource {
109109
r, rr))
110110
}
111111

112+
//Computes the delta between a resource oject representing available
113+
//resources an operand representing resources being requested. Any
114+
//field that is less than 0 after the operation represents an
115+
//insufficient resource.
116+
func (r *Resource) FitDelta(rr *Resource) *Resource {
117+
if rr.MilliCPU > 0 {
118+
r.MilliCPU -= rr.MilliCPU + minMilliCPU
119+
}
120+
121+
if rr.Memory > 0 {
122+
r.Memory -= rr.Memory + minMemory
123+
}
124+
125+
if rr.MilliGPU > 0 {
126+
r.MilliGPU -= rr.MilliGPU + minMilliGPU
127+
}
128+
return r
129+
}
130+
112131
func (r *Resource) Multi(ratio float64) *Resource {
113132
r.MilliCPU = r.MilliCPU * ratio
114133
r.Memory = r.Memory * ratio

pkg/scheduler/cache/cache.go

+6
Original file line numberDiff line numberDiff line change
@@ -550,5 +550,11 @@ func (sc *SchedulerCache) Backoff(job *arbapi.JobInfo, event arbcorev1.Event, re
550550
return fmt.Errorf("no scheduling specification for job")
551551
}
552552

553+
for _, tasks := range job.TaskStatusIndex {
554+
for _, t := range tasks {
555+
sc.recorder.Eventf(t.Pod, v1.EventTypeWarning, string(event), reason)
556+
}
557+
}
558+
553559
return nil
554560
}

pkg/scheduler/plugins/gang/gang.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package gang
1818

1919
import (
20+
"fmt"
2021
"github.com/golang/glog"
2122

2223
arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
@@ -144,8 +145,10 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
144145

145146
func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
146147
for _, job := range ssn.Jobs {
147-
if len(job.TaskStatusIndex[api.Allocated]) != 0 {
148-
ssn.Backoff(job, arbcorev1.UnschedulableEvent, "not enough resource for job")
148+
if len(job.TaskStatusIndex[api.Pending]) != 0 {
149+
glog.V(3).Infof("Gang: <%v/%v> allocated: %v, pending: %v", job.Namespace, job.Name, len(job.TaskStatusIndex[api.Allocated]), len(job.TaskStatusIndex[api.Pending]))
150+
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
151+
ssn.Backoff(job, arbcorev1.UnschedulableEvent, msg)
149152
}
150153
}
151154
}

0 commit comments

Comments
 (0)