Skip to content

[WIP] Query the store for Labels APIs #3478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
pkg/ruler/ruler.pb.go: pkg/ruler/rules/rules.proto
pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto
pkg/scheduler/schedulerpb/scheduler.pb.go: pkg/scheduler/schedulerpb/scheduler.proto
pkg/storegateway/storegatewaypb/gateway.pb.go: pkg/storegateway/storegatewaypb/gateway.proto
pkg/chunk/grpc/grpc.pb.go: pkg/chunk/grpc/grpc.proto
tools/blocksconvert/scheduler.pb.go: tools/blocksconvert/scheduler.proto

Expand Down
141 changes: 139 additions & 2 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -293,8 +294,7 @@ func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
}

func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
// Cortex doesn't use this. It will ask ingesters for metadata.
return nil, nil, errors.New("not implemented")
return q.labelValues(name)
}

func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
Expand All @@ -306,6 +306,134 @@ func (q *blocksStoreQuerier) Close() error {
return nil
}

func (q *blocksStoreQuerier) labelValues(name string) ([]string, storage.Warnings, error) {
spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.labelValues")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT

// If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until
// now - queryStoreAfter, because the most recent time range is covered by ingesters. This
// optimization is particularly important for the blocks storage because can be used to skip
// querying most recent not-compacted-yet blocks from the storage.
if q.queryStoreAfter > 0 {
now := time.Now()
origMaxT := maxT
maxT = util.Min64(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter)))

if origMaxT != maxT {
level.Debug(spanLog).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT)
}

if maxT < minT {
q.metrics.storesHit.Observe(0)
level.Debug(spanLog).Log("msg", "empty query time range after max time manipulation")
return nil, nil, nil
}
}

// Find the list of blocks we need to query given the time range.
knownMetas, _, err := q.finder.GetBlocks(q.userID, minT, maxT)
if err != nil {
return nil, nil, err
}

if len(knownMetas) == 0 {
q.metrics.storesHit.Observe(0)
level.Debug(spanLog).Log("msg", "no blocks found")
return nil, nil, nil
}

level.Debug(spanLog).Log("msg", "found blocks to query", "expected", BlockMetas(knownMetas).String())

// At the beginning the list of blocks to query are all known blocks.
remainingBlocks := getULIDsFromBlockMetas(knownMetas)
// Find the set of store-gateway instances having the blocks. The exclude parameter is the
// map of blocks queried so far, with the list of store-gateway addresses for each block.
clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, nil)
if err != nil {
return nil, nil, err
}
level.Debug(spanLog).Log("msg", "found store-gateway instances to query", "num instances", len(clients))

// Fetch series from stores. If an error occur we do not retry because retries
// are only meant to cover missing blocks.
lnames, warnings, err := q.fetchLabelValuesFromStores(spanCtx, name, clients, minT, maxT)
if err != nil {
return nil, nil, err
}
level.Debug(spanLog).Log("msg", "received labels from all store-gateways")

return lnames, warnings, nil
}
func (q *blocksStoreQuerier) fetchLabelValuesFromStores(
ctx context.Context,
label string,
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
) ([]string, storage.Warnings, error) {
var (
reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID)
g, gCtx = errgroup.WithContext(reqCtx)
mtx = sync.Mutex{}
labelValues = map[string]struct{}{}
warnings = storage.Warnings(nil)
spanLog = spanlogger.FromContext(ctx)
)

// Concurrently fetch series from all clients.
for c, blockIDs := range clients {
// Change variables scope since it will be used in a goroutine.
c := c
blockIDs := blockIDs

g.Go(func() error {
valuesResp, err := c.LabelValues(gCtx, &storepb.LabelValuesRequest{
Label: label,
PartialResponseDisabled: true,
Start: minT,
End: maxT,
})
if err != nil {
return err
}

level.Debug(spanLog).Log("msg", "received label values from store-gateway",
"instance", c,
"num label values", len(valuesResp.Values),
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
)

// Store the result.
mtx.Lock()
for _, lval := range valuesResp.Values {
labelValues[lval] = struct{}{}
}

for _, warn := range valuesResp.Warnings {
warnings = append(warnings, errors.New(warn))
}
mtx.Unlock()

return nil
})
}

// Wait until all client requests complete.
if err := g.Wait(); err != nil {
return nil, nil, err
}

finalValues := make([]string, 0, len(labelValues))
for val := range labelValues {
finalValues = append(finalValues, val)
}
sort.Strings(finalValues)

return finalValues, warnings, nil
}

func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.selectSorted")
defer spanLog.Span.Finish()
Expand Down Expand Up @@ -580,6 +708,15 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip
}, nil
}

func createLabelValuesRequest(minT, maxT int64, name string) *storepb.LabelValuesRequest {
return &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: true,
Start: minT,
End: maxT,
}
}

func convertULIDsToString(ids []ulid.ULID) []string {
res := make([]string, len(ids))
for idx, id := range ids {
Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,14 @@ func (m *storeGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesR
return seriesClient, nil
}

func (m *storeGatewayClientMock) LabelNames(context.Context, *storepb.LabelNamesRequest, ...grpc.CallOption) (*storepb.LabelNamesResponse, error) {
return nil, nil
}

func (m *storeGatewayClientMock) LabelValues(context.Context, *storepb.LabelValuesRequest, ...grpc.CallOption) (*storepb.LabelValuesResponse, error) {
return nil, nil
}

func (m *storeGatewayClientMock) RemoteAddress() string {
return m.remoteAddr
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,16 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat

// LabelsValue implements storage.Querier.
func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) {
if q.queryStoreForLabels {
return q.metadataQuerier.LabelValues(name)
}
return q.metadataQuerier.LabelValues(name)
}

func (q querier) LabelNames() ([]string, storage.Warnings, error) {
if q.queryStoreForLabels {
return q.metadataQuerier.LabelNames()
}
return q.metadataQuerier.LabelNames()
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/store_gateway_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,11 @@ type mockStoreGatewayServer struct{}
func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error {
return nil
}

func (m *mockStoreGatewayServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, nil
}

func (m *mockStoreGatewayServer) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
return nil, nil
}
36 changes: 36 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,42 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
})
}

// LabelNames implements the Storegateway proto service.
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")
defer spanLog.Span.Finish()

userID := getUserIDFromGRPCContext(spanCtx)
if userID == "" {
return nil, fmt.Errorf("no userID")
}

store := u.getStore(userID)
if store == nil {
return nil, nil
}

return store.LabelNames(ctx, req)
}

// LabelValues implements the Storegateway proto service.
func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")
defer spanLog.Span.Finish()

userID := getUserIDFromGRPCContext(spanCtx)
if userID == "" {
return nil, fmt.Errorf("no userID")
}

store := u.getStore(userID)
if store == nil {
return nil, nil
}

return store.LabelValues(ctx, req)
}

// scanUsers in the bucket and return the list of found users. If an error occurs while
// iterating the bucket, it may return both an error and a subset of the users in the bucket.
func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,16 @@ func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.Sto
return g.stores.Series(req, srv)
}

// LabelNames implements the Storegateway proto service.
func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return g.stores.LabelNames(ctx, req)
}

// LabelValues implements the Storegateway proto service.
func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
return g.stores.LabelValues(ctx, req)
}

func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.IngesterDesc) (ring.IngesterState, ring.Tokens) {
// When we initialize the store-gateway instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
Expand Down
Loading