Skip to content

Commit 1d7978e

Browse files
authored
fix(rw2.0): reject remote write 2.0 based on content type (#10423)
* fix(rw2.0): reject remote write 2.0 based on content type The current solution returns 2xx , but doesn't actually ingest the samples. Prometheus does detect this prometheus-1 | time=2025-01-13T13:01:35.028Z level=ERROR source=queue_manager.go:1670 msg="non-recoverable error" component=remote remote_name=150c10 url=http://mimir-1:8001/api/v1/push failedSampleCount=2000 failedHistogramCount=0 failedExemplarCount=0 err="sent v2 request with 2000 samples, 0 histograms and 0 exemplars; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted; assumining failure e.g. the target only supports PRW 1.0 prometheus.WriteRequest, but does not check the Content-Type header correctly" But we can do better and also start working towards RW2.0 support. * update changelog * Copy integration test from POC From #10432 Signed-off-by: György Krajcsovits <[email protected]>
1 parent 9b67339 commit 1d7978e

File tree

5 files changed

+348
-34
lines changed

5 files changed

+348
-34
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* [BUGFIX] MQE: Fix deriv with histograms #10383
3030
* [BUGFIX] PromQL: Fix <aggr_over_time> functions with histograms https://github.com/prometheus/prometheus/pull/15711 #10400
3131
* [BUGFIX] MQE: Fix <aggr_over_time> functions with histograms #10400
32+
* [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423
3233

3334
### Mixin
3435

integration/distributor_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ import (
1818
"github.com/prometheus/common/model"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/prompb"
21+
promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2122
"github.com/stretchr/testify/require"
2223

2324
"github.com/grafana/mimir/integration/e2emimir"
25+
"github.com/grafana/mimir/pkg/distributor/rw2"
2426
)
2527

2628
func TestDistributor(t *testing.T) {
@@ -370,3 +372,128 @@ overrides:
370372
})
371373
}
372374
}
375+
376+
func TestDistributorRemoteWrite2(t *testing.T) {
377+
queryEnd := time.Now().Round(time.Second)
378+
queryStart := queryEnd.Add(-1 * time.Hour)
379+
380+
testCases := map[string]struct {
381+
inRemoteWrite []*promRW2.Request
382+
runtimeConfig string
383+
queries map[string]model.Matrix
384+
exemplarQueries map[string][]promv1.ExemplarQueryResult
385+
}{
386+
"no special features": {
387+
inRemoteWrite: []*promRW2.Request{
388+
rw2.AddFloatSeries(
389+
nil,
390+
labels.FromStrings("__name__", "foobar"),
391+
[]promRW2.Sample{{Timestamp: queryStart.UnixMilli(), Value: 100}},
392+
promRW2.Metadata_METRIC_TYPE_COUNTER,
393+
"some help",
394+
"someunit",
395+
0,
396+
nil),
397+
},
398+
queries: map[string]model.Matrix{
399+
"foobar": {{
400+
Metric: model.Metric{"__name__": "foobar"},
401+
Values: []model.SamplePair{{Timestamp: model.Time(queryStart.UnixMilli()), Value: model.SampleValue(100)}},
402+
}},
403+
},
404+
},
405+
}
406+
407+
s, err := e2e.NewScenario(networkName)
408+
require.NoError(t, err)
409+
defer s.Close()
410+
411+
previousRuntimeConfig := ""
412+
require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(previousRuntimeConfig)))
413+
414+
// Start dependencies.
415+
consul := e2edb.NewConsul()
416+
minio := e2edb.NewMinio(9000, blocksBucketName)
417+
require.NoError(t, s.StartAndWaitReady(consul, minio))
418+
419+
baseFlags := map[string]string{
420+
"-distributor.ingestion-tenant-shard-size": "0",
421+
"-ingester.ring.heartbeat-period": "1s",
422+
"-distributor.ha-tracker.enable": "true",
423+
"-distributor.ha-tracker.enable-for-all-users": "true",
424+
"-distributor.ha-tracker.store": "consul",
425+
"-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(),
426+
"-distributor.ha-tracker.prefix": "prom_ha/",
427+
"-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(false),
428+
}
429+
430+
flags := mergeFlags(
431+
BlocksStorageFlags(),
432+
BlocksStorageS3Flags(),
433+
baseFlags,
434+
)
435+
436+
// We want only distributor to be reloading runtime config.
437+
distributorFlags := mergeFlags(flags, map[string]string{
438+
"-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, "runtime.yaml"),
439+
"-runtime-config.reload-period": "100ms",
440+
// Set non-zero default for number of exemplars. That way our values used in the test (0 and 100) will show up in runtime config diff.
441+
"-ingester.max-global-exemplars-per-user": "3",
442+
})
443+
444+
// Ingester will not reload runtime config.
445+
ingesterFlags := mergeFlags(flags, map[string]string{
446+
// Ingester will always see exemplars enabled. We do this to avoid waiting for ingester to apply new setting to TSDB.
447+
"-ingester.max-global-exemplars-per-user": "100",
448+
})
449+
450+
// Start Mimir components.
451+
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), distributorFlags)
452+
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ingesterFlags)
453+
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
454+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
455+
456+
// Wait until distributor has updated the ring.
457+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
458+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
459+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
460+
461+
// Wait until querier has updated the ring.
462+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
463+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
464+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
465+
466+
client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
467+
require.NoError(t, err)
468+
469+
runtimeConfigURL := fmt.Sprintf("http://%s/runtime_config?mode=diff", distributor.HTTPEndpoint())
470+
471+
for testName, tc := range testCases {
472+
t.Run(testName, func(t *testing.T) {
473+
for _, ser := range tc.inRemoteWrite {
474+
if tc.runtimeConfig != previousRuntimeConfig {
475+
currentRuntimeConfig, err := getURL(runtimeConfigURL)
476+
require.NoError(t, err)
477+
478+
// Write new runtime config
479+
require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(tc.runtimeConfig)))
480+
481+
// Wait until distributor has reloaded runtime config.
482+
test.Poll(t, 1*time.Second, true, func() interface{} {
483+
newRuntimeConfig, err := getURL(runtimeConfigURL)
484+
require.NoError(t, err)
485+
return currentRuntimeConfig != newRuntimeConfig
486+
})
487+
488+
previousRuntimeConfig = tc.runtimeConfig
489+
}
490+
491+
res, err := client.PushRW2(ser)
492+
require.NoError(t, err)
493+
require.Equal(t, http.StatusUnsupportedMediaType, res.StatusCode)
494+
}
495+
496+
// Placeholder for actual query tests.
497+
})
498+
}
499+
}

