Skip to content

Commit 3a3cb4b

Browse files
1. added logic to add tag `tenant_id` to tracing spans if tenant is defined in context. 2. added additional tracing spans to merge_querier's methods Signed-off-by: Vladyslav Diachenko <[email protected]>
1 parent d25f6c7 commit 3a3cb4b

File tree

5 files changed

+116
-2
lines changed

5 files changed

+116
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151
2121
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
2222
* [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176
23+
* [ENHANCEMENT] Added `tenant_id` tag to tracing spans
2324

2425
## Blocksconvert
2526

pkg/querier/tenantfederation/merge_queryable.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/cortexproject/cortex/pkg/tenant"
1717
"github.com/cortexproject/cortex/pkg/util/concurrency"
18+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
1819
)
1920

2021
const (
@@ -96,6 +97,9 @@ type mergeQueryable struct {
9697
// Querier returns a new mergeQuerier, which aggregates results from multiple
9798
// underlying queriers into a single result.
9899
func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) {
100+
// TODO: it's necessary to think how to override context inside querier
101+
// to mark spans created inside querier as child of a span created inside
102+
// methods of merged querier.
99103
ids, queriers, err := m.callback(ctx, mint, maxt)
100104
if err != nil {
101105
return nil, err
@@ -133,6 +137,8 @@ type mergeQuerier struct {
133137
// For the label "original_" + `idLabelName it will return all the values
134138
// of the underlying queriers for `idLabelName`.
135139
func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
140+
log, _ := spanlogger.New(m.ctx, "mergeQuerier.labelValues")
141+
defer log.Span.Finish()
136142
if name == m.idLabelName {
137143
return m.ids, nil, nil
138144
}
@@ -152,6 +158,8 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
152158
// queriers. It also adds the `idLabelName` and if present in the original
153159
// results the original `idLabelName`.
154160
func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
161+
log, _ := spanlogger.New(m.ctx, "mergeQuerier.labelNames")
162+
defer log.Span.Finish()
155163
labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
156164
return q.LabelNames()
157165
})
@@ -272,6 +280,8 @@ type selectJob struct {
272280
// matching. The forwarded labelSelector is not containing those that operate
273281
// on `idLabelName`.
274282
func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
283+
log, ctx := spanlogger.New(m.ctx, "mergeQuerier.select")
284+
defer log.Span.Finish()
275285
matchedValues, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...)
276286
var jobs = make([]interface{}, len(matchedValues))
277287
var seriesSets = make([]storage.SeriesSet, len(matchedValues))
@@ -305,7 +315,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
305315
return nil
306316
}
307317

308-
err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
318+
err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
309319
if err != nil {
310320
return storage.ErrSeriesSet(err)
311321
}

pkg/querier/tenantfederation/merge_queryable_test.go

+72-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"testing"
1010
"time"
1111

12+
sl "github.com/cortexproject/cortex/pkg/util/spanlogger"
13+
"github.com/opentracing/opentracing-go"
14+
"github.com/opentracing/opentracing-go/mocktracer"
1215
"github.com/prometheus/common/model"
1316
"github.com/prometheus/prometheus/pkg/labels"
1417
"github.com/prometheus/prometheus/storage"
@@ -37,7 +40,11 @@ func (m *mockTenantQueryableWithFilter) Querier(ctx context.Context, _, _ int64)
3740
return nil, err
3841
}
3942

40-
q := mockTenantQuerier{tenant: tenantIDs[0], extraLabels: m.extraLabels}
43+
q := mockTenantQuerier{
44+
tenant: tenantIDs[0],
45+
extraLabels: m.extraLabels,
46+
ctx: ctx,
47+
}
4148

