Skip to content

Commit 970e005

Browse files
author
mada 00483107
committed
Merge branch 'performance' into 'master'
[Issue volcano-sh#61] scheduler性能优化,减少大量deepcopy操作,增加qps/burst ## 优化逻辑 1. 增加kubeclient出口限流的配置,并配置为 default-scheduler 对应的 qps 50 / burst 100 2. 在predicates和nodeorder内减少大量deepcopy的操作 ## 遗留问题 1. scheduler的cache内部使用全局大锁,此处需要优化 2. nodeorder内nodeInfo每次执行nodeOrderFn的时候均会对node进行重建,此处消耗性能较大 ## 性能图 ![profile001.svg](/uploads/7e323f66f04fe8e8023766fddc1f1835/profile001.svg) ## 优化结果 带有1000pod,minA为1的volcanojob,创建时间从55s变为22s Issues info: Issue ID: 61 Title: scheduler性能优化,减少大量deepcopy操作,增加qps/burst Issue url: CBU-PaaS/Community/volcano/volcano#61 See merge request CBU-PaaS/Community/volcano/volcano!103
2 parents 45bb2c3 + 10ac1ad commit 970e005

File tree

7 files changed

+95
-29
lines changed

7 files changed

+95
-29
lines changed

cmd/controllers/app/options/options.go

+9
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@ import (
2222
"github.com/spf13/pflag"
2323
)
2424

25+
const (
26+
defaultQPS = 50.0
27+
defaultBurst = 100
28+
)
29+
2530
// ServerOption is the main context object for the controller manager.
2631
type ServerOption struct {
2732
Master string
2833
Kubeconfig string
2934
EnableLeaderElection bool
3035
LockObjectNamespace string
36+
KubeAPIBurst int
37+
KubeAPIQPS float32
3138
}
3239

3340
// NewServerOption creates a new CMServer with a default config.
@@ -43,6 +50,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
4350
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection, "Start a leader election client and gain leadership before "+
4451
"executing the main loop. Enable this when running replicated kar-scheduler for high availability.")
4552
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object.")
53+
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
54+
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
4655
}
4756

