Skip to content

Commit 58351d3

Browse files
committed
Simplify controller watches using EnqueueRequestsFromMapFunc
1 parent ed8ca88 commit 58351d3

File tree

9 files changed

+100
-209
lines changed

9 files changed

+100
-209
lines changed

internal/bridge/crunchybridgecluster/crunchybridgecluster_controller.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ import (
2020
"sigs.k8s.io/controller-runtime/pkg/client"
2121
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
2222
"sigs.k8s.io/controller-runtime/pkg/event"
23+
"sigs.k8s.io/controller-runtime/pkg/handler"
2324

2425
"github.com/crunchydata/postgres-operator/internal/bridge"
2526
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
26-
pgoRuntime "github.com/crunchydata/postgres-operator/internal/controller/runtime"
27+
"github.com/crunchydata/postgres-operator/internal/initialize"
2728
"github.com/crunchydata/postgres-operator/internal/naming"
2829
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
2930
)
@@ -54,15 +55,23 @@ func (r *CrunchyBridgeClusterReconciler) SetupWithManager(
5455
For(&v1beta1.CrunchyBridgeCluster{}).
5556
Owns(&corev1.Secret{}).
5657
// Wake periodically to check Bridge API for all CrunchyBridgeClusters.
57-
// Potentially replace with different requeue times, remove the Watch function
58-
// Smarter: retry after a certain time for each cluster: https://gist.github.com/cbandy/a5a604e3026630c5b08cfbcdfffd2a13
58+
// Potentially replace with different requeue times
59+
// Smarter: retry after a certain time for each cluster
5960
WatchesRawSource(
60-
pgoRuntime.NewTickerImmediate(5*time.Minute, event.GenericEvent{}, r.Watch()),
61+
runtime.NewTickerImmediate(5*time.Minute, event.GenericEvent{},
62+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []ctrl.Request {
63+
var list v1beta1.CrunchyBridgeClusterList
64+
_ = r.List(ctx, &list)
65+
return runtime.Requests(initialize.Pointers(list.Items...)...)
66+
}),
67+
),
6168
).
6269
// Watch secrets and filter for secrets mentioned by CrunchyBridgeClusters
6370
Watches(
6471
&corev1.Secret{},
65-
r.watchForRelatedSecret(),
72+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, secret client.Object) []ctrl.Request {
73+
return runtime.Requests(r.findCrunchyBridgeClustersForSecret(ctx, client.ObjectKeyFromObject(secret))...)
74+
}),
6675
).
6776
Complete(r)
6877
}

internal/bridge/crunchybridgecluster/watches.go

+1-65
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,11 @@ package crunchybridgecluster
77
import (
88
"context"
99

10-
"k8s.io/client-go/util/workqueue"
11-
ctrl "sigs.k8s.io/controller-runtime"
1210
"sigs.k8s.io/controller-runtime/pkg/client"
13-
"sigs.k8s.io/controller-runtime/pkg/event"
14-
"sigs.k8s.io/controller-runtime/pkg/handler"
15-
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1611

1712
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
1813
)
1914

20-
// watchForRelatedSecret handles create/update/delete events for secrets,
21-
// passing the Secret ObjectKey to findCrunchyBridgeClustersForSecret
22-
func (r *CrunchyBridgeClusterReconciler) watchForRelatedSecret() handler.EventHandler {
23-
handle := func(ctx context.Context, secret client.Object, q workqueue.RateLimitingInterface) {
24-
key := client.ObjectKeyFromObject(secret)
25-
26-
for _, cluster := range r.findCrunchyBridgeClustersForSecret(ctx, key) {
27-
q.Add(ctrl.Request{
28-
NamespacedName: client.ObjectKeyFromObject(cluster),
29-
})
30-
}
31-
}
32-
33-
return handler.Funcs{
34-
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
35-
handle(ctx, e.Object, q)
36-
},
37-
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
38-
handle(ctx, e.ObjectNew, q)
39-
},
40-
// If the secret is deleted, we want to reconcile
41-
// in order to emit an event/status about this problem.
42-
// We will also emit a matching event/status about this problem
43-
// when we reconcile the cluster and can't find the secret.
44-
// That way, users will get two alerts: one when the secret is deleted
45-
// and another when the cluster is being reconciled.
46-
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
47-
handle(ctx, e.Object, q)
48-
},
49-
}
50-
}
51-
5215
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list}
5316

