Skip to content

Commit cbe4f8a

Browse files
authored
Expose job controller's workqueue rate limiting configs (#674)
* Expose controller workqueue config via options Signed-off-by: Rotem Elad <[email protected]> * Fix double hyphen typo Signed-off-by: Rotem Elad <[email protected]> * Generate Signed-off-by: Rotem Elad <[email protected]> --------- Signed-off-by: Rotem Elad <[email protected]>
1 parent c50eb45 commit cbe4f8a

File tree

6 files changed

+40
-15
lines changed

6 files changed

+40
-15
lines changed

ADOPTERS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ This page contains a list of organizations who are using MPI Operator. If you'd
1919
| [PITS Global Data Recovery Services](https://www.pitsdatarecovery.net/) | [Benjamin Trudeau](https://github.com/benjx1990) |
2020
| [Polyaxon](https://polyaxon.com/) | [Mourad Mourafiq](https://github.com/mouradmourafiq) |
2121
| [Qutoutiao](https://www.qutoutiao.net/) | [Zhaojing Yu](https://github.com/yuzhaojing) |
22+
| [Run:AI](https://www.run.ai/) | [Rotem Elad](https://github.com/roteme-runai) |
2223
| [Tencent](http://tencent.com/en-us/) | [Lei Xue](https://github.com/carmark) |

cmd/mpi-operator/app/options/options.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,18 @@ const (
2828

2929
// ServerOption is the main context object for the controller manager.
3030
type ServerOption struct {
31-
Kubeconfig string
32-
MasterURL string
33-
Threadiness int
34-
MonitoringPort int
35-
PrintVersion bool
36-
GangSchedulingName string
37-
Namespace string
38-
LockNamespace string
39-
QPS int
40-
Burst int
31+
Kubeconfig string
32+
MasterURL string
33+
Threadiness int
34+
MonitoringPort int
35+
PrintVersion bool
36+
GangSchedulingName string
37+
Namespace string
38+
LockNamespace string
39+
QPS int
40+
Burst int
41+
ControllerRateLimit int
42+
ControllerBurst int
4143
}
4244

4345
// NewServerOption creates a new CMServer with a default config.
@@ -75,4 +77,7 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
7577

7678
fs.IntVar(&s.QPS, "kube-api-qps", 5, "QPS indicates the maximum QPS to the master from this client.")
7779
fs.IntVar(&s.Burst, "kube-api-burst", 10, "Maximum burst for throttle.")
80+
81+
fs.IntVar(&s.ControllerRateLimit, "controller-queue-rate-limit", 10, "Rate limit of the controller events queue .")
82+
fs.IntVar(&s.ControllerBurst, "controller-queue-burst", 100, "Maximum burst of the controller events queue.")
7883
}

cmd/mpi-operator/app/server.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/prometheus/client_golang/prometheus"
2525
"github.com/prometheus/client_golang/prometheus/promauto"
26+
"golang.org/x/time/rate"
2627
corev1 "k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/api/errors"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -38,6 +39,7 @@ import (
3839
election "k8s.io/client-go/tools/leaderelection"
3940
"k8s.io/client-go/tools/leaderelection/resourcelock"
4041
"k8s.io/client-go/tools/record"
42+
"k8s.io/client-go/util/workqueue"
4143
"k8s.io/klog"
4244
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
4345
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
@@ -67,6 +69,9 @@ var (
6769
// allowed for timeout. Checks within the timeout period after the lease
6870
// expires will still return healthy.
6971
leaderHealthzAdaptorTimeout = time.Second * 20
72+
//exponential workqueue rate limiting config
73+
workqueueExponentialBaseDelay = 5 * time.Millisecond
74+
workqueueExponentialMaxDelay = 1000 * time.Second
7075
)
7176

7277
var (
@@ -141,6 +146,11 @@ func Run(opt *options.ServerOption) error {
141146
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
142147
kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...)
143148

149+
workqueueRateLimiter := workqueue.NewTypedMaxOfRateLimiter(
150+
workqueue.NewTypedItemExponentialFailureRateLimiter[any](workqueueExponentialBaseDelay, workqueueExponentialMaxDelay),
151+
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(opt.ControllerRateLimit), opt.ControllerBurst)},
152+
)
153+
144154
controller, err := controllersv1.NewMPIJobController(
145155
kubeClient,
146156
mpiJobClientSet,
@@ -153,7 +163,8 @@ func Run(opt *options.ServerOption) error {
153163
kubeInformerFactory.Core().V1().Pods(),
154164
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
155165
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(),
156-
namespace, opt.GangSchedulingName)
166+
namespace, opt.GangSchedulingName,
167+
workqueueRateLimiter)
157168
if err != nil {
158169
klog.Fatalf("Failed to setup the controller")
159170
}

pkg/controller/mpi_job_controller.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,11 @@ func NewMPIJobController(
269269
podInformer coreinformers.PodInformer,
270270
priorityClassInformer schedulinginformers.PriorityClassInformer,
271271
mpiJobInformer informers.MPIJobInformer,
272-
namespace, gangSchedulingName string) (*MPIJobController, error) {
272+
namespace, gangSchedulingName string,
273+
workqueueRateLimiter workqueue.TypedRateLimiter[any]) (*MPIJobController, error) {
273274
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient,
274275
configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer,
275-
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName)
276+
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName, workqueueRateLimiter)
276277
}
277278

278279
// NewMPIJobControllerWithClock returns a new MPIJob controller.
@@ -289,7 +290,8 @@ func NewMPIJobControllerWithClock(
289290
priorityClassInformer schedulinginformers.PriorityClassInformer,
290291
mpiJobInformer informers.MPIJobInformer,
291292
clock clock.WithTicker,
292-
namespace, gangSchedulingName string) (*MPIJobController, error) {
293+
namespace, gangSchedulingName string,
294+
workqueueRateLimiter workqueue.TypedRateLimiter[any]) (*MPIJobController, error) {
293295

294296
// Create event broadcaster.
295297
klog.V(4).Info("Creating event broadcaster")
@@ -336,7 +338,7 @@ func NewMPIJobControllerWithClock(
336338
priorityClassSynced: priorityClassSynced,
337339
mpiJobLister: mpiJobInformer.Lister(),
338340
mpiJobSynced: mpiJobInformer.Informer().HasSynced,
339-
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "MPIJob"}),
341+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueueRateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: "MPIJob"}),
340342
recorder: recorder,
341343
clock: clock,
342344
}