integration/e2emimir/client.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
promConfig "github.com/prometheus/prometheus/config"
3232
"github.com/prometheus/prometheus/model/rulefmt"
3333
"github.com/prometheus/prometheus/prompb" // OTLP protos are not compatible with gogo
34+
promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
3435
"github.com/prometheus/prometheus/storage/remote"
3536
yaml "gopkg.in/yaml.v3"
3637

@@ -186,6 +187,38 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
186187
return res, nil
187188
}
188189

190+
func (c *Client) PushRW2(writeRequest *promRW2.Request) (*http.Response, error) {
191+
// Create write request
192+
data, err := proto.Marshal(writeRequest)
193+
if err != nil {
194+
return nil, err
195+
}
196+
197+
// Create HTTP request
198+
compressed := snappy.Encode(nil, data)
199+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/push", c.distributorAddress), bytes.NewReader(compressed))
200+
if err != nil {
201+
return nil, err
202+
}
203+
204+
req.Header.Add("Content-Encoding", "snappy")
205+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
206+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
207+
req.Header.Set("X-Scope-OrgID", c.orgID)
208+
209+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
210+
defer cancel()
211+
212+
// Execute HTTP request
213+
res, err := c.httpClient.Do(req.WithContext(ctx))
214+
if err != nil {
215+
return nil, err
216+
}
217+
218+
defer res.Body.Close()
219+
return res, nil
220+
}
221+
189222
// PushOTLP the input timeseries to the remote endpoint in OTLP format
190223
func (c *Client) PushOTLP(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) (*http.Response, error) {
191224
// Create write request

pkg/distributor/push.go

Lines changed: 83 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"math/rand"
1414
"net/http"
1515
"strconv"
16+
"strings"
1617
"time"
1718

1819
"github.com/go-kit/log"
@@ -149,49 +150,63 @@ func handler(
149150
logger = utillog.WithSourceIPs(source, logger)
150151
}
151152
}
152-
supplier := func() (*mimirpb.WriteRequest, func(), error) {
153-
rb := util.NewRequestBuffers(requestBufferPool)
154-
var req mimirpb.PreallocWriteRequest
155153

156-
userID, err := tenant.TenantID(ctx)
157-
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
158-
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
159-
}
160-
161-
// userID might be empty if none was in the ctx, in this case just use the default setting.
162-
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
163-
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
164-
// Optimization to avoid the allocations required for unmarshaling exemplars.
165-
req.SkipUnmarshalingExemplars = true
154+
var supplier supplierFunc
155+
isRW2, err := isRemoteWrite2(r)
156+
if err != nil {
157+
http.Error(w, err.Error(), http.StatusBadRequest)
158+
}
159+
if isRW2 {
160+
supplier = func() (*mimirpb.WriteRequest, func(), error) {
161+
// Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable
162+
// unless the client switches to remote-write v1.
163+
return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported")
166164
}
165+
} else {
166+
supplier = func() (*mimirpb.WriteRequest, func(), error) {
167+
rb := util.NewRequestBuffers(requestBufferPool)
168+
var req mimirpb.PreallocWriteRequest
169+
170+
userID, err := tenant.TenantID(ctx)
171+
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
172+
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
173+
}
167174

168-
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
169-
// Check for httpgrpc error, default to client error if parsing failed
170-
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
171-
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
175+
// userID might be empty if none was in the ctx, in this case just use the default setting.
176+
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
177+
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
178+
// Optimization to avoid the allocations required for unmarshaling exemplars.
179+
req.SkipUnmarshalingExemplars = true
172180
}
173181

