-
Notifications
You must be signed in to change notification settings - Fork 612
Add RW2 support #11100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add RW2 support #11100
Changes from 4 commits
4fc7ccd
5523a3f
f029c1d
705b352
0ca712c
bbb59f6
264adb1
6da844c
bda00c1
e3c3275
cce29a8
944ed6e
747361b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import ( | |
"github.com/grafana/dskit/user" | ||
"github.com/opentracing/opentracing-go" | ||
"github.com/pkg/errors" | ||
promRemote "github.com/prometheus/prometheus/storage/remote" | ||
|
||
"github.com/grafana/mimir/pkg/mimirpb" | ||
"github.com/grafana/mimir/pkg/util" | ||
|
@@ -36,6 +37,13 @@ import ( | |
// PushFunc defines the type of the push. It is similar to http.HandlerFunc. | ||
type PushFunc func(ctx context.Context, req *Request) error | ||
|
||
// The PushFunc might store promRemote.WriteResponseStats in the context. | ||
type pushResponseStatsContextMarker struct{} | ||
|
||
var ( | ||
PushResponseStatsContextKey = &pushResponseStatsContextMarker{} | ||
) | ||
|
||
// parserFunc defines how to read the body the request from an HTTP request. It takes an optional RequestBuffers. | ||
type parserFunc func(ctx context.Context, r *http.Request, maxSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error | ||
|
||
|
@@ -151,65 +159,68 @@ func handler( | |
} | ||
} | ||
|
||
var supplier supplierFunc | ||
isRW2, err := isRemoteWrite2(r) | ||
if err != nil { | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
} | ||
if isRW2 { | ||
supplier = func() (*mimirpb.WriteRequest, func(), error) { | ||
// Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable | ||
// unless the client switches to remote-write v1. | ||
return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported") | ||
} | ||
} else { | ||
supplier = func() (*mimirpb.WriteRequest, func(), error) { | ||
rb := util.NewRequestBuffers(requestBufferPool) | ||
var req mimirpb.PreallocWriteRequest | ||
supplier := func() (*mimirpb.WriteRequest, func(), error) { | ||
rb := util.NewRequestBuffers(requestBufferPool) | ||
var req mimirpb.PreallocWriteRequest | ||
|
||
userID, err := tenant.TenantID(ctx) | ||
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID | ||
return nil, nil, errors.Wrap(err, "failed to get tenant ID") | ||
} | ||
req.UnmarshalFromRW2 = isRW2 | ||
|
||
// userID might be empty if none was in the ctx, in this case just use the default setting. | ||
if limits.MaxGlobalExemplarsPerUser(userID) == 0 { | ||
// The user is not allowed to send exemplars, so there is no need to unmarshal them. | ||
// Optimization to avoid the allocations required for unmarshaling exemplars. | ||
req.SkipUnmarshalingExemplars = true | ||
} | ||
userID, err := tenant.TenantID(ctx) | ||
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID | ||
return nil, nil, errors.Wrap(err, "failed to get tenant ID") | ||
} | ||
|
||
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil { | ||
// Check for httpgrpc error, default to client error if parsing failed | ||
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok { | ||
err = httpgrpc.Error(http.StatusBadRequest, err.Error()) | ||
} | ||
// userID might be empty if none was in the ctx, in this case just use the default setting. | ||
if limits.MaxGlobalExemplarsPerUser(userID) == 0 { | ||
// The user is not allowed to send exemplars, so there is no need to unmarshal them. | ||
// Optimization to avoid the allocations required for unmarshaling exemplars. | ||
req.SkipUnmarshalingExemplars = true | ||
} | ||
|
||
rb.CleanUp() | ||
return nil, nil, err | ||
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil { | ||
// Check for httpgrpc error, default to client error if parsing failed | ||
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok { | ||
err = httpgrpc.Error(http.StatusBadRequest, err.Error()) | ||
} | ||
|
||
if allowSkipLabelNameValidation { | ||
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true" | ||
} else { | ||
req.SkipLabelValidation = false | ||
} | ||
rb.CleanUp() | ||
return nil, nil, err | ||
} | ||
|
||
if allowSkipLabelCountValidation { | ||
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true" | ||
} else { | ||
req.SkipLabelCountValidation = false | ||
} | ||
if allowSkipLabelNameValidation { | ||
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true" | ||
} else { | ||
req.SkipLabelValidation = false | ||
} | ||
|
||
cleanup := func() { | ||
mimirpb.ReuseSlice(req.Timeseries) | ||
rb.CleanUp() | ||
} | ||
return &req.WriteRequest, cleanup, nil | ||
if allowSkipLabelCountValidation { | ||
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true" | ||
} else { | ||
req.SkipLabelCountValidation = false | ||
} | ||
|
||
cleanup := func() { | ||
mimirpb.ReuseSlice(req.Timeseries) | ||
rb.CleanUp() | ||
} | ||
return &req.WriteRequest, cleanup, nil | ||
} | ||
req := newRequest(supplier) | ||
if err := push(ctx, req); err != nil { | ||
ctx = contextWithWriteResponseStats(ctx) | ||
err = push(ctx, req) | ||
rsValue := ctx.Value(PushResponseStatsContextKey) | ||
if rsValue != nil { | ||
rs := rsValue.(*promRemote.WriteResponseStats) | ||
addWriteResponseStats(w, rs) | ||
} else { | ||
// This should not happen, but if it does, we should not panic. | ||
addWriteResponseStats(w, &promRemote.WriteResponseStats{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, you're right, I've ended up doing it for RW1 as well, let me see how I could avoid that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed, only do stats for RW2 and also add check to the integration test |
||
} | ||
if err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
http.Error(w, err.Error(), statusClientClosedRequest) | ||
level.Warn(logger).Log("msg", "push request canceled", "err", err) | ||
|
@@ -277,6 +288,35 @@ func isRemoteWrite2(r *http.Request) (bool, error) { | |
return false, nil | ||
} | ||
|
||
// Consts from https://github.com/prometheus/prometheus/blob/main/storage/remote/stats.go | ||
const ( | ||
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written" | ||
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written" | ||
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written" | ||
) | ||
|
||
func contextWithWriteResponseStats(ctx context.Context) context.Context { | ||
return context.WithValue(ctx, PushResponseStatsContextKey, &promRemote.WriteResponseStats{}) | ||
} | ||
|
||
func addWriteResponseStats(w http.ResponseWriter, rs *promRemote.WriteResponseStats) { | ||
headers := w.Header() | ||
headers.Set(rw20WrittenSamplesHeader, strconv.Itoa(rs.Samples)) | ||
headers.Set(rw20WrittenHistogramsHeader, strconv.Itoa(rs.Histograms)) | ||
headers.Set(rw20WrittenExemplarsHeader, strconv.Itoa(rs.Exemplars)) | ||
} | ||
|
||
func updateWriteResponseStatsCtx(ctx context.Context, samples, histograms, exemplars int) { | ||
krajorama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
prs := ctx.Value(PushResponseStatsContextKey) | ||
if prs == nil { | ||
// Should not happen, but we should not panic anyway. | ||
return | ||
} | ||
prs.(*promRemote.WriteResponseStats).Samples += samples | ||
prs.(*promRemote.WriteResponseStats).Histograms += histograms | ||
prs.(*promRemote.WriteResponseStats).Exemplars += exemplars | ||
} | ||
|
||
func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string { | ||
const jitterFactor = 0.5 | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package mimirpb | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
) | ||
|
||
// Remote Write 2.0 related variables and functions. | ||
var ( | ||
errorUnexpectedRW1Timeseries = errors.New("proto: Remote Write 1.0 field Timeseries in non-Remote Write 1.0 message") | ||
errorUnexpectedRW1Metadata = errors.New("proto: Remote Write 1.0 field Metadata in non-Remote Write 1.0 message") | ||
errorUnexpectedRW2Timeseries = errors.New("proto: Remote Write 2.0 field Timeseries in non-Remote Write 2.0 message") | ||
errorUnexpectedRW2Symbols = errors.New("proto: Remote Write 2.0 field Symbols in non-Remote Write 2.0 message") | ||
errorOddNumberOfLabelRefs = errors.New("proto: Remote Write 2.0 odd number of label references") | ||
errorOddNumberOfExemplarLabelRefs = errors.New("proto: Remote Write 2.0 odd number of exemplar label references") | ||
errorInvalidLabelRef = errors.New("proto: Remote Write 2.0 invalid label reference") | ||
errorInvalidExemplarLabelRef = errors.New("proto: Remote Write 2.0 invalid exemplar label reference") | ||
errorInternalRW2 = errors.New("proto: Remote Write 2.0 internal error") | ||
errorInvalidHelpRef = errors.New("proto: Remote Write 2.0 invalid help reference") | ||
errorInvalidUnitRef = errors.New("proto: Remote Write 2.0 invalid unit reference") | ||
) | ||
|
||
// rw2SymbolPageSize is the size of each page in bits. | ||
const rw2SymbolPageSize = 16 | ||
|
||
// rw2PagedSymbols is a structure that holds symbols in pages. | ||
// The problem this solves is that protobuf doesn't tell us | ||
// how many symbols there are in advance. Without this paging | ||
// mechanism, we would have to allocate a large amount of memory | ||
// or do reallocation. This is a compromise between the two. | ||
type rw2PagedSymbols struct { | ||
count uint32 | ||
pages [][]string | ||
} | ||
|
||
func (ps *rw2PagedSymbols) append(symbol string) { | ||
nextPage := ps.count >> rw2SymbolPageSize | ||
if int(nextPage) >= len(ps.pages) { | ||
ps.pages = append(ps.pages, rw2PagedSymbolsPool.Get().([]string)) | ||
} | ||
ps.pages[nextPage] = append(ps.pages[nextPage], symbol) | ||
krajorama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ps.count++ | ||
} | ||
|
||
func (ps *rw2PagedSymbols) releasePages() { | ||
for _, page := range ps.pages { | ||
page = page[:0] | ||
rw2PagedSymbolsPool.Put(page) //nolint:staticcheck | ||
krajorama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
func (ps *rw2PagedSymbols) get(ref uint32) (string, error) { | ||
if ref < ps.count { | ||
page := ps.pages[ref>>rw2SymbolPageSize] | ||
return page[ref&((1<<rw2SymbolPageSize)-1)], nil | ||
} | ||
return "", fmt.Errorf("symbol reference %d is out of bounds", ref) | ||
} | ||
|
||
var ( | ||
rw2PagedSymbolsPool = sync.Pool{ | ||
New: func() interface{} { | ||
return make([]string, 0, 1<<rw2SymbolPageSize) | ||
}, | ||
} | ||
) |
Uh oh!
There was an error while loading. Please reload this page.