Skip to content

Commit 167863b

Browse files
committed
Add iteration support to memory backend
This builds on top of #52199 by updating the memory backend to implement backend.BackendWithItems. Only GetRange, and not DeletRange, was refactored to use Items to retrieve a range.
1 parent bc1edf1 commit 167863b

File tree

1 file changed

+101
-31
lines changed

1 file changed

+101
-31
lines changed

lib/backend/memory/memory.go

Lines changed: 101 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package memory
2121
import (
2222
"bytes"
2323
"context"
24+
"iter"
2425
"log/slog"
2526
"sync"
2627
"time"
@@ -276,8 +277,19 @@ func (m *Memory) DeleteRange(ctx context.Context, startKey, endKey backend.Key)
276277
m.Lock()
277278
defer m.Unlock()
278279
m.removeExpired()
279-
re := m.getRange(ctx, startKey, endKey, backend.NoLimit)
280-
for _, item := range re.Items {
280+
281+
var items []backend.Item
282+
m.tree.AscendGreaterOrEqual(&btreeItem{Item: backend.Item{Key: startKey}}, func(item *btreeItem) bool {
283+
if endKey.Compare(item.Key) < 0 {
284+
return false
285+
}
286+
287+
items = append(items, item.Item)
288+
289+
return true
290+
})
291+
292+
for _, item := range items {
281293
event := backend.Event{
282294
Type: types.OpDelete,
283295
Item: item,
@@ -290,25 +302,100 @@ func (m *Memory) DeleteRange(ctx context.Context, startKey, endKey backend.Key)
290302
return nil
291303
}
292304

293-
// GetRange returns query range
294-
func (m *Memory) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
295-
if startKey.IsZero() {
296-
return nil, trace.BadParameter("missing parameter startKey")
305+
func (m *Memory) Items(ctx context.Context, params backend.IterateParams) iter.Seq2[backend.Item, error] {
306+
if params.StartKey.IsZero() {
307+
err := trace.BadParameter("missing parameter startKey")
308+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
297309
}
298-
if endKey.IsZero() {
299-
return nil, trace.BadParameter("missing parameter endKey")
310+
if params.EndKey.IsZero() {
311+
err := trace.BadParameter("missing parameter endKey")
312+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
300313
}
314+
315+
limit := params.Limit
301316
if limit <= 0 {
302317
limit = backend.DefaultRangeLimit
303318
}
304-
m.Lock()
305-
defer m.Unlock()
306-
m.removeExpired()
307-
re := m.getRange(ctx, startKey, endKey, limit)
308-
if len(re.Items) == backend.DefaultRangeLimit {
319+
320+
const defaultPageSize = 1000
321+
return func(yield func(backend.Item, error) bool) {
322+
startKey := params.StartKey
323+
if params.Descending {
324+
startKey = params.EndKey
325+
}
326+
327+
var totalCount, pageCount int
328+
pageLimit := defaultPageSize
329+
items := make([]backend.Item, pageLimit)
330+
startItem := &btreeItem{Item: backend.Item{Key: startKey}}
331+
332+
treeIter := m.tree.AscendGreaterOrEqual
333+
if params.Descending {
334+
treeIter = m.tree.DescendLessOrEqual
335+
}
336+
337+
for {
338+
pageLimit = min(limit-totalCount, defaultPageSize)
339+
pageCount = 0
340+
341+
m.Lock()
342+
m.removeExpired()
343+
var moreItems bool
344+
treeIter(startItem, func(item *btreeItem) bool {
345+
if params.Descending && item.Key.Compare(params.StartKey) < 0 {
346+
startItem = item
347+
return false
348+
}
349+
350+
if !params.Descending && params.EndKey.Compare(item.Key) < 0 {
351+
startItem = item
352+
return false
353+
}
354+
355+
if pageCount == pageLimit {
356+
startItem = item
357+
moreItems = true
358+
return false
359+
}
360+
361+
items[pageCount] = item.Item
362+
pageCount++
363+
return true
364+
})
365+
m.Unlock()
366+
367+
for _, item := range items[:pageCount] {
368+
if !yield(item, nil) {
369+
return
370+
}
371+
372+
totalCount++
373+
if limit != backend.NoLimit && totalCount >= limit {
374+
return
375+
}
376+
}
377+
378+
if !moreItems || pageCount < pageLimit {
379+
return
380+
}
381+
}
382+
}
383+
}
384+
385+
// GetRange returns query range
386+
func (m *Memory) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
387+
var result backend.GetResult
388+
for item, err := range m.Items(ctx, backend.IterateParams{StartKey: startKey, EndKey: endKey, Limit: limit}) {
389+
if err != nil {
390+
return nil, trace.Wrap(err)
391+
}
392+
result.Items = append(result.Items, item)
393+
}
394+
395+
if len(result.Items) == backend.DefaultRangeLimit {
309396
m.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit)
310397
}
311-
return &re, nil
398+
return &result, nil
312399
}
313400

314401
// KeepAlive updates TTL on the lease
@@ -439,23 +526,6 @@ func (m *Memory) NewWatcher(ctx context.Context, watch backend.Watch) (backend.W
439526
return m.buf.NewWatcher(ctx, watch)
440527
}
441528

442-
func (m *Memory) getRange(ctx context.Context, startKey, endKey backend.Key, limit int) backend.GetResult {
443-
var res backend.GetResult
444-
startItem := &btreeItem{Item: backend.Item{Key: startKey}}
445-
endItem := &btreeItem{Item: backend.Item{Key: endKey}}
446-
m.tree.AscendGreaterOrEqual(startItem, func(item *btreeItem) bool {
447-
if endItem.Less(item) {
448-
return false
449-
}
450-
res.Items = append(res.Items, item.Item)
451-
if limit > 0 && len(res.Items) >= limit {
452-
return false
453-
}
454-
return true
455-
})
456-
return res
457-
}
458-
459529
// removeExpired makes a pass through map and removes expired elements
460530
// returns the number of expired elements removed
461531
func (m *Memory) removeExpired() int {

0 commit comments

Comments
 (0)