Skip to content

Commit 37251c4

Browse files
authored
Merge pull request #288 from volcano-sh/scheduler-in-tree
Scheduler in tree
2 parents 82b4ec4 + 2bf4884 commit 37251c4

File tree

111 files changed

+12996
-41
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+12996
-41
lines changed

cmd/admission/app/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"net/http"
2424

2525
"github.com/golang/glog"
26-
"github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
26+
"volcano.sh/volcano/pkg/client/clientset/versioned"
2727

2828
"k8s.io/api/admission/v1beta1"
2929
"k8s.io/apimachinery/pkg/runtime"

cmd/controllers/app/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import (
3838
"k8s.io/client-go/tools/leaderelection/resourcelock"
3939
"k8s.io/client-go/tools/record"
4040

41-
kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
41+
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
4242

4343
"volcano.sh/volcano/cmd/controllers/app/options"
4444
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"

cmd/kube-batch/app/options/options.go

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package options
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"github.com/spf13/pflag"
24+
)
25+
26+
const (
27+
defaultSchedulerName = "volcano"
28+
defaultSchedulerPeriod = time.Second
29+
defaultQueue = "default"
30+
defaultListenAddress = ":8080"
31+
32+
defaultQPS = 50.0
33+
defaultBurst = 100
34+
)
35+
36+
// ServerOption is the main context object for the controller manager.
37+
type ServerOption struct {
38+
Master string
39+
Kubeconfig string
40+
SchedulerName string
41+
SchedulerConf string
42+
SchedulePeriod time.Duration
43+
EnableLeaderElection bool
44+
LockObjectNamespace string
45+
DefaultQueue string
46+
PrintVersion bool
47+
ListenAddress string
48+
EnablePriorityClass bool
49+
KubeAPIBurst int
50+
KubeAPIQPS float32
51+
}
52+
53+
// ServerOpts server options
54+
var ServerOpts *ServerOption
55+
56+
// NewServerOption creates a new CMServer with a default config.
57+
func NewServerOption() *ServerOption {
58+
s := ServerOption{}
59+
return &s
60+
}
61+
62+
// AddFlags adds flags for a specific CMServer to the specified FlagSet
63+
func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
64+
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
65+
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information")
66+
// kube-batch will ignore pods with scheduler names other than specified with the option
67+
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "kube-batch will handle pods whose .spec.SchedulerName is same as scheduler-name")
68+
fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file")
69+
fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle")
70+
fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job")
71+
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection,
72+
"Start a leader election client and gain leadership before "+
73+
"executing the main loop. Enable this when running replicated kube-batch for high availability")
74+
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
75+
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object that is used for leader election")
76+
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
77+
fs.BoolVar(&s.EnablePriorityClass, "priority-class", true,
78+
"Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false")
79+
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
80+
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
81+
}
82+
83+
// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled
84+
func (s *ServerOption) CheckOptionOrDie() error {
85+
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
86+
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
87+
}
88+
89+
return nil
90+
}
91+
92+
// RegisterOptions registers options
93+
func (s *ServerOption) RegisterOptions() {
94+
ServerOpts = s
95+
}
+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package options
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
"time"
23+
24+
"github.com/spf13/pflag"
25+
)
26+
27+
func TestAddFlags(t *testing.T) {
28+
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
29+
s := NewServerOption()
30+
s.AddFlags(fs)
31+
32+
args := []string{
33+
"--schedule-period=5m",
34+
"--priority-class=false",
35+
}
36+
fs.Parse(args)
37+
38+
// This is a snapshot of expected options parsed by args.
39+
expected := &ServerOption{
40+
SchedulerName: defaultSchedulerName,
41+
SchedulePeriod: 5 * time.Minute,
42+
DefaultQueue: defaultQueue,
43+
ListenAddress: defaultListenAddress,
44+
KubeAPIBurst: defaultBurst,
45+
KubeAPIQPS: defaultQPS,
46+
}
47+
48+
if !reflect.DeepEqual(expected, s) {
49+
t.Errorf("Got different run options than expected.\nGot: %+v\nExpected: %+v\n", s, expected)
50+
}
51+
}

cmd/kube-batch/app/server.go

