Skip to content

Commit e0ea36e

Browse files
author
Klaus Ma
authored
Merge pull request #9 from volcano-sh/add_kb
Added actions/plugins.
2 parents 888fb85 + 12e16e8 commit e0ea36e

File tree

19 files changed

+338
-60
lines changed

19 files changed

+338
-60
lines changed

cmd/scheduler/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import (
2626
"k8s.io/apimachinery/pkg/util/wait"
2727
"k8s.io/apiserver/pkg/util/flag"
2828

29-
_ "volcano.sh/volcano/pkg/scheduler/algorithm"
29+
_ "volcano.sh/volcano/pkg/scheduler/actions"
30+
_ "volcano.sh/volcano/pkg/scheduler/plugins"
3031

3132
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app"
3233
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import (
2121

2222
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
2323
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
24-
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
24+
25+
"volcano.sh/volcano/pkg/scheduler/util"
2526
)
2627

2728
type allocateAction struct {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package allocate
18+
19+
import (
20+
"fmt"
21+
22+
"reflect"
23+
"sync"
24+
"testing"
25+
"time"
26+
27+
"k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/api/resource"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/types"
31+
"k8s.io/client-go/tools/record"
32+
33+
kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
34+
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
35+
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache"
36+
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf"
37+
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
38+
39+
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
40+
"volcano.sh/volcano/pkg/scheduler/plugins/proportion"
41+
)
42+
43+
func buildResourceList(cpu string, memory string) v1.ResourceList {
44+
return v1.ResourceList{
45+
v1.ResourceCPU: resource.MustParse(cpu),
46+
v1.ResourceMemory: resource.MustParse(memory),
47+
api.GPUResourceName: resource.MustParse("0"),
48+
}
49+
}
50+
51+
func buildResourceListWithGPU(cpu string, memory string, GPU string) v1.ResourceList {
52+
return v1.ResourceList{
53+
v1.ResourceCPU: resource.MustParse(cpu),
54+
v1.ResourceMemory: resource.MustParse(memory),
55+
api.GPUResourceName: resource.MustParse(GPU),
56+
}
57+
}
58+
59+
func buildNode(name string, alloc v1.ResourceList, labels map[string]string) *v1.Node {
60+
return &v1.Node{
61+
ObjectMeta: metav1.ObjectMeta{
62+
Name: name,
63+
Labels: labels,
64+
},
65+
Status: v1.NodeStatus{
66+
Capacity: alloc,
67+
Allocatable: alloc,
68+
},
69+
}
70+
}
71+
72+
func buildPod(ns, n, nn string, p v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string) *v1.Pod {
73+
return &v1.Pod{
74+
ObjectMeta: metav1.ObjectMeta{
75+
UID: types.UID(fmt.Sprintf("%v-%v", ns, n)),
76+
Name: n,
77+
Namespace: ns,
78+
Labels: labels,
79+
Annotations: map[string]string{
80+
kbv1.GroupNameAnnotationKey: groupName,
81+
},
82+
},
83+
Status: v1.PodStatus{
84+
Phase: p,
85+
},
86+
Spec: v1.PodSpec{
87+
NodeName: nn,
88+
NodeSelector: selector,
89+
Containers: []v1.Container{
90+
{
91+
Resources: v1.ResourceRequirements{
92+
Requests: req,
93+
},
94+
},
95+
},
96+
},
97+
}
98+
}
99+
100+
type fakeBinder struct {
101+
sync.Mutex
102+
binds map[string]string
103+
c chan string
104+
}
105+
106+
func (fb *fakeBinder) Bind(p *v1.Pod, hostname string) error {
107+
fb.Lock()
108+
defer fb.Unlock()
109+
110+
key := fmt.Sprintf("%v/%v", p.Namespace, p.Name)
111+
fb.binds[key] = hostname
112+
113+
fb.c <- key
114+
115+
return nil
116+
}
117+
118+
type fakeStatusUpdater struct {
119+
}
120+
121+
func (ftsu *fakeStatusUpdater) UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) {
122+
// do nothing here
123+
return nil, nil
124+
}
125+
126+
func (ftsu *fakeStatusUpdater) UpdatePodGroup(pg *kbv1.PodGroup) (*kbv1.PodGroup, error) {
127+
// do nothing here
128+
return nil, nil
129+
}
130+
131+
type fakeVolumeBinder struct {
132+
}
133+
134+
func (fvb *fakeVolumeBinder) AllocateVolumes(task *api.TaskInfo, hostname string) error {
135+
return nil
136+
}
137+
func (fvb *fakeVolumeBinder) BindVolumes(task *api.TaskInfo) error {
138+
return nil
139+
}
140+
141+
func TestAllocate(t *testing.T) {
142+
framework.RegisterPluginBuilder("drf", drf.New)
143+
framework.RegisterPluginBuilder("proportion", proportion.New)
144+
defer framework.CleanupPluginBuilders()
145+
146+
tests := []struct {
147+
name string
148+
podGroups []*kbv1.PodGroup
149+
pods []*v1.Pod
150+
nodes []*v1.Node
151+
queues []*kbv1.Queue
152+
expected map[string]string
153+
}{
154+
{
155+
name: "one Job with two Pods on one node",
156+
podGroups: []*kbv1.PodGroup{
157+
{
158+
ObjectMeta: metav1.ObjectMeta{
159+
Name: "pg1",
160+
Namespace: "c1",
161+
},
162+
Spec: kbv1.PodGroupSpec{
163+
Queue: "c1",
164+
},
165+
},
166+
},
167+
pods: []*v1.Pod{
168+
buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
169+
buildPod("c1", "p2", "", v1.PodPending, buildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
170+
},
171+
nodes: []*v1.Node{
172+
buildNode("n1", buildResourceList("2", "4Gi"), make(map[string]string)),
173+
},
174+
queues: []*kbv1.Queue{
175+
{
176+
ObjectMeta: metav1.ObjectMeta{
177+
Name: "c1",
178+
},
179+
Spec: kbv1.QueueSpec{
180+
Weight: 1,
181+
},
182+
},
183+
},
184+
expected: map[string]string{
185+
"c1/p1": "n1",
186+
"c1/p2": "n1",
187+
},
188+
},
189+
{
190+
name: "two Jobs on one node",
191+
podGroups: []*kbv1.PodGroup{
192+
{
193+
ObjectMeta: metav1.ObjectMeta{
194+
Name: "pg1",
195+
Namespace: "c1",
196+
},
197+
Spec: kbv1.PodGroupSpec{
198+
Queue: "c1",
199+
},
200+
},
201+
{
202+
ObjectMeta: metav1.ObjectMeta{
203+
Name: "pg2",
204+
Namespace: "c2",
205+
},
206+
Spec: kbv1.PodGroupSpec{
207+
Queue: "c2",
208+
},
209+
},
210+
},
211+
212+
pods: []*v1.Pod{
213+
// pending pod with owner1, under c1
214+
buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
215+
// pending pod with owner1, under c1
216+
buildPod("c1", "p2", "", v1.PodPending, buildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)),
217+
// pending pod with owner2, under c2
218+
buildPod("c2", "p1", "", v1.PodPending, buildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
219+
// pending pod with owner, under c2
220+
buildPod("c2", "p2", "", v1.PodPending, buildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)),
221+
},
222+
nodes: []*v1.Node{
223+
buildNode("n1", buildResourceList("2", "4G"), make(map[string]string)),
224+
},
225+
queues: []*kbv1.Queue{
226+
{
227+
ObjectMeta: metav1.ObjectMeta{
228+
Name: "c1",
229+
},
230+
Spec: kbv1.QueueSpec{
231+
Weight: 1,
232+
},
233+
},
234+
{
235+
ObjectMeta: metav1.ObjectMeta{
236+
Name: "c2",
237+
},
238+
Spec: kbv1.QueueSpec{
239+
Weight: 1,
240+
},
241+
},
242+
},
243+
expected: map[string]string{
244+
"c2/p1": "n1",
245+
"c1/p1": "n1",
246+
},
247+
},
248+
}
249+
250+
allocate := New()
251+
252+
for i, test := range tests {
253+
binder := &fakeBinder{
254+
binds: map[string]string{},
255+
c: make(chan string),
256+
}
257+
schedulerCache := &cache.SchedulerCache{
258+
Nodes: make(map[string]*api.NodeInfo),
259+
Jobs: make(map[api.JobID]*api.JobInfo),
260+
Queues: make(map[api.QueueID]*api.QueueInfo),
261+
Binder: binder,
262+
StatusUpdater: &fakeStatusUpdater{},
263+
VolumeBinder: &fakeVolumeBinder{},
264+
265+
Recorder: record.NewFakeRecorder(100),
266+
}
267+
for _, node := range test.nodes {
268+
schedulerCache.AddNode(node)
269+
}
270+
for _, pod := range test.pods {
271+
schedulerCache.AddPod(pod)
272+
}
273+
274+
for _, ss := range test.podGroups {
275+
schedulerCache.AddPodGroup(ss)
276+
}
277+
278+
for _, q := range test.queues {
279+
schedulerCache.AddQueue(q)
280+
}
281+
282+
ssn := framework.OpenSession(schedulerCache, []conf.Tier{
283+
{
284+
Plugins: []conf.PluginOption{
285+
{
286+
Name: "drf",
287+
},
288+
{
289+
Name: "proportion",
290+
},
291+
},
292+
},
293+
})
294+
defer framework.CloseSession(ssn)
295+
296+
allocate.Execute(ssn)
297+
298+
for i := 0; i < len(test.expected); i++ {
299+
select {
300+
case <-binder.c:
301+
case <-time.After(3 * time.Second):
302+
t.Errorf("Failed to get binding request.")
303+
}
304+
}
305+
306+
if !reflect.DeepEqual(test.expected, binder.binds) {
307+
t.Errorf("case %d (%s): expected: %v, got %v ", i, test.name, test.expected, binder.binds)
308+
}
309+
}
310+
}
+4-4
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package actions
1919
import (
2020
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
2121

22-
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate"
23-
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill"
24-
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt"
25-
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim"
22+
"volcano.sh/volcano/pkg/scheduler/actions/allocate"
23+
"volcano.sh/volcano/pkg/scheduler/actions/backfill"
24+
"volcano.sh/volcano/pkg/scheduler/actions/preempt"
25+
"volcano.sh/volcano/pkg/scheduler/actions/reclaim"
2626
)
2727

2828
func init() {
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import (
2424
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
2525
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
2626
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
27-
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
27+
28+
"volcano.sh/volcano/pkg/scheduler/util"
2829
)
2930

3031
type preemptAction struct {
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2018 The Vulcan Authors.
2+
Copyright 2018 The Kubernetes Authors.
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -14,17 +14,19 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package algorithm
17+
package preempt
1818

1919
import (
20+
"testing"
21+
2022
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
21-
// Import default actions/plugins.
22-
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions"
23-
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins"
2423

25-
"volcano.sh/volcano/pkg/scheduler/algorithm/fairshare"
24+
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
2625
)
2726

28-
func init() {
29-
framework.RegisterPluginBuilder("fairshare", fairshare.New)
27+
func TestPreempt(t *testing.T) {
28+
framework.RegisterPluginBuilder("drf", drf.New)
29+
defer framework.CleanupPluginBuilders()
30+
31+
// TODO (k82cn): Add UT cases here.
3032
}

0 commit comments

Comments
 (0)