Skip to content
This repository was archived by the owner on Apr 25, 2023. It is now read-only.

Commit 2628b60

Browse files
authored
Merge pull request #1052 from marun/vary-apiversion
Support varying the apiVersion of target resources
2 parents bc86147 + 0b2eb69 commit 2628b60

14 files changed

+164
-124
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
# Unreleased
2+
- [#1052](https://github.com/kubernetes-sigs/kubefed/pull/1052)
3+
Support has been added for varying the `apiVersion` of target
4+
resources. This is intended to allow a federated type to manage
5+
more than one version of the target type across member clusters.
6+
`apiVersion` can be set either in the template of a federated
7+
resource or via override.
28
- [#951](https://github.com/kubernetes-sigs/kubefed/issues/951)
39
Propagation status for a namespaced federated resource whose
410
containing namespace is not federated now indicates an unhealthy

pkg/client/generic/genericclient.go

+6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Client interface {
3434
Delete(ctx context.Context, obj runtime.Object, namespace, name string) error
3535
List(ctx context.Context, obj runtime.Object, namespace string) error
3636
UpdateStatus(ctx context.Context, obj runtime.Object) error
37+
38+
ListWithOptions(ctx context.Context, opts *client.ListOptions, obj runtime.Object) error
3739
}
3840

3941
type genericClient struct {
@@ -85,6 +87,10 @@ func (c *genericClient) List(ctx context.Context, obj runtime.Object, namespace
8587
return c.client.List(ctx, &client.ListOptions{Namespace: namespace}, obj)
8688
}
8789

90+
func (c *genericClient) ListWithOptions(ctx context.Context, opts *client.ListOptions, obj runtime.Object) error {
91+
return c.client.List(ctx, opts, obj)
92+
}
93+
8894
func (c *genericClient) UpdateStatus(ctx context.Context, obj runtime.Object) error {
8995
return c.client.Status().Update(ctx, obj)
9096
}

pkg/controller/sync/controller.go

+15-13
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3131
pkgruntime "k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/runtime/schema"
3233
"k8s.io/apimachinery/pkg/util/runtime"
3334
"k8s.io/apimachinery/pkg/util/sets"
3435
"k8s.io/apimachinery/pkg/util/wait"
@@ -246,11 +247,12 @@ func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util
246247
return util.StatusError
247248
}
248249
if possibleOrphan {
249-
targetKind := s.typeConfig.GetTargetType().Kind
250-
klog.V(2).Infof("Ensuring the removal of the label %q from %s %q in member clusters.", util.ManagedByKubeFedLabelKey, targetKind, qualifiedName)
251-
err = s.removeManagedLabel(targetKind, qualifiedName)
250+
apiResource := s.typeConfig.GetTargetType()
251+
gvk := apiResourceToGVK(&apiResource)
252+
klog.V(2).Infof("Ensuring the removal of the label %q from %s %q in member clusters.", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
253+
err = s.removeManagedLabel(gvk, qualifiedName)
252254
if err != nil {
253-
wrappedErr := errors.Wrapf(err, "failed to remove the label %q from %s %q in member clusters", util.ManagedByKubeFedLabelKey, targetKind, qualifiedName)
255+
wrappedErr := errors.Wrapf(err, "failed to remove the label %q from %s %q in member clusters", util.ManagedByKubeFedLabelKey, gvk.Kind, qualifiedName)
254256
runtime.HandleError(wrappedErr)
255257
return util.StatusError
256258
}
@@ -453,7 +455,7 @@ func (s *KubeFedSyncController) ensureDeletion(fedResource FederatedResource) ut
453455
return util.StatusError
454456
}
455457
klog.V(2).Infof("Initiating the removal of the label %q from resources previously managed by %s %q.", util.ManagedByKubeFedLabelKey, kind, key)
456-
err = s.removeManagedLabel(fedResource.TargetKind(), fedResource.TargetName())
458+
err = s.removeManagedLabel(fedResource.TargetGVK(), fedResource.TargetName())
457459
if err != nil {
458460
wrappedErr := errors.Wrapf(err, "failed to remove the label %q from all resources previously managed by %s %q", util.ManagedByKubeFedLabelKey, kind, key)
459461
runtime.HandleError(wrappedErr)
@@ -477,8 +479,8 @@ func (s *KubeFedSyncController) ensureDeletion(fedResource FederatedResource) ut
477479

478480
// removeManagedLabel attempts to remove the managed label from
479481
// resources with the given name in member clusters.
480-
func (s *KubeFedSyncController) removeManagedLabel(kind string, qualifiedName util.QualifiedName) error {
481-
ok, err := s.handleDeletionInClusters(kind, qualifiedName, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
482+
func (s *KubeFedSyncController) removeManagedLabel(gvk schema.GroupVersionKind, qualifiedName util.QualifiedName) error {
483+
ok, err := s.handleDeletionInClusters(gvk, qualifiedName, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
482484
if clusterObj.GetDeletionTimestamp() != nil {
483485
return
484486
}
@@ -495,11 +497,11 @@ func (s *KubeFedSyncController) removeManagedLabel(kind string, qualifiedName ut
495497
}
496498

497499
func (s *KubeFedSyncController) deleteFromClusters(fedResource FederatedResource) (bool, error) {
498-
kind := fedResource.TargetKind()
500+
gvk := fedResource.TargetGVK()
499501
qualifiedName := fedResource.TargetName()
500502

501503
remainingClusters := []string{}
502-
ok, err := s.handleDeletionInClusters(kind, qualifiedName, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
504+
ok, err := s.handleDeletionInClusters(gvk, qualifiedName, func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured) {
503505
// If the containing namespace of a FederatedNamespace is
504506
// marked for deletion, it is impossible to require the
505507
// removal of the namespace in advance of removal of the sync
@@ -557,7 +559,7 @@ func (s *KubeFedSyncController) ensureRemovedOrUnmanaged(fedResource FederatedRe
557559
return errors.Wrap(err, "failed to get a list of clusters")
558560
}
559561

560-
dispatcher := dispatch.NewCheckUnmanagedDispatcher(s.informer.GetClientForCluster, fedResource.TargetKind(), fedResource.TargetName())
562+
dispatcher := dispatch.NewCheckUnmanagedDispatcher(s.informer.GetClientForCluster, fedResource.TargetGVK(), fedResource.TargetName())
561563
unreadyClusters := []string{}
562564
for _, cluster := range clusters {
563565
if !util.IsClusterReady(&cluster.Status) {
@@ -581,15 +583,15 @@ func (s *KubeFedSyncController) ensureRemovedOrUnmanaged(fedResource FederatedRe
581583

582584
// handleDeletionInClusters invokes the provided deletion handler for
583585
// each managed resource in member clusters.
584-
func (s *KubeFedSyncController) handleDeletionInClusters(kind string, qualifiedName util.QualifiedName,
586+
func (s *KubeFedSyncController) handleDeletionInClusters(gvk schema.GroupVersionKind, qualifiedName util.QualifiedName,
585587
deletionFunc func(dispatcher dispatch.UnmanagedDispatcher, clusterName string, clusterObj *unstructured.Unstructured)) (bool, error) {
586588

587589
clusters, err := s.informer.GetClusters()
588590
if err != nil {
589591
return false, errors.Wrap(err, "failed to get a list of clusters")
590592
}
591593

592-
dispatcher := dispatch.NewUnmanagedDispatcher(s.informer.GetClientForCluster, kind, qualifiedName)
594+
dispatcher := dispatch.NewUnmanagedDispatcher(s.informer.GetClientForCluster, gvk, qualifiedName)
593595
key := qualifiedName.String()
594596
retrievalFailureClusters := []string{}
595597
unreadyClusters := []string{}
@@ -603,7 +605,7 @@ func (s *KubeFedSyncController) handleDeletionInClusters(kind string, qualifiedN
603605

604606
rawClusterObj, _, err := s.informer.GetTargetStore().GetByKey(clusterName, key)
605607
if err != nil {
606-
wrappedErr := errors.Wrapf(err, "failed to retrieve %s %q for cluster %q", kind, key, clusterName)
608+
wrappedErr := errors.Wrapf(err, "failed to retrieve %s %q for cluster %q", gvk.Kind, key, clusterName)
607609
runtime.HandleError(wrappedErr)
608610
retrievalFailureClusters = append(retrievalFailureClusters, clusterName)
609611
continue

pkg/controller/sync/dispatch/checkunmanaged.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@ limitations under the License.
1717
package dispatch
1818

1919
import (
20+
"context"
21+
2022
"github.com/pkg/errors"
2123

2224
apierrors "k8s.io/apimachinery/pkg/api/errors"
23-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2426
pkgruntime "k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
2528
"k8s.io/apimachinery/pkg/util/runtime"
2629
"k8s.io/klog"
2730

31+
"sigs.k8s.io/kubefed/pkg/client/generic"
2832
"sigs.k8s.io/kubefed/pkg/controller/util"
2933
)
3034

@@ -39,16 +43,16 @@ type CheckUnmanagedDispatcher interface {
3943
type checkUnmanagedDispatcherImpl struct {
4044
dispatcher *operationDispatcherImpl
4145

46+
targetGVK schema.GroupVersionKind
4247
targetName util.QualifiedName
43-
targetKind string
4448
}
4549

46-
func NewCheckUnmanagedDispatcher(clientAccessor clientAccessorFunc, targetKind string, targetName util.QualifiedName) CheckUnmanagedDispatcher {
50+
func NewCheckUnmanagedDispatcher(clientAccessor clientAccessorFunc, targetGVK schema.GroupVersionKind, targetName util.QualifiedName) CheckUnmanagedDispatcher {
4751
dispatcher := newOperationDispatcher(clientAccessor, nil)
4852
return &checkUnmanagedDispatcherImpl{
4953
dispatcher: dispatcher,
54+
targetGVK: targetGVK,
5055
targetName: targetName,
51-
targetKind: targetKind,
5256
}
5357
}
5458

@@ -63,10 +67,12 @@ func (d *checkUnmanagedDispatcherImpl) CheckRemovedOrUnlabeled(clusterName strin
6367
d.dispatcher.incrementOperationsInitiated()
6468
const op = "check for deletion of resource or removal of managed label from"
6569
const opContinuous = "Checking for deletion of resource or removal of managed label from"
66-
go d.dispatcher.clusterOperation(clusterName, op, func(client util.ResourceClient) util.ReconciliationStatus {
67-
klog.V(2).Infof(eventTemplate, opContinuous, d.targetKind, d.targetName, clusterName)
70+
go d.dispatcher.clusterOperation(clusterName, op, func(client generic.Client) util.ReconciliationStatus {
71+
klog.V(2).Infof(eventTemplate, opContinuous, d.targetGVK.Kind, d.targetName, clusterName)
6872

69-
clusterObj, err := client.Resources(d.targetName.Namespace).Get(d.targetName.Name, metav1.GetOptions{})
73+
clusterObj := &unstructured.Unstructured{}
74+
clusterObj.SetGroupVersionKind(d.targetGVK)
75+
err := client.Get(context.Background(), clusterObj, d.targetName.Namespace, d.targetName.Name)
7076
if apierrors.IsNotFound(err) {
7177
return util.StatusAllOK
7278
}
@@ -95,5 +101,5 @@ func (d *checkUnmanagedDispatcherImpl) CheckRemovedOrUnlabeled(clusterName strin
95101
}
96102

97103
func (d *checkUnmanagedDispatcherImpl) wrapOperationError(err error, clusterName, operation string) error {
98-
return wrapOperationError(err, operation, d.targetKind, d.targetName.String(), clusterName)
104+
return wrapOperationError(err, operation, d.targetGVK.Kind, d.targetName.String(), clusterName)
99105
}

pkg/controller/sync/dispatch/managed.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@ limitations under the License.
1717
package dispatch
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"strings"
2223
"sync"
2324

2425
"github.com/pkg/errors"
2526

2627
apierrors "k8s.io/apimachinery/pkg/api/errors"
27-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2929
pkgruntime "k8s.io/apimachinery/pkg/runtime"
30+
"k8s.io/apimachinery/pkg/runtime/schema"
3031
"k8s.io/apimachinery/pkg/util/sets"
3132

33+
"sigs.k8s.io/kubefed/pkg/client/generic"
3234
"sigs.k8s.io/kubefed/pkg/controller/sync/status"
3335
"sigs.k8s.io/kubefed/pkg/controller/util"
3436
)
@@ -38,6 +40,7 @@ import (
3840
type FederatedResourceForDispatch interface {
3941
TargetName() util.QualifiedName
4042
TargetKind() string
43+
TargetGVK() schema.GroupVersionKind
4144
Object() *unstructured.Unstructured
4245
VersionForCluster(clusterName string) (string, error)
4346
ObjectForCluster(clusterName string) (*unstructured.Unstructured, error)
@@ -80,7 +83,7 @@ func NewManagedDispatcher(clientAccessor clientAccessorFunc, fedResource Federat
8083
skipAdoptingResources: skipAdoptingResources,
8184
}
8285
d.dispatcher = newOperationDispatcher(clientAccessor, d)
83-
d.unmanagedDispatcher = newUnmanagedDispatcher(d.dispatcher, d, fedResource.TargetKind(), fedResource.TargetName())
86+
d.unmanagedDispatcher = newUnmanagedDispatcher(d.dispatcher, d, fedResource.TargetGVK(), fedResource.TargetName())
8487
return d
8588
}
8689

@@ -126,7 +129,7 @@ func (d *managedDispatcherImpl) Create(clusterName string) {
126129

127130
d.dispatcher.incrementOperationsInitiated()
128131
const op = "create"
129-
go d.dispatcher.clusterOperation(clusterName, op, func(client util.ResourceClient) util.ReconciliationStatus {
132+
go d.dispatcher.clusterOperation(clusterName, op, func(client generic.Client) util.ReconciliationStatus {
130133
d.recordEvent(clusterName, op, "Creating")
131134

132135
obj, err := d.fedResource.ObjectForCluster(clusterName)
@@ -139,9 +142,9 @@ func (d *managedDispatcherImpl) Create(clusterName string) {
139142
return d.recordOperationError(status.ApplyOverridesFailed, clusterName, op, err)
140143
}
141144

142-
createdObj, err := client.Resources(obj.GetNamespace()).Create(obj, metav1.CreateOptions{})
145+
err = client.Create(context.Background(), obj)
143146
if err == nil {
144-
version := util.ObjectVersion(createdObj)
147+
version := util.ObjectVersion(obj)
145148
d.recordVersion(clusterName, version)
146149
return util.StatusAllOK
147150
}
@@ -155,19 +158,19 @@ func (d *managedDispatcherImpl) Create(clusterName string) {
155158

156159
// Attempt to update the existing resource to ensure that it
157160
// is labeled as a managed resource.
158-
clusterObj, err := client.Resources(obj.GetNamespace()).Get(obj.GetName(), metav1.GetOptions{})
161+
err = client.Get(context.Background(), obj, obj.GetNamespace(), obj.GetName())
159162
if err != nil {
160163
wrappedErr := errors.Wrapf(err, "failed to retrieve object potentially requiring adoption")
161164
return d.recordOperationError(status.RetrievalFailed, clusterName, op, wrappedErr)
162165
}
163166

164-
if d.skipAdoptingResources && !d.fedResource.IsNamespaceInHostCluster(clusterObj) {
167+
if d.skipAdoptingResources && !d.fedResource.IsNamespaceInHostCluster(obj) {
165168
_ = d.recordOperationError(status.AlreadyExists, clusterName, op, errors.Errorf("Resource pre-exist in cluster"))
166169
return util.StatusAllOK
167170
}
168171

169172
d.recordError(clusterName, op, errors.Errorf("An update will be attempted instead of a creation due to an existing resource"))
170-
d.Update(clusterName, clusterObj)
173+
d.Update(clusterName, obj)
171174
return util.StatusAllOK
172175
})
173176
}
@@ -177,7 +180,7 @@ func (d *managedDispatcherImpl) Update(clusterName string, clusterObj *unstructu
177180

178181
d.dispatcher.incrementOperationsInitiated()
179182
const op = "update"
180-
go d.dispatcher.clusterOperation(clusterName, op, func(client util.ResourceClient) util.ReconciliationStatus {
183+
go d.dispatcher.clusterOperation(clusterName, op, func(client generic.Client) util.ReconciliationStatus {
181184
obj, err := d.fedResource.ObjectForCluster(clusterName)
182185
if err != nil {
183186
return d.recordOperationError(status.ComputeResourceFailed, clusterName, op, err)
@@ -206,11 +209,11 @@ func (d *managedDispatcherImpl) Update(clusterName string, clusterObj *unstructu
206209
// Only record an event if the resource is not current
207210
d.recordEvent(clusterName, op, "Updating")
208211

209-
updatedObj, err := client.Resources(obj.GetNamespace()).Update(obj, metav1.UpdateOptions{})
212+
err = client.Update(context.Background(), obj)
210213
if err != nil {
211214
return d.recordOperationError(status.UpdateFailed, clusterName, op, err)
212215
}
213-
version = util.ObjectVersion(updatedObj)
216+
version = util.ObjectVersion(obj)
214217
d.recordVersion(clusterName, version)
215218
return util.StatusAllOK
216219
})

pkg/controller/sync/dispatch/operation.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ import (
2424

2525
"k8s.io/apimachinery/pkg/util/runtime"
2626

27+
"sigs.k8s.io/kubefed/pkg/client/generic"
2728
"sigs.k8s.io/kubefed/pkg/controller/sync/status"
2829
"sigs.k8s.io/kubefed/pkg/controller/util"
2930
)
3031

31-
type clientAccessorFunc func(clusterName string) (util.ResourceClient, error)
32+
type clientAccessorFunc func(clusterName string) (generic.Client, error)
3233

3334
type dispatchRecorder interface {
3435
recordEvent(clusterName, operation, operationContinuous string)
@@ -91,9 +92,8 @@ func (d *operationDispatcherImpl) Wait() (bool, error) {
9192
return ok, nil
9293
}
9394

94-
func (d *operationDispatcherImpl) clusterOperation(clusterName, op string, opFunc func(util.ResourceClient) util.ReconciliationStatus) {
95-
// TODO(marun) Update to generic client and support cancellation
96-
// on timeout.
95+
func (d *operationDispatcherImpl) clusterOperation(clusterName, op string, opFunc func(generic.Client) util.ReconciliationStatus) {
96+
// TODO(marun) Support cancellation of client calls on timeout.
9797
client, err := d.clientAccessor(clusterName)
9898
if err != nil {
9999
wrappedErr := errors.Wrapf(err, "Error retrieving client for cluster")

0 commit comments

Comments
 (0)