Skip to content

Commit b8f2eb1

Browse files
committed
feat(rw2): unmarshal RW1 request from RW2
Implement unmarshal of Promtheus remote write request 1.0 data from Remote Write 2.0 input by converting the model. The conversion is lossy as RW1 does not have metadata per series, only per metric family. Signed-off-by: György Krajcsovits <[email protected]> Signed-off-by: Arve Knudsen <[email protected]>
1 parent 4fc7ccd commit b8f2eb1

10 files changed

+1189
-262
lines changed

integration/distributor_test.go

+28-3
Original file line numberDiff line numberDiff line change
@@ -374,14 +374,26 @@ overrides:
374374
}
375375

376376
func TestDistributorRemoteWrite2(t *testing.T) {
377+
t.Run("caching_unmarshal_data_enabled", func(t *testing.T) {
378+
testDistributorRemoteWrite2(t, true)
379+
})
380+
381+
t.Run("caching_unmarshal_data_disabled", func(t *testing.T) {
382+
testDistributorRemoteWrite2(t, false)
383+
})
384+
}
385+
386+
func testDistributorRemoteWrite2(t *testing.T, cachingUnmarshalDataEnabled bool) {
377387
queryEnd := time.Now().Round(time.Second)
378388
queryStart := queryEnd.Add(-1 * time.Hour)
389+
queryStep := 10 * time.Minute
379390

380391
testCases := map[string]struct {
381392
inRemoteWrite []*promRW2.Request
382393
runtimeConfig string
383394
queries map[string]model.Matrix
384395
exemplarQueries map[string][]promv1.ExemplarQueryResult
396+
expectedStatus int
385397
}{
386398
"no special features": {
387399
inRemoteWrite: []*promRW2.Request{
@@ -401,6 +413,7 @@ func TestDistributorRemoteWrite2(t *testing.T) {
401413
Values: []model.SamplePair{{Timestamp: model.Time(queryStart.UnixMilli()), Value: model.SampleValue(100)}},
402414
}},
403415
},
416+
expectedStatus: http.StatusOK,
404417
},
405418
}
406419

@@ -424,7 +437,7 @@ func TestDistributorRemoteWrite2(t *testing.T) {
424437
"-distributor.ha-tracker.store": "consul",
425438
"-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(),
426439
"-distributor.ha-tracker.prefix": "prom_ha/",
427-
"-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(false),
440+
"-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(cachingUnmarshalDataEnabled),
428441
}
429442

430443
flags := mergeFlags(
@@ -490,10 +503,22 @@ func TestDistributorRemoteWrite2(t *testing.T) {
490503

491504
res, err := client.PushRW2(ser)
492505
require.NoError(t, err)
493-
require.Equal(t, http.StatusUnsupportedMediaType, res.StatusCode)
506+
require.Equal(t, tc.expectedStatus, res.StatusCode)
507+
}
508+
509+
for q, res := range tc.queries {
510+
result, err := client.QueryRange(q, queryStart, queryEnd, queryStep)
511+
require.NoError(t, err)
512+
513+
require.Equal(t, res.String(), result.String())
494514
}
495515

496-
// Placeholder for actual query tests.
516+
for q, expResult := range tc.exemplarQueries {
517+
result, err := client.QueryExemplars(q, queryStart, queryEnd)
518+
require.NoError(t, err)
519+
520+
require.Equal(t, expResult, result)
521+
}
497522
})
498523
}
499524
}

pkg/distributor/distributor.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -1645,7 +1645,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
16451645
return err
16461646
}
16471647

1648-
d.updateReceivedMetrics(req, userID)
1648+
d.updateReceivedMetrics(ctx, req, userID)
16491649

16501650
if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
16511651
return nil
@@ -1876,18 +1876,21 @@ func tokenForMetadata(userID string, metricName string) uint32 {
18761876
return mimirpb.ShardByMetricName(userID, metricName)
18771877
}
18781878

1879-
func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
1880-
var receivedSamples, receivedExemplars, receivedMetadata int
1879+
func (d *Distributor) updateReceivedMetrics(ctx context.Context, req *mimirpb.WriteRequest, userID string) {
1880+
var receivedSamples, receivedHistograms, receivedExemplars, receivedMetadata int
18811881
for _, ts := range req.Timeseries {
1882-
receivedSamples += len(ts.Samples) + len(ts.Histograms)
1882+
receivedSamples += len(ts.Samples)
1883+
receivedHistograms += len(ts.Histograms)
18831884
receivedExemplars += len(ts.Exemplars)
18841885
}
18851886
d.costAttributionMgr.SampleTracker(userID).IncrementReceivedSamples(req, mtime.Now())
18861887
receivedMetadata = len(req.Metadata)
18871888

1888-
d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
1889+
d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples + receivedHistograms))
18891890
d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars))
18901891
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))
1892+
1893+
updateWriteResponseStatsCtx(ctx, receivedSamples, receivedHistograms, receivedExemplars)
18911894
}
18921895

