Skip to content

Commit 6e9a036

Browse files
authored
Add iteration support to etcd (#52199)
Introduces a new `Items(context.Context, backend.IterateParams) iter.Seq2[backend.Item, error]` to the etcd backend. The core logic for this function was moved from GetRange, and GetRange now defers to calling Items and collecting a slice of items. The motivation for this change is to attempt to reduce the complexity of pagination for consumers of GetRange. While backend.IterateRange and backend.StreamRange do already exists to serve the same purpose, they too have suffered from pagination bugs in the past. Additionally, this aims to unify iteration to using the iter package instead of the various homegrown iteration mechanisms that exist throughout the code base. There is also one distinct difference to the iteration api: it allows retrieving items within the specified range in ascending or descending key order. While there may not be many use cases for this today, exposing this in the api from it's inception is easier than adding functional options, or ItemsAscending/ItemsDescending later.
1 parent ba65676 commit 6e9a036

File tree

3 files changed

+334
-26
lines changed

3 files changed

+334
-26
lines changed

lib/backend/backend.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"context"
2424
"fmt"
2525
"io"
26+
"iter"
2627
"sort"
2728
"time"
2829

@@ -113,6 +114,34 @@ type Backend interface {
113114
CloseWatchers()
114115
}
115116

117+
// IterateParams are parameters that are provided to
118+
// [BackendWithItems.Items] to alter the iteration behavior.
119+
type IterateParams struct {
120+
// StartKey is the minimum key in the range yielded by the iteration. This key
121+
// will be included in the results if it exists.
122+
StartKey Key
123+
// EndKey is the maximum key in the range yielded by the iteration. This key
124+
// will be included in the results if it exists.
125+
EndKey Key
126+
// Descending makes the iteration yield items from the biggest to the smallest
127+
// key (i.e. from EndKey to StartKey). If unset, the iteration will proceed in the
128+
// usual ascending order (i.e. from StartKey to EndKey).
129+
Descending bool
130+
// Limit is an optional maximum number of items to retrieve during iteration.
131+
Limit int
132+
}
133+
134+
// BackendWithItems is a temporary interface that will be added to [backend.Backend]
135+
// once all concrete backend implementations satisfy the new interface.
136+
// TODO(tross): REMEMBER TO DELETE THIS
137+
type BackendWithItems interface {
138+
Backend
139+
140+
// Items produces an iterator of backend items in the range, and order
141+
// described in the provided [IterateParams].
142+
Items(ctx context.Context, params IterateParams) iter.Seq2[Item, error]
143+
}
144+
116145
// New initializes a new [Backend] implementation based on the service config.
117146
func New(ctx context.Context, backend string, params Params) (Backend, error) {
118147
registryMu.RLock()

lib/backend/etcdbk/etcd.go

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import (
2525
"crypto/x509"
2626
"encoding/base64"
2727
"errors"
28+
"iter"
2829
"log/slog"
2930
"os"
30-
"sort"
3131
"strconv"
3232
"strings"
3333
"sync"
@@ -649,6 +649,84 @@ func (b *EtcdBackend) NewWatcher(ctx context.Context, watch backend.Watch) (back
649649
return b.buf.NewWatcher(ctx, watch)
650650
}
651651

652+
func (b *EtcdBackend) Items(ctx context.Context, params backend.IterateParams) iter.Seq2[backend.Item, error] {
653+
if params.StartKey.IsZero() {
654+
err := trace.BadParameter("missing parameter startKey")
655+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
656+
}
657+
if params.EndKey.IsZero() {
658+
err := trace.BadParameter("missing parameter endKey")
659+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
660+
}
661+
662+
sort := clientv3.SortAscend
663+
if params.Descending {
664+
sort = clientv3.SortDescend
665+
}
666+
667+
const defaultPageSize = 1000
668+
return func(yield func(backend.Item, error) bool) {
669+
inclusiveStartKey := b.prependPrefix(params.StartKey)
670+
// etcd's range query includes the start point and excludes the end point,
671+
// but Backend.GetRange is supposed to be inclusive at both ends, so we
672+
// query until the very next key in lexicographic order (i.e., the same key
673+
// followed by a 0 byte)
674+
endKey := b.prependPrefix(params.EndKey) + "\x00"
675+
count := 0
676+
677+
pageSize := defaultPageSize
678+
for {
679+
if params.Limit > backend.NoLimit {
680+
pageSize = min(params.Limit-count, defaultPageSize)
681+
}
682+
683+
start := b.clock.Now()
684+
re, err := b.clients.Next().Get(ctx, inclusiveStartKey,
685+
clientv3.WithRange(endKey),
686+
clientv3.WithSort(clientv3.SortByKey, sort),
687+
clientv3.WithLimit(int64(pageSize)),
688+
)
689+
batchReadLatencies.Observe(time.Since(start).Seconds())
690+
batchReadRequests.Inc()
691+
if err := convertErr(err); err != nil {
692+
yield(backend.Item{}, trace.Wrap(err))
693+
return
694+
}
695+
696+
if len(re.Kvs) == 0 {
697+
return
698+
}
699+
700+
for _, kv := range re.Kvs {
701+
value, err := unmarshal(kv.Value)
702+
if err != nil {
703+
yield(backend.Item{}, trace.Wrap(err))
704+
return
705+
}
706+
707+
if !yield(backend.Item{
708+
Key: b.trimPrefix(kv.Key),
709+
Value: value,
710+
Revision: toBackendRevision(kv.ModRevision),
711+
}, nil) {
712+
return
713+
}
714+
715+
count++
716+
if params.Limit != backend.NoLimit && count >= params.Limit {
717+
return
718+
}
719+
}
720+
721+
if params.Descending {
722+
endKey = string(re.Kvs[len(re.Kvs)-1].Key)
723+
} else {
724+
inclusiveStartKey = string(re.Kvs[len(re.Kvs)-1].Key) + "\x00"
725+
}
726+
}
727+
}
728+
}
729+
652730
// GetRange returns query range
653731
func (b *EtcdBackend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
654732
if startKey.IsZero() {
@@ -657,35 +735,18 @@ func (b *EtcdBackend) GetRange(ctx context.Context, startKey, endKey backend.Key
657735
if endKey.IsZero() {
658736
return nil, trace.BadParameter("missing parameter endKey")
659737
}
660-
// etcd's range query includes the start point and excludes the end point,
661-
// but Backend.GetRange is supposed to be inclusive at both ends, so we
662-
// query until the very next key in lexicographic order (i.e., the same key
663-
// followed by a 0 byte)
664-
opts := []clientv3.OpOption{clientv3.WithRange(b.prependPrefix(endKey) + "\x00")}
665-
if limit > 0 {
666-
opts = append(opts, clientv3.WithLimit(int64(limit)))
667-
}
668-
start := b.clock.Now()
669-
re, err := b.clients.Next().Get(ctx, b.prependPrefix(startKey), opts...)
670-
batchReadLatencies.Observe(time.Since(start).Seconds())
671-
batchReadRequests.Inc()
672-
if err := convertErr(err); err != nil {
673-
return nil, trace.Wrap(err)
674-
}
675-
items := make([]backend.Item, 0, len(re.Kvs))
676-
for _, kv := range re.Kvs {
677-
value, err := unmarshal(kv.Value)
738+
739+
var result backend.GetResult
740+
for item, err := range b.Items(ctx, backend.IterateParams{StartKey: startKey, EndKey: endKey, Limit: limit}) {
678741
if err != nil {
679742
return nil, trace.Wrap(err)
680743
}
681-
items = append(items, backend.Item{
682-
Key: b.trimPrefix(kv.Key),
683-
Value: value,
684-
Revision: toBackendRevision(kv.ModRevision),
685-
})
744+
result.Items = append(result.Items, item)
745+
if limit != backend.NoLimit && len(result.Items) > limit {
746+
return nil, trace.BadParameter("item iterator produced more items than requested (this is a bug). limit=%d, recevied=%d", limit, len(result.Items))
747+
}
686748
}
687-
sort.Sort(backend.Items(items))
688-
return &backend.GetResult{Items: items}, nil
749+
return &result, nil
689750
}
690751

691752
func toBackendRevision(rev int64) string {

0 commit comments

Comments
 (0)