Skip to content

Commit 33f363e

Browse files
authored
Add metric and enhanced logging for query partial data (#6676)
* add metric Signed-off-by: Justin Jung <[email protected]> * improve log to include failed instance address Signed-off-by: Justin Jung <[email protected]> * more tests Signed-off-by: Justin Jung <[email protected]> * changelog Signed-off-by: Justin Jung <[email protected]> * include all partial errors in log Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]>
1 parent a582154 commit 33f363e

7 files changed

+139
-29
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* [ENHANCEMENT] Query Frontend: Add a `-frontend.enabled-ruler-query-stats` flag to configure whether to report the query stats log for queries coming from the Ruler. #6504
2020
* [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617
2121
* [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628
22+
* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676
2223
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
2324
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
2425
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/distributor/distributor.go

+6
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ type Distributor struct {
123123
ingesterAppendFailures *prometheus.CounterVec
124124
ingesterQueries *prometheus.CounterVec
125125
ingesterQueryFailures *prometheus.CounterVec
126+
ingesterPartialDataQueries prometheus.Counter
126127
replicationFactor prometheus.Gauge
127128
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
128129

@@ -375,6 +376,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
375376
Name: "distributor_ingester_query_failures_total",
376377
Help: "The total number of failed queries sent to ingesters.",
377378
}, []string{"ingester"}),
379+
ingesterPartialDataQueries: promauto.With(reg).NewCounter(prometheus.CounterOpts{
380+
Namespace: "cortex",
381+
Name: "distributor_ingester_partial_data_queries_total",
382+
Help: "The total number of queries sent to ingesters that may have returned partial data.",
383+
}),
378384
replicationFactor: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
379385
Namespace: "cortex",
380386
Name: "distributor_replication_factor",

pkg/distributor/query.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
330330
reqStats.AddFetchedSamples(uint64(resp.SamplesCount()))
331331

332332
if partialdata.IsPartialDataError(err) {
333-
level.Info(d.log).Log("msg", "returning partial data")
333+
level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error())
334+
d.ingesterPartialDataQueries.Inc()
334335
return resp, err
335336
}
336337

pkg/ring/replication_set.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ring
22

33
import (
44
"context"
5+
"fmt"
56
"sort"
67
"time"
78

@@ -70,39 +71,32 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults
7071
}(i, &r.Instances[i])
7172
}
7273

73-
trackerFailed := false
74-
cnt := 0
75-
76-
track:
77-
for !tracker.succeeded() {
74+
for !tracker.succeeded() && !tracker.finished() {
7875
select {
7976
case res := <-ch:
8077
tracker.done(res.instance, res.res, res.err)
8178
if res.err != nil {
82-
if tracker.failed() {
83-
if !partialDataEnabled || tracker.failedInAllZones() {
84-
return nil, res.err
85-
}
86-
trackerFailed = true
79+
if tracker.failed() && (!partialDataEnabled || tracker.failedCompletely()) {
80+
return nil, res.err
8781
}
8882

8983
// force one of the delayed requests to start
9084
if delay > 0 && r.MaxUnavailableZones == 0 {
9185
forceStart <- struct{}{}
9286
}
9387
}
94-
cnt++
95-
if cnt == len(r.Instances) {
96-
break track
97-
}
9888

9989
case <-ctx.Done():
10090
return nil, ctx.Err()
10191
}
10292
}
10393

104-
if partialDataEnabled && trackerFailed {
105-
return tracker.getResults(), partialdata.ErrPartialData
94+
if partialDataEnabled && tracker.failed() {
95+
finalErr := partialdata.ErrPartialData
96+
for _, partialErr := range tracker.getErrors() {
97+
finalErr = fmt.Errorf("%w: %w", finalErr, partialErr)
98+
}
99+
return tracker.getResults(), finalErr
106100
}
107101

108102
return tracker.getResults(), nil

pkg/ring/replication_set_test.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func TestReplicationSet_Do(t *testing.T) {
124124
expectedError error
125125
zoneResultsQuorum bool
126126
queryPartialData bool
127+
errStrContains []string
127128
}{
128129
{
129130
name: "max errors = 0, no errors no delay",
@@ -196,12 +197,13 @@ func TestReplicationSet_Do(t *testing.T) {
196197
},
197198
{
198199
name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)",
199-
instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}},
200+
instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}},
200201
f: failingFunctionOnZones("zone1", "zone2"),
201202
maxUnavailableZones: 1,
202203
queryPartialData: true,
203204
want: []interface{}{1},
204205
expectedError: partialdata.ErrPartialData,
206+
errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"},
205207
},
206208
{
207209
name: "with partial data enabled, should fail on instances failing in all zones",
@@ -264,7 +266,10 @@ func TestReplicationSet_Do(t *testing.T) {
264266
}
265267
got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f)
266268
if tt.expectedError != nil {
267-
assert.Equal(t, tt.expectedError, err)
269+
assert.ErrorIs(t, err, tt.expectedError)
270+
for _, str := range tt.errStrContains {
271+
assert.ErrorContains(t, err, str)
272+
}
268273
} else {
269274
assert.NoError(t, err)
270275
}