18931896
// forReplicationSets runs f, in parallel, for all ingesters in the input replicationSets.

pkg/distributor/push.go

+84-44
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/grafana/dskit/user"
2626
"github.com/opentracing/opentracing-go"
2727
"github.com/pkg/errors"
28+
promRemote "github.com/prometheus/prometheus/storage/remote"
2829

2930
"github.com/grafana/mimir/pkg/mimirpb"
3031
"github.com/grafana/mimir/pkg/util"
@@ -36,6 +37,13 @@ import (
3637
// PushFunc defines the type of the push. It is similar to http.HandlerFunc.
3738
type PushFunc func(ctx context.Context, req *Request) error
3839

40+
// The PushFunc might store promRemote.WriteResponseStats in the context.
41+
type pushResponseStatsContextMarker struct{}
42+
43+
var (
44+
PushResponseStatsContextKey = &pushResponseStatsContextMarker{}
45+
)
46+
3947
// parserFunc defines how to read the body the request from an HTTP request. It takes an optional RequestBuffers.
4048
type parserFunc func(ctx context.Context, r *http.Request, maxSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error
4149

@@ -151,65 +159,68 @@ func handler(
151159
}
152160
}
153161

154-
var supplier supplierFunc
155162
isRW2, err := isRemoteWrite2(r)
156163
if err != nil {
157164
http.Error(w, err.Error(), http.StatusBadRequest)
158165
}
159-
if isRW2 {
160-
supplier = func() (*mimirpb.WriteRequest, func(), error) {
161-
// Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable
162-
// unless the client switches to remote-write v1.
163-
return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported")
164-
}
165-
} else {
166-
supplier = func() (*mimirpb.WriteRequest, func(), error) {
167-
rb := util.NewRequestBuffers(requestBufferPool)
168-
var req mimirpb.PreallocWriteRequest
166+
supplier := func() (*mimirpb.WriteRequest, func(), error) {
167+
rb := util.NewRequestBuffers(requestBufferPool)
168+
var req mimirpb.PreallocWriteRequest
169169

170-
userID, err := tenant.TenantID(ctx)
171-
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
172-
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
173-
}
170+
req.UnmarshalFromRW2 = isRW2
174171

175-
// userID might be empty if none was in the ctx, in this case just use the default setting.
176-
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
177-
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
178-
// Optimization to avoid the allocations required for unmarshaling exemplars.
179-
req.SkipUnmarshalingExemplars = true
180-
}
172+
userID, err := tenant.TenantID(ctx)
173+
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
174+
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
175+
}
181176

182-
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
183-
// Check for httpgrpc error, default to client error if parsing failed
184-
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
185-
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
186-
}
177+
// userID might be empty if none was in the ctx, in this case just use the default setting.
178+
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
179+
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
180+
// Optimization to avoid the allocations required for unmarshaling exemplars.
181+
req.SkipUnmarshalingExemplars = true
182+
}
187183

188-
rb.CleanUp()
189-
return nil, nil, err
184+
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
185+
// Check for httpgrpc error, default to client error if parsing failed
186+
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
187+
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
190188
}
191189

192-
if allowSkipLabelNameValidation {
193-
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
194-
} else {
195-
req.SkipLabelValidation = false
196-
}
190+
rb.CleanUp()
191+
return nil, nil, err
192+
}
197193

198-
if allowSkipLabelCountValidation {
199-
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
200-
} else {
201-
req.SkipLabelCountValidation = false
202-
}
194+
if allowSkipLabelNameValidation {
195+
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
196+
} else {
197+
req.SkipLabelValidation = false
198+
}
203199

