Skip to content

Commit 70ef976

Browse files
committed
fix(rw2.0): reject remote write 2.0 based on content type
The current solution returns 2xx , but doesn't actually ingest the samples. Prometheus does detect this prometheus-1 | time=2025-01-13T13:01:35.028Z level=ERROR source=queue_manager.go:1670 msg="non-recoverable error" component=remote remote_name=150c10 url=http://mimir-1:8001/api/v1/push failedSampleCount=2000 failedHistogramCount=0 failedExemplarCount=0 err="sent v2 request with 2000 samples, 0 histograms and 0 exemplars; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted; assumining failure e.g. the target only supports PRW 1.0 prometheus.WriteRequest, but does not check the Content-Type header correctly" But we can do better and also start working towards RW2.0 support. Signed-off-by: György Krajcsovits <[email protected]>
1 parent dee581e commit 70ef976

File tree

1 file changed

+83
-34
lines changed

1 file changed

+83
-34
lines changed

pkg/distributor/push.go

Lines changed: 83 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"math/rand"
1414
"net/http"
1515
"strconv"
16+
"strings"
1617
"time"
1718

1819
"github.com/go-kit/log"
@@ -149,49 +150,63 @@ func handler(
149150
logger = utillog.WithSourceIPs(source, logger)
150151
}
151152
}
152-
supplier := func() (*mimirpb.WriteRequest, func(), error) {
153-
rb := util.NewRequestBuffers(requestBufferPool)
154-
var req mimirpb.PreallocWriteRequest
155153

156-
userID, err := tenant.TenantID(ctx)
157-
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
158-
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
159-
}
160-
161-
// userID might be empty if none was in the ctx, in this case just use the default setting.
162-
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
163-
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
164-
// Optimization to avoid the allocations required for unmarshaling exemplars.
165-
req.SkipUnmarshalingExemplars = true
154+
var supplier supplierFunc
155+
isRW2, err := isRemoteWrite2(r)
156+
if err != nil {
157+
http.Error(w, err.Error(), http.StatusBadRequest)
158+
}
159+
if isRW2 {
160+
supplier = func() (*mimirpb.WriteRequest, func(), error) {
161+
// Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable
162+
// unless the client switches to remote-write v1.
163+
return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported")
166164
}
165+
} else {
166+
supplier = func() (*mimirpb.WriteRequest, func(), error) {
167+
rb := util.NewRequestBuffers(requestBufferPool)
168+
var req mimirpb.PreallocWriteRequest
169+
170+
userID, err := tenant.TenantID(ctx)
171+
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
172+
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
173+
}
167174

168-
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
169-
// Check for httpgrpc error, default to client error if parsing failed
170-
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
171-
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
175+
// userID might be empty if none was in the ctx, in this case just use the default setting.
176+
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
177+
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
178+
// Optimization to avoid the allocations required for unmarshaling exemplars.
179+
req.SkipUnmarshalingExemplars = true
172180
}
173181

174-
rb.CleanUp()
175-
return nil, nil, err
176-
}
182+
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
183+
// Check for httpgrpc error, default to client error if parsing failed
184+
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
185+
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
186+
}
177187

178-
if allowSkipLabelNameValidation {
179-
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
180-
} else {
181-
req.SkipLabelValidation = false
182-
}
188+
rb.CleanUp()
189+
return nil, nil, err
190+
}
183191

184-
if allowSkipLabelCountValidation {
185-
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
186-
} else {
187-
req.SkipLabelCountValidation = false
188-
}
192+
if allowSkipLabelNameValidation {
193+
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
194+
} else {
195+
req.SkipLabelValidation = false
196+
}
189197

190-
cleanup := func() {
191-
mimirpb.ReuseSlice(req.Timeseries)
192-
rb.CleanUp()
198+
if allowSkipLabelCountValidation {
199+
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
200+
} else {
201+
req.SkipLabelCountValidation = false
202+
}
203+
204+
cleanup := func() {
205+
mimirpb.ReuseSlice(req.Timeseries)
206+
rb.CleanUp()
207+
}
208+
return &req.WriteRequest, cleanup, nil
193209
}
194-
return &req.WriteRequest, cleanup, nil
195210
}
196211
req := newRequest(supplier)
197212
if err := push(ctx, req); err != nil {
@@ -226,6 +241,40 @@ func handler(
226241
})
227242
}
228243

244+
func isRemoteWrite2(r *http.Request) (bool, error) {
245+
const appProtoContentType = "application/x-protobuf"
246+
247+
contentType := r.Header.Get("Content-Type")
248+
if contentType == "" {
249+
// If the content type is not set, we assume it is remote write v1.
250+
return false, nil
251+
}
252+
parts := strings.Split(contentType, ";")
253+
if parts[0] != appProtoContentType {
254+
return false, fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType)
255+
}
256+
257+
// Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter
258+
for _, p := range parts[1:] {
259+
pair := strings.Split(p, "=")
260+
if len(pair) != 2 {
261+
return false, fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType)
262+
}
263+
if pair[0] == "proto" {
264+
switch pair[1] {
265+
case "prometheus.WriteRequest":
266+
return false, nil
267+
case "io.prometheus.write.v2.Request":
268+
return true, nil
269+
default:
270+
return false, fmt.Errorf("got %v content type; expected prometheus.WriteRequest or io.prometheus.write.v2.Request", contentType)
271+
}
272+
}
273+
}
274+
// No "proto=" parameter, assuming v1.
275+
return false, nil
276+
}
277+
229278
func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string {
230279
const jitterFactor = 0.5
231280

0 commit comments

Comments
 (0)