Skip to content

Commit 5b7c076

Browse files
authored
Small optimization to PrometheusResponse marshalling (#3964)
* Small optimization to PrometheusResponse marshalling Signed-off-by: Marco Pracucci <[email protected]> * Fixed TestResponse Signed-off-by: Marco Pracucci <[email protected]>
1 parent dd9de3b commit 5b7c076

File tree

5 files changed

+96
-25
lines changed

5 files changed

+96
-25
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* `cortex_ruler_clients`
1717
* `cortex_ruler_client_request_duration_seconds`
1818
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
19+
* [ENHANCEMENT] Query-frontend: reduced memory allocations when serializing query response. #3964
1920
* [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969 #3971 #3973
2021
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
2122
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959

pkg/frontend/transport/roundtripper.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
3636
}
3737

3838
httpResp := &http.Response{
39-
StatusCode: int(resp.Code),
40-
Body: ioutil.NopCloser(bytes.NewReader(resp.Body)),
41-
Header: http.Header{},
39+
StatusCode: int(resp.Code),
40+
Body: ioutil.NopCloser(bytes.NewReader(resp.Body)),
41+
Header: http.Header{},
42+
ContentLength: int64(len(resp.Body)),
4243
}
4344
for _, h := range resp.Headers {
4445
httpResp.Header[h.Key] = h.Values

pkg/querier/queryrange/marshaling_test.go

+70-11
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,88 @@ import (
44
"bytes"
55
"context"
66
"io/ioutil"
7+
"math/rand"
78
"net/http"
8-
"os"
99
"testing"
1010

1111
"github.com/stretchr/testify/require"
12+
13+
"github.com/cortexproject/cortex/pkg/cortexpb"
1214
)
1315

14-
func BenchmarkMarshalling(b *testing.B) {
15-
jsonfile := os.Getenv("JSON")
16-
buf, err := ioutil.ReadFile(jsonfile)
16+
func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) {
17+
const (
18+
numSeries = 1000
19+
numSamplesPerSeries = 1000
20+
)
21+
22+
// Generate a mocked response and marshal it.
23+
res := mockPrometheusResponse(numSeries, numSamplesPerSeries)
24+
encodedRes, err := json.Marshal(res)
1725
require.NoError(b, err)
26+
b.Log("test prometheus response size:", len(encodedRes))
27+
28+
b.ResetTimer()
29+
b.ReportAllocs()
30+
1831
for n := 0; n < b.N; n++ {
19-
apiResp, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{
20-
StatusCode: 200,
21-
Body: ioutil.NopCloser(bytes.NewReader(buf)),
32+
_, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{
33+
StatusCode: 200,
34+
Body: ioutil.NopCloser(bytes.NewReader(encodedRes)),
35+
ContentLength: int64(len(encodedRes)),
2236
}, nil)
2337
require.NoError(b, err)
38+
}
39+
}
2440

25-
resp, err := PrometheusCodec.EncodeResponse(context.Background(), apiResp)
26-
require.NoError(b, err)
41+
func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) {
42+
const (
43+
numSeries = 1000
44+
numSamplesPerSeries = 1000
45+
)
46+
47+
// Generate a mocked response and marshal it.
48+
res := mockPrometheusResponse(numSeries, numSamplesPerSeries)
49+
50+
b.ResetTimer()
51+
b.ReportAllocs()
2752

28-
buf2, err := ioutil.ReadAll(resp.Body)
53+
for n := 0; n < b.N; n++ {
54+
_, err := PrometheusCodec.EncodeResponse(context.Background(), res)
2955
require.NoError(b, err)
30-
require.Equal(b, string(buf), string(buf2))
56+
}
57+
}
58+
59+
func mockPrometheusResponse(numSeries, numSamplesPerSeries int) *PrometheusResponse {
60+
stream := make([]SampleStream, numSeries)
61+
for s := 0; s < numSeries; s++ {
62+
// Generate random samples.
63+
samples := make([]cortexpb.Sample, numSamplesPerSeries)
64+
for i := 0; i < numSamplesPerSeries; i++ {
65+
samples[i] = cortexpb.Sample{
66+
Value: rand.Float64(),
67+
TimestampMs: int64(i),
68+
}
69+
}
70+
71+
// Generate random labels.
72+
lbls := make([]cortexpb.LabelAdapter, 10)
73+
for i := range lbls {
74+
lbls[i].Name = "a_medium_size_label_name"
75+
lbls[i].Value = "a_medium_size_label_value_that_is_used_to_benchmark_marshalling"
76+
}
77+
78+
stream[s] = SampleStream{
79+
Labels: lbls,
80+
Samples: samples,
81+
}
82+
}
83+
84+
return &PrometheusResponse{
85+
Status: "success",
86+
Data: PrometheusData{
87+
ResultType: "vector",
88+
Result: stream,
89+
},
3190
}
3291
}

pkg/querier/queryrange/query_range.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,12 @@ import (
3131
const StatusSuccess = "success"
3232

3333
var (
34-
matrix = model.ValMatrix.String()
35-
json = jsoniter.ConfigCompatibleWithStandardLibrary
34+
matrix = model.ValMatrix.String()
35+
json = jsoniter.Config{
36+
EscapeHTML: false, // No HTML in our responses.
37+
SortMapKeys: true,
38+
ValidateJsonRawMessage: true,
39+
}.Froze()
3640
errEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "end timestamp must not be before start time")
3741
errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer")
3842
errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
@@ -262,16 +266,20 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
262266
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
263267
defer log.Finish()
264268

265-
buf, err := ioutil.ReadAll(r.Body)
266-
if err != nil {
269+
// Preallocate the buffer with the exact size so we don't waste allocations
270+
// while progressively growing an initial small buffer. The buffer capacity
271+
// is increased by MinRead to avoid extra allocations due to how ReadFrom()
272+
// internally works.
273+
buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead))
274+
if _, err := buf.ReadFrom(r.Body); err != nil {
267275
log.Error(err)
268276
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
269277
}
270278

271-
log.LogFields(otlog.Int("bytes", len(buf)))
279+
log.LogFields(otlog.Int("bytes", buf.Len()))
272280

273281
var resp PrometheusResponse
274-
if err := json.Unmarshal(buf, &resp); err != nil {
282+
if err := json.Unmarshal(buf.Bytes(), &resp); err != nil {
275283
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
276284
}
277285

@@ -303,8 +311,9 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
303311
Header: http.Header{
304312
"Content-Type": []string{"application/json"},
305313
},
306-
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
307-
StatusCode: http.StatusOK,
314+
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
315+
StatusCode: http.StatusOK,
316+
ContentLength: int64(len(b)),
308317
}
309318
return &resp, nil
310319
}

pkg/querier/queryrange/query_range_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,10 @@ func TestResponse(t *testing.T) {
9797

9898
// Reset response, as the above call will have consumed the body reader.
9999
response = &http.Response{
100-
StatusCode: 200,
101-
Header: http.Header{"Content-Type": []string{"application/json"}},
102-
Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))),
100+
StatusCode: 200,
101+
Header: http.Header{"Content-Type": []string{"application/json"}},
102+
Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))),
103+
ContentLength: int64(len(tc.body)),
103104
}
104105
resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp)
105106
require.NoError(t, err)

0 commit comments

Comments
 (0)