pkg/ring/replication_set_tracker.go

+40-6
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
11
package ring
22

3+
import "fmt"
4+
35
type replicationSetResultTracker interface {
46
// Signals an instance has done the execution, either successful (no error)
57
// or failed (with error). If successful, result will be recorded and can
68
// be accessed via getResults.
79
done(instance *InstanceDesc, result interface{}, err error)
810

11+
// Returns true if all instances are done executing
12+
finished() bool
13+
914
// Returns true if the minimum number of successful results have been received.
1015
succeeded() bool
1116

1217
// Returns true if the maximum number of failed executions have been reached.
1318
failed() bool
1419

15-
// Returns true if executions failed in all zones. Only relevant for zoneAwareResultTracker.
16-
failedInAllZones() bool
20+
// Returns true if executions failed in all instances or all zones.
21+
failedCompletely() bool
1722

1823
// Returns recorded results.
1924
getResults() []interface{}
25+
26+
// Returns errors
27+
getErrors() []error
2028
}
2129

2230
type defaultResultTracker struct {
@@ -25,6 +33,8 @@ type defaultResultTracker struct {
2533
numErrors int
2634
maxErrors int
2735
results []interface{}
36+
numInstances int
37+
errors []error
2838
}
2939

3040
func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker {
@@ -33,19 +43,26 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe
3343
numSucceeded: 0,
3444
numErrors: 0,
3545
maxErrors: maxErrors,
46+
errors: make([]error, 0, len(instances)),
3647
results: make([]interface{}, 0, len(instances)),
48+
numInstances: len(instances),
3749
}
3850
}
3951

40-
func (t *defaultResultTracker) done(_ *InstanceDesc, result interface{}, err error) {
52+
func (t *defaultResultTracker) done(instance *InstanceDesc, result interface{}, err error) {
4153
if err == nil {
4254
t.numSucceeded++
4355
t.results = append(t.results, result)
4456
} else {
57+
t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err))
4558
t.numErrors++
4659
}
4760
}
4861

62+
func (t *defaultResultTracker) finished() bool {
63+
return t.numSucceeded+t.numErrors == t.numInstances
64+
}
65+
4966
func (t *defaultResultTracker) succeeded() bool {
5067
return t.numSucceeded >= t.minSucceeded
5168
}
@@ -54,14 +71,18 @@ func (t *defaultResultTracker) failed() bool {
5471
return t.numErrors > t.maxErrors
5572
}
5673

57-
func (t *defaultResultTracker) failedInAllZones() bool {
58-
return false
74+
func (t *defaultResultTracker) failedCompletely() bool {
75+
return t.numInstances == t.numErrors
5976
}
6077

6178
func (t *defaultResultTracker) getResults() []interface{} {
6279
return t.results
6380
}
6481

82+
func (t *defaultResultTracker) getErrors() []error {
83+
return t.errors
84+
}
85+
6586
// zoneAwareResultTracker tracks the results per zone.
6687
// All instances in a zone must succeed in order for the zone to succeed.
6788
type zoneAwareResultTracker struct {
@@ -73,6 +94,8 @@ type zoneAwareResultTracker struct {
7394
numInstances int
7495
zoneResultsQuorum bool
7596
zoneCount int
97+
doneCount int
98+
errors []error
7699
}
77100

78101
func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int, zoneResultsQuorum bool) *zoneAwareResultTracker {
@@ -82,6 +105,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int
82105
maxUnavailableZones: maxUnavailableZones,
83106
numInstances: len(instances),
84107
zoneResultsQuorum: zoneResultsQuorum,
108+
errors: make([]error, 0, len(instances)),
85109
}
86110

87111
for _, instance := range instances {
@@ -97,6 +121,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int
97121
func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) {
98122
if err != nil {
99123
t.failuresByZone[instance.Zone]++
124+
t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err))
100125
} else {
101126
if _, ok := t.resultsPerZone[instance.Zone]; !ok {
102127
// If it is the first result in the zone, then total number of instances
@@ -107,6 +132,11 @@ func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}
107132
}
108133

109134
t.waitingByZone[instance.Zone]--
135+
t.doneCount++
136+
}
137+
138+
func (t *zoneAwareResultTracker) finished() bool {
139+
return t.doneCount == t.numInstances
110140
}
111141