5417
// findCrunchyBridgeClustersForSecret returns CrunchyBridgeClusters
@@ -60,7 +23,7 @@ func (r *CrunchyBridgeClusterReconciler) findCrunchyBridgeClustersForSecret(
6023
var clusters v1beta1.CrunchyBridgeClusterList
6124

6225
// NOTE: If this becomes slow due to a large number of CrunchyBridgeClusters in a single
63-
// namespace, we can configure the [ctrl.Manager] field indexer and pass a
26+
// namespace, we can configure the [manager.Manager] field indexer and pass a
6427
// [fields.Selector] here.
6528
// - https://book.kubebuilder.io/reference/watching-resources/externally-managed.html
6629
if err := r.List(ctx, &clusters, &client.ListOptions{
@@ -74,30 +37,3 @@ func (r *CrunchyBridgeClusterReconciler) findCrunchyBridgeClustersForSecret(
7437
}
7538
return matching
7639
}
77-
78-
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="crunchybridgeclusters",verbs={list}
79-
80-
// Watch enqueues all existing CrunchyBridgeClusters for reconciles.
81-
func (r *CrunchyBridgeClusterReconciler) Watch() handler.EventHandler {
82-
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, _ client.Object) []reconcile.Request {
83-
log := ctrl.LoggerFrom(ctx)
84-
85-
crunchyBridgeClusterList := &v1beta1.CrunchyBridgeClusterList{}
86-
if err := r.List(ctx, crunchyBridgeClusterList); err != nil {
87-
log.Error(err, "Error listing CrunchyBridgeClusters.")
88-
}
89-
90-
reconcileRequests := []reconcile.Request{}
91-
for index := range crunchyBridgeClusterList.Items {
92-
reconcileRequests = append(reconcileRequests,
93-
reconcile.Request{
94-
NamespacedName: client.ObjectKeyFromObject(
95-
&crunchyBridgeClusterList.Items[index],
96-
),
97-
},
98-
)
99-
}
100-
101-
return reconcileRequests
102-
})
103-
}

internal/controller/pgupgrade/pgupgrade_controller.go

+3-28
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ import (
1414
"k8s.io/apimachinery/pkg/api/meta"
1515
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616
"k8s.io/client-go/tools/record"
17-
"k8s.io/client-go/util/workqueue"
1817
ctrl "sigs.k8s.io/controller-runtime"
1918
"sigs.k8s.io/controller-runtime/pkg/client"
20-
"sigs.k8s.io/controller-runtime/pkg/event"
2119
"sigs.k8s.io/controller-runtime/pkg/handler"
2220

2321
"github.com/crunchydata/postgres-operator/internal/config"
@@ -50,7 +48,9 @@ func (r *PGUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error {
5048
Owns(&batchv1.Job{}).
5149
Watches(
5250
v1beta1.NewPostgresCluster(),
53-
r.watchPostgresClusters(),
51+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, cluster client.Object) []ctrl.Request {
52+
return runtime.Requests(r.findUpgradesForPostgresCluster(ctx, client.ObjectKeyFromObject(cluster))...)
53+
}),
5454
).
5555
Complete(r)
5656
}
@@ -80,31 +80,6 @@ func (r *PGUpgradeReconciler) findUpgradesForPostgresCluster(
8080
return matching
8181
}
8282

83-
// watchPostgresClusters returns a [handler.EventHandler] for PostgresClusters.
84-
func (r *PGUpgradeReconciler) watchPostgresClusters() handler.Funcs {
85-
handle := func(ctx context.Context, cluster client.Object, q workqueue.RateLimitingInterface) {
86-
key := client.ObjectKeyFromObject(cluster)
87-
88-
for _, upgrade := range r.findUpgradesForPostgresCluster(ctx, key) {
89-
q.Add(ctrl.Request{
90-
NamespacedName: client.ObjectKeyFromObject(upgrade),
91-
})
92-
}
93-
}
94-
95-
return handler.Funcs{
96-
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
97-
handle(ctx, e.Object, q)
98-
},
99-
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
100-
handle(ctx, e.ObjectNew, q)
101-
},
102-
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
103-
handle(ctx, e.Object, q)
104-
},
105-
}
106-
}
107-
10883
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgupgrades",verbs={get}
10984
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgupgrades/status",verbs={patch}
11085
//+kubebuilder:rbac:groups="batch",resources="jobs",verbs={delete}

