Skip to content

Commit c1bd6db

Browse files
authored
Extended status check for reconciliation (#207)
2 parents 468c477 + f81648b commit c1bd6db

File tree

6 files changed

+286
-32
lines changed

6 files changed

+286
-32
lines changed

api/v1alpha1/etcdcluster_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type EtcdClusterSpec struct {
5555
const (
5656
EtcdConditionInitialized = "Initialized"
5757
EtcdConditionReady = "Ready"
58+
EtcdConditionError = "Error"
5859
)
5960

6061
type EtcdCondType string
@@ -66,6 +67,7 @@ const (
6667
EtcdCondTypeWaitingForFirstQuorum EtcdCondType = "WaitingForFirstQuorum"
6768
EtcdCondTypeStatefulSetReady EtcdCondType = "StatefulSetReady"
6869
EtcdCondTypeStatefulSetNotReady EtcdCondType = "StatefulSetNotReady"
70+
EtcdCondTypeSplitbrain EtcdCondType = "Splitbrain"
6971
)
7072

7173
const (
@@ -74,6 +76,7 @@ const (
7476
EtcdReadyCondNegMessage EtcdCondMessage = "Cluster StatefulSet is not Ready"
7577
EtcdReadyCondPosMessage EtcdCondMessage = "Cluster StatefulSet is Ready"
7678
EtcdReadyCondNegWaitingForQuorum EtcdCondMessage = "Waiting for first quorum to be established"
79+
EtcdErrorCondSplitbrainMessage EtcdCondMessage = "Etcd endpoints reporting more than one unique cluster ID"
7780
)
7881

7982
// EtcdClusterStatus defines the observed state of EtcdCluster

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ rules:
1616
- patch
1717
- update
1818
- watch
19+
- apiGroups:
20+
- ""
21+
resources:
22+
- endpoints
23+
verbs:
24+
- get
25+
- list
26+
- watch
1927
- apiGroups:
2028
- ""
2129
resources:

internal/controller/etcdcluster_controller.go

Lines changed: 118 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"slices"
2626
"strconv"
2727
"strings"
28+
"sync"
2829
"time"
2930

3031
"github.com/aenix-io/etcd-operator/internal/log"
@@ -47,6 +48,10 @@ import (
4748
clientv3 "go.etcd.io/etcd/client/v3"
4849
)
4950

51+
const (
52+
etcdDefaultTimeout = 5 * time.Second
53+
)
54+
5055
// EtcdClusterReconciler reconciles a EtcdCluster object
5156
type EtcdClusterReconciler struct {
5257
client.Client
@@ -56,6 +61,7 @@ type EtcdClusterReconciler struct {
5661
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
5762
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
5863
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
64+
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
5965
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
6066
// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
6167
// +kubebuilder:rbac:groups="",resources=secrets,verbs=view;list;watch
@@ -80,13 +86,68 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
8086
return reconcile.Result{}, nil
8187
}
8288

89+
state := observables{}
90+
91+
// create two services and the pdb
92+
err = r.ensureUnconditionalObjects(ctx, instance)
93+
if err != nil {
94+
return ctrl.Result{}, err
95+
}
96+
97+
// fetch STS if exists
98+
err = r.Get(ctx, req.NamespacedName, &state.statefulSet)
99+
if client.IgnoreNotFound(err) != nil {
100+
return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err)
101+
}
102+
state.stsExists = state.statefulSet.UID != ""
103+
104+
// fetch endpoints
105+
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
106+
if err != nil {
107+
return ctrl.Result{}, err
108+
}
109+
state.endpointsFound = clusterClient != nil && singleClients != nil
110+
111+
if !state.endpointsFound {
112+
if !state.stsExists {
113+
// TODO: happy path for new cluster creation
114+
log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
115+
}
116+
}
117+
118+
// get status of every endpoint and member list from every endpoint
119+
state.etcdStatuses = make([]etcdStatus, len(singleClients))
120+
{
121+
var wg sync.WaitGroup
122+
ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
123+
for i := range singleClients {
124+
wg.Add(1)
125+
go func(i int) {
126+
defer wg.Done()
127+
state.etcdStatuses[i].fill(ctx, singleClients[i])
128+
}(i)
129+
}
130+
wg.Wait()
131+
cancel()
132+
}
133+
state.setClusterID()
134+
if state.inSplitbrain() {
135+
log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue")
136+
factory.SetCondition(instance, factory.NewCondition(etcdaenixiov1alpha1.EtcdConditionError).
137+
WithStatus(true).
138+
WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeSplitbrain)).
139+
WithMessage(string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage)).
140+
Complete(),
141+
)
142+
return r.updateStatus(ctx, instance)
143+
}
83144
// fill conditions
84145
if len(instance.Status.Conditions) == 0 {
85146
factory.FillConditions(instance)
86147
}
87148

