Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#4147 added tenant_id tag to tracing spans #4186

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* `cortex_alertmanager_state_persist_failed_total`
* [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=<n>` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124
* [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151
* [ENHANCEMENT] Added `tenant_ids` tag to tracing spans #4147
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
* [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176

Expand Down
15 changes: 13 additions & 2 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

const (
Expand Down Expand Up @@ -96,6 +97,9 @@ type mergeQueryable struct {
// Querier returns a new mergeQuerier, which aggregates results from multiple
// underlying queriers into a single result.
func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) {
// TODO: it's necessary to think how to override context inside querier
// to mark spans created inside querier as child of a span created inside
// methods of merged querier.
Comment on lines +100 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to create a span during the creation of the queriers and the finish the span once Close() of the resulting mergeQuerier is called.

I have played with that here:

6ebd273

That's how that looks like:
scrn-2021-05-17-12-46-02

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels at least like it can group the same tenants nicely together. Obvisouly for a real fix we need context propagation as a part of LabelValues,LabelName, Select.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it will help to group spans by tenant_id . thanks, i will implement it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

however, it might confuse a little bit because all spans mergeQuerier.NewQuerier will take almost similar time for each tenant. but inside this span we will see real picture.
in current implementation it looks less confusing, i think.
image

what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I agree partly with your point, but I am not too sure, how to fix this properly.

I guess part of the problem is that the a query is only finished once Close() is called, which happens at the same time for underlying queriers.

I think the most confusing part of not grouping spans by tenant_id is seeing all (and even unrelated) spans together. I think until we implement context passing in those methods, we won't be able to solve that properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could instantiate the queriers (call the callback inside LabelValues, etc.) and the right context will be applied.

ids, queriers, err := m.callback(ctx, mint, maxt)
if err != nil {
return nil, err
Expand Down Expand Up @@ -133,6 +137,8 @@ type mergeQuerier struct {
// For the label "original_" + `idLabelName it will return all the values
// of the underlying queriers for `idLabelName`.
func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelValues")
defer log.Span.Finish()
if name == m.idLabelName {
return m.ids, nil, nil
}
Expand All @@ -143,7 +149,8 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
name = m.idLabelName
}

