Skip to content

Translate errors to PromQL errors by wrapping queryable. #2941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (a *API) RegisterQuerier(
) http.Handler {
api := v1.NewAPI(
engine,
queryable,
errorTranslateQueryable{queryable}, // Translate errors to errors expected by API.
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} },
func() config.Config { return config.Config{} },
Expand Down
156 changes: 156 additions & 0 deletions pkg/api/queryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package api

import (
"context"

"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/cortexproject/cortex/pkg/chunk"
)

func translateError(err error) error {
if err == nil {
return err
}

// vendor/github.com/prometheus/prometheus/web/api/v1/api.go, respondError function only accepts
// *apiError types.
// Translation of error to *apiError happens in vendor/github.com/prometheus/prometheus/web/api/v1/api.go, returnAPIError method.
// It only supports:
// promql.ErrQueryCanceled, mapped to 503
// promql.ErrQueryTimeout, mapped to 503
// promql.ErrStorage mapped to 500
// anything else is mapped to 422

switch errors.Cause(err).(type) {
case promql.ErrStorage, promql.ErrTooManySamples, promql.ErrQueryCanceled, promql.ErrQueryTimeout:
// Recognized by Prometheus API, vendor/github.com/prometheus/prometheus/promql/engine.go:91.
// Don't translate those, just in case we use them internally.
return err
case chunk.QueryError:
// This will be returned with status code 422 by Prometheus API.
// vendor/github.com/prometheus/prometheus/web/api/v1/api.go:1393
return err
default:
s, ok := status.FromError(err)
if ok {
code := s.Code()

// Treat these as HTTP status codes, even though they are supposed to be grpc codes.
if code >= 400 && code < 500 {
// Return directly, will be mapped to 422
return err
} else if code >= 500 && code < 599 {
// Wrap into ErrStorage for mapping to 500
return promql.ErrStorage{Err: err}
}
}

// All other errors will be returned as 500.
return promql.ErrStorage{Err: err}
}
}

type errorTranslateQueryable struct {
q storage.SampleAndChunkQueryable
}

func (e errorTranslateQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := e.q.Querier(ctx, mint, maxt)
return errorTranslateQuerier{q: q}, translateError(err)
}

func (e errorTranslateQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := e.q.ChunkQuerier(ctx, mint, maxt)
return errorTranslateChunkQuerier{q: q}, translateError(err)
}

type errorTranslateQuerier struct {
q storage.Querier
}

func (e errorTranslateQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelValues(name)
return values, warnings, translateError(err)
}

func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
return values, warnings, translateError(err)
}

func (e errorTranslateQuerier) Close() error {
return translateError(e.q.Close())
}

func (e errorTranslateQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
s := e.q.Select(sortSeries, hints, matchers...)
return errorTranslateSeriesSet{s}
}

type errorTranslateChunkQuerier struct {
q storage.ChunkQuerier
}

func (e errorTranslateChunkQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelValues(name)
return values, warnings, translateError(err)
}

func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
return values, warnings, translateError(err)
}

func (e errorTranslateChunkQuerier) Close() error {
return translateError(e.q.Close())
}

func (e errorTranslateChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
s := e.q.Select(sortSeries, hints, matchers...)
return errorTranslateChunkSeriesSet{s}
}

type errorTranslateSeriesSet struct {
s storage.SeriesSet
}

func (e errorTranslateSeriesSet) Next() bool {
return e.s.Next()
}

func (e errorTranslateSeriesSet) At() storage.Series {
return e.s.At()
}

func (e errorTranslateSeriesSet) Err() error {
return translateError(e.s.Err())
}

func (e errorTranslateSeriesSet) Warnings() storage.Warnings {
return e.s.Warnings()
}

type errorTranslateChunkSeriesSet struct {
s storage.ChunkSeriesSet
}

func (e errorTranslateChunkSeriesSet) Next() bool {
return e.s.Next()
}

func (e errorTranslateChunkSeriesSet) At() storage.ChunkSeries {
return e.s.At()
}

func (e errorTranslateChunkSeriesSet) Err() error {
return translateError(e.s.Err())
}

func (e errorTranslateChunkSeriesSet) Warnings() storage.Warnings {
return e.s.Warnings()
}
199 changes: 199 additions & 0 deletions pkg/api/queryable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package api

import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"regexp"
"testing"
"time"

"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/util"
)