88149
// ensure managed resources
89-
if err = r.ensureClusterObjects(ctx, instance); err != nil {
150+
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
90151
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
91152
}
92153

@@ -138,8 +199,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
138199
return r.updateStatus(ctx, instance)
139200
}
140201

141-
// ensureClusterObjects creates or updates all objects owned by cluster CR
142-
func (r *EtcdClusterReconciler) ensureClusterObjects(
202+
// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
203+
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
143204
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
144205

145206
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
@@ -148,30 +209,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects(
148209
}
149210
log.Debug(ctx, "cluster state configmap reconciled")
150211

151-
if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil {
152-
log.Error(ctx, err, "reconcile headless service failed")
153-
return err
154-
}
155-
log.Debug(ctx, "headless service reconciled")
156-
157212
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
158213
log.Error(ctx, err, "reconcile statefulset failed")
159214
return err
160215
}
161216
log.Debug(ctx, "statefulset reconciled")
162217

163-
if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil {
164-
log.Error(ctx, err, "reconcile client service failed")
165-
return err
166-
}
167-
log.Debug(ctx, "client service reconciled")
168-
169-
if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil {
170-
log.Error(ctx, err, "reconcile pdb failed")
171-
return err
172-
}
173-
log.Debug(ctx, "pdb reconciled")
174-
175218
return nil
176219
}
177220

@@ -498,3 +541,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie
498541

499542
return nil
500543
}
544+
545+
// ensureUnconditionalObjects creates the two services and the PDB
546+
// which can be created at the start of the reconciliation loop
547+
// without any risk of disrupting the etcd cluster
548+
func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error {
549+
const concurrentOperations = 3
550+
c := make(chan error)
551+
defer close(c)
552+
ctx, cancel := context.WithCancel(ctx)
553+
defer cancel()
554+
var wg sync.WaitGroup
555+
wg.Add(concurrentOperations)
556+
wrapWithMsg := func(err error, msg string) error {
557+
if err != nil {
558+
return fmt.Errorf(msg+": %w", err)
559+
}
560+
return nil
561+
}
562+
go func(chan<- error) {
563+
defer wg.Done()
564+
select {
565+
case <-ctx.Done():
566+
case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client),
567+
"couldn't ensure client service"):
568+
}
569+
}(c)
570+
go func(chan<- error) {
571+
defer wg.Done()
572+
select {
573+
case <-ctx.Done():
574+
case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client),
575+
"couldn't ensure headless service"):
576+
}
577+
}(c)
578+
go func(chan<- error) {
579+
defer wg.Done()
580+
select {
581+
case <-ctx.Done():
582+
case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client),
583+
"couldn't ensure pod disruption budget"):
584+
}
585+
}(c)
586+
587+
for i := 0; i < concurrentOperations; i++ {
588+
if err := <-c; err != nil {
589+
cancel()
590+
591+
// let all goroutines select the ctx.Done() case to avoid races on closed channels
592+
wg.Wait()
593+
return err
594+
}
595+
}
596+
return nil
597+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package factory
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/aenix-io/etcd-operator/api/v1alpha1"
8+
clientv3 "go.etcd.io/etcd/client/v3"
9+
v1 "k8s.io/api/core/v1"
10+
"k8s.io/apimachinery/pkg/types"
11+
"sigs.k8s.io/controller-runtime/pkg/client"
12+
)
13+
14+
func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) {
15+
cfg, err := configFromCluster(ctx, cluster, cli)
16+
if err != nil {
17+
return nil, nil, err
18+
}
19+
if len(cfg.Endpoints) == 0 {
20+
return nil, nil, nil
21+
}
22+
eps := cfg.Endpoints
23+
clusterClient, err := clientv3.New(cfg)
24+
if err != nil {
25+
return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err)
26+
}
27+
singleClients := make([]*clientv3.Client, len(eps))
28+
for i, ep := range eps {
29+
cfg.Endpoints = []string{ep}
30+
singleClients[i], err = clientv3.New(cfg)
31+
if err != nil {
32+
return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err)
33+
}
34+
}
35+
return clusterClient, singleClients, nil
36+
}
37+
38+
func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) {
39+
ep := v1.Endpoints{}
40+
err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
41+
if client.IgnoreNotFound(err) != nil {
42+
return clientv3.Config{}, err
43+
}
44+
if err != nil {
45+
return clientv3.Config{Endpoints: []string{}}, nil
46+
}
47+
48+
names := map[string]struct{}{}
49+
urls := make([]string, 0, 8)
50+
for _, v := range ep.Subsets {
51+
for _, addr := range v.Addresses {
52+
names[addr.Hostname] = struct{}{}
53+
}
54+
for _, addr := range v.NotReadyAddresses {
55+
names[addr.Hostname] = struct{}{}
56+
}
57+
}
58+
for name := range names {
59+
urls = append(urls, fmt.Sprintf("%s:%s", name, "2379"))
60+
}
61+
62+
return clientv3.Config{Endpoints: urls}, nil
63+
}

