Skip to content

Commit c55a528

Browse files
Merge pull request #2645 from justinsb/opt_in_to_new_controller
feat: allow users to opt-in to direct reconciliation
2 parents e78019c + 2db88e0 commit c55a528

File tree

12 files changed

+1242
-50
lines changed

12 files changed

+1242
-50
lines changed

pkg/controller/dcl/controller.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
2828
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
2929
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
30-
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
30+
kccpredicate "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
3131
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
3232
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourceactuation"
3333
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
@@ -64,6 +64,7 @@ import (
6464
"sigs.k8s.io/controller-runtime/pkg/handler"
6565
"sigs.k8s.io/controller-runtime/pkg/log"
6666
"sigs.k8s.io/controller-runtime/pkg/manager"
67+
"sigs.k8s.io/controller-runtime/pkg/predicate"
6768
"sigs.k8s.io/controller-runtime/pkg/reconcile"
6869
"sigs.k8s.io/controller-runtime/pkg/source"
6970
)
@@ -98,10 +99,15 @@ type Reconciler struct {
9899
}
99100

100101
func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, converter *conversion.Converter,
101-
dclConfig *mmdcl.Config, serviceMappingLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator) (k8s.SchemaReferenceUpdater, error) {
102+
dclConfig *mmdcl.Config, serviceMappingLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator,
103+
additionalPredicate predicate.Predicate) (k8s.SchemaReferenceUpdater, error) {
102104
if jitterGenerator == nil {
103105
return nil, fmt.Errorf("jitter generator not initialized")
104106
}
107+
predicates := []predicate.Predicate{kccpredicate.UnderlyingResourceOutOfSyncPredicate{}}
108+
if additionalPredicate != nil {
109+
predicates = append(predicates, additionalPredicate)
110+
}
105111
kind := crd.Spec.Names.Kind
106112
apiVersion := k8s.GetAPIVersionFromCRD(crd)
107113
controllerName := fmt.Sprintf("%v-controller", strings.ToLower(kind))
@@ -122,7 +128,7 @@ func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, conve
122128
Named(controllerName).
123129
WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
124130
WatchesRawSource(&source.Channel{Source: immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
125-
For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
131+
For(obj, builder.OnlyMetadata, builder.WithPredicates(predicates...)).
126132
Build(r)
127133
if err != nil {
128134
return nil, fmt.Errorf("error creating new controller: %w", err)

pkg/controller/direct/monitoring/utils.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (w *visitorWalker) visitAny(path string, v reflect.Value) {
108108
}
109109

110110
switch v.Kind() {
111-
case reflect.Ptr:
111+
case reflect.Ptr, reflect.Interface:
112112
if v.IsNil() {
113113
return
114114
}

pkg/controller/direct/registry/registry.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,8 @@ func Init(ctx context.Context, config *config.ControllerConfig) error {
9292
}
9393

9494
func RegisterModel(gvk schema.GroupVersionKind, modelFn ModelFactoryFunc) {
95-
if singleton.registrations == nil {
96-
singleton.registrations = make(map[schema.GroupKind]*registration)
97-
}
98-
singleton.registrations[gvk.GroupKind()] = &registration{
99-
gvk: gvk,
100-
factory: modelFn,
101-
}
95+
rg := &predicate.OptInToDirectReconciliation{}
96+
RegisterModelWithReconcileGate(gvk, modelFn, rg)
10297
}
10398

10499
func RegisterModelWithReconcileGate(gvk schema.GroupVersionKind, modelFn ModelFactoryFunc, rg predicate.ReconcileGate) {

pkg/controller/direct/sql/sqlinstance_controller.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,16 @@ func init() {
4343
registry.RegisterModelWithReconcileGate(krm.SQLInstanceGVK, newSQLInstanceModel, rg)
4444
}
4545

46-
type SQLInstanceReconcileGate struct{}
46+
type SQLInstanceReconcileGate struct {
47+
optIn kccpredicate.OptInToDirectReconciliation
48+
}
4749

4850
var _ kccpredicate.ReconcileGate = &SQLInstanceReconcileGate{}
4951

50-
func (*SQLInstanceReconcileGate) ShouldReconcile(o *unstructured.Unstructured) bool {
52+
func (r *SQLInstanceReconcileGate) ShouldReconcile(o *unstructured.Unstructured) bool {
53+
if r.optIn.ShouldReconcile(o) {
54+
return true
55+
}
5156
obj := &krm.SQLInstance{}
5257
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(o.Object, &obj); err != nil {
5358
return false

pkg/controller/predicate/optin.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package predicate
16+
17+
import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
18+
19+
// AnnotationKeyAlphaReconciler allows customers to opt-in to using the direct reconciler.
20+
const AnnotationKeyAlphaReconciler = "alpha.cnrm.cloud.google.com/reconciler"
21+
22+
// OptInToDirectReconciliation allows users to opt in to direct reconciliation
23+
// by specifying an AnnotationKeyAlphaReconciler annotation.
24+
type OptInToDirectReconciliation struct {
25+
}
26+
27+
var _ ReconcileGate = &OptInToDirectReconciliation{}
28+
29+
// ShouldReconcile returns true if the reconciler should be used to for the resource.
30+
func (r *OptInToDirectReconciliation) ShouldReconcile(o *unstructured.Unstructured) bool {
31+
v := o.GetAnnotations()[AnnotationKeyAlphaReconciler]
32+
return v == "direct"
33+
}

pkg/controller/registration/registration_controller.go

+52-34
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"sigs.k8s.io/controller-runtime/pkg/handler"
5353
crlog "sigs.k8s.io/controller-runtime/pkg/log"
5454
"sigs.k8s.io/controller-runtime/pkg/manager"
55+
"sigs.k8s.io/controller-runtime/pkg/predicate"
5556
"sigs.k8s.io/controller-runtime/pkg/reconcile"
5657
"sigs.k8s.io/controller-runtime/pkg/source"
5758
)
@@ -238,55 +239,72 @@ func registerDefaultController(r *ReconcileRegistration, config *config.Controll
238239
}
239240
}
240241

241-
// register controllers for dcl-based CRDs
242-
if val, ok := crd.Labels[k8s.DCL2CRDLabel]; ok && val == "true" {
243-
su, err := dclcontroller.Add(r.mgr, crd, r.dclConverter, r.dclConfig, r.smLoader, r.defaulters, r.jitterGenerator)
244-
if err != nil {
245-
return nil, fmt.Errorf("error adding dcl controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
242+
hasDirectController := registry.IsDirectByGK(gvk.GroupKind())
243+
hasTerraformController := crd.Labels[crdgeneration.TF2CRDLabel] == "true"
244+
hasDCLController := crd.Labels[k8s.DCL2CRDLabel] == "true"
245+
246+
var useDirectReconcilerPredicate predicate.Predicate
247+
var useLegacyPredicate predicate.Predicate
248+
249+
// If we have a choice of controllers, construct predicates to choose between them
250+
if hasDirectController && (hasTerraformController || hasDCLController) {
251+
reconcileGate := registry.GetReconcileGate(gvk.GroupKind())
252+
if reconcileGate != nil {
253+
// If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will
254+
// run the direct reconciler only when the reconcile gate returns true.
255+
useDirectReconcilerPredicate = kccpredicate.NewReconcilePredicate(r.mgr.GetClient(), gvk, reconcileGate)
256+
useLegacyPredicate = kccpredicate.NewInverseReconcilePredicate(r.mgr.GetClient(), gvk, reconcileGate)
246257
}
247-
return su, nil
248-
}
249-
// register controllers for tf-based CRDs
250-
if val, ok := crd.Labels[crdgeneration.TF2CRDLabel]; ok && val == "true" {
251-
su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, nil)
252-
if err != nil {
253-
return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
258+
259+
if !hasTerraformController && !hasDCLController {
260+
// We're always going to use the direct reconciler
261+
useDirectReconcilerPredicate = nil
262+
useLegacyPredicate = nil
263+
}
264+
265+
if (hasTerraformController || hasDCLController) && useDirectReconcilerPredicate == nil {
266+
logger.Error(fmt.Errorf("no predicate where we have multiple controllers"), "skipping direct controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind)
267+
hasDirectController = false
254268
}
255-
return su, nil
256269
}
270+
257271
// register controllers for direct CRDs
258-
if registry.IsDirectByGK(gvk.GroupKind()) {
272+
if hasDirectController {
259273
model, err := registry.GetModel(gvk.GroupKind())
260274
if err != nil {
261275
return nil, err
262276
}
263277
deps := directbase.Deps{
264-
JitterGenerator: r.jitterGenerator,
265-
}
266-
rg := registry.GetReconcileGate(gvk.GroupKind())
267-
if rg != nil {
268-
// If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will
269-
// run the direct reconciler only when the reconcile gate returns true.
270-
rp := kccpredicate.NewReconcilePredicate(r.mgr.GetClient(), gvk, rg)
271-
deps.ReconcilePredicate = rp
278+
JitterGenerator: r.jitterGenerator,
279+
ReconcilePredicate: useDirectReconcilerPredicate,
272280
}
273281
if err := directbase.AddController(r.mgr, gvk, model, deps); err != nil {
274282
return nil, fmt.Errorf("error adding direct controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
275283
}
276-
if rg != nil {
277-
// If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will
278-
// run the terraform-based reconciler when the reconcile gate returns false.
279-
irp := kccpredicate.NewInverseReconcilePredicate(r.mgr.GetClient(), gvk, rg)
280-
su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, irp)
281-
if err != nil {
282-
return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
283-
}
284-
return su, nil
284+
}
285+
286+
// register controllers for dcl-based CRDs
287+
if hasDCLController {
288+
su, err := dclcontroller.Add(r.mgr, crd, r.dclConverter, r.dclConfig, r.smLoader, r.defaulters, r.jitterGenerator, useLegacyPredicate)
289+
if err != nil {
290+
return nil, fmt.Errorf("error adding dcl controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
285291
}
286-
return schemaUpdater, nil
292+
return su, nil
287293
}
288-
logger.Error(fmt.Errorf("unrecognized CRD: %v", crd.Spec.Names.Kind), "skipping controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind)
289-
return nil, nil
294+
// register controllers for tf-based CRDs
295+
if hasTerraformController {
296+
su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, useLegacyPredicate)
297+
if err != nil {
298+
return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
299+
}
300+
return su, nil
301+
}
302+
303+
if !hasDCLController && !hasTerraformController && !hasDirectController {
304+
logger.Error(fmt.Errorf("unrecognized CRD: %v", crd.Spec.Names.Kind), "skipping controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind)
305+
return nil, nil
306+
}
307+
290308
}
291309
return schemaUpdater, nil
292310
}

pkg/controller/tf/controller.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type Reconciler struct {
8080
resourceWatcherRoutines *semaphore.Weighted // Used to cap number of goroutines watching unready dependencies
8181
}
8282

83-
func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, irp predicate.Predicate) (k8s.SchemaReferenceUpdater, error) {
83+
func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, additionalPredicate predicate.Predicate) (k8s.SchemaReferenceUpdater, error) {
8484
kind := crd.Spec.Names.Kind
8585
apiVersion := k8s.GetAPIVersionFromCRD(crd)
8686
controllerName := fmt.Sprintf("%v-controller", strings.ToLower(kind))
@@ -97,8 +97,8 @@ func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provi
9797
},
9898
}
9999
predicateList := []predicate.Predicate{kccpredicate.UnderlyingResourceOutOfSyncPredicate{}}
100-
if irp != nil {
101-
predicateList = append(predicateList, irp)
100+
if additionalPredicate != nil {
101+
predicateList = append(predicateList, additionalPredicate)
102102
}
103103
_, err = builder.
104104
ControllerManagedBy(mgr).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
apiVersion: dataflow.cnrm.cloud.google.com/v1beta1
2+
kind: DataflowFlexTemplateJob
3+
metadata:
4+
annotations:
5+
alpha.cnrm.cloud.google.com/reconciler: direct
6+
cnrm.cloud.google.com/management-conflict-prevention-policy: none
7+
cnrm.cloud.google.com/on-delete: cancel
8+
cnrm.cloud.google.com/project-id: ${projectId}
9+
finalizers:
10+
- cnrm.cloud.google.com/finalizer
11+
- cnrm.cloud.google.com/deletion-defender
12+
generation: 1
13+
labels:
14+
cnrm-test: "true"
15+
name: dataflowflextemplatejob-${uniqueId}
16+
namespace: ${uniqueId}
17+
spec:
18+
containerSpecGcsPath: gs://dataflow-templates/2022-10-03-00_RC00/flex/File_Format_Conversion
19+
parameters:
20+
inputFileFormat: csv
21+
inputFileSpec: gs://config-connector-samples/dataflowflextemplate/numbertest.csv
22+
outputBucket: gs://storagebucket-${uniqueId}
23+
outputFileFormat: avro
24+
schema: gs://config-connector-samples/dataflowflextemplate/numbers.avsc
25+
region: us-central1
26+
status:
27+
conditions:
28+
- lastTransitionTime: "1970-01-01T00:00:00Z"
29+
message: The resource is up to date
30+
reason: UpToDate
31+
status: "True"
32+
type: Ready
33+
jobId: ${jobID}
34+
observedGeneration: 1

0 commit comments

Comments
 (0)