Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using global instant query codec #6328

Merged
merged 2 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand All @@ -490,7 +491,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
return nil, err
}

instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
if err != nil {
return nil, err
}
Expand All @@ -501,7 +502,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
queryRangeMiddlewares,
instantQueryMiddlewares,
prometheusCodec,
instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec),
instantQueryCodec,
t.Overrides,
queryAnalyzer,
t.Cfg.Querier.DefaultEvaluationInterval,
Expand Down
2 changes: 0 additions & 2 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
)

var (
InstantQueryCodec tripperware.Codec = NewInstantQueryCodec("", "protobuf")

json = jsoniter.Config{
EscapeHTML: false, // No HTML in our responses.
SortMapKeys: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
func Middlewares(
log log.Logger,
limits tripperware.Limits,
merger tripperware.Merger,
queryAnalyzer querysharding.Analyzer,
lookbackDelta time.Duration,
) ([]tripperware.Middleware, error) {
m := []tripperware.Middleware{
NewLimitsMiddleware(limits, lookbackDelta),
tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer),
tripperware.ShardByMiddleware(log, limits, merger, queryAnalyzer),
}
return m, nil
}
26 changes: 14 additions & 12 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType))

const testHistogramResponse = `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"prometheus_http_request_duration_seconds","handler":"/metrics","instance":"localhost:9090","job":"prometheus"},"histogram":[1719528871.898,{"count":"6342","sum":"43.31319875499995","buckets":[[0,"0.0013810679320049755","0.0015060652591874421","1"],[0,"0.0015060652591874421","0.001642375811042411","7"],[0,"0.001642375811042411","0.0017910235218841233","5"],[0,"0.0017910235218841233","0.001953125","13"],[0,"0.001953125","0.0021298979153618314","19"],[0,"0.0021298979153618314","0.0023226701464896895","13"],[0,"0.0023226701464896895","0.002532889755177753","13"],[0,"0.002532889755177753","0.002762135864009951","15"],[0,"0.002762135864009951","0.0030121305183748843","12"],[0,"0.0030121305183748843","0.003284751622084822","34"],[0,"0.003284751622084822","0.0035820470437682465","188"],[0,"0.0035820470437682465","0.00390625","372"],[0,"0.00390625","0.004259795830723663","400"],[0,"0.004259795830723663","0.004645340292979379","411"],[0,"0.004645340292979379","0.005065779510355506","425"],[0,"0.005065779510355506","0.005524271728019902","425"],[0,"0.005524271728019902","0.0060242610367497685","521"],[0,"0.0060242610367497685","0.006569503244169644","621"],[0,"0.006569503244169644","0.007164094087536493","593"],[0,"0.007164094087536493","0.0078125","506"],[0,"0.0078125","0.008519591661447326","458"],[0,"0.008519591661447326","0.009290680585958758","346"],[0,"0.009290680585958758","0.010131559020711013","285"],[0,"0.010131559020711013","0.011048543456039804","196"],[0,"0.011048543456039804","0.012048522073499537","129"],[0,"0.012048522073499537","0.013139006488339287","85"],[0,"0.013139006488339287","0.014328188175072986","65"],[0,"0.014328188175072986","0.015625","54"],[0,"0.015625","0.01703918332289465","53"],[0,"0.01703918332289465","0.018581361171917516","20"],[0,"0.018581361171917516","0.020263118041422026","21"],[0,"0.020263118041422026","0.022097086912079608","15"],[0,"0.022097086912079608","0.024097044146999074","11"],[0,"0.024097044146999074","0.026278012976678575","2"],[0,"0.026278012976678575","0.028656376350145972","3"],[0,"0.028656376350145972","0.03125","3"],[0,"0.04052623608284405","0.044194173824159216","2"]]}]}]}}`

func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeader) {
Expand All @@ -35,7 +37,7 @@ func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeade

func TestRequest(t *testing.T) {
t.Parallel()
codec := InstantQueryCodec
codec := testInstantQueryCodec

for _, tc := range []struct {
url string
Expand Down Expand Up @@ -182,7 +184,7 @@ func TestCompressedResponse(t *testing.T) {
Header: h,
Body: io.NopCloser(responseBody),
}
resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.Equal(t, tc.err, err)

if err == nil {
Expand Down Expand Up @@ -376,7 +378,7 @@ func TestResponse(t *testing.T) {
}
}

resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.NoError(t, err)

// Reset response, as the above call will have consumed the body reader.
Expand All @@ -386,7 +388,7 @@ func TestResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))),
ContentLength: int64(len(tc.jsonBody)),
}
resp2, err := InstantQueryCodec.EncodeResponse(context.Background(), resp)
resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
})
Expand Down Expand Up @@ -645,7 +647,7 @@ func TestMergeResponse(t *testing.T) {
if tc.cancelBeforeDecode {
cancelCtx()
}
dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil)
dr, err := testInstantQueryCodec.DecodeResponse(ctx, hr, nil)
assert.Equal(t, tc.expectedDecodeErr, err)
if err != nil {
cancelCtx()
Expand All @@ -657,13 +659,13 @@ func TestMergeResponse(t *testing.T) {
if tc.cancelBeforeMerge {
cancelCtx()
}
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
resp, err := testInstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
assert.Equal(t, tc.expectedErr, err)
if err != nil {
cancelCtx()
return
}
dr, err := InstantQueryCodec.EncodeResponse(ctx, resp)
dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp)
assert.Equal(t, tc.expectedErr, err)
contents, err := io.ReadAll(dr.Body)
assert.Equal(t, tc.expectedErr, err)
Expand Down Expand Up @@ -1660,7 +1662,7 @@ func TestMergeResponseProtobuf(t *testing.T) {
if tc.cancelBeforeDecode {
cancelCtx()
}
dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil)
dr, err := testInstantQueryCodec.DecodeResponse(ctx, hr, nil)
assert.Equal(t, tc.expectedDecodeErr, err)
if err != nil {
cancelCtx()
Expand All @@ -1672,13 +1674,13 @@ func TestMergeResponseProtobuf(t *testing.T) {
if tc.cancelBeforeMerge {
cancelCtx()
}
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
resp, err := testInstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
assert.Equal(t, tc.expectedErr, err)
if err != nil {
cancelCtx()
return
}
dr, err := InstantQueryCodec.EncodeResponse(ctx, resp)
dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp)
assert.Equal(t, tc.expectedErr, err)
contents, err := io.ReadAll(dr.Body)
assert.Equal(t, tc.expectedErr, err)
Expand Down Expand Up @@ -1743,7 +1745,7 @@ func Benchmark_Decode(b *testing.B) {
StatusCode: 200,
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.NoError(b, err)
}
})
Expand Down Expand Up @@ -1806,7 +1808,7 @@ func Benchmark_Decode_Protobuf(b *testing.B) {
Header: http.Header{"Content-Type": []string{"application/x-protobuf"}},
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.NoError(b, err)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf"))
tripperware.TestQueryShardQuery(t, testInstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf"))
}
Loading