Skip to content

Commit 09ec01d

Browse files
xulinfei1996xulinfei.xlf
andauthored
scheduler: coscheduling plugin sync scheduled in controller (#2032)
Signed-off-by: xulinfei.xlf <[email protected]> Co-authored-by: xulinfei.xlf <[email protected]>
1 parent b6e7bc0 commit 09ec01d

File tree

6 files changed

+67
-153
lines changed

6 files changed

+67
-153
lines changed

pkg/scheduler/plugins/coscheduling/controller/podgroup.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -245,33 +245,38 @@ func (ctrl *PodGroupController) syncHandler(key string) error {
245245
fillOccupiedObj(pgCopy, pods[0])
246246
}
247247
default:
248+
if len(pods) == 0 {
249+
pgCopy.Status.Phase = schedv1alpha1.PodGroupPending
250+
break
251+
}
252+
248253
var (
249254
running int32 = 0
250255
succeeded int32 = 0
251256
failed int32 = 0
252257
)
253-
if len(pods) != 0 {
254-
for _, pod := range pods {
255-
switch pod.Status.Phase {
256-
case v1.PodRunning:
257-
running++
258-
case v1.PodSucceeded:
259-
succeeded++
260-
case v1.PodFailed:
261-
failed++
262-
}
258+
259+
for _, pod := range pods {
260+
switch pod.Status.Phase {
261+
case v1.PodRunning:
262+
running++
263+
case v1.PodSucceeded:
264+
succeeded++
265+
case v1.PodFailed:
266+
failed++
263267
}
264268
}
265269
pgCopy.Status.Failed = failed
266270
pgCopy.Status.Succeeded = succeeded
267271
pgCopy.Status.Running = running
272+
pgCopy.Status.Scheduled = ctrl.pgManager.GetBoundPodNumber(util.GetId(pg.Namespace, pg.Name))
268273

269-
if len(pods) == 0 {
270-
pgCopy.Status.Phase = schedv1alpha1.PodGroupPending
271-
break
272-
}
273-
274-
if pgCopy.Status.Scheduled >= pgCopy.Spec.MinMember && pgCopy.Status.Phase == schedv1alpha1.PodGroupScheduling {
274+
if pgCopy.Status.Scheduled < pgCopy.Spec.MinMember {
275+
pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduling
276+
if pgCopy.Status.ScheduleStartTime.IsZero() {
277+
pgCopy.Status.ScheduleStartTime = metav1.Time{Time: time.Now()}
278+
}
279+
} else {
275280
pgCopy.Status.Phase = schedv1alpha1.PodGroupScheduled
276281
}
277282

pkg/scheduler/plugins/coscheduling/controller/podgroup_test.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/stretchr/testify/assert"
2728
v1 "k8s.io/api/core/v1"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/runtime"
2931
"k8s.io/apimachinery/pkg/util/wait"
3032
"k8s.io/client-go/informers"
3133
"k8s.io/client-go/kubernetes/fake"
@@ -159,6 +161,15 @@ func Test_Run(t *testing.T) {
159161
previousPhase: v1alpha1.PodGroupRunning,
160162
desiredGroupPhase: v1alpha1.PodGroupPending,
161163
},
164+
{
165+
name: "Group status convert from scheduling to scheduled, bugCase",
166+
pgName: "pg11",
167+
minMember: 4,
168+
podNames: []string{"pod11-1", "pod11-2", "pod11-3", "pod11-4"},
169+
podPhase: v1.PodRunning,
170+
previousPhase: v1alpha1.PodGroupScheduling,
171+
desiredGroupPhase: v1alpha1.PodGroupRunning,
172+
},
162173
}
163174
for _, c := range cases {
164175
t.Run(c.name, func(t *testing.T) {
@@ -179,6 +190,9 @@ func Test_Run(t *testing.T) {
179190
if pg.Status.Phase != c.desiredGroupPhase {
180191
return false, fmt.Errorf("want %v, got %v", c.desiredGroupPhase, pg.Status.Phase)
181192
}
193+
if c.name == "Group status convert from scheduling to scheduled, bugCase" {
194+
assert.Equal(t, int32(4), pg.Status.Scheduled)
195+
}
182196
return true, nil
183197
})
184198
if err != nil {
@@ -268,10 +282,16 @@ func setUp(ctx context.Context, podNames []string, pgName string, podPhase v1.Po
268282
if len(podNames) == 0 {
269283
kubeClient = fake.NewSimpleClientset()
270284
} else {
271-
ps := makePods(podNames, pgName, podPhase, podOwnerReference)
272-
kubeClient = fake.NewSimpleClientset(ps[0], ps[1])
285+
var objs []runtime.Object
286+
for _, pod := range makePods(podNames, pgName, podPhase, podOwnerReference) {
287+
objs = append(objs, pod)
288+
}
289+
kubeClient = fake.NewSimpleClientset(objs...)
273290
}
274291
pg := makePG(pgName, minMember, groupPhase, podGroupCreateTime)
292+
if pg.Name == "pg11" {
293+
pg.Status.Scheduled = 3
294+
}
275295
pgClient := pgfake.NewSimpleClientset(pg)
276296

277297
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
@@ -297,6 +317,7 @@ func makePods(podNames []string, pgName string, phase v1.PodPhase, reference []m
297317
if reference != nil && len(reference) != 0 {
298318
pod.OwnerReferences = reference
299319
}
320+
pod.Spec.NodeName = "test"
300321
pds = append(pds, pod)
301322
}
302323
return pds

pkg/scheduler/plugins/coscheduling/core/core.go

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type Manager interface {
7676
IsGangMinSatisfied(*corev1.Pod) bool
7777
GetChildScheduleCycle(*corev1.Pod) int
7878
GetLastScheduleTime(*corev1.Pod, time.Time) time.Time
79+
GetBoundPodNumber(gangId string) int32
7980
}
8081

8182
// PodGroupManager defines the scheduling operation called
@@ -447,47 +448,8 @@ func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nod
447448
klog.Warningf("Pod %q missing Gang", klog.KObj(pod))
448449
return
449450
}
450-
// first update gang in cache
451+
// update gang in cache
451452
gang.addBoundPod(pod)
452-
453-
// update PodGroup
454-
_, pg := pgMgr.GetPodGroup(pod)
455-
if pg == nil {
456-
return
457-
}
458-
pgCopy := pg.DeepCopy()
459-
460-
pgCopy.Status.Scheduled = int32(gang.getBoundPodNum())
461-
462-
if pgCopy.Status.Scheduled >= pgCopy.Spec.MinMember {
463-
pgCopy.Status.Phase = v1alpha1.PodGroupScheduled
464-
klog.InfoS("PostBind has got enough bound child for gang", "gang", gang.Name, "pod", klog.KObj(pod))
465-
} else {
466-
pgCopy.Status.Phase = v1alpha1.PodGroupScheduling
467-
klog.InfoS("PostBind has not got enough bound child for gang", "gang", gang.Name, "pod", klog.KObj(pod))
468-
if pgCopy.Status.ScheduleStartTime.IsZero() {
469-
pgCopy.Status.ScheduleStartTime = metav1.Time{Time: time.Now()}
470-
}
471-
}
472-
if pgCopy.Status.Phase != pg.Status.Phase {
473-
pg, err := pgMgr.pgLister.PodGroups(pgCopy.Namespace).Get(pgCopy.Name)
474-
if err != nil {
475-
klog.ErrorS(err, "PosFilter failed to get PodGroup", "podGroup", klog.KObj(pgCopy))
476-
return
477-
}
478-
patch, err := util.CreateMergePatch(pg, pgCopy)
479-
if err != nil {
480-
klog.ErrorS(err, "PostFilter failed to create merge patch", "podGroup", klog.KObj(pg), "podGroup", klog.KObj(pgCopy))
481-
return
482-
}
483-
if err := pgMgr.PatchPodGroup(pg.Name, pg.Namespace, patch); err != nil {
484-
klog.ErrorS(err, "PostFilter Failed to patch", "podGroup", klog.KObj(pg))
485-
return
486-
} else {
487-
klog.InfoS("PostFilter success to patch podGroup", "podGroup", klog.KObj(pgCopy))
488-
}
489-
}
490-
491453
}
492454

493455
func (pgMgr *PodGroupManager) GetCreatTime(podInfo *framework.QueuedPodInfo) time.Time {
@@ -600,3 +562,11 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int {
600562

601563
return gang.getChildScheduleCycle(pod)
602564
}
565+
566+
func (pgMgr *PodGroupManager) GetBoundPodNumber(gangId string) int32 {
567+
gang := pgMgr.cache.getGangFromCacheByGangId(gangId, false)
568+
if gang == nil {
569+
return 0
570+
}
571+
return gang.getBoundPodNum()
572+
}

pkg/scheduler/plugins/coscheduling/core/core_test.go

Lines changed: 5 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@ import (
2424

2525
"github.com/stretchr/testify/assert"
2626
corev1 "k8s.io/api/core/v1"
27-
"k8s.io/apimachinery/pkg/api/errors"
2827
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2928
"k8s.io/client-go/informers"
3029
clientsetfake "k8s.io/client-go/kubernetes/fake"
31-
"k8s.io/client-go/util/retry"
3230
"k8s.io/kubernetes/pkg/scheduler/framework"
3331
st "k8s.io/kubernetes/pkg/scheduler/testing"
3432

@@ -485,7 +483,7 @@ func TestPermit(t *testing.T) {
485483
st.MakePod().Name("pod3-1").UID("pod3-1").Namespace("gangA_ns").Label(v1alpha1.PodGroupLabel, "gangA").Obj(),
486484
},
487485
runningPods: []*corev1.Pod{
488-
st.MakePod().Name("pod3-2").UID("pod3-2").Namespace("gangA_ns").Label(v1alpha1.PodGroupLabel, "gangA").Obj(),
486+
st.MakePod().Name("pod3-2").UID("pod3-2").Namespace("gangA_ns").Label(v1alpha1.PodGroupLabel, "gangA").Node("n1").Obj(),
489487
},
490488
pgs: []*v1alpha1.PodGroup{makePg("gangA", "gangA_ns", 3, &gangACreatedTime, nil)},
491489
onceSatisfy: true,
@@ -500,7 +498,7 @@ func TestPermit(t *testing.T) {
500498
st.MakePod().Name("pod3-1").UID("pod3-1").Namespace("gangA_ns").Label(v1alpha1.PodGroupLabel, "gangA").Obj(),
501499
},
502500
runningPods: []*corev1.Pod{
503-
st.MakePod().Name("pod3-2").UID("pod3-2").Namespace("gangA_ns").Label(v1alpha1.PodGroupLabel, "gangA").Obj(),
501+
st.MakePod().Name("pod3-2").UID("pod3-2").Namespace("gangA_ns").Label(v1alpha1.PodGroupLabel, "gangA").Node("n1").Obj(),
504502
},
505503
pgs: []*v1alpha1.PodGroup{makePg("gangA", "gangA_ns", 3, &gangACreatedTime, nil)},
506504
onceSatisfy: true,
@@ -584,99 +582,13 @@ func TestPermit(t *testing.T) {
584582
mgr.cache.onPodAdd(pod)
585583
mgr.PostBind(ctx, pod, "tmp")
586584
}
585+
if len(tt.runningPods) != 0 {
586+
assert.Equal(t, int32(len(tt.runningPods)), mgr.GetBoundPodNumber(util.GetId(tt.runningPods[0].Namespace, util.GetGangNameByPod(tt.runningPods[0]))))
587+
}
587588
mgr.cache.onPodAdd(tt.pod)
588589
timeout, status := mgr.Permit(ctx, tt.pod)
589590
assert.Equal(t, tt.wantWaittime, timeout)
590591
assert.Equal(t, tt.wantStatus, status)
591592
})
592593
}
593594
}
594-
595-
// Unreserve also tested in the Coscheduling_test
596-
597-
func TestPostBind(t *testing.T) {
598-
tests := []struct {
599-
name string
600-
pod *corev1.Pod
601-
pg *v1alpha1.PodGroup
602-
desiredGroupPhase v1alpha1.PodGroupPhase
603-
desiredScheduled int32
604-
// case
605-
originalScheduled int
606-
phase v1alpha1.PodGroupPhase
607-
annotation map[string]string
608-
}{
609-
{
610-
name: "pg status convert to scheduled",
611-
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg").Obj(),
612-
pg: makePg("pg", "ns1", 1, nil, nil),
613-
desiredGroupPhase: v1alpha1.PodGroupScheduled,
614-
desiredScheduled: 1,
615-
},
616-
{
617-
name: "pg status convert to scheduling",
618-
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(),
619-
pg: makePg("pg1", "ns1", 2, nil, nil),
620-
desiredGroupPhase: v1alpha1.PodGroupScheduling,
621-
desiredScheduled: 1,
622-
},
623-
{
624-
name: "pg status does not convert, although scheduled pods change",
625-
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(),
626-
pg: makePg("pg2", "ns1", 3, nil, nil),
627-
desiredGroupPhase: v1alpha1.PodGroupScheduling,
628-
desiredScheduled: 1,
629-
phase: v1alpha1.PodGroupScheduling,
630-
originalScheduled: 1,
631-
},
632-
}
633-
for _, tt := range tests {
634-
t.Run(tt.name, func(t *testing.T) {
635-
bigMgr := NewManagerForTest()
636-
mgr, pginforme := bigMgr.pgMgr, bigMgr.pgInformer
637-
// pg create
638-
if tt.annotation != nil {
639-
if tt.pod.Annotations == nil {
640-
tt.pod.Annotations = map[string]string{}
641-
}
642-
tt.pod.Annotations = tt.annotation
643-
mgr.cache.onPodAdd(tt.pod)
644-
}
645-
if tt.pg != nil {
646-
err := retry.OnError(
647-
retry.DefaultRetry,
648-
errors.IsTooManyRequests,
649-
func() error {
650-
var err error
651-
_, err = mgr.pgClient.SchedulingV1alpha1().PodGroups(tt.pg.Namespace).Create(context.TODO(), tt.pg, metav1.CreateOptions{})
652-
return err
653-
})
654-
if err != nil {
655-
t.Errorf("pgclient create pg err: %v", err)
656-
}
657-
pginforme.Informer().GetStore().Add(tt.pg)
658-
mgr.cache.onPodGroupAdd(tt.pg)
659-
}
660-
ctx := context.TODO()
661-
// create pods
662-
mgr.cache.onPodAdd(tt.pod)
663-
mgr.PostBind(ctx, tt.pod, "test")
664-
// get the pg cr
665-
var pg *v1alpha1.PodGroup
666-
err := retry.OnError(
667-
retry.DefaultRetry,
668-
errors.IsTooManyRequests,
669-
func() error {
670-
var err error
671-
pg, err = mgr.pgClient.SchedulingV1alpha1().PodGroups(tt.pod.Namespace).Get(context.TODO(), util.GetGangNameByPod(tt.pod), metav1.GetOptions{})
672-
return err
673-
})
674-
if err != nil {
675-
t.Errorf("pgclient get pg err: %v", err)
676-
}
677-
assert.Equal(t, tt.desiredGroupPhase, pg.Status.Phase)
678-
assert.Equal(t, tt.desiredScheduled, pg.Status.Scheduled)
679-
})
680-
}
681-
682-
}

pkg/scheduler/plugins/coscheduling/core/gang.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,10 @@ func (gang *Gang) getGangTotalNum() int {
292292
return gang.TotalChildrenNum
293293
}
294294

295-
func (gang *Gang) getBoundPodNum() int {
295+
func (gang *Gang) getBoundPodNum() int32 {
296296
gang.lock.Lock()
297297
defer gang.lock.Unlock()
298-
return len(gang.BoundChildren)
298+
return int32(len(gang.BoundChildren))
299299
}
300300

301301
func (gang *Gang) getGangMode() string {

pkg/scheduler/plugins/coscheduling/core/gang_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ func TestGangGroupInfo_SetGangGroupInfo(t *testing.T) {
3030
gang.TotalChildrenNum = 2
3131
gang.SetGangGroupInfo(gangGroupInfo)
3232
assert.Equal(t, gang.GangGroupInfo.GangTotalChildrenNumMap["aa"], 2)
33+
34+
gang.BoundChildren = map[string]*corev1.Pod{
35+
"pod1": {},
36+
"pod2": {},
37+
}
38+
assert.Equal(t, int32(2), gang.getBoundPodNum())
3339
}
3440

3541
func TestDeletePod(t *testing.T) {

0 commit comments

Comments
 (0)