Skip to content

Do not prefix Thanos objstore client metrics and fix instrumentation in querier #2568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
* [CHANGE] Slow query log has a different output now. Previously used `url` field has been replaced with `host` and `path`, and query parameters are logged as individual log fields with `qs_` prefix. #2520
* [CHANGE] Experimental WAL: WAL and checkpoint compression is now disabled. #2436
* [CHANGE] Update in dependency `go-kit/kit` from `v0.9.0` to `v0.10.0`. HTML escaping disabled in JSON Logger. #2535
* [CHANGE] Experimental TSDB: Removed `cortex_<service>_` prefix from Thanos objstore metrics and added `component` label to distinguish which Cortex component is doing API calls to the object storage when running in single-binary mode: #2568
- `cortex_<service>_thanos_objstore_bucket_operations_total` renamed to `thanos_objstore_bucket_operations_total{component="<name>"}`
- `cortex_<service>_thanos_objstore_bucket_operation_failures_total` renamed to `thanos_objstore_bucket_operation_failures_total{component="<name>"}`
- `cortex_<service>_thanos_objstore_bucket_operation_duration_seconds` renamed to `thanos_objstore_bucket_operation_duration_seconds{component="<name>"}`
- `cortex_<service>_thanos_objstore_bucket_last_successful_upload_time` renamed to `thanos_objstore_bucket_last_successful_upload_time{component="<name>"}`
* [FEATURE] Ruler: The `-ruler.evaluation-delay` flag was added to allow users to configure a default evaluation delay for all rules in cortex. The default value is 0 which is the current behavior. #2423
* [FEATURE] Experimental: Added a new object storage client for OpenStack Swift. #2440
* [FEATURE] Update in dependency `weaveworks/common`. TLS config options added to the Server. #2535
Expand Down
10 changes: 10 additions & 0 deletions development/tsdb-blocks-storage-s3/config/grafana-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ prometheus:
- targets: ['query-frontend:8007']
labels:
container: 'query-frontend'
- job_name: cortex-store-gateway-1
static_configs:
- targets: ['store-gateway-1:8008']
labels:
container: 'store-gateway-1'
- job_name: cortex-store-gateway-2
static_configs:
- targets: ['store-gateway-2:8009']
labels:
container: 'store-gateway-2'

remote_write:
- url: http://distributor:8001/api/prom/push
10 changes: 10 additions & 0 deletions development/tsdb-blocks-storage-s3/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ scrape_configs:
- targets: ['query-frontend:8007']
labels:
container: 'query-frontend'
- job_name: cortex-store-gateway-1
static_configs:
- targets: ['store-gateway-1:8008']
labels:
container: 'store-gateway-1'
- job_name: cortex-store-gateway-2
static_configs:
- targets: ['store-gateway-2:8009']
labels:
container: 'store-gateway-2'

remote_write:
- url: http://distributor:8001/api/prom/push
4 changes: 2 additions & 2 deletions integration/e2ecortex/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ type S3Client struct {
}

func NewS3Client(cfg s3.Config) (*S3Client, error) {
writer, err := s3.NewBucketClient(cfg, "test", log.NewNopLogger())
writer, err := s3.NewBucketClient(cfg, "test", log.NewNopLogger(), nil)
if err != nil {
return nil, err
}

reader, err := s3.NewBucketReaderClient(cfg, "test", log.NewNopLogger())
reader, err := s3.NewBucketReaderClient(cfg, "test", log.NewNopLogger(), nil)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,11 @@ type Compactor struct {
// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.Config, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
createBucketClientAndTsdbCompactor := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg, "compactor", logger)
bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg, "compactor", logger, registerer)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create the bucket client")
}

if registerer != nil {
bucketClient = objstore.BucketWithMetrics( /* bucket label value */ "", bucketClient, prometheus.WrapRegistererWithPrefix("cortex_compactor_", registerer))
}

compactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMilliseconds(), downsample.NewPool())
return bucketClient, compactor, err
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,11 @@ type TSDBState struct {
// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
util.WarnExperimentalUse("Blocks storage engine")
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "cortex", util.Logger)
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "ingester", util.Logger, registerer)
if err != nil {
return nil, errors.Wrap(err, "failed to create the bucket client")
}

if registerer != nil {
bucketClient = objstore.BucketWithMetrics( /* bucket label value */ "", bucketClient, prometheus.WrapRegistererWithPrefix("cortex_ingester_", registerer))
}

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
Expand Down
7 changes: 1 addition & 6 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/user"
Expand All @@ -33,15 +32,11 @@ type BlockQueryable struct {
// NewBlockQueryable returns a client to query a block store
func NewBlockQueryable(cfg tsdb.Config, logLevel logging.Level, registerer prometheus.Registerer) (*BlockQueryable, error) {
util.WarnExperimentalUse("Blocks storage engine")
bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "cortex-bucket-stores", util.Logger)
bucketClient, err := tsdb.NewBucketClient(context.Background(), cfg, "querier", util.Logger, registerer)
if err != nil {
return nil, err
}

if registerer != nil {
bucketClient = objstore.BucketWithMetrics( /* bucket label value */ "", bucketClient, prometheus.WrapRegistererWithPrefix("cortex_querier_", registerer))
}

us, err := NewBucketStoresService(cfg, bucketClient, logLevel, util.Logger, registerer)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func prepareBlocksScanner(t *testing.T, cfg BlocksScannerConfig) (*BlocksScanner
storageDir, err := ioutil.TempDir(os.TempDir(), "blocks-scanner-test-storage")
require.NoError(t, err)

bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}, "test", nil)
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, reg pro
func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.Config, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
var stores BlocksStoreSet

bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), storageCfg, "querier", logger)
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), storageCfg, "querier", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "failed to create bucket client")
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/storage/backend/azure/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package azure

import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/azure"
yaml "gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/storage/backend/instrumentation"
)

func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
bucketConfig := azure.Config{
StorageAccountName: cfg.StorageAccountName,
StorageAccountKey: cfg.StorageAccountKey.Value,
Expand All @@ -23,5 +26,10 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke
return nil, err
}

return azure.NewBucket(logger, serialized, name)
client, err := azure.NewBucket(logger, serialized, name)
if err != nil {
return nil, err
}

return instrumentation.BucketWithMetrics(client, name, reg), nil
}
12 changes: 10 additions & 2 deletions pkg/storage/backend/filesystem/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package filesystem

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/filesystem"

"github.com/cortexproject/cortex/pkg/storage/backend/instrumentation"
)

// NewBucketClient creates a new filesystem bucket client
func NewBucketClient(cfg Config) (objstore.Bucket, error) {
return filesystem.NewBucket(cfg.Directory)
func NewBucketClient(cfg Config, name string, reg prometheus.Registerer) (objstore.Bucket, error) {
client, err := filesystem.NewBucket(cfg.Directory)
if err != nil {
return nil, err
}

return instrumentation.BucketWithMetrics(client, name, reg), nil
}
12 changes: 10 additions & 2 deletions pkg/storage/backend/gcs/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/gcs"
yaml "gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/storage/backend/instrumentation"
)

// NewBucketClient creates a new GCS bucket client
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
bucketConfig := gcs.Config{
Bucket: cfg.BucketName,
ServiceAccount: cfg.ServiceAccount.Value,
Expand All @@ -23,5 +26,10 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo
return nil, err
}

return gcs.NewBucket(ctx, logger, serialized, name)
client, err := gcs.NewBucket(ctx, logger, serialized, name)
if err != nil {
return nil, err
}

return instrumentation.BucketWithMetrics(client, name, reg), nil
}
17 changes: 17 additions & 0 deletions pkg/storage/backend/instrumentation/bucket_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package instrumentation

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"
)

