Skip to content

Commit 8a3e8ac

Browse files
changes suggested from PR review
Signed-off-by: Sandeep Sukhani <[email protected]>
1 parent b69195a commit 8a3e8ac

File tree

6 files changed

+40
-35
lines changed

6 files changed

+40
-35
lines changed

pkg/chunk/aws/metrics.go renamed to pkg/chunk/aws/dynamodb_metrics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/weaveworks/common/instrument"
77
)
88

9-
type metrics struct {
9+
type dynamoDBMetrics struct {
1010
dynamoRequestDuration *instrument.HistogramCollector
1111
dynamoConsumedCapacity *prometheus.CounterVec
1212
dynamoThrottled *prometheus.CounterVec
@@ -15,8 +15,8 @@ type metrics struct {
1515
dynamoQueryPagesCount prometheus.Histogram
1616
}
1717

18-
func newMetrics(r prometheus.Registerer) *metrics {
19-
m := metrics{}
18+
func newMetrics(r prometheus.Registerer) *dynamoDBMetrics {
19+
m := dynamoDBMetrics{}
2020

2121
m.dynamoRequestDuration = instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
2222
Namespace: "cortex",

pkg/chunk/aws/dynamodb_storage_client.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ type dynamoDBStorageClient struct {
100100
batchGetItemRequestFn func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest
101101
batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest
102102

103-
metrics *metrics
103+
metrics *dynamoDBMetrics
104104
}
105105

106106
// NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient.
@@ -141,9 +141,9 @@ func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch {
141141
return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{})
142142
}
143143

144-
func logWriteRetry(unprocessed dynamoDBWriteBatch, dynamoThrottled *prometheus.CounterVec) {
144+
func logWriteRetry(unprocessed dynamoDBWriteBatch, metrics *dynamoDBMetrics) {
145145
for table, reqs := range unprocessed {
146-
dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs)))
146+
metrics.dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs)))
147147
for _, req := range reqs {
148148
item := req.PutRequest.Item
149149
var hash, rnge string
@@ -191,13 +191,13 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
191191

192192
if err != nil {
193193
for tableName := range requests {
194-
recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics.dynamoFailures)
194+
recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics)
195195
}
196196

197197
// If we get provisionedThroughputExceededException, then no items were processed,
198198
// so back off and retry all.
199199
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
200-
logWriteRetry(requests, a.metrics.dynamoThrottled)
200+
logWriteRetry(requests, a.metrics)
201201
unprocessed.TakeReqs(requests, -1)
202202
_ = a.writeThrottle.WaitN(ctx, len(requests))
203203
backoff.Wait()
@@ -222,7 +222,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
222222
// If there are unprocessed items, retry those items.
223223
unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems)
224224
if len(unprocessedItems) > 0 {
225-
logWriteRetry(unprocessedItems, a.metrics.dynamoThrottled)
225+
logWriteRetry(unprocessedItems, a.metrics)
226226
_ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len())
227227
unprocessed.TakeReqs(unprocessedItems, -1)
228228
}
@@ -304,7 +304,7 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery
304304
}
305305

306306
return callback(query, &dynamoDBReadResponse{items: output.Items})
307-
}, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics.dynamoFailures))
307+
}, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics))
308308
})
309309
if err != nil {
310310
return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName)
@@ -447,7 +447,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c
447447

448448
if err != nil {
449449
for tableName := range requests {
450-
recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics.dynamoFailures)
450+
recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics)
451451
}
452452

453453
// If we get provisionedThroughputExceededException, then no items were processed,
@@ -746,21 +746,21 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) {
746746
}
747747
}
748748

749-
func withErrorHandler(tableName, operation string, dynamoFailures *prometheus.CounterVec) func(req *request.Request) {
749+
func withErrorHandler(tableName, operation string, metrics *dynamoDBMetrics) func(req *request.Request) {
750750
return func(req *request.Request) {
751751
req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) {
752752
if req.Error != nil {
753-
recordDynamoError(tableName, req.Error, operation, dynamoFailures)
753+
recordDynamoError(tableName, req.Error, operation, metrics)
754754
}
755755
})
756756
}
757757
}
758758

759-
func recordDynamoError(tableName string, err error, operation string, dynamoFailures *prometheus.CounterVec) {
759+
func recordDynamoError(tableName string, err error, operation string, metrics *dynamoDBMetrics) {
760760
if awsErr, ok := err.(awserr.Error); ok {
761-
dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1))
761+
metrics.dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1))
762762
} else {
763-
dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1))
763+
metrics.dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1))
764764
}
765765
}
766766

