Skip to content

Commit 3f7ae6c

Browse files
committed
Extended status check for reconciliation
1 parent e509604 commit 3f7ae6c

File tree

2 files changed

+111
-0
lines changed

2 files changed

+111
-0
lines changed

internal/controller/etcdcluster_controller.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type EtcdClusterReconciler struct {
4848
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
4949
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
5050
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
51+
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
5152
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
5253
// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
5354
// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch
@@ -72,6 +73,57 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
7273
return reconcile.Result{}, nil
7374
}
7475

76+
sts := appsv1.StatefulSet{}
77+
// create two services and the pdb, try fetching the sts
78+
{
79+
c := make(chan error)
80+
go func(chan<- error) {
81+
err := factory.CreateOrUpdateClientService(ctx, instance, r.Client)
82+
if err != nil {
83+
err = fmt.Errorf("couldn't ensure client service: %w", err)
84+
}
85+
c <- err
86+
}(c)
87+
go func(chan<- error) {
88+
err := factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client)
89+
if err != nil {
90+
err = fmt.Errorf("couldn't ensure headless service: %w", err)
91+
}
92+
c <- err
93+
}(c)
94+
go func(chan<- error) {
95+
err := factory.CreateOrUpdatePdb(ctx, instance, r.Client)
96+
if err != nil {
97+
err = fmt.Errorf("couldn't ensure pod disruption budget: %w", err)
98+
}
99+
c <- err
100+
}(c)
101+
go func(chan<- error) {
102+
err := r.Get(ctx, req.NamespacedName, &sts)
103+
if client.IgnoreNotFound(err) != nil {
104+
err = fmt.Errorf("couldn't get statefulset: %w", err)
105+
}
106+
c <- err
107+
}(c)
108+
for i := 0; i < 4; i++ {
109+
if err := <-c; err != nil {
110+
return ctrl.Result{}, err
111+
}
112+
}
113+
}
114+
/*
115+
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
116+
if err != nil {
117+
return ctrl.Result{}, err
118+
}
119+
if clusterClient == nil || singleClients == nil {
120+
// TODO: no endpoints case
121+
122+
}
123+
if sts.UID != "" {
124+
r.Patch()
125+
}
126+
*/
75127
// fill conditions
76128
if len(instance.Status.Conditions) == 0 {
77129
factory.FillConditions(instance)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package factory
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/aenix-io/etcd-operator/api/v1alpha1"
8+
clientv3 "go.etcd.io/etcd/client/v3"
9+
v1 "k8s.io/api/core/v1"
10+
"k8s.io/apimachinery/pkg/types"
11+
"sigs.k8s.io/controller-runtime/pkg/client"
12+
)
13+
14+
func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (*clientv3.Client, []*clientv3.Client, error) {
15+
cfg, err := configFromCluster(ctx, cluster, client)
16+
if err != nil {
17+
return nil, nil, err
18+
}
19+
if len(cfg.Endpoints) == 0 {
20+
return nil, nil, nil
21+
}
22+
eps := cfg.Endpoints
23+
clusterClient, err := clientv3.New(cfg)
24+
if err != nil {
25+
return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err)
26+
}
27+
singleClients := make([]*clientv3.Client, len(eps))
28+
for i, ep := range eps {
29+
cfg.Endpoints = []string{ep}
30+
singleClients[i], err = clientv3.New(cfg)
31+
if err != nil {
32+
return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err)
33+
}
34+
}
35+
return clusterClient, singleClients, nil
36+
}
37+
38+
func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (clientv3.Config, error) {
39+
ep := v1.Endpoints{}
40+
err := client.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
41+
if err != nil {
42+
return clientv3.Config{}, err
43+
}
44+
names := map[string]struct{}{}
45+
urls := make([]string, 0, 8)
46+
for _, v := range ep.Subsets {
47+
for _, addr := range v.Addresses {
48+
names[addr.Hostname] = struct{}{}
49+
}
50+
for _, addr := range v.NotReadyAddresses {
51+
names[addr.Hostname] = struct{}{}
52+
}
53+
}
54+
for name := range names {
55+
urls = append(urls, fmt.Sprintf("%s:%s", name, "2379"))
56+
}
57+
58+
return clientv3.Config{Endpoints: urls}, nil
59+
}

0 commit comments

Comments
 (0)