204-
cleanup := func() {
205-
mimirpb.ReuseSlice(req.Timeseries)
206-
rb.CleanUp()
207-
}
208-
return &req.WriteRequest, cleanup, nil
200+
if allowSkipLabelCountValidation {
201+
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
202+
} else {
203+
req.SkipLabelCountValidation = false
204+
}
205+
206+
cleanup := func() {
207+
mimirpb.ReuseSlice(req.Timeseries)
208+
rb.CleanUp()
209209
}
210+
return &req.WriteRequest, cleanup, nil
210211
}
211212
req := newRequest(supplier)
212-
if err := push(ctx, req); err != nil {
213+
ctx = contextWithWriteResponseStats(ctx)
214+
err = push(ctx, req)
215+
rsValue := ctx.Value(PushResponseStatsContextKey)
216+
if rsValue != nil {
217+
rs := rsValue.(*promRemote.WriteResponseStats)
218+
addWriteResponseStats(w, rs)
219+
} else {
220+
// This should not happen, but if it does, we should not panic.
221+
addWriteResponseStats(w, &promRemote.WriteResponseStats{})
222+
}
223+
if err != nil {
213224
if errors.Is(err, context.Canceled) {
214225
http.Error(w, err.Error(), statusClientClosedRequest)
215226
level.Warn(logger).Log("msg", "push request canceled", "err", err)
@@ -277,6 +288,35 @@ func isRemoteWrite2(r *http.Request) (bool, error) {
277288
return false, nil
278289
}
279290

291+
// Consts from https://github.com/prometheus/prometheus/blob/main/storage/remote/stats.go
292+
const (
293+
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
294+
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
295+
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
296+
)
297+
298+
func contextWithWriteResponseStats(ctx context.Context) context.Context {
299+
return context.WithValue(ctx, PushResponseStatsContextKey, &promRemote.WriteResponseStats{})
300+
}
301+
302+
func addWriteResponseStats(w http.ResponseWriter, rs *promRemote.WriteResponseStats) {
303+
headers := w.Header()
304+
headers.Set(rw20WrittenSamplesHeader, strconv.Itoa(rs.Samples))
305+
headers.Set(rw20WrittenHistogramsHeader, strconv.Itoa(rs.Histograms))
306+
headers.Set(rw20WrittenExemplarsHeader, strconv.Itoa(rs.Exemplars))
307+
}
308+
309+
func updateWriteResponseStatsCtx(ctx context.Context, samples, histograms, exemplars int) {
310+
prs := ctx.Value(PushResponseStatsContextKey)
311+
if prs == nil {
312+
// Should not happen, but we should not panic anyway.
313+
return
314+
}
315+
prs.(*promRemote.WriteResponseStats).Samples += samples
316+
prs.(*promRemote.WriteResponseStats).Histograms += histograms
317+
prs.(*promRemote.WriteResponseStats).Exemplars += exemplars
318+
}
319+
280320
func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string {
281321
const jitterFactor = 0.5
282322

pkg/mimirpb/compat_rw2.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package mimirpb
4+
5+
import (
6+
"errors"
7+
"fmt"
8+
"sync"
9+
)
10+
11+
// Remote Write 2.0 related variables and functions.
12+
var (
13+
errorUnexpectedRW1Timeseries = errors.New("proto: Remote Write 1.0 field Timeseries in non-Remote Write 1.0 message")
14+
errorUnexpectedRW1Metadata = errors.New("proto: Remote Write 1.0 field Metadata in non-Remote Write 1.0 message")
15+
errorUnexpectedRW2Timeseries = errors.New("proto: Remote Write 2.0 field Timeseries in non-Remote Write 2.0 message")
16+
errorUnexpectedRW2Symbols = errors.New("proto: Remote Write 2.0 field Symbols in non-Remote Write 2.0 message")
17+
errorOddNumberOfLabelRefs = errors.New("proto: Remote Write 2.0 odd number of label references")
18+
errorOddNumberOfExemplarLabelRefs = errors.New("proto: Remote Write 2.0 odd number of exemplar label references")
19+
errorInvalidLabelRef = errors.New("proto: Remote Write 2.0 invalid label reference")
20+
errorInvalidExemplarLabelRef = errors.New("proto: Remote Write 2.0 invalid exemplar label reference")
21+
errorInternalRW2 = errors.New("proto: Remote Write 2.0 internal error")
22+
errorInvalidHelpRef = errors.New("proto: Remote Write 2.0 invalid help reference")
23+
errorInvalidUnitRef = errors.New("proto: Remote Write 2.0 invalid unit reference")
24+
)
25+
26+
// rw2SymbolPageSize is the size of each page in bits.
27+
const rw2SymbolPageSize = 16
28+
29+
// rw2PagedSymbols is a structure that holds symbols in pages.
30+
// The problem this solves is that protobuf doesn't tell us
31+
// how many symbols there are in advance. Without this paging
32+
// mechanism, we would have to allocate a large amount of memory
33+
// or do reallocation. This is a compromise between the two.
34+
type rw2PagedSymbols struct {
35+
count uint32
36+
pages [][]string
37+
}
38+
39+
func (ps *rw2PagedSymbols) append(symbol string) {
40+
nextPage := ps.count >> rw2SymbolPageSize
41+
if int(nextPage) >= len(ps.pages) {
42+
ps.pages = append(ps.pages, rw2PagedSymbolsPool.Get().([]string))
43+
}
44+
ps.pages[nextPage] = append(ps.pages[nextPage], symbol)
45+
ps.count++
46+
}
47+
48+
func (ps *rw2PagedSymbols) releasePages() {
49+
for _, page := range ps.pages {
50+
page = page[:0]
51+
rw2PagedSymbolsPool.Put(page) //nolint:staticcheck
52+
}
53+
}
54+
55+
func (ps *rw2PagedSymbols) get(ref uint32) (string, error) {
56+
if ref < ps.count {
57+
page := ps.pages[ref>>rw2SymbolPageSize]
58+
return page[ref&((1<<rw2SymbolPageSize)-1)], nil
59+
}
60+
return "", fmt.Errorf("symbol reference %d is out of bounds", ref)
61+
}
62+
63+
var (
64+
rw2PagedSymbolsPool = sync.Pool{
65+
New: func() interface{} {
66+
return make([]string, 0, 1<<rw2SymbolPageSize)
67+
},
68+
}
69+
)

0 commit comments

Comments
 (0)