Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small optimization to PrometheusResponse marshalling #3964

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* `cortex_ruler_clients`
* `cortex_ruler_client_request_duration_seconds`
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
* [ENHANCEMENT] Query-frontend: reduced memory allocations when serializing query response. #3964
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948

## 1.8.0 in progress
Expand Down
7 changes: 4 additions & 3 deletions pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
}

httpResp := &http.Response{
StatusCode: int(resp.Code),
Body: ioutil.NopCloser(bytes.NewReader(resp.Body)),
Header: http.Header{},
StatusCode: int(resp.Code),
Body: ioutil.NopCloser(bytes.NewReader(resp.Body)),
Header: http.Header{},
ContentLength: int64(len(resp.Body)),
}
for _, h := range resp.Headers {
httpResp.Header[h.Key] = h.Values
Expand Down
81 changes: 70 additions & 11 deletions pkg/querier/queryrange/marshaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,88 @@ import (
"bytes"
"context"
"io/ioutil"
"math/rand"
"net/http"
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/cortexpb"
)

func BenchmarkMarshalling(b *testing.B) {
jsonfile := os.Getenv("JSON")
buf, err := ioutil.ReadFile(jsonfile)
func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) {
const (
numSeries = 1000
numSamplesPerSeries = 1000
)

// Generate a mocked response and marshal it.
res := mockPrometheusResponse(numSeries, numSamplesPerSeries)
encodedRes, err := json.Marshal(res)
require.NoError(b, err)
b.Log("test prometheus response size:", len(encodedRes))

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
apiResp, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader(buf)),
_, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader(encodedRes)),
ContentLength: int64(len(encodedRes)),
}, nil)
require.NoError(b, err)
}
}

resp, err := PrometheusCodec.EncodeResponse(context.Background(), apiResp)
require.NoError(b, err)
func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) {
const (
numSeries = 1000
numSamplesPerSeries = 1000
)

// Generate a mocked response and marshal it.
res := mockPrometheusResponse(numSeries, numSamplesPerSeries)

b.ResetTimer()
b.ReportAllocs()

buf2, err := ioutil.ReadAll(resp.Body)
for n := 0; n < b.N; n++ {
_, err := PrometheusCodec.EncodeResponse(context.Background(), res)
require.NoError(b, err)
require.Equal(b, string(buf), string(buf2))
}
}

func mockPrometheusResponse(numSeries, numSamplesPerSeries int) *PrometheusResponse {
stream := make([]SampleStream, numSeries)
for s := 0; s < numSeries; s++ {
// Generate random samples.
samples := make([]cortexpb.Sample, numSamplesPerSeries)
for i := 0; i < numSamplesPerSeries; i++ {
samples[i] = cortexpb.Sample{
Value: rand.Float64(),
TimestampMs: int64(i),
}
}

// Generate random labels.
lbls := make([]cortexpb.LabelAdapter, 10)
for i := range lbls {
lbls[i].Name = "a_medium_size_label_name"
lbls[i].Value = "a_medium_size_label_value_that_is_used_to_benchmark_marshalling"
}

stream[s] = SampleStream{
Labels: lbls,
Samples: samples,
}
}

return &PrometheusResponse{
Status: "success",
Data: PrometheusData{
ResultType: "vector",
Result: stream,
},
}
}
25 changes: 17 additions & 8 deletions pkg/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ import (
const StatusSuccess = "success"

var (
matrix = model.ValMatrix.String()
json = jsoniter.ConfigCompatibleWithStandardLibrary
matrix = model.ValMatrix.String()
json = jsoniter.Config{
EscapeHTML: false, // No HTML in our responses.
SortMapKeys: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's required, but I wasn't able to figure out if we may introduce any regression disabling it (the default config has it enabled). Any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect some clients are relying on it whether they realise it or not.
I.e. the ordering of series here maps directly to what you see in your dashboard, in at least some cases.

However we should try disabling it (in a separate change) and see if anyone notices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue to follow up about it:
#3997

ValidateJsonRawMessage: true,
}.Froze()
errEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "end timestamp must not be before start time")
errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
Expand Down Expand Up @@ -262,16 +266,20 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
// Preallocate the buffer with the exact size so we don't waste allocations
// while progressively growing an initial small buffer. The buffer capacity
// is increased by MinRead to avoid extra allocations due to how ReadFrom()
// internally works.
buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead))
if _, err := buf.ReadFrom(r.Body); err != nil {
log.Error(err)
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}

log.LogFields(otlog.Int("bytes", len(buf)))
log.LogFields(otlog.Int("bytes", buf.Len()))

var resp PrometheusResponse
if err := json.Unmarshal(buf, &resp); err != nil {
if err := json.Unmarshal(buf.Bytes(), &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}

Expand Down Expand Up @@ -303,8 +311,9 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
ContentLength: int64(len(b)),
}
return &resp, nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ func TestResponse(t *testing.T) {

// Reset response, as the above call will have consumed the body reader.
response = &http.Response{
StatusCode: 200,
Header: http.Header{"Content-Type": []string{"application/json"}},
Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))),
StatusCode: 200,
Header: http.Header{"Content-Type": []string{"application/json"}},
Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))),
ContentLength: int64(len(tc.body)),
}
resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp)
require.NoError(t, err)
Expand Down