4857
func (s *ServerOption) CheckOptionOrDie() error {

cmd/controllers/app/server.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,28 @@ const (
4646
retryPeriod = 5 * time.Second
4747
)
4848

49-
func buildConfig(master, kubeconfig string) (*rest.Config, error) {
49+
func buildConfig(opt *options.ServerOption) (*rest.Config, error) {
50+
var cfg *rest.Config
51+
var err error
52+
53+
master := opt.Master
54+
kubeconfig := opt.Kubeconfig
5055
if master != "" || kubeconfig != "" {
51-
return clientcmd.BuildConfigFromFlags(master, kubeconfig)
56+
cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
57+
} else {
58+
cfg, err = rest.InClusterConfig()
59+
}
60+
if err != nil {
61+
return nil, err
5262
}
53-
return rest.InClusterConfig()
63+
cfg.QPS = opt.KubeAPIQPS
64+
cfg.Burst = opt.KubeAPIBurst
65+
66+
return cfg, nil
5467
}
5568

5669
func Run(opt *options.ServerOption) error {
57-
config, err := buildConfig(opt.Master, opt.Kubeconfig)
70+
config, err := buildConfig(opt)
5871
if err != nil {
5972
return err
6073
}

cmd/kube-batch/app/options/options.go

+6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const (
2828
defaultSchedulerPeriod = time.Second
2929
defaultQueue = "default"
3030
defaultListenAddress = ":8080"
31+
defaultQPS = 50.0
32+
defaultBurst = 100
3133
)
3234

3335
// ServerOption is the main context object for the controller manager.
@@ -43,6 +45,8 @@ type ServerOption struct {
4345
PrintVersion bool
4446
ListenAddress string
4547
EnablePriorityClass bool
48+
KubeAPIBurst int
49+
KubeAPIQPS float32
4650
}
4751

4852
// ServerOpts server options
@@ -71,6 +75,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
7175
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
7276
fs.BoolVar(&s.EnablePriorityClass, "priority-class", true,
7377
"Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false")
78+
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
79+
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
7480
}
7581

7682
// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled

cmd/kube-batch/app/server.go

+20-5
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ import (
2020
"context"
2121
"fmt"
2222
"net/http"
23+
_ "net/http/pprof"
2324
"os"
2425
"time"
2526

2627
"github.com/golang/glog"
28+
29+
"github.com/prometheus/client_golang/prometheus/promhttp"
2730
"volcano.sh/volcano/cmd/kube-batch/app/options"
2831
"volcano.sh/volcano/pkg/scheduler"
2932
"volcano.sh/volcano/pkg/version"
30-
"github.com/prometheus/client_golang/prometheus/promhttp"
3133

3234
v1 "k8s.io/api/core/v1"
3335
"k8s.io/apimachinery/pkg/util/uuid"
@@ -51,11 +53,24 @@ const (
5153
apiVersion = "v1alpha1"
5254
)
5355

54-
func buildConfig(master, kubeconfig string) (*rest.Config, error) {
56+
func buildConfig(opt *options.ServerOption) (*rest.Config, error) {
57+
var cfg *rest.Config
58+
var err error
59+
60+
master := opt.Master
61+
kubeconfig := opt.Kubeconfig
5562
if master != "" || kubeconfig != "" {
56-
return clientcmd.BuildConfigFromFlags(master, kubeconfig)
63+
cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
64+
} else {
65+
cfg, err = rest.InClusterConfig()
5766
}
58-
return rest.InClusterConfig()
67+
if err != nil {
68+
return nil, err
69+
}
70+
cfg.QPS = opt.KubeAPIQPS
71+
cfg.Burst = opt.KubeAPIBurst
72+
73+
return cfg, nil
5974
}
6075

6176
// Run the kubeBatch scheduler
@@ -64,7 +79,7 @@ func Run(opt *options.ServerOption) error {
6479
version.PrintVersionAndExit(apiVersion)
6580
}
6681

67-
config, err := buildConfig(opt.Master, opt.Kubeconfig)
82+
config, err := buildConfig(opt)
6883
if err != nil {
6984
return err
7085
}

pkg/scheduler/plugins/nodeorder/nodeorder.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
"github.com/golang/glog"
2323

24-
v1 "k8s.io/api/core/v1"
24+
"k8s.io/api/core/v1"
2525
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
2626
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
2727
"k8s.io/kubernetes/pkg/scheduler/cache"
@@ -153,22 +153,19 @@ func calculateWeight(args framework.Arguments) priorityWeight {
153153
}
154154

155155
func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
156-
nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
157-
158-
weight := calculateWeight(pp.pluginArguments)
156+
weight := calculateWeight(pp.pluginArguments)
159157

160-
pl := &util.PodLister{
161-
Session: ssn,
162-
}
158+
pl := util.NewPodLister(ssn)
163159

164-
nl := &util.NodeLister{
165-
Session: ssn,
166-
}
160+
nl := &util.NodeLister{
161+
Session: ssn,
162+
}
167163

168-
cn := &cachedNodeInfo{
169-
session: ssn,
170-
}
164+
cn := &cachedNodeInfo{
165+
session: ssn,
166+
}
171167

168+
nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
172169
var nodeMap map[string]*cache.NodeInfo
173170
var nodeSlice []*v1.Node
174171
var interPodAffinityScore schedulerapi.HostPriorityList

pkg/scheduler/plugins/predicates/predicates.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,7 @@ func formatReason(reasons []algorithm.PredicateFailureReason) string {
5555
}
5656

5757
func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
58-
pl := &util.PodLister{
59-
Session: ssn,
60-
}
58+
pl := util.NewPodLister(ssn)
6159

6260
ni := &util.CachedNodeInfo{
6361
Session: ssn,

pkg/scheduler/plugins/util/util.go

+32-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package util
1818

1919
import (
2020
"fmt"
21+
"sync"
2122

2223
"k8s.io/api/core/v1"
2324
"k8s.io/apimachinery/pkg/labels"
@@ -30,6 +31,35 @@ import (
3031
// PodLister is used in predicate and nodeorder plugin
3132
type PodLister struct {
3233
Session *framework.Session
34+
35+
podLock sync.RWMutex
36+
UpdatedPod map[api.TaskID]*v1.Pod
37+
}
38+
39+
func NewPodLister(ssn *framework.Session) *PodLister {
40+
return &PodLister{
41+
Session: ssn,
42+
UpdatedPod: make(map[api.TaskID]*v1.Pod),
43+
}
44+
}
45+
46+
func (pl *PodLister) updateTask(task *api.TaskInfo) *v1.Pod {
47+
pl.podLock.RLock()
48+
pod, found := pl.UpdatedPod[task.UID]
49+
pl.podLock.RUnlock()
50+
51+
if !found {
52+
pod = task.Pod.DeepCopy()
53+
pod.Spec.NodeName = task.NodeName
54+
55+
pl.podLock.Lock()
56+
pl.UpdatedPod[task.UID] = pod
57+
pl.podLock.Unlock()
58+
} else {
59+
pod.Spec.NodeName = task.NodeName
60+
}
61+
62+
return pod
3363
}
3464

3565
// List method is used to list all the pods
@@ -44,8 +74,7 @@ func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
4474
for _, task := range tasks {
4575
if selector.Matches(labels.Set(task.Pod.Labels)) {
4676
if task.NodeName != task.Pod.Spec.NodeName {
47-
pod := task.Pod.DeepCopy()
48-
pod.Spec.NodeName = task.NodeName
77+
pod := pl.updateTask(task)
4978
pods = append(pods, pod)
5079
} else {
5180
pods = append(pods, task.Pod)
@@ -70,8 +99,7 @@ func (pl *PodLister) FilteredList(podFilter algorithm.PodFilter, selector labels
7099
for _, task := range tasks {
71100
if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) {
72101
if task.NodeName != task.Pod.Spec.NodeName {
73-
pod := task.Pod.DeepCopy()
74-
pod.Spec.NodeName = task.NodeName
102+
pod := pl.updateTask(task)
75103
pods = append(pods, pod)
76104
} else {
77105
pods = append(pods, task.Pod)

0 commit comments

Comments
 (0)