pkg/chunk/aws/dynamodb_table_client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type dynamoTableClient struct {
3636
DynamoDB dynamodbiface.DynamoDBAPI
3737
callManager callManager
3838
autoscale autoscale
39-
metrics *metrics
39+
metrics *dynamoDBMetrics
4040
}
4141

4242
// NewDynamoDBTableClient makes a new DynamoTableClient.
@@ -53,7 +53,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig, reg prometheus.Registerer) (chun
5353

5454
var autoscale autoscale
5555
if cfg.Metrics.URL != "" {
56-
autoscale, err = newAutoScale(cfg)
56+
autoscale, err = newMetricsAutoScaling(cfg)
5757
if err != nil {
5858
return nil, err
5959
}
@@ -323,7 +323,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch
323323
return err
324324
})
325325
}); err != nil {
326-
recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics.dynamoFailures)
326+
recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics)
327327
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" {
328328
level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err)
329329
} else {
@@ -341,7 +341,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch
341341
return err
342342
})
343343
}); err != nil {
344-
recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics.dynamoFailures)
344+
recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics)
345345
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" {
346346
level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err)
347347
} else {

pkg/chunk/aws/metrics_autoscaling.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type metricsData struct {
7878
readErrorRates map[string]float64
7979
}
8080

81-
func newAutoScale(cfg DynamoDBConfig) (*metricsData, error) {
81+
func newMetricsAutoScaling(cfg DynamoDBConfig) (*metricsData, error) {
8282
client, err := promApi.NewClient(promApi.Config{Address: cfg.Metrics.URL})
8383
if err != nil {
8484
return nil, err

pkg/chunk/storage/factory.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
153153
stores := chunk.NewCompositeStore(cacheGenNumLoader)
154154

155155
for _, s := range schemaCfg.Configs {
156-
reg := prometheus.WrapRegistererWith(
157-
prometheus.Labels{"purpose": s.From.String()}, reg)
156+
indexClientReg := prometheus.WrapRegistererWith(
157+
prometheus.Labels{"component": "index-store-" + s.From.String()}, reg)
158158

159-
index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, reg)
159+
index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, indexClientReg)
160160
if err != nil {
161161
return nil, errors.Wrap(err, "error creating index client")
162162
}
@@ -166,7 +166,11 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
166166
if objectStoreType == "" {
167167
objectStoreType = s.IndexType
168168
}
169-
chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, reg)
169+
170+
chunkClientReg := prometheus.WrapRegistererWith(
171+
prometheus.Labels{"component": "chunk-store-" + s.From.String()}, reg)
172+
173+
chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, chunkClientReg)
170174
if err != nil {
171175
return nil, errors.Wrap(err, "error creating object client")
172176
}

pkg/cortex/modules.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
330330

331331
var indexClient chunk.IndexClient
332332
reg := prometheus.WrapRegistererWith(
333-
prometheus.Labels{"purpose": "delete-requests"}, prometheus.DefaultRegisterer)
333+
prometheus.Labels{"component": DeleteRequestsStore}, prometheus.DefaultRegisterer)
334334
indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema, reg)
335335
if err != nil {
336336
return
@@ -427,7 +427,10 @@ func (t *Cortex) initTableManager() (services.Service, error) {
427427
os.Exit(1)
428428
}
429429

430-
tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage, prometheus.DefaultRegisterer)
430+
reg := prometheus.WrapRegistererWith(
431+
prometheus.Labels{"component": "table-manager-index-chunk-" + lastConfig.From.String()}, prometheus.DefaultRegisterer)
432+
433+
tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage, reg)
431434
if err != nil {
432435
return nil, err
433436
}
@@ -437,14 +440,12 @@ func (t *Cortex) initTableManager() (services.Service, error) {
437440

438441
var extraTables []chunk.ExtraTables
439442
if t.Cfg.PurgerConfig.Enable {
440-
var deleteStoreTableClient chunk.TableClient
441-
if lastConfig.IndexType == t.Cfg.Storage.DeleteStoreConfig.Store {
442-
deleteStoreTableClient = tableClient
443-
} else {
444-
deleteStoreTableClient, err = storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, prometheus.DefaultRegisterer)
445-
if err != nil {
446-
return nil, err
447-
}
443+
reg := prometheus.WrapRegistererWith(
444+
prometheus.Labels{"component": "table-manager-" + DeleteRequestsStore}, prometheus.DefaultRegisterer)
445+
446+
deleteStoreTableClient, err := storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, reg)
447+
if err != nil {
448+
return nil, err
448449
}
449450

450451
extraTables = append(extraTables, chunk.ExtraTables{TableClient: deleteStoreTableClient, Tables: t.Cfg.Storage.DeleteStoreConfig.GetTables()})

0 commit comments

Comments
 (0)