Skip to content

Commit d958c37

Browse files
committed
[prometheusremotewriteexporter] reduce allocations when serializing protobufs (open-telemetry#58)
1 parent 40fa8b8 commit d958c37

File tree

1 file changed

+30
-3
lines changed

1 file changed

+30
-3
lines changed

exporter/prometheusremotewriteexporter/exporter.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS
5353
p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
5454
}
5555

56+
type buffer struct {
57+
protobuf *proto.Buffer
58+
snappy []byte
59+
}
60+
61+
// A reusable buffer pool for serializing protobufs and compressing them with Snappy.
62+
var bufferPool = sync.Pool{
63+
New: func() any {
64+
return &buffer{
65+
protobuf: proto.NewBuffer(nil),
66+
snappy: nil,
67+
}
68+
},
69+
}
70+
5671
// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
5772
type prwExporter struct {
5873
endpointURL *url.URL
@@ -271,14 +286,26 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
271286
}
272287

273288
func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
289+
buf := bufferPool.Get().(*buffer)
290+
buf.protobuf.Reset()
291+
defer bufferPool.Put(buf)
292+
274293
// Uses proto.Marshal to convert the WriteRequest into bytes array
275-
data, errMarshal := proto.Marshal(writeReq)
294+
errMarshal := buf.protobuf.Marshal(writeReq)
276295
if errMarshal != nil {
277296
return consumererror.NewPermanent(errMarshal)
278297
}
279298
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
280-
// Therefore we always let Snappy decide the size of the buffer.
281-
compressedData := snappy.Encode(nil, data)
299+
// Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards.
300+
maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes()))
301+
if maxCompressedLen > len(buf.snappy) {
302+
if cap(buf.snappy) < maxCompressedLen {
303+
buf.snappy = make([]byte, maxCompressedLen)
304+
} else {
305+
buf.snappy = buf.snappy[:maxCompressedLen]
306+
}
307+
}
308+
compressedData := snappy.Encode(buf.snappy, buf.protobuf.Bytes())
282309

283310
// executeFunc can be used for backoff and non backoff scenarios.
284311
executeFunc := func() error {

0 commit comments

Comments
 (0)