Skip to content

Commit 0335149

Browse files
committed
Add iteration support to firestore
This builds on top of #52199 by updating the firestore backend to implement backend.BackendWithItems. Only GetRange, and not DeletRange, was refactored to use Items to retrieve a range. The iteration logic is slightly more complex here to ensure that documents in the collection are produced in the correct order no matter how the key was stored in the collection. Over the years the key has been stored in three different formats, and no single query will pull the items from all keys. There was a background migration task introduced in v17 to convert all keys to a single format. The iteration logic can be simplified once said migration is removed and all keys can be assumed to be of a single type.
1 parent edd55e9 commit 0335149

File tree

1 file changed

+228
-26
lines changed

1 file changed

+228
-26
lines changed

lib/backend/firestore/firestorebk.go

Lines changed: 228 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"context"
2525
"encoding/base64"
2626
"errors"
27+
"iter"
2728
"log/slog"
2829
"strconv"
2930
"strings"
@@ -34,6 +35,7 @@ import (
3435
"cloud.google.com/go/firestore/apiv1/admin/adminpb"
3536
"github.com/gravitational/trace"
3637
"github.com/jonboulle/clockwork"
38+
"google.golang.org/api/iterator"
3739
"google.golang.org/api/option"
3840
"google.golang.org/grpc"
3941
"google.golang.org/grpc/codes"
@@ -541,45 +543,245 @@ func (b *Backend) getRangeDocs(ctx context.Context, startKey, endKey backend.Key
541543
return allDocs, nil
542544
}
543545

544-
// GetRange returns range of elements
545-
func (b *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
546-
docSnaps, err := b.getRangeDocs(ctx, startKey, endKey, limit)
547-
if err != nil {
548-
return nil, trace.Wrap(err)
546+
func (b *Backend) Items(ctx context.Context, params backend.IterateParams) iter.Seq2[backend.Item, error] {
547+
if params.StartKey.IsZero() {
548+
err := trace.BadParameter("missing parameter startKey")
549+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
549550
}
550-
var values []backend.Item
551-
for _, docSnap := range docSnaps {
552-
r, err := newRecordFromDoc(docSnap)
553-
if err != nil {
554-
return nil, trace.Wrap(err)
551+
if params.EndKey.IsZero() {
552+
err := trace.BadParameter("missing parameter endKey")
553+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
554+
}
555+
556+
limit := params.Limit
557+
if limit <= 0 {
558+
limit = backend.DefaultRangeLimit
559+
}
560+
561+
sort := firestore.Asc
562+
if params.Descending {
563+
sort = firestore.Desc
564+
}
565+
566+
return func(yield func(backend.Item, error) bool) {
567+
count := 0
568+
defer func() {
569+
if count == backend.DefaultRangeLimit {
570+
b.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", params.StartKey, "limit", backend.DefaultRangeLimit)
571+
}
572+
}()
573+
574+
for docSnap, err := range b.documentSnapshots(ctx, params.StartKey.String(), params.EndKey.String(), limit, sort) {
575+
if err != nil {
576+
yield(backend.Item{}, trace.Wrap(err))
577+
return
578+
}
579+
580+
r, err := newRecordFromDoc(docSnap)
581+
if err != nil {
582+
yield(backend.Item{}, trace.Wrap(err))
583+
return
584+
}
585+
586+
if r.isExpired(b.clock.Now()) {
587+
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
588+
// If the document has been updated, then attempt one additional get to see if the
589+
// resource was updated and is no longer expired.
590+
docSnap, err := b.svc.Collection(b.CollectionName).
591+
Doc(docSnap.Ref.ID).
592+
Get(ctx)
593+
if err != nil {
594+
yield(backend.Item{}, trace.Wrap(err))
595+
return
596+
}
597+
598+
r, err = newRecordFromDoc(docSnap)
599+
if err != nil {
600+
yield(backend.Item{}, trace.Wrap(err))
601+
return
602+
}
603+
604+
if r.isExpired(b.clock.Now()) {
605+
continue
606+
}
607+
}
608+
}
609+
610+
if !yield(r.backendItem(), nil) {
611+
return
612+
}
613+
count++
614+
615+
if limit != backend.NoLimit && count >= limit {
616+
return
617+
}
555618
}
619+
}
620+
}
621+
622+
// documentSnapshots returns an iterator that aggregates all items in the collection
623+
// in the desired order. This is required because over the years the key has been
624+
// stored in three formats. In order to iterate over all keys in the collection in the
625+
// correct order, documents with keys of all formats need to be considered.
626+
//
627+
// TODO(tross|tigrato): DELETE IN V19.0.0 with the background migration
628+
func (b *Backend) documentSnapshots(ctx context.Context, startKey, endKey string, limit int, sort firestore.Direction) iter.Seq2[*firestore.DocumentSnapshot, error] {
629+
return func(yield func(*firestore.DocumentSnapshot, error) bool) {
630+
docsIter := newDocIter(b.svc.Collection(b.CollectionName).
631+
Where(keyDocProperty, ">=", []byte(startKey)).
632+
Where(keyDocProperty, "<=", []byte(endKey)).
633+
Limit(limit).
634+
OrderBy(keyDocProperty, sort).
635+
Documents(ctx))
636+
637+
legacyDocsIter := newDocIter(b.svc.Collection(b.CollectionName).
638+
Where(keyDocProperty, ">=", startKey).
639+
Where(keyDocProperty, "<=", endKey).
640+
Limit(limit).
641+
OrderBy(keyDocProperty, sort).
642+
Documents(ctx))
643+
644+
brokenDocsIter := newDocIter(b.svc.Collection(b.CollectionName).
645+
Where(keyDocProperty, ">=", brokenKey(startKey)).
646+
Where(keyDocProperty, "<=", brokenKey(endKey)).
647+
Limit(limit).
648+
OrderBy(keyDocProperty, sort).
649+
Documents(ctx))
650+
651+
defer func() {
652+
docsIter.stop()
653+
legacyDocsIter.stop()
654+
brokenDocsIter.stop()
655+
}()
656+
657+
for {
658+
docSnap, docSnapErr := docsIter.next()
659+
legacySnap, legacySnapErr := legacyDocsIter.next()
660+
brokenSnap, brokenSnapErr := brokenDocsIter.next()
661+
662+
// All items have been exhausted.
663+
if errors.Is(docSnapErr, iterator.Done) &&
664+
errors.Is(legacySnapErr, iterator.Done) &&
665+
errors.Is(brokenSnapErr, iterator.Done) {
666+
return
667+
}
556668

557-
if r.isExpired(b.clock.Now()) {
558-
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
559-
// If the document has been updated, then attempt one additional get to see if the
560-
// resource was updated and is no longer expired.
561-
docSnap, err := b.svc.Collection(b.CollectionName).
562-
Doc(docSnap.Ref.ID).
563-
Get(ctx)
669+
// All iterators failed.
670+
if docSnapErr != nil && legacySnapErr != nil && brokenSnapErr != nil {
671+
yield(nil, trace.NewAggregate(docSnapErr, legacySnapErr, brokenSnapErr))
672+
return
673+
}
674+
675+
// Find the iterator with the next key in the sequence.
676+
var docKey, legacyKey, brokenKey []byte
677+
if docSnap != nil {
678+
r, err := newRecordFromDoc(docSnap)
564679
if err != nil {
565-
return nil, ConvertGRPCError(err)
680+
yield(nil, err)
681+
return
566682
}
567-
r, err := newRecordFromDoc(docSnap)
683+
docKey = r.Key
684+
}
685+
686+
if legacySnap != nil {
687+
r, err := newRecordFromDoc(legacySnap)
568688
if err != nil {
569-
return nil, trace.Wrap(err)
689+
yield(nil, err)
690+
return
570691
}
692+
legacyKey = r.Key
693+
}
571694

572-
if !r.isExpired(b.clock.Now()) {
573-
values = append(values, r.backendItem())
695+
if brokenSnap != nil {
696+
r, err := newRecordFromDoc(brokenSnap)
697+
if err != nil {
698+
yield(nil, err)
699+
return
574700
}
701+
brokenKey = r.Key
702+
}
703+
704+
compareKeys := func(key, other1, other2 []byte) bool {
705+
expected := -1
706+
if sort == firestore.Desc {
707+
expected = 1
708+
}
709+
710+
switch {
711+
case len(key) == 0:
712+
return false
713+
case len(other1) == 0 && len(other2) == 0:
714+
return true
715+
case len(other1) == 0:
716+
return bytes.Compare(key, other2) == expected
717+
case len(other2) == 0:
718+
return bytes.Compare(key, other1) == expected
719+
default:
720+
return bytes.Compare(key, other1) == expected && bytes.Compare(key, other2) == expected
721+
}
722+
}
723+
724+
switch {
725+
case compareKeys(docKey, legacyKey, brokenKey):
726+
docsIter.consume()
727+
if !yield(docSnap, nil) {
728+
return
729+
}
730+
case compareKeys(legacyKey, docKey, brokenKey):
731+
legacyDocsIter.consume()
732+
if !yield(legacySnap, nil) {
733+
return
734+
}
735+
case compareKeys(brokenKey, legacyKey, docKey):
736+
brokenDocsIter.consume()
737+
if !yield(brokenSnap, nil) {
738+
return
739+
}
740+
default:
741+
yield(nil, errors.New("no valid snapshots found"))
742+
return
575743
}
576-
// Do not include this document in the results.
577-
continue
578744
}
745+
}
746+
}
747+
748+
type docIter struct {
749+
iter *firestore.DocumentIterator
750+
snap *firestore.DocumentSnapshot
751+
err error
752+
}
753+
754+
func newDocIter(iter *firestore.DocumentIterator) *docIter {
755+
return &docIter{iter: iter}
756+
}
757+
758+
func (d *docIter) next() (*firestore.DocumentSnapshot, error) {
759+
if d.snap == nil && d.err == nil {
760+
d.snap, d.err = d.iter.Next()
761+
}
762+
763+
return d.snap, d.err
764+
}
765+
766+
func (d *docIter) consume() {
767+
d.snap, d.err = nil, nil
768+
}
769+
770+
func (d *docIter) stop() {
771+
d.snap, d.err = nil, nil
772+
d.iter.Stop()
773+
}
579774

580-
values = append(values, r.backendItem())
775+
// GetRange returns range of elements
776+
func (b *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
777+
var result backend.GetResult
778+
for item, err := range b.Items(ctx, backend.IterateParams{StartKey: startKey, EndKey: endKey, Limit: limit}) {
779+
if err != nil {
780+
return nil, trace.Wrap(err)
781+
}
782+
result.Items = append(result.Items, item)
581783
}
582-
return &backend.GetResult{Items: values}, nil
784+
return &result, nil
583785
}
584786

585787
// DeleteRange deletes range of items with keys between startKey and endKey

0 commit comments

Comments
 (0)