pkg/controller/mpi_job_controller_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
core "k8s.io/client-go/testing"
3636
"k8s.io/client-go/tools/cache"
3737
"k8s.io/client-go/tools/record"
38+
"k8s.io/client-go/util/workqueue"
3839
"k8s.io/utils/clock"
3940
clocktesting "k8s.io/utils/clock/testing"
4041
"k8s.io/utils/ptr"
@@ -163,6 +164,7 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
163164
f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...)
164165
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
165166
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc())
167+
workqueueRateLimiter := workqueue.DefaultTypedControllerRateLimiter[any]()
166168

167169
c, err := NewMPIJobControllerWithClock(
168170
f.kubeClient,
@@ -179,6 +181,7 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
179181
clock,
180182
metav1.NamespaceAll,
181183
f.gangSchedulingName,
184+
workqueueRateLimiter,
182185
)
183186
if err != nil {
184187
fmt.Println("Failed to setup the controller")

test/integration/mpi_job_controller_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
kubeinformers "k8s.io/client-go/informers"
3333
"k8s.io/client-go/kubernetes"
3434
"k8s.io/client-go/tools/reference"
35+
"k8s.io/client-go/util/workqueue"
3536
"k8s.io/utils/ptr"
3637
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
3738
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
@@ -909,6 +910,7 @@ func startController(
909910
) {
910911
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kClient, 0)
911912
mpiInformerFactory := informers.NewSharedInformerFactory(mpiClient, 0)
913+
workqueueRateLimiter := workqueue.DefaultTypedControllerRateLimiter[any]()
912914
var (
913915
volcanoClient volcanoclient.Interface
914916
schedClient schedclientset.Interface
@@ -935,6 +937,7 @@ func startController(
935937
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
936938
mpiInformerFactory.Kubeflow().V2beta1().MPIJobs(),
937939
metav1.NamespaceAll, schedulerName,
940+
workqueueRateLimiter,
938941
)
939942
if err != nil {
940943
panic(err)

0 commit comments

Comments
 (0)