internal/controller/runtime/reconcile.go

+12
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,21 @@ package runtime
77
import (
88
"time"
99

10+
"sigs.k8s.io/controller-runtime/pkg/client"
1011
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1112
)
1213

14+
// Requests converts objects to a slice of [reconcile.Request].
15+
func Requests[T client.Object](objects ...T) []reconcile.Request {
16+
result := make([]reconcile.Request, len(objects))
17+
for i := range objects {
18+
result[i] = reconcile.Request{
19+
NamespacedName: client.ObjectKeyFromObject(objects[i]),
20+
}
21+
}
22+
return result
23+
}
24+
1325
// ErrorWithBackoff returns a Result and error that indicate a non-nil err
1426
// should be logged and measured and its [reconcile.Request] should be retried
1527
// later. When err is nil, nothing is logged and the Request is not retried.

internal/controller/runtime/reconcile_test.go

+27
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,36 @@ import (
1010
"time"
1111

1212
"gotest.tools/v3/assert"
13+
"gotest.tools/v3/assert/cmp"
14+
corev1 "k8s.io/api/core/v1"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
1316
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1417
)
1518

19+
func TestRequests(t *testing.T) {
20+
none := Requests[client.Object]()
21+
assert.Assert(t, none != nil, "does not return nil slice")
22+
assert.DeepEqual(t, none, []reconcile.Request{})
23+
24+
assert.Assert(t, cmp.Panics(func() {
25+
Requests[client.Object](nil)
26+
}), "expected nil pointer dereference")
27+
28+
// Empty request when no metadata.
29+
assert.DeepEqual(t, Requests(new(corev1.Secret)), []reconcile.Request{{}})
30+
31+
secret := new(corev1.Secret)
32+
secret.Namespace = "asdf"
33+
34+
expected := reconcile.Request{}
35+
expected.Namespace = "asdf"
36+
assert.DeepEqual(t, Requests(secret), []reconcile.Request{expected})
37+
38+
secret.Name = "123"
39+
expected.Name = "123"
40+
assert.DeepEqual(t, Requests(secret), []reconcile.Request{expected})
41+
}
42+
1643
func TestErrorWithBackoff(t *testing.T) {
1744
result, err := ErrorWithBackoff(nil)
1845
assert.Assert(t, result.IsZero())

internal/controller/standalone_pgadmin/controller.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ import (
1616
ctrl "sigs.k8s.io/controller-runtime"
1717
"sigs.k8s.io/controller-runtime/pkg/client"
1818
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
19+
"sigs.k8s.io/controller-runtime/pkg/handler"
1920

20-
controllerruntime "github.com/crunchydata/postgres-operator/internal/controller/runtime"
21+
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
2122
"github.com/crunchydata/postgres-operator/internal/logging"
2223
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
2324
)
@@ -46,7 +47,7 @@ type PGAdminReconciler struct {
4647
func (r *PGAdminReconciler) SetupWithManager(mgr ctrl.Manager) error {
4748
if r.PodExec == nil {
4849
var err error
49-
r.PodExec, err = controllerruntime.NewPodExecutor(mgr.GetConfig())
50+
r.PodExec, err = runtime.NewPodExecutor(mgr.GetConfig())
5051
if err != nil {
5152
return err
5253
}
@@ -61,11 +62,15 @@ func (r *PGAdminReconciler) SetupWithManager(mgr ctrl.Manager) error {
6162
Owns(&corev1.Service{}).
6263
Watches(
6364
v1beta1.NewPostgresCluster(),
64-
r.watchPostgresClusters(),
65+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, cluster client.Object) []ctrl.Request {
66+
return runtime.Requests(r.findPGAdminsForPostgresCluster(ctx, cluster)...)
67+
}),
6568
).
6669
Watches(
6770
&corev1.Secret{},
68-
r.watchForRelatedSecret(),
71+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, secret client.Object) []ctrl.Request {
72+
return runtime.Requests(r.findPGAdminsForSecret(ctx, client.ObjectKeyFromObject(secret))...)
73+
}),
6974
).
7075
Complete(r)
7176
}

