-
Notifications
You must be signed in to change notification settings - Fork 814
/
Copy pathindex_cache.go
113 lines (89 loc) · 3.85 KB
/
index_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package tsdb
import (
"flag"
"fmt"
"strings"
"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/model"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/cortexproject/cortex/pkg/util"
)
const (
// IndexCacheBackendInMemory is the value for the in-memory index cache backend.
IndexCacheBackendInMemory = "inmemory"
// IndexCacheBackendMemcached is the value for the memcached index cache backend.
IndexCacheBackendMemcached = "memcached"
// IndexCacheBackendDefault is the value for the default index cache backend.
IndexCacheBackendDefault = IndexCacheBackendInMemory
defaultMaxItemSize = model.Bytes(128 * units.MiB)
)
var (
supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached}
errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
)
type IndexCacheConfig struct {
Backend string `yaml:"backend"`
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
Memcached MemcachedClientConfig `yaml:"memcached"`
}
func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-cache.")
}
func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", IndexCacheBackendDefault, fmt.Sprintf("The index cache backend type. Supported values: %s.", strings.Join(supportedIndexCacheBackends, ", ")))
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
}
// Validate the config.
func (cfg *IndexCacheConfig) Validate() error {
if !util.StringsContain(supportedIndexCacheBackends, cfg.Backend) {
return errUnsupportedIndexCacheBackend
}
if cfg.Backend == IndexCacheBackendMemcached {
if err := cfg.Memcached.Validate(); err != nil {
return err
}
}
return nil
}
type InMemoryIndexCacheConfig struct {
MaxSizeBytes uint64 `yaml:"max_size_bytes"`
}
func (cfg *InMemoryIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.Uint64Var(&cfg.MaxSizeBytes, prefix+"max-size-bytes", uint64(1*units.Gibibyte), "Maximum size in bytes of in-memory index cache used to speed up blocks index lookups (shared between all tenants).")
}
// NewIndexCache creates a new index cache based on the input configuration.
func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
switch cfg.Backend {
case IndexCacheBackendInMemory:
return newInMemoryIndexCache(cfg.InMemory, logger, registerer)
case IndexCacheBackendMemcached:
return newMemcachedIndexCache(cfg.Memcached, logger, registerer)
default:
return nil, errUnsupportedIndexCacheBackend
}
}
func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
maxCacheSize := model.Bytes(cfg.MaxSizeBytes)
// Calculate the max item size.
maxItemSize := defaultMaxItemSize
if maxItemSize > maxCacheSize {
maxItemSize = maxCacheSize
}
return storecache.NewInMemoryIndexCacheWithConfig(logger, registerer, storecache.InMemoryIndexCacheConfig{
MaxSize: maxCacheSize,
MaxItemSize: maxItemSize,
})
}
func newMemcachedIndexCache(cfg MemcachedClientConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
client, err := cacheutil.NewMemcachedClientWithConfig(logger, "index-cache", cfg.ToMemcachedClientConfig(), registerer)
if err != nil {
return nil, errors.Wrapf(err, "create index cache memcached client")
}
return storecache.NewMemcachedIndexCache(logger, client, registerer)
}