+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package app
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net/http"
23+
"os"
24+
"time"
25+
26+
"github.com/golang/glog"
27+
"github.com/prometheus/client_golang/prometheus/promhttp"
28+
"volcano.sh/volcano/cmd/kube-batch/app/options"
29+
"volcano.sh/volcano/pkg/scheduler"
30+
"volcano.sh/volcano/pkg/version"
31+
32+
v1 "k8s.io/api/core/v1"
33+
"k8s.io/apimachinery/pkg/util/uuid"
34+
clientset "k8s.io/client-go/kubernetes"
35+
"k8s.io/client-go/kubernetes/scheme"
36+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
37+
38+
// Register gcp auth
39+
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
40+
"k8s.io/client-go/rest"
41+
restclient "k8s.io/client-go/rest"
42+
"k8s.io/client-go/tools/clientcmd"
43+
"k8s.io/client-go/tools/leaderelection"
44+
"k8s.io/client-go/tools/leaderelection/resourcelock"
45+
"k8s.io/client-go/tools/record"
46+
)
47+
48+
const (
49+
leaseDuration = 15 * time.Second
50+
renewDeadline = 10 * time.Second
51+
retryPeriod = 5 * time.Second
52+
apiVersion = "v1alpha1"
53+
)
54+
55+
func buildConfig(opt *options.ServerOption) (*rest.Config, error) {
56+
var cfg *rest.Config
57+
var err error
58+
59+
master := opt.Master
60+
kubeconfig := opt.Kubeconfig
61+
if master != "" || kubeconfig != "" {
62+
cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
63+
} else {
64+
cfg, err = rest.InClusterConfig()
65+
}
66+
if err != nil {
67+
return nil, err
68+
}
69+
cfg.QPS = opt.KubeAPIQPS
70+
cfg.Burst = opt.KubeAPIBurst
71+
72+
return cfg, nil
73+
}
74+
75+
// Run the kubeBatch scheduler
76+
func Run(opt *options.ServerOption) error {
77+
if opt.PrintVersion {
78+
version.PrintVersionAndExit()
79+
}
80+
81+
config, err := buildConfig(opt)
82+
if err != nil {
83+
return err
84+
}
85+
86+
// Start policy controller to allocate resources.
87+
sched, err := scheduler.NewScheduler(config,
88+
opt.SchedulerName,
89+
opt.SchedulerConf,
90+
opt.SchedulePeriod,
91+
opt.DefaultQueue)
92+
if err != nil {
93+
panic(err)
94+
}
95+
96+
go func() {
97+
http.Handle("/metrics", promhttp.Handler())
98+
glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
99+
}()
100+
101+
run := func(ctx context.Context) {
102+
sched.Run(ctx.Done())
103+
<-ctx.Done()
104+
}
105+
106+
if !opt.EnableLeaderElection {
107+
run(context.TODO())
108+
return fmt.Errorf("finished without leader elect")
109+
}
110+
111+
leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(config, "leader-election"))
112+
if err != nil {
113+
return err
114+
}
115+
116+
// Prepare event clients.
117+
broadcaster := record.NewBroadcaster()
118+
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
119+
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: opt.SchedulerName})
120+
121+
hostname, err := os.Hostname()
122+
if err != nil {
123+
return fmt.Errorf("unable to get hostname: %v", err)
124+
}
125+
// add a uniquifier so that two processes on the same host don't accidentally both become active
126+
id := hostname + "_" + string(uuid.NewUUID())
127+
128+
rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
129+
opt.LockObjectNamespace,
130+
"kube-batch",
131+
leaderElectionClient.CoreV1(),
132+
resourcelock.ResourceLockConfig{
133+
Identity: id,
134+
EventRecorder: eventRecorder,
135+
})
136+
if err != nil {
137+
return fmt.Errorf("couldn't create resource lock: %v", err)
138+
}
139+
140+
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
141+
Lock: rl,
142+
LeaseDuration: leaseDuration,
143+
RenewDeadline: renewDeadline,
144+
RetryPeriod: retryPeriod,
145+
Callbacks: leaderelection.LeaderCallbacks{
146+
OnStartedLeading: run,
147+
OnStoppedLeading: func() {
148+
glog.Fatalf("leaderelection lost")
149+
},
150+
},
151+
})
152+
return fmt.Errorf("lost lease")
153+
}

cmd/kube-batch/main.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2018 The Vulcan Authors.
2+
Copyright 2017 The Kubernetes Authors.
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -18,24 +18,34 @@ package main
1818
import (
1919
"fmt"
2020
"os"
21+
"runtime"
2122
"time"
2223

24+
// init pprof server
25+
_ "net/http/pprof"
26+
2327
"github.com/golang/glog"
2428
"github.com/spf13/pflag"
2529

2630
"k8s.io/apimachinery/pkg/util/wait"
2731
"k8s.io/apiserver/pkg/util/flag"
2832

29-
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions"
30-
_ "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins"
33+
"volcano.sh/volcano/cmd/kube-batch/app"
34+
"volcano.sh/volcano/cmd/kube-batch/app/options"
35+
36+
// Import default actions/plugins.
37+
_ "volcano.sh/volcano/pkg/scheduler/actions"
38+
_ "volcano.sh/volcano/pkg/scheduler/plugins"
3139

32-
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app"
33-
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
40+
// init assert
41+
_ "volcano.sh/volcano/pkg/scheduler/util/assert"
3442
)
3543

3644
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")
3745

3846
func main() {
47+
runtime.GOMAXPROCS(runtime.NumCPU())
48+
3949
s := options.NewServerOption()
4050
s.AddFlags(pflag.CommandLine)
4151
s.RegisterOptions()

hack/.golint_failures

+7
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
1+
volcano.sh/volcano/pkg/apis/scheduling/v1alpha1
2+
volcano.sh/volcano/pkg/apis/utils
3+
volcano.sh/volcano/pkg/scheduler/actions/allocate
4+
volcano.sh/volcano/pkg/scheduler/actions/backfill
5+
volcano.sh/volcano/pkg/scheduler/actions/enqueue
6+
volcano.sh/volcano/pkg/scheduler/actions/preempt
7+
volcano.sh/volcano/pkg/scheduler/actions/reclaim
18
volcano.sh/volcano/test/e2e

hack/update-gencode.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${SCRIPT_ROOT}; ls -d -1 ./vendor/k8s.io/code-ge
2929
# instead of the $GOPATH directly. For normal projects this can be dropped.
3030
${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \
3131
volcano.sh/volcano/pkg/client volcano.sh/volcano/pkg/apis \
32-
"batch:v1alpha1 bus:v1alpha1" \
32+
"batch:v1alpha1 bus:v1alpha1 scheduling:v1alpha1" \
3333
--go-header-file ${SCRIPT_ROOT}/hack/boilerplate/boilerplate.go.txt
3434

3535
# To use your own boilerplate text use:

pkg/admission/admit_job.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"strings"
2222

2323
"github.com/golang/glog"
24-
"github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
24+
"volcano.sh/volcano/pkg/client/clientset/versioned"
2525

2626
"k8s.io/api/admission/v1beta1"
2727
"k8s.io/api/core/v1"

0 commit comments

Comments
 (0)