internal/controller/standalone_pgadmin/postgrescluster.go renamed to internal/controller/standalone_pgadmin/related.go

+34-5
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ func (r *PGAdminReconciler) findPGAdminsForPostgresCluster(
2727
)
2828

2929
// NOTE: If this becomes slow due to a large number of pgadmins in a single
30-
// namespace, we can configure the [ctrl.Manager] field indexer and pass a
30+
// namespace, we can configure the [manager.Manager] field indexer and pass a
3131
// [fields.Selector] here.
3232
// - https://book.kubebuilder.io/reference/watching-resources/externally-managed.html
33-
if r.List(ctx, &pgadmins, &client.ListOptions{
33+
if r.Client.List(ctx, &pgadmins, &client.ListOptions{
3434
Namespace: cluster.GetNamespace(),
3535
}) == nil {
3636
for i := range pgadmins.Items {
@@ -50,7 +50,36 @@ func (r *PGAdminReconciler) findPGAdminsForPostgresCluster(
5050
return matching
5151
}
5252

53-
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="postgresclusters",verbs={list,watch}
53+
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgadmins",verbs={list}
54+
55+
// findPGAdminsForSecret returns PGAdmins that have a user or users that have their password
56+
// stored in the Secret
57+
func (r *PGAdminReconciler) findPGAdminsForSecret(
58+
ctx context.Context, secret client.ObjectKey,
59+
) []*v1beta1.PGAdmin {
60+
var matching []*v1beta1.PGAdmin
61+
var pgadmins v1beta1.PGAdminList
62+
63+
// NOTE: If this becomes slow due to a large number of PGAdmins in a single
64+
// namespace, we can configure the [manager.Manager] field indexer and pass a
65+
// [fields.Selector] here.
66+
// - https://book.kubebuilder.io/reference/watching-resources/externally-managed.html
67+
if err := r.Client.List(ctx, &pgadmins, &client.ListOptions{
68+
Namespace: secret.Namespace,
69+
}); err == nil {
70+
for i := range pgadmins.Items {
71+
for j := range pgadmins.Items[i].Spec.Users {
72+
if pgadmins.Items[i].Spec.Users[j].PasswordRef.Name == secret.Name {
73+
matching = append(matching, &pgadmins.Items[i])
74+
break
75+
}
76+
}
77+
}
78+
}
79+
return matching
80+
}
81+
82+
//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="postgresclusters",verbs={get,list}
5483

5584
// getClustersForPGAdmin returns clusters managed by the given pgAdmin
5685
func (r *PGAdminReconciler) getClustersForPGAdmin(
@@ -64,7 +93,7 @@ func (r *PGAdminReconciler) getClustersForPGAdmin(
6493
for _, serverGroup := range pgAdmin.Spec.ServerGroups {
6594
var cluster v1beta1.PostgresCluster
6695
if serverGroup.PostgresClusterName != "" {
67-
err = r.Get(ctx, client.ObjectKey{
96+
err = r.Client.Get(ctx, client.ObjectKey{
6897
Name: serverGroup.PostgresClusterName,
6998
Namespace: pgAdmin.GetNamespace(),
7099
}, &cluster)
@@ -75,7 +104,7 @@ func (r *PGAdminReconciler) getClustersForPGAdmin(
75104
}
76105
if selector, err = naming.AsSelector(serverGroup.PostgresClusterSelector); err == nil {
77106
var list v1beta1.PostgresClusterList
78-
err = r.List(ctx, &list,
107+
err = r.Client.List(ctx, &list,
79108
client.InNamespace(pgAdmin.Namespace),
80109
client.MatchingLabelsSelector{Selector: selector},
81110
)

0 commit comments

Comments
 (0)