Skip to content

Add RW2 support #11100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 9, 2025
1,068 changes: 823 additions & 245 deletions integration/distributor_test.go

Large diffs are not rendered by default.

17 changes: 14 additions & 3 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,14 @@ func (c *Client) SetTimeout(t time.Duration) {
// Push the input timeseries to the remote endpoint
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries})
wreq := &prompb.WriteRequest{
Timeseries: timeseries,
}
return c.PushRW1(wreq)
}

func (c *Client) PushRW1(wreq *prompb.WriteRequest) (*http.Response, error) {
data, err := proto.Marshal(wreq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -707,8 +714,12 @@ func (c *Client) ActiveNativeHistogramMetrics(selector string, options ...Active
}

// GetPrometheusMetadata fetches the metadata from the Prometheus endpoint /api/v1/metadata.
func (c *Client) GetPrometheusMetadata() (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/prometheus/api/v1/metadata", c.querierAddress), nil)
func (c *Client) GetPrometheusMetadata(metric string) (*http.Response, error) {
metricParam := ""
if metric != "" {
metricParam = fmt.Sprintf("?metric=%s", metric)
}
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/prometheus/api/v1/metadata%s", c.querierAddress, metricParam), nil)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion integration/influx_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ func TestInfluxIngestion(t *testing.T) {
require.Equal(t, expectedMatrix, rangeResult.(model.Matrix))

// No metadata to query, but we do the query anyway.
_, err = c.GetPrometheusMetadata()
_, err = c.GetPrometheusMetadata("")
require.NoError(t, err)
}
4 changes: 2 additions & 2 deletions integration/otlp_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func testOTLPIngestion(t *testing.T, enableSuffixes bool) {
require.Equal(t, expectedMatrix, rangeResult.(model.Matrix))

// Query the metadata
metadataResult, err := c.GetPrometheusMetadata()
metadataResult, err := c.GetPrometheusMetadata("")
require.NoError(t, err)
require.Equal(t, 200, metadataResult.StatusCode)

Expand Down Expand Up @@ -178,7 +178,7 @@ func testOTLPIngestion(t *testing.T, enableSuffixes bool) {
}
`, sfx, sfx)

metadataResult, err = c.GetPrometheusMetadata()
metadataResult, err = c.GetPrometheusMetadata("")
require.NoError(t, err)
require.Equal(t, 200, metadataResult.StatusCode)

Expand Down
13 changes: 8 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

d.updateReceivedMetrics(req, userID)
d.updateReceivedMetrics(ctx, req, userID)

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1876,18 +1876,21 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedExemplars, receivedMetadata int
func (d *Distributor) updateReceivedMetrics(ctx context.Context, req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedHistograms, receivedExemplars, receivedMetadata int
for _, ts := range req.Timeseries {
receivedSamples += len(ts.Samples) + len(ts.Histograms)
receivedSamples += len(ts.Samples)
receivedHistograms += len(ts.Histograms)
receivedExemplars += len(ts.Exemplars)
}
d.costAttributionMgr.SampleTracker(userID).IncrementReceivedSamples(req, mtime.Now())
receivedMetadata = len(req.Metadata)

d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples + receivedHistograms))
d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))

updateWriteResponseStatsCtx(ctx, receivedSamples, receivedHistograms, receivedExemplars)
}

// forReplicationSets runs f, in parallel, for all ingesters in the input replicationSets.
Expand Down
128 changes: 84 additions & 44 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
promRemote "github.com/prometheus/prometheus/storage/remote"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand All @@ -36,6 +37,13 @@ import (
// PushFunc defines the type of the push. It is similar to http.HandlerFunc.
type PushFunc func(ctx context.Context, req *Request) error

// The PushFunc might store promRemote.WriteResponseStats in the context.
type pushResponseStatsContextMarker struct{}

var (
PushResponseStatsContextKey = &pushResponseStatsContextMarker{}
)

// parserFunc defines how to read the body the request from an HTTP request. It takes an optional RequestBuffers.
type parserFunc func(ctx context.Context, r *http.Request, maxSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error

Expand Down Expand Up @@ -151,65 +159,68 @@ func handler(
}
}

var supplier supplierFunc
isRW2, err := isRemoteWrite2(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
if isRW2 {
supplier = func() (*mimirpb.WriteRequest, func(), error) {
// Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable
// unless the client switches to remote-write v1.
return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported")
}
} else {
supplier = func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest
supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest

userID, err := tenant.TenantID(ctx)
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
}
req.UnmarshalFromRW2 = isRW2

// userID might be empty if none was in the ctx, in this case just use the default setting.
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
// Optimization to avoid the allocations required for unmarshaling exemplars.
req.SkipUnmarshalingExemplars = true
}
userID, err := tenant.TenantID(ctx)
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
}

if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
}
// userID might be empty if none was in the ctx, in this case just use the default setting.
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
// Optimization to avoid the allocations required for unmarshaling exemplars.
req.SkipUnmarshalingExemplars = true
}

rb.CleanUp()
return nil, nil, err
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
}

if allowSkipLabelNameValidation {
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
} else {
req.SkipLabelValidation = false
}
rb.CleanUp()
return nil, nil, err
}

if allowSkipLabelCountValidation {
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
} else {
req.SkipLabelCountValidation = false
}
if allowSkipLabelNameValidation {
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
} else {
req.SkipLabelValidation = false
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
if allowSkipLabelCountValidation {
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
} else {
req.SkipLabelCountValidation = false
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
}
req := newRequest(supplier)
if err := push(ctx, req); err != nil {
ctx = contextWithWriteResponseStats(ctx)
err = push(ctx, req)
rsValue := ctx.Value(PushResponseStatsContextKey)
if rsValue != nil {
rs := rsValue.(*promRemote.WriteResponseStats)
addWriteResponseStats(w, rs)
} else {
// This should not happen, but if it does, we should not panic.
addWriteResponseStats(w, &promRemote.WriteResponseStats{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't addWriteResponseStats only relevant for RW v2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, you're right, I've ended up doing it for RW1 as well, let me see how I could avoid that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, only do stats for RW2 and also add check to the integration test

}
if err != nil {
if errors.Is(err, context.Canceled) {
http.Error(w, err.Error(), statusClientClosedRequest)
level.Warn(logger).Log("msg", "push request canceled", "err", err)
Expand Down Expand Up @@ -277,6 +288,35 @@ func isRemoteWrite2(r *http.Request) (bool, error) {
return false, nil
}

// Consts from https://github.com/prometheus/prometheus/blob/main/storage/remote/stats.go
const (
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
)

func contextWithWriteResponseStats(ctx context.Context) context.Context {
return context.WithValue(ctx, PushResponseStatsContextKey, &promRemote.WriteResponseStats{})
}

func addWriteResponseStats(w http.ResponseWriter, rs *promRemote.WriteResponseStats) {
headers := w.Header()
headers.Set(rw20WrittenSamplesHeader, strconv.Itoa(rs.Samples))
headers.Set(rw20WrittenHistogramsHeader, strconv.Itoa(rs.Histograms))
headers.Set(rw20WrittenExemplarsHeader, strconv.Itoa(rs.Exemplars))
}

func updateWriteResponseStatsCtx(ctx context.Context, samples, histograms, exemplars int) {
prs := ctx.Value(PushResponseStatsContextKey)
if prs == nil {
// Should not happen, but we should not panic anyway.
return
}
prs.(*promRemote.WriteResponseStats).Samples += samples
prs.(*promRemote.WriteResponseStats).Histograms += histograms
prs.(*promRemote.WriteResponseStats).Exemplars += exemplars
}

func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string {
const jitterFactor = 0.5

Expand Down
69 changes: 69 additions & 0 deletions pkg/mimirpb/compat_rw2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// SPDX-License-Identifier: AGPL-3.0-only

package mimirpb

import (
"errors"
"fmt"
"sync"
)

// Remote Write 2.0 related variables and functions.
var (
errorUnexpectedRW1Timeseries = errors.New("proto: Remote Write 1.0 field Timeseries in non-Remote Write 1.0 message")
errorUnexpectedRW1Metadata = errors.New("proto: Remote Write 1.0 field Metadata in non-Remote Write 1.0 message")
errorUnexpectedRW2Timeseries = errors.New("proto: Remote Write 2.0 field Timeseries in non-Remote Write 2.0 message")
errorUnexpectedRW2Symbols = errors.New("proto: Remote Write 2.0 field Symbols in non-Remote Write 2.0 message")
errorOddNumberOfLabelRefs = errors.New("proto: Remote Write 2.0 odd number of label references")
errorOddNumberOfExemplarLabelRefs = errors.New("proto: Remote Write 2.0 odd number of exemplar label references")
errorInvalidLabelRef = errors.New("proto: Remote Write 2.0 invalid label reference")
errorInvalidExemplarLabelRef = errors.New("proto: Remote Write 2.0 invalid exemplar label reference")
errorInternalRW2 = errors.New("proto: Remote Write 2.0 internal error")
errorInvalidHelpRef = errors.New("proto: Remote Write 2.0 invalid help reference")
errorInvalidUnitRef = errors.New("proto: Remote Write 2.0 invalid unit reference")
)

// rw2SymbolPageSize is the size of each page in bits.
const rw2SymbolPageSize = 16

// rw2PagedSymbols is a structure that holds symbols in pages.
// The problem this solves is that protobuf doesn't tell us
// how many symbols there are in advance. Without this paging
// mechanism, we would have to allocate a large amount of memory
// or do reallocation. This is a compromise between the two.
type rw2PagedSymbols struct {
count uint32
pages [][]string
}

func (ps *rw2PagedSymbols) append(symbol string) {
nextPage := ps.count >> rw2SymbolPageSize
if int(nextPage) >= len(ps.pages) {
ps.pages = append(ps.pages, rw2PagedSymbolsPool.Get().([]string))
}
ps.pages[nextPage] = append(ps.pages[nextPage], symbol)
ps.count++
}

func (ps *rw2PagedSymbols) releasePages() {
for _, page := range ps.pages {
page = page[:0]
rw2PagedSymbolsPool.Put(page) //nolint:staticcheck
}
}

func (ps *rw2PagedSymbols) get(ref uint32) (string, error) {
if ref < ps.count {
page := ps.pages[ref>>rw2SymbolPageSize]
return page[ref&((1<<rw2SymbolPageSize)-1)], nil
}
return "", fmt.Errorf("symbol reference %d is out of bounds", ref)
}

var (
rw2PagedSymbolsPool = sync.Pool{
New: func() interface{} {
return make([]string, 0, 1<<rw2SymbolPageSize)
},
}
)
Loading