Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 1eed8a4

Browse files
authored
Merge pull request #619 from k82cn/automated-cherry-pick-of-#603-upstream-release-0.4
Automated cherry pick of #603: Added PriorityClass to PodGroup.
2 parents 550044e + f6f84ab commit 1eed8a4

File tree

4 files changed

+141
-12
lines changed

4 files changed

+141
-12
lines changed

pkg/apis/scheduling/v1alpha1/types.go

+9
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ type PodGroupSpec struct {
114114
// Queue defines the queue to allocate resource for PodGroup; if queue does not exist,
115115
// the PodGroup will not be scheduled.
116116
Queue string `json:"queue,omitempty" protobuf:"bytes,2,opt,name=queue"`
117+
118+
// If specified, indicates the PodGroup's priority. "system-node-critical" and
119+
// "system-cluster-critical" are two special keywords which indicate the
120+
// highest priorities with the former being the highest priority. Any other
121+
// name must be defined by creating a PriorityClass object with that name.
122+
// If not specified, the PodGroup priority will be default or zero if there is no
123+
// default.
124+
// +optional
125+
PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"`
117126
}
118127

119128
// PodGroupStatus represents the current state of a pod group.

pkg/scheduler/api/job_info.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ type JobInfo struct {
124124

125125
Queue QueueID
126126

127-
Priority int
127+
Priority int32
128128

129129
NodeSelector map[string]string
130130
MinAvailable int32

pkg/scheduler/cache/cache.go

+36-11
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ import (
2525
"github.com/golang/glog"
2626

2727
"k8s.io/api/core/v1"
28+
"k8s.io/api/scheduling/v1beta1"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/runtime"
3031
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3132
"k8s.io/apimachinery/pkg/util/wait"
3233
"k8s.io/client-go/informers"
3334
infov1 "k8s.io/client-go/informers/core/v1"
3435
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
36+
schedv1 "k8s.io/client-go/informers/scheduling/v1beta1"
3537
storagev1 "k8s.io/client-go/informers/storage/v1"
3638
"k8s.io/client-go/kubernetes"
3739
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -82,6 +84,7 @@ type SchedulerCache struct {
8284
pvInformer infov1.PersistentVolumeInformer
8385
pvcInformer infov1.PersistentVolumeClaimInformer
8486
scInformer storagev1.StorageClassInformer
87+
pcInformer schedv1.PriorityClassInformer
8588

8689
Binder Binder
8790
Evictor Evictor
@@ -90,9 +93,12 @@ type SchedulerCache struct {
9093

9194
Recorder record.EventRecorder
9295

93-
Jobs map[kbapi.JobID]*kbapi.JobInfo
94-
Nodes map[string]*kbapi.NodeInfo
95-
Queues map[kbapi.QueueID]*kbapi.QueueInfo
96+
Jobs map[kbapi.JobID]*kbapi.JobInfo
97+
Nodes map[string]*kbapi.NodeInfo
98+
Queues map[kbapi.QueueID]*kbapi.QueueInfo
99+
PriorityClasses map[string]*v1beta1.PriorityClass
100+
defaultPriorityClass *v1beta1.PriorityClass
101+
defaultPriority int32
96102

97103
errTasks workqueue.RateLimitingInterface
98104
deletedJobs workqueue.RateLimitingInterface
@@ -179,14 +185,15 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {
179185

180186
func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache {
181187
sc := &SchedulerCache{
182-
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
183-
Nodes: make(map[string]*kbapi.NodeInfo),
184-
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
185-
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
186-
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
187-
kubeclient: kubernetes.NewForConfigOrDie(config),
188-
kbclient: kbver.NewForConfigOrDie(config),
189-
defaultQueue: defaultQueue,
188+
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
189+
Nodes: make(map[string]*kbapi.NodeInfo),
190+
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
191+
PriorityClasses: make(map[string]*v1beta1.PriorityClass),
192+
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
193+
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
194+
kubeclient: kubernetes.NewForConfigOrDie(config),
195+
kbclient: kbver.NewForConfigOrDie(config),
196+
defaultQueue: defaultQueue,
190197
}
191198

192199
// Prepare event clients.
@@ -263,6 +270,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
263270
DeleteFunc: sc.DeletePDB,
264271
})
265272

273+
sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses()
274+
sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
275+
AddFunc: sc.AddPriorityClass,
276+
UpdateFunc: sc.UpdatePriorityClass,
277+
DeleteFunc: sc.DeletePriorityClass,
278+
})
279+
266280
kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0)
267281
// create informer for PodGroup information
268282
sc.podGroupInformer = kbinformer.Scheduling().V1alpha1().PodGroups()
@@ -292,6 +306,7 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
292306
go sc.pvcInformer.Informer().Run(stopCh)
293307
go sc.scInformer.Informer().Run(stopCh)
294308
go sc.queueInformer.Informer().Run(stopCh)
309+
go sc.pcInformer.Informer().Run(stopCh)
295310

296311
// Re-sync error tasks.
297312
go wait.Until(sc.processResyncTask, 0, stopCh)
@@ -311,6 +326,7 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
311326
sc.pvcInformer.Informer().HasSynced,
312327
sc.scInformer.Informer().HasSynced,
313328
sc.queueInformer.Informer().HasSynced,
329+
sc.pcInformer.Informer().HasSynced,
314330
)
315331
}
316332

