Skip to content

Commit 73cb98d

Browse files
alanprotCharlieTLe
authored andcommitted
Fix: PostingCache promise should fetch data only once (cortexproject#6314)
* Fix: PostingCache promise should fetch data only once Signed-off-by: alanprot <[email protected]> * fix Signed-off-by: alanprot <[email protected]> * fix test using default prometheus registerer Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent b1a54ff commit 73cb98d

File tree

2 files changed

+54
-33
lines changed

2 files changed

+54
-33
lines changed

pkg/storage/tsdb/expanded_postings_cache.go

+16-19
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,17 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
197197
return c.result(promise)
198198
}
199199

200-
func (c *BlocksPostingsForMatchersCache) result(promise *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
200+
func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
201201
return func(ctx context.Context) (index.Postings, error) {
202-
ids, err := promise.result(ctx)
203-
return index.NewListPostings(ids), err
202+
select {
203+
case <-ctx.Done():
204+
return nil, ctx.Err()
205+
case <-ce.done:
206+
if ctx.Err() != nil {
207+
return nil, ctx.Err()
208+
}
209+
return index.NewListPostings(ce.v), ce.err
210+
}
204211
}
205212
}
206213

@@ -327,9 +334,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
327334
c.expire()
328335
}
329336

330-
// If is cached but is expired, lets try to replace the cache value
331-
if ok && loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) {
332-
if c.cachedValues.CompareAndSwap(k, loaded, r) {
337+
if ok {
338+
// If the promise is already in the cache, lets wait it to fetch the data.
339+
<-loaded.(*cacheEntryPromise[V]).done
340+
341+
// If is cached but is expired, lets try to replace the cache value.
342+
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
333343
r.v, r.sizeBytes, r.err = fetch()
334344
r.sizeBytes += int64(len(k))
335345
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
@@ -404,19 +414,6 @@ type cacheEntryPromise[V any] struct {
404414
err error
405415
}
406416

407-
func (ce *cacheEntryPromise[V]) result(ctx context.Context) (V, error) {
408-
select {
409-
case <-ctx.Done():
410-
return ce.v, ctx.Err()
411-
case <-ce.done:
412-
if ctx.Err() != nil {
413-
return ce.v, ctx.Err()
414-
}
415-
416-
return ce.v, ce.err
417-
}
418-
}
419-
420417
func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool {
421418
ts := ce.ts
422419
r := now.Sub(ts)

pkg/storage/tsdb/expanded_postings_cache_test.go

+38-14
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,59 @@
11
package tsdb
22

33
import (
4-
"context"
54
"fmt"
65
"strings"
6+
"sync"
77
"testing"
88
"time"
99

1010
"github.com/prometheus/client_golang/prometheus"
1111
"github.com/stretchr/testify/require"
12+
"go.uber.org/atomic"
1213
)
1314

15+
func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
16+
cfg := PostingsCacheConfig{
17+
Enabled: true,
18+
Ttl: time.Hour,
19+
MaxBytes: 10 << 20,
20+
}
21+
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
22+
cache := newFifoCache[int](cfg, "test", m, time.Now)
23+
calls := atomic.Int64{}
24+
concurrency := 100
25+
wg := sync.WaitGroup{}
26+
wg.Add(concurrency)
27+
28+
fetchFunc := func() (int, int64, error) {
29+
calls.Inc()
30+
time.Sleep(100 * time.Millisecond)
31+
return 0, 0, nil
32+
}
33+
34+
for i := 0; i < 100; i++ {
35+
go func() {
36+
defer wg.Done()
37+
cache.getPromiseForKey("key1", fetchFunc)
38+
}()
39+
}
40+
41+
wg.Wait()
42+
require.Equal(t, int64(1), calls.Load())
43+
44+
}
45+
1446
func TestFifoCacheDisabled(t *testing.T) {
1547
cfg := PostingsCacheConfig{}
1648
cfg.Enabled = false
17-
m := NewPostingCacheMetrics(prometheus.DefaultRegisterer)
49+
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
1850
timeNow := time.Now
1951
cache := newFifoCache[int](cfg, "test", m, timeNow)
2052
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
2153
return 1, 0, nil
2254
})
2355
require.False(t, loaded)
24-
v, err := old.result(context.Background())
25-
require.NoError(t, err)
26-
require.Equal(t, 1, v)
56+
require.Equal(t, 1, old.v)
2757
require.False(t, cache.contains("key1"))
2858
}
2959

@@ -68,17 +98,13 @@ func TestFifoCacheExpire(t *testing.T) {
6898
return 1, 8, nil
6999
})
70100
require.False(t, loaded)
71-
v, err := p.result(context.Background())
72-
require.NoError(t, err)
73-
require.Equal(t, 1, v)
101+
require.Equal(t, 1, p.v)
74102
require.True(t, cache.contains(key))
75103
p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) {
76104
return 1, 0, nil
77105
})
78106
require.True(t, loaded)
79-
v, err = p.result(context.Background())
80-
require.NoError(t, err)
81-
require.Equal(t, 1, v)
107+
require.Equal(t, 1, p.v)
82108
}
83109

84110
totalCacheSize := 0
@@ -104,10 +130,8 @@ func TestFifoCacheExpire(t *testing.T) {
104130
return 2, 18, nil
105131
})
106132
require.False(t, loaded)
107-
v, err := p.result(context.Background())
108-
require.NoError(t, err)
109133
// New value
110-
require.Equal(t, 2, v)
134+
require.Equal(t, 2, p.v)
111135
// Total Size Updated
112136
require.Equal(t, originalSize+10, cache.cachedBytes)
113137
}

0 commit comments

Comments
 (0)