Skip to content

fix panic when using cassandra in multiple periodic configs or as a store for both index and delete requests #2774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* [BUGFIX] Ingester: Flushing chunks via `/flush` endpoint could previously lead to panic, if chunks were already flushed before and then removed from memory during the flush caused by `/flush` handler. Immediate flush now doesn't cause chunks to be flushed again. Samples received during flush triggered via `/flush` handler are no longer discarded. #2778
* [BUGFIX] Prometheus upgraded. #2849
* Fixed unknown symbol error during head compaction
* [BUGFIX] Fix panic when using cassandra as store for both index and delete requests. #2774

## 1.2.0 / 2020-07-01

Expand Down
60 changes: 60 additions & 0 deletions pkg/chunk/aws/dynamodb_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package aws

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)

type dynamoDBMetrics struct {
dynamoRequestDuration *instrument.HistogramCollector
dynamoConsumedCapacity *prometheus.CounterVec
dynamoThrottled *prometheus.CounterVec
dynamoFailures *prometheus.CounterVec
dynamoDroppedRequests *prometheus.CounterVec
dynamoQueryPagesCount prometheus.Histogram
}

func newMetrics(r prometheus.Registerer) *dynamoDBMetrics {
m := dynamoDBMetrics{}

m.dynamoRequestDuration = instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_request_duration_seconds",
Help: "Time spent doing DynamoDB requests.",

// DynamoDB latency seems to range from a few ms to a several seconds and is
// important. So use 9 buckets from 1ms to just over 1 minute (65s).
Buckets: prometheus.ExponentialBuckets(0.001, 4, 9),
}, []string{"operation", "status_code"}))
m.dynamoConsumedCapacity = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_consumed_capacity_total",
Help: "The capacity units consumed by operation.",
}, []string{"operation", tableNameLabel})
m.dynamoThrottled = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_throttled_total",
Help: "The total number of throttled events.",
}, []string{"operation", tableNameLabel})
m.dynamoFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_failures_total",
Help: "The total number of errors while storing chunks to the chunk store.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
m.dynamoDroppedRequests = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_dropped_requests_total",
Help: "The total number of requests which were dropped due to errors encountered from dynamo.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
m.dynamoQueryPagesCount = promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_query_pages_count",
Help: "Number of pages per query.",
// Most queries will have one page, however this may increase with fuzzy
// metric names.
Buckets: prometheus.ExponentialBuckets(1, 4, 6),
})

return &m
}
104 changes: 29 additions & 75 deletions pkg/chunk/aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,55 +50,6 @@ const (
validationException = "ValidationException"
)

var (
dynamoRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_request_duration_seconds",
Help: "Time spent doing DynamoDB requests.",

// DynamoDB latency seems to range from a few ms to a several seconds and is
// important. So use 9 buckets from 1ms to just over 1 minute (65s).
Buckets: prometheus.ExponentialBuckets(0.001, 4, 9),
}, []string{"operation", "status_code"}))
dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_consumed_capacity_total",
Help: "The capacity units consumed by operation.",
}, []string{"operation", tableNameLabel})
dynamoThrottled = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_throttled_total",
Help: "The total number of throttled events.",
}, []string{"operation", tableNameLabel})
dynamoFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_failures_total",
Help: "The total number of errors while storing chunks to the chunk store.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
dynamoDroppedRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_dropped_requests_total",
Help: "The total number of requests which were dropped due to errors encountered from dynamo.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
dynamoQueryPagesCount = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_query_pages_count",
Help: "Number of pages per query.",
// Most queries will have one page, however this may increase with fuzzy
// metric names.
Buckets: prometheus.ExponentialBuckets(1, 4, 6),
})
)

func init() {
dynamoRequestDuration.Register()
prometheus.MustRegister(dynamoConsumedCapacity)
prometheus.MustRegister(dynamoThrottled)
prometheus.MustRegister(dynamoFailures)
prometheus.MustRegister(dynamoQueryPagesCount)
prometheus.MustRegister(dynamoDroppedRequests)
}

// DynamoDBConfig specifies config for a DynamoDB database.
type DynamoDBConfig struct {
DynamoDB flagext.URLValue `yaml:"dynamodb_url"`
Expand Down Expand Up @@ -148,20 +99,22 @@ type dynamoDBStorageClient struct {
// of boilerplate.
batchGetItemRequestFn func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest
batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest

metrics *dynamoDBMetrics
}

// NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient.
func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
return newDynamoDBStorageClient(cfg, schemaCfg)
func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.IndexClient, error) {
return newDynamoDBStorageClient(cfg, schemaCfg, reg)
}

