Skip to content

Commit e92f96f

Browse files
authored
Reuse the byte buffer from GRPC response in the frontend. (#4377)
* Reuse the byte buffer from GRPC response in the frontend. This PR allow to reuse the GRPC byte buffer casted as a `io.Reader` in the http rountripper of the frontend. This way we don't have to copy the content from the reader into another buffer when reading the http response. I've been testing this with in Loki and this shows good memory saving (around 1GB). Signed-off-by: Cyril Tovena <[email protected]> * Use safe cast Signed-off-by: Cyril Tovena <[email protected]> * Review feedback. Signed-off-by: Cyril Tovena <[email protected]>
1 parent a9a96cb commit e92f96f

File tree

2 files changed

+39
-12
lines changed

2 files changed

+39
-12
lines changed

pkg/frontend/transport/roundtripper.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package transport
33
import (
44
"bytes"
55
"context"
6+
"io"
67
"io/ioutil"
78
"net/http"
89

@@ -24,6 +25,15 @@ type grpcRoundTripperAdapter struct {
2425
roundTripper GrpcRoundTripper
2526
}
2627

28+
type buffer struct {
29+
buff []byte
30+
io.ReadCloser
31+
}
32+
33+
func (b *buffer) Bytes() []byte {
34+
return b.buff
35+
}
36+
2737
func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
2838
req, err := server.HTTPRequest(r)
2939
if err != nil {
@@ -37,7 +47,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
3747

3848
httpResp := &http.Response{
3949
StatusCode: int(resp.Code),
40-
Body: ioutil.NopCloser(bytes.NewReader(resp.Body)),
50+
Body: &buffer{buff: resp.Body, ReadCloser: ioutil.NopCloser(bytes.NewReader(resp.Body))},
4151
Header: http.Header{},
4252
ContentLength: int64(len(resp.Body)),
4353
}

pkg/querier/queryrange/query_range.go

+28-11
Original file line numberDiff line numberDiff line change
@@ -266,20 +266,15 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
266266
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
267267
defer log.Finish()
268268

269-
// Preallocate the buffer with the exact size so we don't waste allocations
270-
// while progressively growing an initial small buffer. The buffer capacity
271-
// is increased by MinRead to avoid extra allocations due to how ReadFrom()
272-
// internally works.
273-
buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead))
274-
if _, err := buf.ReadFrom(r.Body); err != nil {
269+
buf, err := bodyBuffer(r)
270+
if err != nil {
275271
log.Error(err)
276-
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
272+
return nil, err
277273
}
278-
279-
log.LogFields(otlog.Int("bytes", buf.Len()))
274+
log.LogFields(otlog.Int("bytes", len(buf)))
280275

281276
var resp PrometheusResponse
282-
if err := json.Unmarshal(buf.Bytes(), &resp); err != nil {
277+
if err := json.Unmarshal(buf, &resp); err != nil {
283278
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
284279
}
285280

@@ -289,6 +284,29 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
289284
return &resp, nil
290285
}
291286

287+
// Buffer can be used to read a response body.
288+
// This allows to avoid reading the body multiple times from the `http.Response.Body`.
289+
type Buffer interface {
290+
Bytes() []byte
291+
}
292+
293+
func bodyBuffer(res *http.Response) ([]byte, error) {
294+
// Attempt to cast the response body to a Buffer and use it if possible.
295+
// This is because the frontend may have already read the body and buffered it.
296+
if buffer, ok := res.Body.(Buffer); ok {
297+
return buffer.Bytes(), nil
298+
}
299+
// Preallocate the buffer with the exact size so we don't waste allocations
300+
// while progressively growing an initial small buffer. The buffer capacity
301+
// is increased by MinRead to avoid extra allocations due to how ReadFrom()
302+
// internally works.
303+
buf := bytes.NewBuffer(make([]byte, 0, res.ContentLength+bytes.MinRead))
304+
if _, err := buf.ReadFrom(res.Body); err != nil {
305+
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
306+
}
307+
return buf.Bytes(), nil
308+
}
309+
292310
func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.Response, error) {
293311
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
294312
defer sp.Finish()
@@ -392,7 +410,6 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
392410
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
393411
// timestamps in samples.
394412
func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
395-
396413
if len(samples) <= 0 || minTs < samples[0].TimestampMs {
397414
return samples
398415
}

0 commit comments

Comments
 (0)