@@ -26,6 +26,7 @@ import (
26
26
"github.com/improbable-eng/thanos/pkg/objstore"
27
27
"github.com/improbable-eng/thanos/pkg/pool"
28
28
"github.com/improbable-eng/thanos/pkg/runutil"
29
+ storecache "github.com/improbable-eng/thanos/pkg/store/cache"
29
30
"github.com/improbable-eng/thanos/pkg/store/storepb"
30
31
"github.com/improbable-eng/thanos/pkg/strutil"
31
32
"github.com/improbable-eng/thanos/pkg/tracing"
@@ -182,7 +183,7 @@ type BucketStore struct {
182
183
metrics * bucketStoreMetrics
183
184
bucket objstore.BucketReader
184
185
dir string
185
- indexCache * indexCache
186
+ indexCache * storecache. IndexCache
186
187
chunkPool * pool.BytesPool
187
188
188
189
// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
@@ -225,10 +226,17 @@ func NewBucketStore(
225
226
return nil , errors .Errorf ("max concurrency value cannot be lower than 0 (got %v)" , maxConcurrent )
226
227
}
227
228
228
- indexCache , err := newIndexCache (reg , indexCacheSizeBytes )
229
+ // TODO(bwplotka): Add as a flag?
230
+ maxItemSizeBytes := indexCacheSizeBytes / 2
231
+
232
+ indexCache , err := storecache .NewIndexCache (logger , reg , storecache.Opts {
233
+ MaxSizeBytes : indexCacheSizeBytes ,
234
+ MaxItemSizeBytes : maxItemSizeBytes ,
235
+ })
229
236
if err != nil {
230
237
return nil , errors .Wrap (err , "create index cache" )
231
238
}
239
+
232
240
chunkPool , err := pool .NewBytesPool (2e5 , 50e6 , 2 , maxChunkPoolBytes )
233
241
if err != nil {
234
242
return nil , errors .Wrap (err , "create chunk pool" )
@@ -1058,7 +1066,7 @@ type bucketBlock struct {
1058
1066
bucket objstore.BucketReader
1059
1067
meta * metadata.Meta
1060
1068
dir string
1061
- indexCache * indexCache
1069
+ indexCache * storecache. IndexCache
1062
1070
chunkPool * pool.BytesPool
1063
1071
1064
1072
indexVersion int
@@ -1081,7 +1089,7 @@ func newBucketBlock(
1081
1089
bkt objstore.BucketReader ,
1082
1090
id ulid.ULID ,
1083
1091
dir string ,
1084
- indexCache * indexCache ,
1092
+ indexCache * storecache. IndexCache ,
1085
1093
chunkPool * pool.BytesPool ,
1086
1094
p partitioner ,
1087
1095
) (b * bucketBlock , err error ) {
@@ -1241,13 +1249,13 @@ type bucketIndexReader struct {
1241
1249
block * bucketBlock
1242
1250
dec * index.Decoder
1243
1251
stats * queryStats
1244
- cache * indexCache
1252
+ cache * storecache. IndexCache
1245
1253
1246
1254
mtx sync.Mutex
1247
1255
loadedSeries map [uint64 ][]byte
1248
1256
}
1249
1257
1250
- func newBucketIndexReader (ctx context.Context , logger log.Logger , block * bucketBlock , cache * indexCache ) * bucketIndexReader {
1258
+ func newBucketIndexReader (ctx context.Context , logger log.Logger , block * bucketBlock , cache * storecache. IndexCache ) * bucketIndexReader {
1251
1259
r := & bucketIndexReader {
1252
1260
logger : logger ,
1253
1261
ctx : ctx ,
@@ -1415,7 +1423,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
1415
1423
for i , g := range groups {
1416
1424
for j , key := range g .keys {
1417
1425
// Get postings for the given key from cache first.
1418
- if b , ok := r .cache .postings (r .block .meta .ULID , key ); ok {
1426
+ if b , ok := r .cache .Postings (r .block .meta .ULID , key ); ok {
1419
1427
r .stats .postingsTouched ++
1420
1428
r .stats .postingsTouchedSizeSum += len (b )
1421
1429
@@ -1487,7 +1495,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
1487
1495
1488
1496
// Return postings and fill LRU cache.
1489
1497
groups [p .groupID ].Fill (p .keyID , fetchedPostings )
1490
- r .cache .setPostings (r .block .meta .ULID , groups [p .groupID ].keys [p .keyID ], c )
1498
+ r .cache .SetPostings (r .block .meta .ULID , groups [p .groupID ].keys [p .keyID ], c )
1491
1499
1492
1500
// If we just fetched it we still have to update the stats for touched postings.
1493
1501
r .stats .postingsTouched ++
@@ -1510,7 +1518,7 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
1510
1518
var newIDs []uint64
1511
1519
1512
1520
for _ , id := range ids {
1513
- if b , ok := r .cache .series (r .block .meta .ULID , id ); ok {
1521
+ if b , ok := r .cache .Series (r .block .meta .ULID , id ); ok {
1514
1522
r .loadedSeries [id ] = b
1515
1523
continue
1516
1524
}
@@ -1567,7 +1575,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
1567
1575
}
1568
1576
c = c [n : n + int (l )]
1569
1577
r .loadedSeries [id ] = c
1570
- r .cache .setSeries (r .block .meta .ULID , id , c )
1578
+ r .cache .SetSeries (r .block .meta .ULID , id , c )
1571
1579
}
1572
1580
return nil
1573
1581
}
0 commit comments