func BucketWithMetrics(bucketClient objstore.Bucket, component string, reg prometheus.Registerer) objstore.Bucket {
if reg == nil {
return bucketClient
}

return objstore.BucketWithMetrics(
"", // bucket label value
bucketClient,
prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg))
}
21 changes: 17 additions & 4 deletions pkg/storage/backend/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,31 @@ package s3

import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/s3"

"github.com/cortexproject/cortex/pkg/storage/backend/instrumentation"
)

// NewBucketClient creates a new S3 bucket client
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
return s3.NewBucketWithConfig(logger, newS3Config(cfg), name)
func NewBucketClient(cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
client, err := s3.NewBucketWithConfig(logger, newS3Config(cfg), name)
if err != nil {
return nil, err
}

return instrumentation.BucketWithMetrics(client, name, reg), nil
}

// NewBucketReaderClient creates a new S3 bucket client
func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore.BucketReader, error) {
return s3.NewBucketWithConfig(logger, newS3Config(cfg), name)
func NewBucketReaderClient(cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.BucketReader, error) {
client, err := s3.NewBucketWithConfig(logger, newS3Config(cfg), name)
if err != nil {
return nil, err
}

return instrumentation.BucketWithMetrics(client, name, reg), nil
}

func newS3Config(cfg Config) s3.Config {
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/tsdb/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/backend/azure"
Expand All @@ -13,16 +14,16 @@ import (
)

// NewBucketClient creates a new bucket client based on the configured backend
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
switch cfg.Backend {
case BackendS3:
return s3.NewBucketClient(cfg.S3, name, logger)
return s3.NewBucketClient(cfg.S3, name, logger, reg)
case BackendGCS:
return gcs.NewBucketClient(ctx, cfg.GCS, name, logger)
return gcs.NewBucketClient(ctx, cfg.GCS, name, logger, reg)
case BackendAzure:
return azure.NewBucketClient(cfg.Azure, name, logger)
return azure.NewBucketClient(cfg.Azure, name, logger, reg)
case BackendFilesystem:
return filesystem.NewBucketClient(cfg.Filesystem)
return filesystem.NewBucketClient(cfg.Filesystem, name, reg)
default:
return nil, errUnsupportedStorageBackend
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestNewBucketClient(t *testing.T) {
require.NoError(t, err)

// Instance a new bucket client from the config
bucketClient, err := NewBucketClient(context.Background(), cfg, "test", util.Logger)
bucketClient, err := NewBucketClient(context.Background(), cfg, "test", util.Logger, nil)
require.Equal(t, testData.expectedErr, err)

if testData.expectedErr == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestBucketStores_InitialSync(t *testing.T) {
generateStorageBlock(t, storageDir, userID, metricName, 10, 100)
}

bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}, "test", nil)
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
storageDir, err := ioutil.TempDir(os.TempDir(), "storage-*")
require.NoError(t, err)

bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}, "test", nil)
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
Expand Down
6 changes: 1 addition & 5 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,10 @@ func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.
}

func createBucketClient(cfg cortex_tsdb.Config, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg, "cortex-bucket-stores", logger)
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg, "store-gateway", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "create bucket client")
}

if reg != nil {
bucketClient = objstore.BucketWithMetrics( /* bucket label value */ "", bucketClient, prometheus.WrapRegistererWithPrefix("cortex_storegateway_", reg))
}

return bucketClient, nil
}
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {
require.NoError(t, mockTSDB(path.Join(storageDir, "user-1"), 24, now.Add(-24*time.Hour).Unix()*1000, now.Unix()*1000))
require.NoError(t, mockTSDB(path.Join(storageDir, "user-2"), 24, now.Add(-24*time.Hour).Unix()*1000, now.Unix()*1000))

bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}, "test", nil)
require.NoError(t, err)

tests := map[string]struct {
Expand Down