Skip to content

Commit ce6596d

Browse files
authored
Merge pull request #12 from zmberg/fix-rbg-start-failed
Support for dynamic watch lws CRD
2 parents a0fbf84 + d580404 commit ce6596d

File tree

2 files changed

+51
-11
lines changed

2 files changed

+51
-11
lines changed

internal/controller/workloads/rolebasedgroup_controller.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@ package workloads
1919
import (
2020
"context"
2121
"fmt"
22-
apierrors "k8s.io/apimachinery/pkg/api/errors"
23-
"k8s.io/apimachinery/pkg/util/errors"
2422
"reflect"
25-
lwsv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"
26-
27-
"k8s.io/apimachinery/pkg/runtime/schema"
23+
"sync"
2824

2925
appsv1 "k8s.io/api/apps/v1"
3026
corev1 "k8s.io/api/core/v1"
27+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3128
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3229
"k8s.io/apimachinery/pkg/runtime"
30+
"k8s.io/apimachinery/pkg/runtime/schema"
3331
"k8s.io/apimachinery/pkg/types"
32+
"k8s.io/apimachinery/pkg/util/errors"
3433
"k8s.io/client-go/tools/record"
3534
"k8s.io/klog/v2"
3635
ctrl "sigs.k8s.io/controller-runtime"
@@ -40,12 +39,23 @@ import (
4039
"sigs.k8s.io/controller-runtime/pkg/event"
4140
"sigs.k8s.io/controller-runtime/pkg/log"
4241
"sigs.k8s.io/controller-runtime/pkg/predicate"
42+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
43+
lwsv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"
4344
workloadsv1alpha1 "sigs.k8s.io/rbgs/api/workloads/v1alpha1"
4445
"sigs.k8s.io/rbgs/pkg/dependency"
4546
"sigs.k8s.io/rbgs/pkg/reconciler"
4647
"sigs.k8s.io/rbgs/pkg/utils"
4748
)
4849

50+
var (
51+
runtimeController *builder.TypedBuilder[reconcile.Request]
52+
watchedWorkload sync.Map
53+
)
54+
55+
func init() {
56+
watchedWorkload = sync.Map{}
57+
}
58+
4959
// RoleBasedGroupReconciler reconciles a RoleBasedGroup object
5060
type RoleBasedGroupReconciler struct {
5161
client client.Client
@@ -94,6 +104,8 @@ func (r *RoleBasedGroupReconciler) Reconcile(ctx context.Context, req ctrl.Reque
94104
var roleStatuses []workloadsv1alpha1.RoleStatus
95105
var updateStatus bool
96106
for _, role := range sortedRoles {
107+
// first check whether watch lws cr
108+
dynamicWatchCustomCRD(ctx, role.Workload)
97109
// Check dependencies first
98110
ready, err := dependencyManager.CheckDependencyReady(ctx, rbg, role)
99111
if err != nil {
@@ -229,20 +241,21 @@ func (r *RoleBasedGroupReconciler) updateRBGStatus(ctx context.Context, rbg *wor
229241

230242
// SetupWithManager sets up the controller with the Manager.
231243
func (r *RoleBasedGroupReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
232-
controller := ctrl.NewControllerManagedBy(mgr).
244+
runtimeController = ctrl.NewControllerManagedBy(mgr).
233245
WithOptions(options).
234246
For(&workloadsv1alpha1.RoleBasedGroup{}, builder.WithPredicates(RBGPredicate())).
235247
Owns(&appsv1.StatefulSet{}, builder.WithPredicates(WorkloadPredicate())).
236248
Owns(&appsv1.Deployment{}, builder.WithPredicates(WorkloadPredicate())).
237249
Owns(&corev1.Service{}).
238250
Named("workloads-rolebasedgroup")
239251

240-
err := utils.CheckCrdExists(r.apiReader, "leaderworkersets.leaderworkerset.x-k8s.io")
252+
err := utils.CheckCrdExists(r.apiReader, reconciler.LwsCrdName)
241253
if err == nil {
242-
controller = controller.Owns(&lwsv1.LeaderWorkerSet{}, builder.WithPredicates(WorkloadPredicate()))
254+
watchedWorkload.LoadOrStore(reconciler.LwsCrdName, struct{}{})
255+
runtimeController.Owns(&lwsv1.LeaderWorkerSet{}, builder.WithPredicates(WorkloadPredicate()))
243256
}
244257

245-
return controller.Complete(r)
258+
return runtimeController.Complete(r)
246259
}
247260

248261
// CheckCrdExists checks if the specified Custom Resource Definition (CRD) exists in the Kubernetes cluster.
@@ -357,3 +370,20 @@ func hasValidOwnerRef(obj client.Object, targetGVK schema.GroupVersionKind) bool
357370
func getRbgGVK() schema.GroupVersionKind {
358371
return schema.FromAPIVersionAndKind(workloadsv1alpha1.GroupVersion.String(), "RoleBasedGroup")
359372
}
373+
374+
func getLwsGVK() schema.GroupVersionKind {
375+
return schema.FromAPIVersionAndKind(lwsv1.GroupVersion.String(), "LeaderWorkerSet")
376+
}
377+
378+
func dynamicWatchCustomCRD(ctx context.Context, workload workloadsv1alpha1.WorkloadSpec) {
379+
logger := log.FromContext(ctx)
380+
switch workload.Kind {
381+
case getLwsGVK().Kind:
382+
_, lwsExist := watchedWorkload.Load(reconciler.LwsCrdName)
383+
if !lwsExist {
384+
watchedWorkload.LoadOrStore(reconciler.LwsCrdName, struct{}{})
385+
runtimeController.Owns(&lwsv1.LeaderWorkerSet{}, builder.WithPredicates(WorkloadPredicate()))
386+
logger.Info("rbgs controller watch LeaderWorkerSet CRD")
387+
}
388+
}
389+
}

pkg/reconciler/lws_reconciler.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"reflect"
8+
"time"
9+
710
corev1 "k8s.io/api/core/v1"
811
apierrors "k8s.io/apimachinery/pkg/api/errors"
912
"k8s.io/apimachinery/pkg/runtime"
@@ -12,14 +15,16 @@ import (
1215
"k8s.io/apimachinery/pkg/util/wait"
1316
metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
1417
utilpointer "k8s.io/utils/pointer"
15-
"reflect"
1618
"sigs.k8s.io/controller-runtime/pkg/client"
1719
"sigs.k8s.io/controller-runtime/pkg/log"
1820
lwsv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"
1921
lwsapplyv1 "sigs.k8s.io/lws/client-go/applyconfiguration/leaderworkerset/v1"
2022
workloadsv1alpha1 "sigs.k8s.io/rbgs/api/workloads/v1alpha1"
2123
"sigs.k8s.io/rbgs/pkg/utils"
22-
"time"
24+
)
25+
26+
const (
27+
LwsCrdName = "leaderworkersets.leaderworkerset.x-k8s.io"
2328
)
2429

2530
type LeaderWorkerSetReconciler struct {
@@ -103,6 +108,11 @@ func (r *LeaderWorkerSetReconciler) CheckWorkloadReady(ctx context.Context, rbg
103108

104109
func (r *LeaderWorkerSetReconciler) CleanupOrphanedWorkloads(ctx context.Context, rbg *workloadsv1alpha1.RoleBasedGroup) error {
105110
logger := log.FromContext(ctx)
111+
err := utils.CheckCrdExists(r.client, LwsCrdName)
112+
if err != nil {
113+
logger.Info(fmt.Sprintf("LeaderWorkerSetReconciler CleanupOrphanedWorkloads check lws crd failed: %s", err.Error()))
114+
return nil
115+
}
106116
// list lws managed by rbg
107117
lwsList := &lwsv1.LeaderWorkerSetList{}
108118
if err := r.client.List(ctx, lwsList, client.InNamespace(rbg.Namespace),

0 commit comments

Comments
 (0)