internal/controller/observables.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
clientv3 "go.etcd.io/etcd/client/v3"
8+
appsv1 "k8s.io/api/apps/v1"
9+
corev1 "k8s.io/api/core/v1"
10+
)
11+
12+
// etcdStatus holds the details of the status that an etcd endpoint
13+
// can return about itself, i.e. its own status and its perceived
14+
// member list
15+
type etcdStatus struct {
16+
endpointStatus *clientv3.StatusResponse
17+
endpointStatusError error
18+
memberList *clientv3.MemberListResponse
19+
memberListError error
20+
}
21+
22+
// observables stores observations that the operator can make about
23+
// states of objects in kubernetes
24+
type observables struct {
25+
statefulSet appsv1.StatefulSet
26+
stsExists bool
27+
endpointsFound bool
28+
etcdStatuses []etcdStatus
29+
clusterID uint64
30+
_ int
31+
_ []corev1.PersistentVolumeClaim
32+
}
33+
34+
// setClusterID populates the clusterID field based on etcdStatuses
35+
func (o *observables) setClusterID() {
36+
for i := range o.etcdStatuses {
37+
if o.etcdStatuses[i].endpointStatus != nil {
38+
o.clusterID = o.etcdStatuses[i].endpointStatus.Header.ClusterId
39+
return
40+
}
41+
}
42+
}
43+
44+
// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses.
45+
// If more than one unique ID is reported, cluster is in splitbrain.
46+
func (o *observables) inSplitbrain() bool {
47+
for i := range o.etcdStatuses {
48+
if o.etcdStatuses[i].endpointStatus != nil {
49+
if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId {
50+
return true
51+
}
52+
}
53+
}
54+
return false
55+
}
56+
57+
// fill takes a single-endpoint client and populates the fields of etcdStatus
58+
// with the endpoint's status and its perceived member list.
59+
func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
60+
var wg sync.WaitGroup
61+
wg.Add(1)
62+
go func() {
63+
defer wg.Done()
64+
s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0])
65+
}()
66+
s.memberList, s.memberListError = c.MemberList(ctx)
67+
wg.Wait()
68+
}
69+
70+
// TODO: make a real function
71+
func (o *observables) _() int {
72+
if o.etcdStatuses != nil {
73+
for i := range o.etcdStatuses {
74+
if o.etcdStatuses[i].memberList != nil {
75+
return len(o.etcdStatuses[i].memberList.Members)
76+
}
77+
}
78+
}
79+
return 0
80+
}

0 commit comments

Comments
 (0)