174-
rb.CleanUp()
175-
return nil, nil, err
176-
}
182+
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
183+
// Check for httpgrpc error, default to client error if parsing failed
184+
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
185+
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
186+
}
177187

178-
if allowSkipLabelNameValidation {
179-
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
180-
} else {
181-
req.SkipLabelValidation = false
182-
}
188+
rb.CleanUp()
189+
return nil, nil, err
190+
}
183191

184-
if allowSkipLabelCountValidation {
185-
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
186-
} else {
187-
req.SkipLabelCountValidation = false
188-
}
192+
if allowSkipLabelNameValidation {
193+
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
194+
} else {
195+
req.SkipLabelValidation = false
196+
}
189197

190-
cleanup := func() {
191-
mimirpb.ReuseSlice(req.Timeseries)
192-
rb.CleanUp()
198+
if allowSkipLabelCountValidation {
199+
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
200+
} else {
201+
req.SkipLabelCountValidation = false
202+
}
203+
204+
cleanup := func() {
205+
mimirpb.ReuseSlice(req.Timeseries)
206+
rb.CleanUp()
207+
}
208+
return &req.WriteRequest, cleanup, nil
193209
}
194-
return &req.WriteRequest, cleanup, nil
195210
}
196211
req := newRequest(supplier)
197212
if err := push(ctx, req); err != nil {
@@ -226,6 +241,40 @@ func handler(
226241
})
227242
}
228243

244+
func isRemoteWrite2(r *http.Request) (bool, error) {
245+
const appProtoContentType = "application/x-protobuf"
246+
247+
contentType := r.Header.Get("Content-Type")
248+
if contentType == "" {
249+
// If the content type is not set, we assume it is remote write v1.
250+
return false, nil
251+
}
252+
parts := strings.Split(contentType, ";")
253+
if parts[0] != appProtoContentType {
254+
return false, fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType)
255+
}
256+
257+
// Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter
258+
for _, p := range parts[1:] {
259+
pair := strings.Split(p, "=")
260+
if len(pair) != 2 {
261+
return false, fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType)
262+
}
263+
if pair[0] == "proto" {
264+
switch pair[1] {
265+
case "prometheus.WriteRequest":
266+
return false, nil
267+
case "io.prometheus.write.v2.Request":
268+
return true, nil
269+
default:
270+
return false, fmt.Errorf("got %v content type; expected prometheus.WriteRequest or io.prometheus.write.v2.Request", contentType)
271+
}
272+
}
273+
}
274+
// No "proto=" parameter, assuming v1.
275+
return false, nil
276+
}
277+
229278
func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string {
230279
const jitterFactor = 0.5
231280

0 commit comments

Comments
 (0)