return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
return m.mergeDistinctStringSlice(func(ctx context.Context,
q storage.Querier) ([]string, storage.Warnings, error) {
return q.LabelValues(name, matchers...)
})
}
Expand All @@ -152,6 +159,8 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
// queriers. It also adds the `idLabelName` and if present in the original
// results the original `idLabelName`.
func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelNames")
defer log.Span.Finish()
labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
return q.LabelNames()
})
Expand Down Expand Up @@ -272,6 +281,8 @@ type selectJob struct {
// matching. The forwarded labelSelector is not containing those that operate
// on `idLabelName`.
func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
log, ctx := spanlogger.New(m.ctx, "mergeQuerier.Select")
defer log.Span.Finish()
matchedValues, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...)
var jobs = make([]interface{}, len(matchedValues))
var seriesSets = make([]storage.SeriesSet, len(matchedValues))
Expand Down Expand Up @@ -305,7 +316,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
76 changes: 75 additions & 1 deletion pkg/querier/tenantfederation/merge_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
Expand All @@ -18,6 +21,7 @@ import (

"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

const (
Expand All @@ -37,7 +41,11 @@ func (m *mockTenantQueryableWithFilter) Querier(ctx context.Context, _, _ int64)
return nil, err
}

q := mockTenantQuerier{tenant: tenantIDs[0], extraLabels: m.extraLabels}
q := mockTenantQuerier{
tenant: tenantIDs[0],
extraLabels: m.extraLabels,
ctx: ctx,
}

// set warning if exists
if m.warningsByTenant != nil {
Expand Down Expand Up @@ -66,6 +74,7 @@ type mockTenantQuerier struct {

warnings storage.Warnings
queryErr error
ctx context.Context
}

func (m mockTenantQuerier) matrix() model.Matrix {
Expand Down Expand Up @@ -135,6 +144,8 @@ func (m *mockSeriesSet) Warnings() storage.Warnings {
}

func (m mockTenantQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
log, _ := spanlogger.New(m.ctx, "mockTenantQuerier.select")
defer log.Span.Finish()
var matrix model.Matrix

for _, s := range m.matrix() {
Expand Down Expand Up @@ -478,3 +489,66 @@ func TestSetLabelsRetainExisting(t *testing.T) {
assert.Equal(t, tc.expected, setLabelsRetainExisting(tc.labels, tc.additionalLabels...))
}
}

func TestTracingMergeQueryable(t *testing.T) {
mockTracer := mocktracer.New()
opentracing.SetGlobalTracer(mockTracer)
ctx := user.InjectOrgID(context.Background(), "team-a|team-b")

// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())
filter := mockTenantQueryableWithFilter{}
q := NewQueryable(&filter, false)
// retrieve querier if set
querier, err := q.Querier(ctx, mint, maxt)
require.NoError(t, err)

seriesSet := querier.Select(true, &storage.SelectHints{Start: mint,
End: maxt})

require.NoError(t, seriesSet.Err())
spans := mockTracer.FinishedSpans()
assertSpanExist(t, spans, "mergeQuerier.Select", expectedTag{spanlogger.TenantIDTagName,
[]string{"team-a", "team-b"}})
assertSpanExist(t, spans, "mockTenantQuerier.select", expectedTag{spanlogger.TenantIDTagName,
[]string{"team-a"}})
assertSpanExist(t, spans, "mockTenantQuerier.select", expectedTag{spanlogger.TenantIDTagName,
[]string{"team-b"}})
}

func assertSpanExist(t *testing.T,
actualSpans []*mocktracer.MockSpan,
name string,
tag expectedTag) {
for _, span := range actualSpans {
if span.OperationName == name && containsTags(span, tag) {
return
}
}
require.FailNow(t, "can not find span matching params",
"expected span with name `%v` and with "+
"tags %v to be present but it was not. actual spans: %+v",
name, tag, extractNameWithTags(actualSpans))
}

func extractNameWithTags(actualSpans []*mocktracer.MockSpan) []spanWithTags {
result := make([]spanWithTags, len(actualSpans))
for i, span := range actualSpans {
result[i] = spanWithTags{span.OperationName, span.Tags()}
}
return result
}

func containsTags(span *mocktracer.MockSpan, expectedTag expectedTag) bool {
return reflect.DeepEqual(span.Tag(expectedTag.key), expectedTag.values)
}

type spanWithTags struct {
name string
tags map[string]interface{}
}

type expectedTag struct {
key string
values []string
}
8 changes: 8 additions & 0 deletions pkg/util/spanlogger/spanlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ import (
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"

"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

type loggerCtxMarker struct{}

const (
TenantIDTagName = "tenant_ids"
)

var (
loggerCtxKey = &loggerCtxMarker{}
)
Expand All @@ -34,6 +39,9 @@ func New(ctx context.Context, method string, kvps ...interface{}) (*SpanLogger,
// retrieved with FromContext or FromContextWithFallback.
func NewWithLogger(ctx context.Context, l log.Logger, method string, kvps ...interface{}) (*SpanLogger, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, method)
if ids, _ := tenant.TenantIDs(ctx); len(ids) > 0 {
span.SetTag(TenantIDTagName, ids)
}
logger := &SpanLogger{
Logger: log.With(util_log.WithContext(ctx, l), "method", method),
Span: span,
Expand Down
24 changes: 24 additions & 0 deletions pkg/util/spanlogger/spanlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"testing"

"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
)

func TestSpanLogger_Log(t *testing.T) {
Expand Down Expand Up @@ -44,6 +47,27 @@ func TestSpanLogger_CustomLogger(t *testing.T) {
require.Equal(t, expect, logged)
}

func TestSpanCreatedWithTenantTag(t *testing.T) {
mockSpan := createSpan(user.InjectOrgID(context.Background(), "team-a"))

require.Equal(t, []string{"team-a"}, mockSpan.Tag(TenantIDTagName))
}

func TestSpanCreatedWithoutTenantTag(t *testing.T) {
mockSpan := createSpan(context.Background())

_, exist := mockSpan.Tags()[TenantIDTagName]
require.False(t, exist)
}

func createSpan(ctx context.Context) *mocktracer.MockSpan {
mockTracer := mocktracer.New()
opentracing.SetGlobalTracer(mockTracer)

logger, _ := New(ctx, "name")
return logger.Span.(*mocktracer.MockSpan)
}

type funcLogger func(keyvals ...interface{}) error

func (f funcLogger) Log(keyvals ...interface{}) error {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading