From ffb83317628a1b44ad45a4e5f7cf344f3e66ddf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 27 Jul 2020 18:06:32 +0200 Subject: [PATCH 1/6] Initial version. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/api/queryable.go | 116 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 pkg/api/queryable.go diff --git a/pkg/api/queryable.go b/pkg/api/queryable.go new file mode 100644 index 00000000000..23751df0220 --- /dev/null +++ b/pkg/api/queryable.go @@ -0,0 +1,116 @@ +package api + +import ( + "context" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +type errorTranslateQueryable struct { + q storage.SampleAndChunkQueryable +} + +func (e errorTranslateQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := e.q.Querier(ctx, mint, maxt) + return errorTranslateQuerier{q: q}, err +} + +func (e errorTranslateQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + q, err := e.q.ChunkQuerier(ctx, mint, maxt) + return errorTranslateChunkQuerier{q: q}, err +} + +func translateError(err error) error { + if err == nil { + return err + } + + // TODO:... +} + +type errorTranslateQuerier struct { + q storage.Querier +} + +func (e errorTranslateQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + values, warnings, err := e.q.LabelValues(name) + return values, warnings, translateError(err) +} + +func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) { + values, warnings, err := e.q.LabelNames() + return values, warnings, translateError(err) +} + +func (e errorTranslateQuerier) Close() error { + return translateError(e.q.Close()) +} + +func (e errorTranslateQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + s := e.q.Select(sortSeries, hints, matchers...) + return errorTranslateSeriesSet{s} +} + +type errorTranslateChunkQuerier struct { + q storage.ChunkQuerier +} + +func (e errorTranslateChunkQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + values, warnings, err := e.q.LabelValues(name) + return values, warnings, translateError(err) +} + +func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) { + values, warnings, err := e.q.LabelNames() + return values, warnings, translateError(err) +} + +func (e errorTranslateChunkQuerier) Close() error { + return translateError(e.q.Close()) +} + +func (e errorTranslateChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + s := e.q.Select(sortSeries, hints, matchers...) + return errorTranslateChunkSeriesSet{s} +} + +type errorTranslateSeriesSet struct { + s storage.SeriesSet +} + +func (e errorTranslateSeriesSet) Next() bool { + return e.s.Next() +} + +func (e errorTranslateSeriesSet) At() storage.Series { + return e.s.At() +} + +func (e errorTranslateSeriesSet) Err() error { + return translateError(e.s.Err()) +} + +func (e errorTranslateSeriesSet) Warnings() storage.Warnings { + return e.s.Warnings() +} + +type errorTranslateChunkSeriesSet struct { + s storage.ChunkSeriesSet +} + +func (e errorTranslateChunkSeriesSet) Next() bool { + return e.s.Next() +} + +func (e errorTranslateChunkSeriesSet) At() storage.ChunkSeries { + return e.s.At() +} + +func (e errorTranslateChunkSeriesSet) Err() error { + return translateError(e.s.Err()) +} + +func (e errorTranslateChunkSeriesSet) Warnings() storage.Warnings { + return e.s.Warnings() +} From b8fa53fb8bfe1058bd9989c2b4fb914f8ba40a9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 28 Jul 2020 14:44:43 +0200 Subject: [PATCH 2/6] Move PromQL API error translation to querier registration. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/api/api.go | 2 +- pkg/api/queryable.go | 60 +++++-- pkg/api/queryable_test.go | 199 ++++++++++++++++++++++ pkg/querier/chunk_store_queryable.go | 15 +- pkg/querier/chunk_store_queryable_test.go | 115 ------------- 5 files changed, 251 insertions(+), 140 deletions(-) create mode 100644 pkg/api/queryable_test.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 4238dcad5d3..45db017daf4 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -269,7 +269,7 @@ func (a *API) RegisterQuerier( ) http.Handler { api := v1.NewAPI( engine, - queryable, + errorTranslateQueryable{queryable}, // Translate errors to errors expected by API. func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} }, func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} }, func() config.Config { return config.Config{} }, diff --git a/pkg/api/queryable.go b/pkg/api/queryable.go index 23751df0220..6c0d7513200 100644 --- a/pkg/api/queryable.go +++ b/pkg/api/queryable.go @@ -3,30 +3,70 @@ package api import ( "context" + "github.com/gogo/status" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + + "github.com/cortexproject/cortex/pkg/chunk" ) +func translateError(err error) error { + if err == nil { + return err + } + + // vendor/github.com/prometheus/prometheus/web/api/v1/api.go, respondError function only accepts + // *apiError types. + // Translation of error to *apiError happens in vendor/github.com/prometheus/prometheus/web/api/v1/api.go, returnAPIError method. + // It only supports: + // promql.ErrQueryCanceled, mapped to 503 + // promql.ErrQueryTimeout, mapped to 503 + // promql.ErrStorage mapped to 500 + // anything else is mapped to 422 + + switch errors.Cause(err).(type) { + case promql.ErrStorage, promql.ErrTooManySamples, promql.ErrQueryCanceled, promql.ErrQueryTimeout: + // Recognized by Prometheus API, vendor/github.com/prometheus/prometheus/promql/engine.go:91. + // Don't translate those, just in case we use them internally. + return err + case chunk.QueryError: + // This will be returned with status code 422 by Prometheus API. + // vendor/github.com/prometheus/prometheus/web/api/v1/api.go:1393 + return err + default: + s, ok := status.FromError(err) + if ok { + code := s.Code() + + // Treat these as HTTP status codes, even though they are supposed to be grpc codes. + if code >= 400 && code < 500 { + // Return directly, will be mapped to 422 + return err + } else if code >= 500 && code < 599 { + // Wrap into ErrStorage for mapping to 500 + return promql.ErrStorage{Err: err} + } + } + + // All other errors will be returned as 500. + return promql.ErrStorage{Err: err} + } +} + type errorTranslateQueryable struct { q storage.SampleAndChunkQueryable } func (e errorTranslateQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { q, err := e.q.Querier(ctx, mint, maxt) - return errorTranslateQuerier{q: q}, err + return errorTranslateQuerier{q: q}, translateError(err) } func (e errorTranslateQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { q, err := e.q.ChunkQuerier(ctx, mint, maxt) - return errorTranslateChunkQuerier{q: q}, err -} - -func translateError(err error) error { - if err == nil { - return err - } - - // TODO:... + return errorTranslateChunkQuerier{q: q}, translateError(err) } type errorTranslateQuerier struct { diff --git a/pkg/api/queryable_test.go b/pkg/api/queryable_test.go new file mode 100644 index 00000000000..71f65ef53e7 --- /dev/null +++ b/pkg/api/queryable_test.go @@ -0,0 +1,199 @@ +package api + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "regexp" + "testing" + "time" + + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/util" +) + +func TestApiStatusCodes(t *testing.T) { + for ix, tc := range []struct { + err error + expectedString string + expectedCode int + }{ + { + err: errors.New("some random error"), + expectedString: "some random error", + expectedCode: 500, + }, + + { + err: chunk.QueryError("special handling"), // handled specially by chunk_store_queryable + expectedString: "special handling", + expectedCode: 422, + }, + + { + err: promql.ErrTooManySamples("query execution"), + expectedString: "too many samples", + expectedCode: 422, + }, + + { + err: promql.ErrQueryCanceled("query execution"), + expectedString: "query was canceled", + expectedCode: 503, + }, + + { + err: promql.ErrQueryTimeout("query execution"), + expectedString: "query timed out", + expectedCode: 503, + }, + + // Status code 400 is remapped to 422 (only choice we have) + { + err: httpgrpc.Errorf(http.StatusBadRequest, "test string"), + expectedString: "test string", + expectedCode: 422, + }, + + // 404 is also translated to 422 + { + err: httpgrpc.Errorf(http.StatusNotFound, "not found"), + expectedString: "not found", + expectedCode: 422, + }, + + // 505 is translated to 500 + { + err: httpgrpc.Errorf(http.StatusHTTPVersionNotSupported, "test"), + expectedString: "test", + expectedCode: 500, + }, + } { + for k, q := range map[string]storage.SampleAndChunkQueryable{ + "error from queryable": testQueryable{err: tc.err}, + "error from querier": testQueryable{q: testQuerier{err: tc.err}}, + "error from seriesset": testQueryable{q: testQuerier{s: testSeriesSet{err: tc.err}}}, + } { + t.Run(fmt.Sprintf("%s/%d", k, ix), func(t *testing.T) { + r := createPrometheusAPI(errorTranslateQueryable{q: q}) + rec := httptest.NewRecorder() + + req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil) + req = req.WithContext(user.InjectOrgID(context.Background(), "test org")) + + r.ServeHTTP(rec, req) + + require.Equal(t, tc.expectedCode, rec.Code) + require.Contains(t, rec.Body.String(), tc.expectedString) + }) + } + } +} + +func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { + engine := promql.NewEngine(promql.EngineOpts{ + Logger: util.Logger, + Reg: nil, + ActiveQueryTracker: nil, + MaxSamples: 100, + Timeout: 5 * time.Second, + }) + + api := v1.NewAPI( + engine, + q, + func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} }, + func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} }, + func() config.Config { return config.Config{} }, + map[string]string{}, // TODO: include configuration flags + v1.GlobalURLOptions{}, + func(f http.HandlerFunc) http.HandlerFunc { return f }, + nil, // Only needed for admin APIs. + "", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty. + false, // Disable admin APIs. + util.Logger, + func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} }, + 0, 0, 0, // Remote read samples and concurrency limit. + regexp.MustCompile(".*"), + func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") }, + &v1.PrometheusVersion{}, + ) + + promRouter := route.New().WithPrefix("/api/v1") + api.Register(promRouter) + + return promRouter +} + +type testQueryable struct { + q storage.Querier + err error +} + +func (t testQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + return nil, t.err +} + +func (t testQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + if t.q != nil { + return t.q, nil + } + return nil, t.err +} + +type testQuerier struct { + s storage.SeriesSet + err error +} + +func (t testQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + return nil, nil, t.err +} + +func (t testQuerier) LabelNames() ([]string, storage.Warnings, error) { + return nil, nil, t.err +} + +func (t testQuerier) Close() error { + return nil +} + +func (t testQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + if t.s != nil { + return t.s + } + return storage.ErrSeriesSet(t.err) +} + +type testSeriesSet struct { + err error +} + +func (t testSeriesSet) Next() bool { + return false +} + +func (t testSeriesSet) At() storage.Series { + return nil +} + +func (t testSeriesSet) Err() error { + return t.err +} + +func (t testSeriesSet) Warnings() storage.Warnings { + return nil +} diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index 50fa8573fb6..b36db0a2429 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -5,7 +5,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/weaveworks/common/user" @@ -46,19 +45,7 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ... } chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...) if err != nil { - switch err.(type) { - case promql.ErrStorage, promql.ErrTooManySamples, promql.ErrQueryCanceled, promql.ErrQueryTimeout: - // Recognized by Prometheus API, vendor/github.com/prometheus/prometheus/promql/engine.go:91. - // Don't translate those, just in case we use them internally. - return storage.ErrSeriesSet(err) - case chunk.QueryError: - // This will be returned with status code 422 by Prometheus API. - // vendor/github.com/prometheus/prometheus/web/api/v1/api.go:1393 - return storage.ErrSeriesSet(err) - default: - // All other errors will be returned as 500. - return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) - } + return storage.ErrSeriesSet(err) } return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc) diff --git a/pkg/querier/chunk_store_queryable_test.go b/pkg/querier/chunk_store_queryable_test.go index b5f51786a49..76cc8074243 100644 --- a/pkg/querier/chunk_store_queryable_test.go +++ b/pkg/querier/chunk_store_queryable_test.go @@ -2,29 +2,17 @@ package querier import ( "context" - "errors" "fmt" - "net/http" - "net/http/httptest" - "regexp" "sort" "testing" "time" "github.com/prometheus/common/model" - "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" - v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" - "github.com/cortexproject/cortex/pkg/querier/chunkstore" - "github.com/cortexproject/cortex/pkg/util" ) // Make sure that chunkSeries implements SeriesWithChunks @@ -112,106 +100,3 @@ type sortedByLabels []labels.Labels func (b sortedByLabels) Len() int { return len(b) } func (b sortedByLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b sortedByLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 } - -func TestApiStatusCodes(t *testing.T) { - for ix, tc := range []struct { - err error - expectedString string - expectedCode int - }{ - { - err: errors.New("some random error"), - expectedString: "some random error", - expectedCode: 500, - }, - - { - err: chunk.QueryError("special handling"), // handled specially by chunk_store_queryable - expectedString: "special handling", - expectedCode: 422, - }, - - { - err: promql.ErrTooManySamples("query execution"), - expectedString: "too many samples", - expectedCode: 422, - }, - - { - err: promql.ErrQueryCanceled("query execution"), - expectedString: "query was canceled", - expectedCode: 503, - }, - - { - err: promql.ErrQueryTimeout("query execution"), - expectedString: "query timed out", - expectedCode: 503, - }, - - // Unfortunately, queryable cannot return anything else than 500 or 422. - { - err: httpgrpc.Errorf(http.StatusBadRequest, "test string"), - expectedString: "test string", - expectedCode: 500, - }, - } { - t.Run(fmt.Sprintf("%d", ix), func(t *testing.T) { - r := createPrometheusAPI(testStore{err: tc.err}) - rec := httptest.NewRecorder() - - req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil) - req = req.WithContext(user.InjectOrgID(context.Background(), "test org")) - - r.ServeHTTP(rec, req) - - require.Equal(t, tc.expectedCode, rec.Code) - require.Contains(t, rec.Body.String(), tc.expectedString) - }) - } -} - -func createPrometheusAPI(store chunkstore.ChunkStore) *route.Router { - engine := promql.NewEngine(promql.EngineOpts{ - Logger: util.Logger, - Reg: nil, - ActiveQueryTracker: nil, - MaxSamples: 100, - Timeout: 5 * time.Second, - }) - - queryable := newChunkStoreQueryable(store, mergeChunks) - - api := v1.NewAPI( - engine, - &sampleAndChunkQueryable{queryable}, - func(context.Context) v1.TargetRetriever { return &DummyTargetRetriever{} }, - func(context.Context) v1.AlertmanagerRetriever { return &DummyAlertmanagerRetriever{} }, - func() config.Config { return config.Config{} }, - map[string]string{}, // TODO: include configuration flags - v1.GlobalURLOptions{}, - func(f http.HandlerFunc) http.HandlerFunc { return f }, - nil, // Only needed for admin APIs. - "", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty. - false, // Disable admin APIs. - util.Logger, - func(context.Context) v1.RulesRetriever { return &DummyRulesRetriever{} }, - 0, 0, 0, // Remote read samples and concurrency limit. - regexp.MustCompile(".*"), - func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") }, - &v1.PrometheusVersion{}, - ) - - promRouter := route.New().WithPrefix("/api/v1") - api.Register(promRouter) - - return promRouter -} - -type testStore struct { - err error -} - -func (t testStore) Get(context.Context, string, model.Time, model.Time, ...*labels.Matcher) ([]chunk.Chunk, error) { - return nil, t.err -} From 0e560eb5d59e21a1177d484637f2ce109fd8d503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Jul 2020 13:29:47 +0200 Subject: [PATCH 3/6] Remove extra use of promql.ErrStorage. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All unknown errors are translated to promql.ErrStorage already at API level of the querier. Signed-off-by: Peter Štibraný --- pkg/api/queryable.go | 2 -- pkg/chunk/chunk_store_utils.go | 1 + pkg/distributor/distributor_test.go | 7 +++---- pkg/distributor/query.go | 9 ++++----- pkg/querier/block.go | 3 +-- pkg/querier/blocks_store_queryable.go | 14 +++----------- pkg/querier/distributor_queryable.go | 9 ++++----- pkg/querier/querier.go | 4 ++-- 8 files changed, 18 insertions(+), 31 deletions(-) diff --git a/pkg/api/queryable.go b/pkg/api/queryable.go index 6c0d7513200..b6b486f03fb 100644 --- a/pkg/api/queryable.go +++ b/pkg/api/queryable.go @@ -28,12 +28,10 @@ func translateError(err error) error { switch errors.Cause(err).(type) { case promql.ErrStorage, promql.ErrTooManySamples, promql.ErrQueryCanceled, promql.ErrQueryTimeout: - // Recognized by Prometheus API, vendor/github.com/prometheus/prometheus/promql/engine.go:91. // Don't translate those, just in case we use them internally. return err case chunk.QueryError: // This will be returned with status code 422 by Prometheus API. - // vendor/github.com/prometheus/prometheus/web/api/v1/api.go:1393 return err default: s, ok := status.FromError(err) diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go index c18a38a1672..061a9b1c638 100644 --- a/pkg/chunk/chunk_store_utils.go +++ b/pkg/chunk/chunk_store_utils.go @@ -162,6 +162,7 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string } if err != nil { + // Don't rely on Cortex error translation here. return nil, promql.ErrStorage{Err: err} } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 8aa2ed240cf..62cd1da6caf 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -18,7 +18,6 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -430,7 +429,7 @@ func TestDistributor_PushQuery(t *testing.T) { numIngesters: numIngesters, happyIngesters: happyIngesters, matchers: []*labels.Matcher{nameMatcher, barMatcher}, - expectedError: promql.ErrStorage{Err: errFail}, + expectedError: errFail, shardByAllLabels: shardByAllLabels, }) continue @@ -444,7 +443,7 @@ func TestDistributor_PushQuery(t *testing.T) { numIngesters: numIngesters, happyIngesters: happyIngesters, matchers: []*labels.Matcher{nameMatcher, barMatcher}, - expectedError: promql.ErrStorage{Err: errFail}, + expectedError: errFail, shardByAllLabels: shardByAllLabels, }) continue @@ -734,7 +733,7 @@ func TestSlowQueries(t *testing.T) { t.Run(fmt.Sprintf("%t/%d", shardByAllLabels, happy), func(t *testing.T) { var expectedErr error if nIngesters-happy > 1 { - expectedErr = promql.ErrStorage{Err: errFail} + expectedErr = errFail } ds, _, r := prepare(t, prepConfig{ diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index fed48e51af9..a7813c6453f 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -7,7 +7,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" @@ -25,12 +24,12 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . err := instrument.CollectedRequest(ctx, "Distributor.Query", queryDuration, instrument.ErrorCode, func(ctx context.Context) error { replicationSet, req, err := d.queryPrep(ctx, from, to, matchers...) if err != nil { - return promql.ErrStorage{Err: err} + return err } matrix, err = d.queryIngesters(ctx, replicationSet, req) if err != nil { - return promql.ErrStorage{Err: err} + return err } if s := opentracing.SpanFromContext(ctx); s != nil { @@ -47,12 +46,12 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", queryDuration, instrument.ErrorCode, func(ctx context.Context) error { replicationSet, req, err := d.queryPrep(ctx, from, to, matchers...) if err != nil { - return promql.ErrStorage{Err: err} + return err } result, err = d.queryIngesterStream(ctx, replicationSet, req) if err != nil { - return promql.ErrStorage{Err: err} + return err } if s := opentracing.SpanFromContext(ctx); s != nil { diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 5cb7648b030..cfc90fcdb83 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -6,7 +6,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -207,7 +206,7 @@ func (it *blockQuerierSeriesIterator) Err() error { err := it.iterators[it.i].Err() if err != nil { - return promql.ErrStorage{Err: errors.Wrapf(err, "cannot iterate chunk for series: %v", it.labels)} + return errors.Wrapf(err, "cannot iterate chunk for series: %v", it.labels) } return nil } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 6c8e392cacb..2d9b3c4de9b 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -249,12 +248,12 @@ func (q *BlocksStoreQueryable) stopping(_ error) error { // Querier returns a new Querier on the storage. func (q *BlocksStoreQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { if s := q.State(); s != services.Running { - return nil, promql.ErrStorage{Err: errors.Errorf("BlocksStoreQueryable is not running: %v", s)} + return nil, errors.Errorf("BlocksStoreQueryable is not running: %v", s) } userID, err := user.ExtractOrgID(ctx) if err != nil { - return nil, promql.ErrStorage{Err: err} + return nil, err } return &blocksStoreQuerier{ @@ -291,14 +290,7 @@ type blocksStoreQuerier struct { // Select implements storage.Querier interface. // The bool passed is ignored because the series is always sorted. func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - set := q.selectSorted(sp, matchers...) - - // We need to wrap the error in order to have Prometheus returning a 5xx error. - if err := set.Err(); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { - set = storage.ErrSeriesSet(promql.ErrStorage{Err: err}) - } - - return set + return q.selectSorted(sp, matchers...) } func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 675946f0e4c..e2e5f0461d8 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -7,7 +7,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/weaveworks/common/user" @@ -95,7 +94,7 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. matrix, err := q.distributor.Query(ctx, model.Time(mint), model.Time(maxt), matchers...) if err != nil { - return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) + return storage.ErrSeriesSet(err) } // Using MatrixToSeriesSet (and in turn NewConcreteSeriesSet), sorts the series. @@ -105,14 +104,14 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. func (q *distributorQuerier) streamingSelect(sp storage.SelectHints, matchers []*labels.Matcher) storage.SeriesSet { userID, err := user.ExtractOrgID(q.ctx) if err != nil { - return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) + return storage.ErrSeriesSet(err) } mint, maxt := sp.Start, sp.End results, err := q.distributor.QueryStream(q.ctx, model.Time(mint), model.Time(maxt), matchers...) if err != nil { - return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) + return storage.ErrSeriesSet(err) } if len(results.Timeseries) != 0 { @@ -131,7 +130,7 @@ func (q *distributorQuerier) streamingSelect(sp storage.SelectHints, matchers [] chunks, err := chunkcompat.FromChunks(userID, ls, result.Chunks) if err != nil { - return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) + return storage.ErrSeriesSet(err) } series := &chunkSeries{ diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 46e5b1e8dc1..1fb6915bbb2 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -296,7 +296,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat userID, err := user.ExtractOrgID(ctx) if err != nil { - return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) + return storage.ErrSeriesSet(err) } // Validate query time range. @@ -308,7 +308,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat tombstones, err := q.tombstonesLoader.GetPendingTombstonesForInterval(userID, startTime, endTime) if err != nil { - return storage.ErrSeriesSet(promql.ErrStorage{Err: err}) + return storage.ErrSeriesSet(err) } if len(q.queriers) == 1 { From d8626641450aa106b33c7e5deaa34bd7a17b2de3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Jul 2020 13:36:37 +0200 Subject: [PATCH 4/6] Updated CHANGELOG.md entry. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c0a02526c72..77655e816a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [CHANGE] Experimental TSDB: compact head when opening TSDB. This should only affect ingester startup after it was unable to compact head in previous run. #2870 * [CHANGE] Metric `cortex_overrides_last_reload_successful` has been renamed to `cortex_runtime_config_last_reload_successful`. #2874 * [CHANGE] HipChat support has been removed from the alertmanager (because removed from the Prometheus upstream too). #2902 +* [CHANGE] Limit errors reported by ingester during query-time now return HTTP status code 422. #2941 * [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783 * [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783 * [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783 From e83aa7d2b5c0ef9b0f4563184edd6a24dc48da9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Jul 2020 14:32:15 +0200 Subject: [PATCH 5/6] Treat context.Canceled as user error and return 422. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/api/queryable.go | 4 ++++ pkg/api/queryable_test.go | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/pkg/api/queryable.go b/pkg/api/queryable.go index b6b486f03fb..8682872a47e 100644 --- a/pkg/api/queryable.go +++ b/pkg/api/queryable.go @@ -34,6 +34,10 @@ func translateError(err error) error { // This will be returned with status code 422 by Prometheus API. return err default: + if errors.Is(err, context.Canceled) { + return err // 422 + } + s, ok := status.FromError(err) if ok { code := s.Code() diff --git a/pkg/api/queryable_test.go b/pkg/api/queryable_test.go index 71f65ef53e7..3543a63b7ed 100644 --- a/pkg/api/queryable_test.go +++ b/pkg/api/queryable_test.go @@ -81,6 +81,12 @@ func TestApiStatusCodes(t *testing.T) { expectedString: "test", expectedCode: 500, }, + + { + err: context.Canceled, + expectedString: "context canceled", + expectedCode: 422, + }, } { for k, q := range map[string]storage.SampleAndChunkQueryable{ "error from queryable": testQueryable{err: tc.err}, From b62c19e1fca3f5b3375519c57a160559bcdc695e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Jul 2020 19:20:48 +0200 Subject: [PATCH 6/6] Added test for context.DeadlineExceeded. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/api/queryable_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/api/queryable_test.go b/pkg/api/queryable_test.go index 3543a63b7ed..79ba4d9d31c 100644 --- a/pkg/api/queryable_test.go +++ b/pkg/api/queryable_test.go @@ -82,6 +82,12 @@ func TestApiStatusCodes(t *testing.T) { expectedCode: 500, }, + { + err: context.DeadlineExceeded, + expectedString: "context deadline exceeded", + expectedCode: 500, + }, + { err: context.Canceled, expectedString: "context canceled",