112142
func (t *zoneAwareResultTracker) succeeded() bool {
@@ -128,7 +158,7 @@ func (t *zoneAwareResultTracker) failed() bool {
128158
return failedZones > t.maxUnavailableZones
129159
}
130160

131-
func (t *zoneAwareResultTracker) failedInAllZones() bool {
161+
func (t *zoneAwareResultTracker) failedCompletely() bool {
132162
failedZones := len(t.failuresByZone)
133163
return failedZones == t.zoneCount
134164
}
@@ -150,3 +180,7 @@ func (t *zoneAwareResultTracker) getResults() []interface{} {
150180
}
151181
return results
152182
}
183+
184+
func (t *zoneAwareResultTracker) getErrors() []error {
185+
return t.errors
186+
}

pkg/ring/replication_set_tracker_test.go

+73-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ring
22

33
import (
44
"errors"
5+
"fmt"
56
"testing"
67

78
"github.com/stretchr/testify/assert"
@@ -154,6 +155,50 @@ func TestDefaultResultTracker(t *testing.T) {
154155
assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults())
155156
},
156157
},
158+
"failedCompletely() should return true only if all instances have failed, regardless of max errors": {
159+
instances: []InstanceDesc{instance1, instance2, instance3},
160+
maxErrors: 1,
161+
run: func(t *testing.T, tracker *defaultResultTracker) {
162+
tracker.done(&instance1, nil, errors.New("test"))
163+
assert.False(t, tracker.succeeded())
164+
assert.False(t, tracker.failed())
165+
assert.False(t, tracker.failedCompletely())
166+
167+
tracker.done(&instance2, nil, errors.New("test"))
168+
assert.False(t, tracker.succeeded())
169+
assert.True(t, tracker.failed())
170+
assert.False(t, tracker.failedCompletely())
171+
172+
tracker.done(&instance3, nil, errors.New("test"))
173+
assert.False(t, tracker.succeeded())
174+
assert.True(t, tracker.failed())
175+
assert.True(t, tracker.failedCompletely())
176+
},
177+
},
178+
"finished() should return true only if all instances are done": {
179+
instances: []InstanceDesc{instance1, instance2},
180+
maxErrors: 1,
181+
run: func(t *testing.T, tracker *defaultResultTracker) {
182+
tracker.done(&instance1, nil, errors.New("test"))
183+
assert.False(t, tracker.finished())
184+
185+
tracker.done(&instance2, nil, errors.New("test"))
186+
assert.True(t, tracker.finished())
187+
},
188+
},
189+
"getErrors() should return list of all errors": {
190+
instances: []InstanceDesc{instance1, instance2},
191+
maxErrors: 1,
192+
run: func(t *testing.T, tracker *defaultResultTracker) {
193+
tracker.done(&instance1, nil, errors.New("test1"))
194+
err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1"))
195+
assert.ElementsMatch(t, []error{err1}, tracker.getErrors())
196+
197+
tracker.done(&instance2, nil, errors.New("test2"))
198+
err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2"))
199+
assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors())
200+
},
201+
},
157202
}
158203

159204
for testName, testCase := range tests {
@@ -399,27 +444,51 @@ func TestZoneAwareResultTracker(t *testing.T) {
399444
assert.False(t, tracker.failed())
400445
},
401446
},
402-
"failInAllZones should return true only if all zones have failed, regardless of max unavailable zones": {
447+
"failedCompletely() should return true only if all zones have failed, regardless of max unavailable zones": {
403448
instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6},
404449
maxUnavailableZones: 1,
405450
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
406451
// Zone-a
407452
tracker.done(&instance1, nil, errors.New("test"))
408453
assert.False(t, tracker.succeeded())
409454
assert.False(t, tracker.failed())
410-
assert.False(t, tracker.failedInAllZones())
455+
assert.False(t, tracker.failedCompletely())
411456

412457
// Zone-b
413458
tracker.done(&instance3, nil, errors.New("test"))
414459
assert.False(t, tracker.succeeded())
415460
assert.True(t, tracker.failed())
416-
assert.False(t, tracker.failedInAllZones())
461+
assert.False(t, tracker.failedCompletely())
417462

418463
// Zone-c
419464
tracker.done(&instance5, nil, errors.New("test"))
420465
assert.False(t, tracker.succeeded())
421466
assert.True(t, tracker.failed())
422-
assert.True(t, tracker.failedInAllZones())
467+
assert.True(t, tracker.failedCompletely())
468+
},
469+
},
470+
"finished() should return true only if all instances are done": {
471+
instances: []InstanceDesc{instance1, instance2},
472+
maxUnavailableZones: 1,
473+
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
474+
tracker.done(&instance1, nil, errors.New("test"))
475+
assert.False(t, tracker.finished())
476+
477+
tracker.done(&instance2, nil, errors.New("test"))
478+
assert.True(t, tracker.finished())
479+
},
480+
},
481+
"getErrors() should return list of all errors": {
482+
instances: []InstanceDesc{instance1, instance2},
483+
maxUnavailableZones: 1,
484+
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
485+
tracker.done(&instance1, nil, errors.New("test1"))
486+
err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1"))
487+
assert.ElementsMatch(t, []error{err1}, tracker.getErrors())
488+
489+
tracker.done(&instance2, nil, errors.New("test2"))
490+
err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2"))
491+
assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors())
423492
},
424493
},
425494
}

0 commit comments

Comments
 (0)