@@ -620,3 +620,104 @@ func getPodOfJob(ctx *context, jobName string) []*v1.Pod {
620
620
621
621
return qjpod
622
622
}
623
+
624
+ func createPodGroup (
625
+ context * context ,
626
+ jobName , name string ,
627
+ min int32 ,
628
+ ) {
629
+ jns , _ , _ := splictJobName (context , jobName )
630
+
631
+ pg := & arbv1.PodGroup {
632
+ ObjectMeta : metav1.ObjectMeta {
633
+ Name : name ,
634
+ Namespace : jns ,
635
+ },
636
+ Spec : arbv1.PodGroupSpec {
637
+ MinMember : min ,
638
+ },
639
+ }
640
+
641
+ pg , err := context .karclient .Scheduling ().PodGroups (jns ).Create (pg )
642
+ Expect (err ).NotTo (HaveOccurred ())
643
+ }
644
+
645
+ func createJobWithoutPodGroup (
646
+ context * context ,
647
+ name string ,
648
+ rep int32 ,
649
+ img string ,
650
+ req v1.ResourceList ,
651
+ pg string ,
652
+ ) * batchv1.Job {
653
+ containers := createContainers (img , req , 0 )
654
+ jns , jn , _ := splictJobName (context , name )
655
+
656
+ job := & batchv1.Job {
657
+ ObjectMeta : metav1.ObjectMeta {
658
+ Name : jn ,
659
+ Namespace : jns ,
660
+ },
661
+ Spec : batchv1.JobSpec {
662
+ Parallelism : & rep ,
663
+ Completions : & rep ,
664
+ Template : v1.PodTemplateSpec {
665
+ ObjectMeta : metav1.ObjectMeta {
666
+ Annotations : map [string ]string {arbv1 .GroupNameAnnotationKey : pg },
667
+ },
668
+ Spec : v1.PodSpec {
669
+ SchedulerName : "kube-batch" ,
670
+ RestartPolicy : v1 .RestartPolicyNever ,
671
+ Containers : containers ,
672
+ },
673
+ },
674
+ },
675
+ }
676
+
677
+ job , err := context .kubeclient .BatchV1 ().Jobs (job .Namespace ).Create (job )
678
+ Expect (err ).NotTo (HaveOccurred ())
679
+
680
+ return job
681
+ }
682
+
683
+ func waitJobReadyWithPodGroup (ctx * context , pgName string , jobNames ... string ) error {
684
+ return wait .Poll (100 * time .Millisecond , oneMinute , taskReadyWithPodGroup (ctx , - 1 , pgName , jobNames ))
685
+ }
686
+
687
+ func taskReadyWithPodGroup (ctx * context , taskNum int , pgName string , jobNames []string ) wait.ConditionFunc {
688
+
689
+ return func () (bool , error ) {
690
+ pg := & arbv1.PodGroup {}
691
+
692
+ readyTaskNum := 0
693
+ for _ , name := range jobNames {
694
+ jns , jn , _ := splictJobName (ctx , name )
695
+
696
+ queueJob , err := ctx .kubeclient .BatchV1 ().Jobs (jns ).Get (jn , metav1.GetOptions {})
697
+ Expect (err ).NotTo (HaveOccurred ())
698
+
699
+ pods , err := ctx .kubeclient .CoreV1 ().Pods (jns ).List (metav1.ListOptions {})
700
+ Expect (err ).NotTo (HaveOccurred ())
701
+
702
+ for _ , pod := range pods .Items {
703
+ labelSelector := labels .SelectorFromSet (queueJob .Spec .Selector .MatchLabels )
704
+ if ! labelSelector .Matches (labels .Set (pod .Labels )) ||
705
+ ! metav1 .IsControlledBy (& pod , queueJob ) {
706
+ continue
707
+ }
708
+ if pod .Status .Phase == v1 .PodRunning || pod .Status .Phase == v1 .PodSucceeded {
709
+ readyTaskNum ++
710
+ }
711
+ }
712
+
713
+ pg , err = ctx .karclient .Scheduling ().PodGroups (jns ).Get (pgName , metav1.GetOptions {})
714
+ Expect (err ).NotTo (HaveOccurred ())
715
+ }
716
+
717
+ if taskNum < 0 {
718
+ taskNum = int (pg .Spec .MinMember )
719
+ }
720
+
721
+ return taskNum <= readyTaskNum , nil
722
+ }
723
+ }
0 commit comments