@@ -527,6 +543,15 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
527543
continue
528544
}
529545

546+
if value.PodGroup != nil {
547+
value.Priority = sc.defaultPriority
548+
549+
priName := value.PodGroup.Spec.PriorityClassName
550+
if priorityClass, found := sc.PriorityClasses[priName]; found {
551+
value.Priority = priorityClass.Value
552+
}
553+
}
554+
530555
snapshot.Jobs[value.UID] = value.Clone()
531556
}
532557

pkg/scheduler/cache/event_handlers.go

+95
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cache
1818

1919
import (
2020
"fmt"
21+
"k8s.io/api/scheduling/v1beta1"
2122
"reflect"
2223

2324
"github.com/golang/glog"
@@ -671,3 +672,97 @@ func (sc *SchedulerCache) deleteQueue(queue *kbv1.Queue) error {
671672

672673
return nil
673674
}
675+
676+
func (sc *SchedulerCache) DeletePriorityClass(obj interface{}) {
677+
var ss *v1beta1.PriorityClass
678+
switch t := obj.(type) {
679+
case *v1beta1.PriorityClass:
680+
ss = t
681+
case cache.DeletedFinalStateUnknown:
682+
var ok bool
683+
ss, ok = t.Obj.(*v1beta1.PriorityClass)
684+
if !ok {
685+
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t.Obj)
686+
return
687+
}
688+
default:
689+
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t)
690+
return
691+
}
692+
693+
sc.Mutex.Lock()
694+
defer sc.Mutex.Unlock()
695+
696+
sc.deletePriorityClass(ss)
697+
}
698+
699+
func (sc *SchedulerCache) UpdatePriorityClass(oldObj, newObj interface{}) {
700+
oldSS, ok := oldObj.(*v1beta1.PriorityClass)
701+
if !ok {
702+
glog.Errorf("Cannot convert oldObj to *v1beta1.PriorityClass: %v", oldObj)
703+
704+
return
705+
706+
}
707+
708+
newSS, ok := newObj.(*v1beta1.PriorityClass)
709+
if !ok {
710+
glog.Errorf("Cannot convert newObj to *v1beta1.PriorityClass: %v", newObj)
711+
712+
return
713+
714+
}
715+
716+
sc.Mutex.Lock()
717+
defer sc.Mutex.Unlock()
718+
719+
sc.deletePriorityClass(oldSS)
720+
sc.addPriorityClass(newSS)
721+
}
722+
723+
func (sc *SchedulerCache) AddPriorityClass(obj interface{}) {
724+
var ss *v1beta1.PriorityClass
725+
switch t := obj.(type) {
726+
case *v1beta1.PriorityClass:
727+
ss = t
728+
case cache.DeletedFinalStateUnknown:
729+
var ok bool
730+
ss, ok = t.Obj.(*v1beta1.PriorityClass)
731+
if !ok {
732+
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t.Obj)
733+
return
734+
}
735+
default:
736+
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t)
737+
return
738+
}
739+
740+
sc.Mutex.Lock()
741+
defer sc.Mutex.Unlock()
742+
743+
sc.addPriorityClass(ss)
744+
}
745+
746+
func (sc *SchedulerCache) deletePriorityClass(pc *v1beta1.PriorityClass) {
747+
if pc.GlobalDefault {
748+
sc.defaultPriorityClass = nil
749+
sc.defaultPriority = 0
750+
751+
}
752+
753+
delete(sc.PriorityClasses, pc.Name)
754+
}
755+
756+
func (sc *SchedulerCache) addPriorityClass(pc *v1beta1.PriorityClass) {
757+
if pc.GlobalDefault {
758+
if sc.defaultPriorityClass != nil {
759+
glog.Errorf("Updated default priority class from <%s> to <%s> forcefully.",
760+
sc.defaultPriorityClass.Name, pc.Name)
761+
762+
}
763+
sc.defaultPriorityClass = pc
764+
sc.defaultPriority = pc.Value
765+
}
766+
767+
sc.PriorityClasses[pc.Name] = pc
768+
}

0 commit comments

Comments
 (0)