func TestApiStatusCodes(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be worth adding a test for context.DeadlineExceeded (we agreed to return 500).

for ix, tc := range []struct {
err error
expectedString string
expectedCode int
}{
{
err: errors.New("some random error"),
expectedString: "some random error",
expectedCode: 500,
},

{
err: chunk.QueryError("special handling"), // handled specially by chunk_store_queryable
expectedString: "special handling",
expectedCode: 422,
},

{
err: promql.ErrTooManySamples("query execution"),
expectedString: "too many samples",
expectedCode: 422,
},

{
err: promql.ErrQueryCanceled("query execution"),
expectedString: "query was canceled",
expectedCode: 503,
},

{
err: promql.ErrQueryTimeout("query execution"),
expectedString: "query timed out",
expectedCode: 503,
},

// Status code 400 is remapped to 422 (only choice we have)
{
err: httpgrpc.Errorf(http.StatusBadRequest, "test string"),
expectedString: "test string",
expectedCode: 422,
},

// 404 is also translated to 422
{
err: httpgrpc.Errorf(http.StatusNotFound, "not found"),
expectedString: "not found",
expectedCode: 422,
},

// 505 is translated to 500
{
err: httpgrpc.Errorf(http.StatusHTTPVersionNotSupported, "test"),
expectedString: "test",
expectedCode: 500,
},
} {
for k, q := range map[string]storage.SampleAndChunkQueryable{
"error from queryable": testQueryable{err: tc.err},
"error from querier": testQueryable{q: testQuerier{err: tc.err}},
"error from seriesset": testQueryable{q: testQuerier{s: testSeriesSet{err: tc.err}}},
} {
t.Run(fmt.Sprintf("%s/%d", k, ix), func(t *testing.T) {
r := createPrometheusAPI(errorTranslateQueryable{q: q})
rec := httptest.NewRecorder()

req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil)
req = req.WithContext(user.InjectOrgID(context.Background(), "test org"))

r.ServeHTTP(rec, req)

require.Equal(t, tc.expectedCode, rec.Code)
require.Contains(t, rec.Body.String(), tc.expectedString)
})
}
}
}

func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
engine := promql.NewEngine(promql.EngineOpts{
Logger: util.Logger,
Reg: nil,
ActiveQueryTracker: nil,
MaxSamples: 100,
Timeout: 5 * time.Second,
})

api := v1.NewAPI(
engine,
q,
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} },
func() config.Config { return config.Config{} },
map[string]string{}, // TODO: include configuration flags
v1.GlobalURLOptions{},
func(f http.HandlerFunc) http.HandlerFunc { return f },
nil, // Only needed for admin APIs.
"", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty.
false, // Disable admin APIs.
util.Logger,
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
0, 0, 0, // Remote read samples and concurrency limit.
regexp.MustCompile(".*"),
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
&v1.PrometheusVersion{},
)

promRouter := route.New().WithPrefix("/api/v1")
api.Register(promRouter)

return promRouter
}

type testQueryable struct {
q storage.Querier
err error
}

func (t testQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
return nil, t.err
}

func (t testQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
if t.q != nil {
return t.q, nil
}
return nil, t.err
}

type testQuerier struct {
s storage.SeriesSet
err error
}

func (t testQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
return nil, nil, t.err
}

func (t testQuerier) LabelNames() ([]string, storage.Warnings, error) {
return nil, nil, t.err
}

func (t testQuerier) Close() error {
return nil
}

func (t testQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if t.s != nil {
return t.s
}
return storage.ErrSeriesSet(t.err)
}

type testSeriesSet struct {
err error
}

func (t testSeriesSet) Next() bool {
return false
}

func (t testSeriesSet) At() storage.Series {
return nil
}

func (t testSeriesSet) Err() error {
return t.err
}

func (t testSeriesSet) Warnings() storage.Warnings {
return nil
}
15 changes: 1 addition & 14 deletions pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -46,19 +45,7 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...
}
chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...)
if err != nil {
switch err.(type) {
case promql.ErrStorage, promql.ErrTooManySamples, promql.ErrQueryCanceled, promql.ErrQueryTimeout:
// Recognized by Prometheus API, vendor/github.com/prometheus/prometheus/promql/engine.go:91.
// Don't translate those, just in case we use them internally.
return storage.ErrSeriesSet(err)
case chunk.QueryError:
// This will be returned with status code 422 by Prometheus API.
// vendor/github.com/prometheus/prometheus/web/api/v1/api.go:1393
return storage.ErrSeriesSet(err)
default:
// All other errors will be returned as 500.
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
}
return storage.ErrSeriesSet(err)
}

return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc)
Expand Down
Loading