// NewDynamoDBChunkClient makes a new DynamoDB-backed chunk.Client.
func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.Client, error) {
return newDynamoDBStorageClient(cfg, schemaCfg)
func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.Client, error) {
return newDynamoDBStorageClient(cfg, schemaCfg, reg)
}

// newDynamoDBStorageClient makes a new DynamoDB-backed IndexClient and chunk.Client.
func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (*dynamoDBStorageClient, error) {
func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (*dynamoDBStorageClient, error) {
dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL)
if err != nil {
return nil, err
Expand All @@ -172,6 +125,7 @@ func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig)
schemaCfg: schemaCfg,
DynamoDB: dynamoDB,
writeThrottle: rate.NewLimiter(rate.Limit(cfg.ThrottleLimit), dynamoDBMaxWriteBatchSize),
metrics: newMetrics(reg),
}
client.batchGetItemRequestFn = client.batchGetItemRequest
client.batchWriteItemRequestFn = client.batchWriteItemRequest
Expand All @@ -187,9 +141,9 @@ func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch {
return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{})
}

func logWriteRetry(ctx context.Context, unprocessed dynamoDBWriteBatch) {
func logWriteRetry(unprocessed dynamoDBWriteBatch, metrics *dynamoDBMetrics) {
for table, reqs := range unprocessed {
dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs)))
metrics.dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs)))
for _, req := range reqs {
item := req.PutRequest.Item
var hash, rnge string
Expand Down Expand Up @@ -225,25 +179,25 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})

err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return request.Send()
})
resp := request.Data().(*dynamodb.BatchWriteItemOutput)

for _, cc := range resp.ConsumedCapacity {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName).
a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName).
Add(float64(*cc.CapacityUnits))
}

if err != nil {
for tableName := range requests {
recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem")
recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics)
}

// If we get provisionedThroughputExceededException, then no items were processed,
// so back off and retry all.
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
logWriteRetry(ctx, requests)
logWriteRetry(requests, a.metrics)
unprocessed.TakeReqs(requests, -1)
_ = a.writeThrottle.WaitN(ctx, len(requests))
backoff.Wait()
Expand All @@ -256,7 +210,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
// recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context
// to determine if a request was dropped (or not)
for tableName := range requests {
dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc()
a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc()
}
continue
}
Expand All @@ -268,7 +222,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
// If there are unprocessed items, retry those items.
unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems)
if len(unprocessedItems) > 0 {
logWriteRetry(ctx, unprocessedItems)
logWriteRetry(unprocessedItems, a.metrics)
_ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len())
unprocessed.TakeReqs(unprocessedItems, -1)
}
Expand Down Expand Up @@ -329,11 +283,11 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery

pageCount := 0
defer func() {
dynamoQueryPagesCount.Observe(float64(pageCount))
a.metrics.dynamoQueryPagesCount.Observe(float64(pageCount))
}()

retryer := newRetryer(ctx, a.cfg.backoffConfig)
err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error {
err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error {
if sp := ot.SpanFromContext(innerCtx); sp != nil {
sp.SetTag("tableName", query.TableName)
sp.SetTag("hashValue", query.HashValue)
Expand All @@ -345,12 +299,12 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery
}

if cc := output.ConsumedCapacity; cc != nil {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName).
a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName).
Add(float64(*cc.CapacityUnits))
}

return callback(query, &dynamoDBReadResponse{items: output.Items})
}, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages"))
}, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics))
})
if err != nil {
return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName)
Expand Down Expand Up @@ -481,19 +435,19 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})

err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return request.Send()
})
response := request.Data().(*dynamodb.BatchGetItemOutput)

for _, cc := range response.ConsumedCapacity {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName).
a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName).
Add(float64(*cc.CapacityUnits))
}

if err != nil {
for tableName := range requests {
recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages")
recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics)
}

// If we get provisionedThroughputExceededException, then no items were processed,
Expand All @@ -509,7 +463,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c
// recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context
// to determine if a request was dropped (or not)
for tableName := range requests {
dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc()
a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc()
}
continue
}
Expand Down Expand Up @@ -792,21 +746,21 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) {
}
}

func withErrorHandler(tableName, operation string) func(req *request.Request) {
func withErrorHandler(tableName, operation string, metrics *dynamoDBMetrics) func(req *request.Request) {
return func(req *request.Request) {
req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) {
if req.Error != nil {
recordDynamoError(tableName, req.Error, operation)
recordDynamoError(tableName, req.Error, operation, metrics)
}
})
}
}

func recordDynamoError(tableName string, err error, operation string) {
func recordDynamoError(tableName string, err error, operation string, metrics *dynamoDBMetrics) {
if awsErr, ok := err.(awserr.Error); ok {
dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1))
metrics.dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1))
} else {
dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1))
metrics.dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1))
}
}

Expand Down
Loading