-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathredis_client.go
316 lines (281 loc) · 9.57 KB
/
redis_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package cacheutil
import (
"context"
"fmt"
"sync"
"time"
"unsafe"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
)
var (
// DefaultRedisClientConfig is default redis config.
DefaultRedisClientConfig = RedisClientConfig{
DialTimeout: time.Second * 5,
ReadTimeout: time.Second * 3,
WriteTimeout: time.Second * 3,
PoolSize: 100,
MinIdleConns: 10,
IdleTimeout: time.Minute * 5,
MaxGetMultiConcurrency: 100,
GetMultiBatchSize: 100,
MaxSetMultiConcurrency: 100,
SetMultiBatchSize: 100,
}
)
// RedisClientConfig is the config accepted by RedisClient.
type RedisClientConfig struct {
// Addr specifies the addresses of redis server.
Addr string `yaml:"addr"`
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Username string `yaml:"username"`
// Optional password. Must match the password specified in the
// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
// or the User Password when connecting to a Redis 6.0 instance, or greater,
// that is using the Redis ACL system.
Password string `yaml:"password"`
// DB Database to be selected after connecting to the server.
DB int `yaml:"db"`
// DialTimeout specifies the client dial timeout.
DialTimeout time.Duration `yaml:"dial_timeout"`
// ReadTimeout specifies the client read timeout.
ReadTimeout time.Duration `yaml:"read_timeout"`
// WriteTimeout specifies the client write timeout.
WriteTimeout time.Duration `yaml:"write_timeout"`
// Maximum number of socket connections.
PoolSize int `yaml:"pool_size"`
// MinIdleConns specifies the minimum number of idle connections which is useful when establishing
// new connection is slow.
MinIdleConns int `yaml:"min_idle_conns"`
// Amount of time after which client closes idle connections.
// Should be less than server's timeout.
// -1 disables idle timeout check.
IdleTimeout time.Duration `yaml:"idle_timeout"`
// Connection age at which client retires (closes) the connection.
// Default 0 is to not close aged connections.
MaxConnAge time.Duration `yaml:"max_conn_age"`
// MaxGetMultiConcurrency specifies the maximum number of concurrent GetMulti() operations.
// If set to 0, concurrency is unlimited.
MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"`
// GetMultiBatchSize specifies the maximum size per batch for mget.
GetMultiBatchSize int `yaml:"get_multi_batch_size"`
// MaxSetMultiConcurrency specifies the maximum number of concurrent SetMulti() operations.
// If set to 0, concurrency is unlimited.
MaxSetMultiConcurrency int `yaml:"max_set_multi_concurrency"`
// SetMultiBatchSize specifies the maximum size per batch for pipeline set.
SetMultiBatchSize int `yaml:"set_multi_batch_size"`
}
func (c *RedisClientConfig) validate() error {
if c.Addr == "" {
return errors.New("no redis addr provided")
}
return nil
}
// RedisClient is a wrap of redis.Client.
type RedisClient struct {
*redis.Client
config RedisClientConfig
// getMultiGate used to enforce the max number of concurrent GetMulti() operations.
getMultiGate gate.Gate
// setMultiGate used to enforce the max number of concurrent SetMulti() operations.
setMultiGate gate.Gate
logger log.Logger
durationSet prometheus.Observer
durationSetMulti prometheus.Observer
durationGetMulti prometheus.Observer
}
// NewRedisClient makes a new RedisClient.
func NewRedisClient(logger log.Logger, name string, conf []byte, reg prometheus.Registerer) (*RedisClient, error) {
config, err := parseRedisClientConfig(conf)
if err != nil {
return nil, err
}
return NewRedisClientWithConfig(logger, name, config, reg)
}
// NewRedisClientWithConfig makes a new RedisClient.
func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClientConfig,
reg prometheus.Registerer) (*RedisClient, error) {
if err := config.validate(); err != nil {
return nil, err
}
redisClient := redis.NewClient(&redis.Options{
Addr: config.Addr,
Username: config.Username,
Password: config.Password,
DB: config.DB,
DialTimeout: config.DialTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MinIdleConns: config.MinIdleConns,
MaxConnAge: config.MaxConnAge,
IdleTimeout: config.IdleTimeout,
})
if reg != nil {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg)
}
c := &RedisClient{
Client: redisClient,
config: config,
logger: logger,
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_redis_getmulti_", reg),
config.MaxGetMultiConcurrency,
),
setMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_redis_setmulti_", reg),
config.MaxSetMultiConcurrency,
),
}
duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_redis_operation_duration_seconds",
Help: "Duration of operations against memcached.",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 3, 6, 10},
}, []string{"operation"})
c.durationSet = duration.WithLabelValues(opSet)
c.durationSetMulti = duration.WithLabelValues(opSetMulti)
c.durationGetMulti = duration.WithLabelValues(opGetMulti)
return c, nil
}
// SetAsync implement RemoteCacheClient.
func (c *RedisClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error {
start := time.Now()
if _, err := c.Set(ctx, key, value, ttl).Result(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key,
"value_size", len(value))
return nil
}
c.durationSet.Observe(time.Since(start).Seconds())
return nil
}
// SetMulti set multiple keys and value.
func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl time.Duration) {
if len(data) == 0 {
return
}
start := time.Now()
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
err := doWithBatch(ctx, len(data), c.config.SetMultiBatchSize, c.setMultiGate, func(startIndex, endIndex int) error {
_, err := c.Pipelined(ctx, func(p redis.Pipeliner) error {
for _, key := range keys {
p.SetEX(ctx, key, data[key], ttl)
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis",
"err", err, "items", len(data))
return nil
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err,
"items", len(data))
return
}
c.durationSetMulti.Observe(time.Since(start).Seconds())
}
// GetMulti implement RemoteCacheClient.
func (c *RedisClient) GetMulti(ctx context.Context, keys []string) map[string][]byte {
if len(keys) == 0 {
return nil
}
start := time.Now()
results := make(map[string][]byte, len(keys))
var mu sync.Mutex
err := doWithBatch(ctx, len(keys), c.config.GetMultiBatchSize, c.getMultiGate, func(startIndex, endIndex int) error {
currentKeys := keys[startIndex:endIndex]
resp, err := c.MGet(ctx, currentKeys...).Result()
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resp))
return nil
}
mu.Lock()
defer mu.Unlock()
for i := 0; i < len(resp); i++ {
key := currentKeys[i]
switch val := resp[i].(type) {
case string:
results[key] = stringToBytes(val)
case nil: // miss
default:
level.Warn(c.logger).Log("msg",
fmt.Sprintf("unexpected redis mget result type:%T %v", resp[i], resp[i]))
}
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(keys))
return nil
}
c.durationGetMulti.Observe(time.Since(start).Seconds())
return results
}
// Stop implement RemoteCacheClient.
func (c *RedisClient) Stop() {
if err := c.Close(); err != nil {
level.Error(c.logger).Log("msg", "redis close err")
}
}
// stringToBytes converts string to byte slice (copied from vendor/github.com/go-redis/redis/v8/internal/util/unsafe.go).
func stringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}
// doWithBatch do func with batch and gate. batchSize==0 means one batch. gate==nil means no gate.
func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate, f func(startIndex, endIndex int) error) error {
if totalSize == 0 {
return nil
}
if batchSize <= 0 {
return f(0, totalSize)
}
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < totalSize; i += batchSize {
j := i + batchSize
if j > totalSize {
j = totalSize
}
if ga != nil {
if err := ga.Start(ctx); err != nil {
return nil
}
}
startIndex, endIndex := i, j
g.Go(func() error {
if ga != nil {
defer ga.Done()
}
return f(startIndex, endIndex)
})
}
return g.Wait()
}
// parseRedisClientConfig unmarshals a buffer into a RedisClientConfig with default values.
func parseRedisClientConfig(conf []byte) (RedisClientConfig, error) {
config := DefaultRedisClientConfig
if err := yaml.Unmarshal(conf, &config); err != nil {
return RedisClientConfig{}, err
}
return config, nil
}