4249
// set warning if exists
4350
if m.warningsByTenant != nil {
@@ -66,6 +73,7 @@ type mockTenantQuerier struct {
6673

6774
warnings storage.Warnings
6875
queryErr error
76+
ctx context.Context
6977
}
7078

7179
func (m mockTenantQuerier) matrix() model.Matrix {
@@ -135,6 +143,8 @@ func (m *mockSeriesSet) Warnings() storage.Warnings {
135143
}
136144

137145
func (m mockTenantQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
146+
log, _ := sl.New(m.ctx, "mockTenantQuerier.select")
147+
defer log.Span.Finish()
138148
var matrix model.Matrix
139149

140150
for _, s := range m.matrix() {
@@ -478,3 +488,64 @@ func TestSetLabelsRetainExisting(t *testing.T) {
478488
assert.Equal(t, tc.expected, setLabelsRetainExisting(tc.labels, tc.additionalLabels...))
479489
}
480490
}
491+
492+
func TestTracingMergeQueryable(t *testing.T) {
493+
mockTracer := mocktracer.New()
494+
opentracing.SetGlobalTracer(mockTracer)
495+
ctx := user.InjectOrgID(context.Background(), "team-a|team-b")
496+
497+
// set a multi tenant resolver
498+
tenant.WithDefaultResolver(tenant.NewMultiResolver())
499+
filter := mockTenantQueryableWithFilter{}
500+
q := NewQueryable(&filter, false)
501+
// retrieve querier if set
502+
querier, err := q.Querier(ctx, mint, maxt)
503+
require.NoError(t, err)
504+
505+
seriesSet := querier.Select(true, &storage.SelectHints{Start: mint,
506+
End: maxt})
507+
508+
require.NoError(t, seriesSet.Err())
509+
spans := mockTracer.FinishedSpans()
510+
assertSpanExist(t, spans, "mergeQuerier.select", map[string]string{sl.TenantIdTagName: "team-a|team-b"})
511+
assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIdTagName: "team-a"})
512+
assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIdTagName: "team-b"})
513+
}
514+
515+
func assertSpanExist(t *testing.T,
516+
actualSpans []*mocktracer.MockSpan,
517+
name string,
518+
expectedTags map[string]string) {
519+
for _, span := range actualSpans {
520+
if span.OperationName == name && containsTags(span, expectedTags) {
521+
return
522+
}
523+
}
524+
require.FailNow(t, "can not find span matching params",
525+
"expected span with name `%v` and with "+
526+
"tags %v to be present but it was not. actual spans: %+v",
527+
name, expectedTags, extractNameWithTags(actualSpans))
528+
}
529+
530+
type spanWithTags struct {
531+
name string
532+
tags map[string]interface{}
533+
}
534+
535+
func extractNameWithTags(actualSpans []*mocktracer.MockSpan) []spanWithTags {
536+
result := make([]spanWithTags, len(actualSpans))
537+
for i, span := range actualSpans {
538+
result[i] = spanWithTags{span.OperationName, span.Tags()}
539+
}
540+
return result
541+
}
542+
543+
func containsTags(span *mocktracer.MockSpan,
544+
expectedTags map[string]string) bool {
545+
for k, expectedVal := range expectedTags {
546+
if span.Tag(k) == expectedVal {
547+
return true
548+
}
549+
}
550+
return false
551+
}

pkg/util/spanlogger/spanlogger.go

+8
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@ import (
88
opentracing "github.com/opentracing/opentracing-go"
99
"github.com/opentracing/opentracing-go/ext"
1010
otlog "github.com/opentracing/opentracing-go/log"
11+
"github.com/weaveworks/common/user"
1112

1213
util_log "github.com/cortexproject/cortex/pkg/util/log"
1314
)
1415

1516
type loggerCtxMarker struct{}
1617

18+
const (
19+
TenantIdTagName = "tenant_id"
20+
)
21+
1722
var (
1823
loggerCtxKey = &loggerCtxMarker{}
1924
)
@@ -34,6 +39,9 @@ func New(ctx context.Context, method string, kvps ...interface{}) (*SpanLogger,
3439
// retrieved with FromContext or FromContextWithFallback.
3540
func NewWithLogger(ctx context.Context, l log.Logger, method string, kvps ...interface{}) (*SpanLogger, context.Context) {
3641
span, ctx := opentracing.StartSpanFromContext(ctx, method)
42+
if orgId, err := user.ExtractOrgID(ctx); err == nil {
43+
span.SetTag(TenantIdTagName, orgId)
44+
}
3745
logger := &SpanLogger{
3846
Logger: log.With(util_log.WithContext(ctx, l), "method", method),
3947
Span: span,

pkg/util/spanlogger/spanlogger_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"testing"
66

77
"github.com/go-kit/kit/log"
8+
"github.com/opentracing/opentracing-go"
9+
"github.com/opentracing/opentracing-go/mocktracer"
810
"github.com/pkg/errors"
911
"github.com/stretchr/testify/require"
12+
"github.com/weaveworks/common/user"
1013
)
1114

1215
func TestSpanLogger_Log(t *testing.T) {
@@ -44,6 +47,27 @@ func TestSpanLogger_CustomLogger(t *testing.T) {
4447
require.Equal(t, expect, logged)
4548
}
4649

50+
func TestSpanCreatedWithTenantTag(t *testing.T) {
51+
mockSpan := createSpan(user.InjectOrgID(context.Background(), "team-a"))
52+
53+
require.Equal(t, "team-a", mockSpan.Tag(TenantIdTagName))
54+
}
55+
56+
func TestSpanCreatedWithoutTenantTag(t *testing.T) {
57+
mockSpan := createSpan(context.Background())
58+
59+
_, exist := mockSpan.Tags()[TenantIdTagName]
60+
require.False(t, exist)
61+
}
62+
63+
func createSpan(ctx context.Context) *mocktracer.MockSpan {
64+
mockTracer := mocktracer.New()
65+
opentracing.SetGlobalTracer(mockTracer)
66+
67+
logger, _ := New(ctx, "name")
68+
return logger.Span.(*mocktracer.MockSpan)
69+
}
70+
4771
type funcLogger func(keyvals ...interface{}) error
4872

4973
func (f funcLogger) Log(keyvals ...interface{}) error {

0 commit comments

Comments
 (0)