diff --git a/go.mod b/go.mod index 87554f58f9..98185e7249 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/armon/go-metrics v0.3.3 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 + github.com/cespare/xxhash/v2 v2.1.1 github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0 github.com/chromedp/chromedp v0.5.3 github.com/cortexproject/cortex v1.3.1-0.20200923145333-8587ea61fe17 diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 1070aaffab..17dac4e03c 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -4,10 +4,11 @@ package receive import ( + "bytes" "context" "crypto/tls" "fmt" - "io/ioutil" + "io" stdlog "log" "net" "net/http" @@ -282,13 +283,17 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { span, ctx := tracing.StartSpan(r.Context(), "receive_http") defer span.Finish() - compressed, err := ioutil.ReadAll(r.Body) + compressed := &bytes.Buffer{} + if r.ContentLength >= 0 { + compressed.Grow(int(r.ContentLength)) + } + _, err := io.Copy(compressed, r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - reqBuf, err := snappy.Decode(nil, compressed) + reqBuf, err := snappy.Decode(nil, compressed.Bytes()) if err != nil { level.Error(h.logger).Log("msg", "snappy decode error", "err", err) http.Error(w, err.Error(), http.StatusBadRequest) @@ -314,6 +319,10 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { if len(tenant) == 0 { tenant = h.options.DefaultTenantID } + if len(tenant) == 0 { + http.Error(w, "no tenant ID supplied", http.StatusBadRequest) + return + } err = h.handleRequest(ctx, rep, tenant, &wreq) switch err { @@ -401,9 +410,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma } }() - logger := log.With(h.logger, "tenant", tenant) + // Avoid log.With extra allocations for rare log lines. + logTags := []interface{}{"tenant", tenant} if id, ok := middleware.RequestIDFromContext(pctx); ok { - logger = log.With(logger, "request-id", id) + logTags = append(logTags, "request-id", id) } ec := make(chan error) @@ -524,7 +534,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma b.attempt++ dur := h.expBackoff.ForAttempt(b.attempt) b.nextAllowed = time.Now().Add(dur) - level.Debug(h.logger).Log("msg", "target unavailable backing off", "for", dur) + level.Debug(h.logger).Log(append(logTags, "msg", "msg", "target unavailable backing off", "for", dur)...) } else { h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} } @@ -553,7 +563,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma go func() { for err := range ec { if err != nil { - level.Debug(logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) + level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err)...) } } }() diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 46409cd6d8..be461ee0bf 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -7,10 +7,14 @@ import ( "bytes" "context" "fmt" + "io/ioutil" + "math" "math/rand" "net/http" "net/http/httptest" + "os" "strconv" + "strings" "sync" "testing" "time" @@ -19,12 +23,16 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/testutil" "google.golang.org/grpc" - "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -140,7 +148,7 @@ func TestCountCause(t *testing.T) { } } -func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { cfg := []HashringConfig{ { Hashring: "test", @@ -166,7 +174,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, - ForwardTimeout: 5 * time.Second, + ForwardTimeout: 10 * time.Second, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), }) handlers = append(handlers, h) @@ -190,7 +198,7 @@ func TestReceiveQuorum(t *testing.T) { wreq1 := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { - Labels: []labelpb.ZLabel{ + Labels: []labelpb.Label{ { Name: "foo", Value: "bar", @@ -476,7 +484,7 @@ func TestReceiveQuorum(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -526,7 +534,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { wreq1 := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { - Labels: []labelpb.ZLabel{ + Labels: []labelpb.Label{ { Name: "foo", Value: "bar", @@ -815,7 +823,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { // to see all requests completing all the time, since we're using local // network we are not expecting anything to go wrong with these. t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -919,6 +927,248 @@ type fakeRemoteWriteGRPCServer struct { h storepb.WriteableStoreServer } -func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { +func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, _ ...grpc.CallOption) (*storepb.WriteResponse, error) { return f.h.RemoteWrite(ctx, in) } + +func serialize(t testing.TB, lbls []labelpb.Label, samples []prompb.Sample) []byte { + // Create significant number of samples to see the weight of it on profiles. + r := &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: lbls, + Samples: samples, + }, + }, + } + body, err := proto.Marshal(r) + testutil.Ok(t, err) + return snappy.Encode(nil, body) +} + +func BenchmarkHandlerReceiveHTTP(b *testing.B) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTBWithAlloc(b)) +} + +func TestHandlerReceiveHTTP(t *testing.T) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t)) +} + +type tsModifierTenantStorage struct { + TenantStorage + + modifier int64 +} + +func (s *tsModifierTenantStorage) TenantAppendable(tenant string) (Appendable, error) { + a, err := s.TenantStorage.TenantAppendable(tenant) + return &tsModifierAppendable{Appendable: a, modifier: s.modifier}, err +} + +type tsModifierAppendable struct { + Appendable + + modifier int64 +} + +func (a *tsModifierAppendable) Appender(ctx context.Context) (storage.Appender, error) { + ret, err := a.Appendable.Appender(ctx) + return &tsModifierAppender{Appender: ret, modifier: a.modifier}, err +} + +type tsModifierAppender struct { + storage.Appender + + modifier int64 +} + +var cnt int64 + +func (a *tsModifierAppender) Add(l labels.Labels, _ int64, v float64) (uint64, error) { + cnt += a.modifier + return a.Appender.Add(l, cnt, v) +} + +func (a *tsModifierAppender) AddFast(ref uint64, _ int64, v float64) error { + cnt += a.modifier + return a.Appender.AddFast(ref, cnt, v) +} + +func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { + dir, err := ioutil.TempDir("", "test_receive") + testutil.Ok(b, err) + defer func() { testutil.Ok(b, os.RemoveAll(dir)) }() + + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handler := handlers[0] + + reg := prometheus.NewRegistry() + + logger := log.NewNopLogger() + m := NewMultiTSDB( + dir, logger, reg, &tsdb.Options{ + MinBlockDuration: int64(2 * time.Hour / time.Millisecond), + MaxBlockDuration: int64(2 * time.Hour / time.Millisecond), + RetentionDuration: int64(6 * time.Hour / time.Millisecond), + NoLockfile: true, + StripeSize: 1, // Disable stripe pre allocation so we can clear profiles. + }, + labels.FromStrings("replica", "01"), + "tenant_id", + nil, + false, + ) + defer func() { testutil.Ok(b, m.Close()) }() + handler.writer = NewWriter(logger, m) + + testutil.Ok(b, m.Flush()) + testutil.Ok(b, m.Open()) + + // Create 2MB of samples payload. + manySamples := make([]prompb.Sample, 10e4) + for i := range manySamples { + manySamples[i] = prompb.Sample{ + Value: math.MaxFloat64, + Timestamp: math.MinInt64, // Timestamp does not matter, it will be overridden. + } + } + + for _, tcase := range []struct { + name string + writeRequest []byte + }{ + { + name: "typical labels under 1KB, single sample", + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. + }, + { + name: "typical labels under 1KB, 2MB of samples", + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), manySamples), + }, + { + name: "bigger labels over 1KB, single sample", + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) + for i := 0; i < len(lbls); i++ { + // Label ~50B name, 50B value. + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. + }, + { + name: "bigger labels over 1KB, 2MB of samples", + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) + for i := 0; i < len(lbls); i++ { + // Label ~50B name, 50B value. + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), manySamples), + }, + { + name: "extremely large label value 10MB, single sample", + writeRequest: serialize(b, func() []labelpb.Label { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + return []labelpb.Label{{Name: "__name__", Value: lbl.String()}} + }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. + }, + { + name: "extremely large label value 10MB, 2MB samples", + writeRequest: serialize(b, func() []labelpb.Label { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + return []labelpb.Label{{Name: "__name__", Value: lbl.String()}} + }(), manySamples), + }, + } { + b.Run(tcase.name, func(b testutil.TB) { + handler.options.DefaultTenantID = fmt.Sprintf("%v-ok", tcase.name) + handler.writer.multiTSDB = &tsModifierTenantStorage{TenantStorage: m, modifier: 1} + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + + b.Run("OK", func(b testutil.TB) { + b.ReportAllocs() + + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + } + }) + + handler.options.DefaultTenantID = fmt.Sprintf("%v-conflicting", tcase.name) + handler.writer.multiTSDB = &tsModifierTenantStorage{TenantStorage: m, modifier: -1} // Timestamp can't go down + // which will cause conflict error. + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + // First request should be fine, since we don't change timestamp, rest is wrong. + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + + b.Run("conflict errors", func(b testutil.TB) { + b.ReportAllocs() + + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusConflict, r.Code, "%v", i) + } + }) + }) + } +} diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index ef6c94390a..76eaa7ee25 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -9,13 +9,11 @@ import ( "sort" "sync" - "github.com/cespare/xxhash" "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) -const sep = '\xff' - // insufficientNodesError is returned when a hashring does not // have enough nodes to satisfy a request for a node. type insufficientNodesError struct { @@ -38,23 +36,6 @@ type Hashring interface { GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error) } -// hash returns a hash for the given tenant and time series. -func hash(tenant string, ts *prompb.TimeSeries) uint64 { - // Sort labelset to ensure a stable hash. - sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) - - b := make([]byte, 0, 1024) - b = append(b, []byte(tenant)...) - b = append(b, sep) - for _, v := range ts.Labels { - b = append(b, v.Name...) - b = append(b, sep) - b = append(b, v.Value...) - b = append(b, sep) - } - return xxhash.Sum64(b) -} - // SingleNodeHashring always returns the same node. type SingleNodeHashring string @@ -84,7 +65,11 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st if n >= uint64(len(s)) { return "", &insufficientNodesError{have: uint64(len(s)), want: n + 1} } - return s[(hash(tenant, ts)+n)%uint64(len(s))], nil + + // TODO(bwplotka): Labels should be sorted already, consider removing this. + sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) + + return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil } // multiHashring represents a set of hashrings. diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index d822bb0e21..52b68daddf 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -10,32 +10,9 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) -func TestHash(t *testing.T) { - ts := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - { - Name: "baz", - Value: "qux", - }, - }, - } - - ts2 := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ts.Labels[1], ts.Labels[0]}, - } - - if hash("", ts) != hash("", ts2) { - t.Errorf("expected hashes to be independent of label order") - } -} - func TestHashringGet(t *testing.T) { ts := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ + Labels: []labelpb.Label{ { Name: "foo", Value: "bar", diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 97b1755176..824c128c57 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -16,10 +16,10 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/store/labelpb" "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/runutil" - "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -48,6 +48,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Flush()) testutil.Ok(t, m.Open()) + // TODO: Ensure to not create new tenant accidentally. app, err := m.TenantAppendable("foo") testutil.Ok(t, err) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 2b1a1b01b7..4bb289b70b 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -60,45 +61,51 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR } var errs terrors.MultiError + var oooSamples, dupSamples, outOfBoundsSamples []prompb.Sample for _, t := range wreq.Timeseries { - lset := make(labels.Labels, len(t.Labels)) - for j := range t.Labels { - lset[j] = labels.Label{ - Name: t.Labels[j].Name, - Value: t.Labels[j].Value, - } - } + lset := labelpb.LabelsToPromLabels(t.Labels) + var ref uint64 // Append as many valid samples as possible, but keep track of the errors. - for _, s := range t.Samples { - _, err = app.Add(lset, s.Timestamp, s.Value) + for i, s := range t.Samples { + if i == 0 { + ref, err = app.Add(lset, s.Timestamp, s.Value) + } else { + err = app.AddFast(ref, s.Timestamp, s.Value) + } + switch err { case nil: continue case storage.ErrOutOfOrderSample: - numOutOfOrder++ - level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset.String(), "sample", s.String()) + oooSamples = append(oooSamples, s) case storage.ErrDuplicateSampleForTimestamp: - numDuplicates++ - level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset.String(), "sample", s.String()) + dupSamples = append(dupSamples, s) case storage.ErrOutOfBounds: - numOutOfBounds++ - level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset.String(), "sample", s.String()) + outOfBoundsSamples = append(outOfBoundsSamples, s) } } + if len(oooSamples) > 0 || len(outOfBoundsSamples) > 0 || len(dupSamples) > 0 { + level.Warn(r.logger).Log("msg", "Skipped problematic samples", "outOfOrder", oooSamples, + "duplicates", dupSamples, "outOfBounds", outOfBoundsSamples, "lset", lset) + + numOutOfOrder += len(oooSamples) + numDuplicates += len(dupSamples) + numOutOfBounds += len(outOfBoundsSamples) + oooSamples = oooSamples[:0] + dupSamples = dupSamples[:0] + outOfBoundsSamples = outOfBoundsSamples[:0] + } } if numOutOfOrder > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder) - errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "failed to non-fast add %d samples", numOutOfOrder)) + errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, " add %d samples", numOutOfOrder)) } if numDuplicates > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates) - errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "failed to non-fast add %d samples", numDuplicates)) + errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates)) } if numOutOfBounds > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds) - errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "failed to non-fast add %d samples", numOutOfBounds)) + errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds)) } if err := app.Commit(); err != nil { diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 5638f69e5f..977cc2afd1 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -14,6 +14,7 @@ import ( "strings" "unsafe" + "github.com/cespare/xxhash/v2" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" ) @@ -295,3 +296,32 @@ func DeepCopy(lbls []ZLabel) []ZLabel { } return ret } + +var sep = []byte{'\xff'} + +// HashWithPrefix returns a hash for the given prefix and labels. +func HashWithPrefix(prefix string, lbls []Label) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + b = append(b, prefix...) + + for i, v := range lbls { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB allocate do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range lbls[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(sep) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(sep) + } + return h.Sum64() + } + b = append(b, v.Name...) + b = append(b, sep[0]) + b = append(b, v.Value...) + b = append(b, sep[0]) + } + return xxhash.Sum64(b) +} diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index 6656eea445..abab21c0b2 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -5,6 +5,7 @@ package labelpb import ( "fmt" + "strings" "testing" "github.com/prometheus/prometheus/pkg/labels" @@ -104,3 +105,71 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) { } }) } + +func TestHashWithPrefix(t *testing.T) { + lbls := []Label{ + {Name: "foo", Value: "bar"}, + {Name: "baz", Value: "qux"}, + } + testutil.Equals(t, HashWithPrefix("a", lbls), HashWithPrefix("a", lbls)) + testutil.Assert(t, HashWithPrefix("a", lbls) != HashWithPrefix("a", []Label{lbls[0]})) + testutil.Assert(t, HashWithPrefix("a", lbls) != HashWithPrefix("a", []Label{lbls[1], lbls[0]})) + testutil.Assert(t, HashWithPrefix("a", lbls) != HashWithPrefix("b", lbls)) +} + +var benchmarkLabelsResult uint64 + +func BenchmarkHasWithPrefix(b *testing.B) { + for _, tcase := range []struct { + name string + lbls []Label + }{ + { + name: "typical labels under 1KB", + lbls: func() []Label { + lbls := make([]Label, 10) + for i := 0; i < len(lbls); i++ { + // ZLabel ~20B name, 50B value. + lbls[i] = Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), + }, + { + name: "bigger labels over 1KB", + lbls: func() []Label { + lbls := make([]Label, 10) + for i := 0; i < len(lbls); i++ { + //ZLabel ~50B name, 50B value. + lbls[i] = Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), + }, + { + name: "extremely large label value 10MB", + lbls: func() []Label { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + return []Label{{Name: "__name__", Value: lbl.String()}} + }(), + }, + } { + b.Run(tcase.name, func(b *testing.B) { + var h uint64 + + const prefix = "test-" + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + h = HashWithPrefix(prefix, tcase.lbls) + } + benchmarkLabelsResult = h + }) + } +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 0e4210084a..e601874ce6 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -232,7 +232,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_Series span.SetTag("series_count", len(resp.Results[0].Timeseries)) for _, e := range resp.Results[0].Timeseries { - lset := labelpb.ExtendLabels(labelpb.ZLabelsToPromLabels(e.Labels), externalLabels) + lset := labelpb.ExtendLabels(labelpb.LabelsToPromLabels(e.Labels), externalLabels) if len(e.Samples) == 0 { // As found in https://github.com/thanos-io/thanos/issues/381 // Prometheus can give us completely empty time series. Ignore these with log until we figure out that diff --git a/pkg/store/storepb/prompb/types.pb.go b/pkg/store/storepb/prompb/types.pb.go index f9b8bbc131..d97af30c4a 100644 --- a/pkg/store/storepb/prompb/types.pb.go +++ b/pkg/store/storepb/prompb/types.pb.go @@ -12,8 +12,8 @@ import ( _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" - _ "github.com/thanos-io/thanos/pkg/store/labelpb" github_com_thanos_io_thanos_pkg_store_labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" + labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" ) // Reference imports to suppress errors if they are not otherwise used. @@ -138,9 +138,10 @@ func (m *Sample) GetTimestamp() int64 { // TimeSeries represents samples and labels for a single time series. type TimeSeries struct { - // TODO(bwplotka): Don't use zero copy ZLabels, see https://github.com/thanos-io/thanos/pull/3279 for details. - Labels []github_com_thanos_io_thanos_pkg_store_labelpb.ZLabel `protobuf:"bytes,1,rep,name=labels,proto3,customtype=github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` + // TODO(bwplotka): Don't use zero copy ZLabels. The message is to often to large and labels are sometimes referenced + // for longer than request time, causing long term over allocation. + Labels []labelpb.Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` } func (m *TimeSeries) Reset() { *m = TimeSeries{} } @@ -176,6 +177,13 @@ func (m *TimeSeries) XXX_DiscardUnknown() { var xxx_messageInfo_TimeSeries proto.InternalMessageInfo +func (m *TimeSeries) GetLabels() []labelpb.Label { + if m != nil { + return m.Labels + } + return nil +} + func (m *TimeSeries) GetSamples() []Sample { if m != nil { return m.Samples @@ -468,45 +476,45 @@ func init() { func init() { proto.RegisterFile("store/storepb/prompb/types.proto", fileDescriptor_166e07899dab7c14) } var fileDescriptor_166e07899dab7c14 = []byte{ - // 599 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x54, 0xcb, 0x6e, 0xd3, 0x4c, - 0x14, 0xce, 0xc4, 0x89, 0x93, 0x9c, 0x5e, 0xfe, 0x68, 0xd4, 0x9f, 0xa6, 0x15, 0x72, 0x2d, 0xaf, - 0xb2, 0xc1, 0x96, 0xda, 0x0a, 0x36, 0x5d, 0x15, 0x45, 0x42, 0x82, 0xa4, 0xea, 0xb4, 0x08, 0xd4, - 0x4d, 0x35, 0x76, 0x06, 0xc7, 0x6a, 0x3c, 0xb6, 0x3c, 0x13, 0xd4, 0xbc, 0x05, 0x6b, 0x1e, 0x81, - 0x1d, 0x3c, 0x45, 0x97, 0x5d, 0x22, 0x16, 0x15, 0x6a, 0x5f, 0x04, 0xcd, 0xb1, 0xd3, 0x50, 0xca, - 0x9a, 0x4d, 0x74, 0x2e, 0xdf, 0x7c, 0xe7, 0xf6, 0xc5, 0xe0, 0x2a, 0x9d, 0x15, 0x22, 0xc0, 0xdf, - 0x3c, 0x0c, 0xf2, 0x22, 0x4b, 0xf3, 0x30, 0xd0, 0xf3, 0x5c, 0x28, 0x3f, 0x2f, 0x32, 0x9d, 0xd1, - 0xff, 0x4c, 0x4c, 0xe8, 0x89, 0x98, 0xa9, 0xf3, 0x28, 0xcb, 0xe7, 0xdb, 0x1b, 0x71, 0x16, 0x67, - 0x98, 0x0b, 0x8c, 0x55, 0xc2, 0xb6, 0xb7, 0x4a, 0xa2, 0x29, 0x0f, 0xc5, 0xf4, 0x21, 0x83, 0x77, - 0x00, 0xf6, 0x09, 0x4f, 0xf3, 0xa9, 0xa0, 0x1b, 0xd0, 0xfc, 0xc8, 0xa7, 0x33, 0xd1, 0x23, 0x2e, - 0xe9, 0x13, 0x56, 0x3a, 0xf4, 0x29, 0x74, 0x74, 0x92, 0x0a, 0xa5, 0x79, 0x9a, 0xf7, 0xea, 0x2e, - 0xe9, 0x5b, 0x6c, 0x19, 0xf0, 0xbe, 0x10, 0x80, 0xd3, 0x24, 0x15, 0x27, 0xa2, 0x48, 0x84, 0xa2, - 0x11, 0xd8, 0x58, 0x43, 0xf5, 0x88, 0x6b, 0xf5, 0x57, 0x76, 0xd7, 0x7c, 0x3d, 0xe1, 0x32, 0x53, - 0xfe, 0x1b, 0x13, 0x3d, 0x3c, 0xb8, 0xba, 0xd9, 0xa9, 0xfd, 0xb8, 0xd9, 0xd9, 0x8f, 0x13, 0x3d, - 0x99, 0x85, 0x7e, 0x94, 0xa5, 0x41, 0x09, 0x78, 0x96, 0x64, 0x95, 0x15, 0xe4, 0x17, 0x71, 0xf0, - 0xa0, 0x5d, 0xff, 0x0c, 0x5f, 0xb3, 0x8a, 0x9a, 0xbe, 0x80, 0x96, 0xc2, 0x8e, 0x55, 0xaf, 0x8e, - 0x55, 0x36, 0xfd, 0x3f, 0xb6, 0xe0, 0x97, 0x13, 0x1d, 0x36, 0x4c, 0x3d, 0xb6, 0x40, 0x7b, 0x9f, - 0x09, 0xac, 0x22, 0xd5, 0x90, 0xeb, 0x68, 0x22, 0x0a, 0xfa, 0x1c, 0x1a, 0x66, 0x15, 0x38, 0xf0, - 0xfa, 0xae, 0xf7, 0x88, 0xe6, 0x77, 0xb0, 0x7f, 0x3a, 0xcf, 0x05, 0x43, 0x3c, 0xa5, 0xd0, 0x90, - 0x3c, 0x15, 0xb8, 0x8e, 0x0e, 0x43, 0x7b, 0xb9, 0x3d, 0x0b, 0x83, 0xa5, 0xe3, 0xf5, 0xa1, 0x61, - 0xde, 0x51, 0x1b, 0xea, 0x83, 0xe3, 0x6e, 0x8d, 0xb6, 0xc0, 0x1a, 0x0d, 0x8e, 0xbb, 0xc4, 0x04, - 0xd8, 0xa0, 0x5b, 0xc7, 0x00, 0x1b, 0x74, 0x2d, 0xef, 0x2b, 0x81, 0x0e, 0x13, 0x7c, 0xfc, 0x2a, - 0x91, 0x5a, 0xd1, 0x4d, 0x68, 0x29, 0x2d, 0xf2, 0xf3, 0x54, 0x61, 0x73, 0x16, 0xb3, 0x8d, 0x3b, - 0x54, 0xa6, 0xf4, 0x87, 0x99, 0x8c, 0x16, 0xa5, 0x8d, 0x4d, 0xb7, 0xa0, 0xad, 0x34, 0x2f, 0xb4, - 0x41, 0x5b, 0x88, 0x6e, 0xa1, 0x3f, 0x54, 0xf4, 0x7f, 0xb0, 0x85, 0x1c, 0x9b, 0x44, 0x03, 0x13, - 0x4d, 0x21, 0xc7, 0x43, 0x45, 0xb7, 0xa1, 0x1d, 0x17, 0xd9, 0x2c, 0x4f, 0x64, 0xdc, 0x6b, 0xba, - 0x56, 0xbf, 0xc3, 0xee, 0x7d, 0xba, 0x0e, 0xf5, 0x70, 0xde, 0xb3, 0x5d, 0xd2, 0x6f, 0xb3, 0x7a, - 0x38, 0x37, 0xec, 0x05, 0x97, 0xb1, 0x30, 0x24, 0xad, 0x92, 0x1d, 0xfd, 0xa1, 0xf2, 0xbe, 0x11, - 0x68, 0xbe, 0x9c, 0xcc, 0xe4, 0x05, 0x75, 0x60, 0x25, 0x4d, 0xe4, 0xb9, 0x11, 0xc6, 0xb2, 0xe7, - 0x4e, 0x9a, 0x48, 0x23, 0x8e, 0xa1, 0xc2, 0x3c, 0xbf, 0xbc, 0xcf, 0x57, 0x3a, 0x4a, 0xf9, 0x65, - 0x95, 0xdf, 0xab, 0x2e, 0x61, 0xe1, 0x25, 0x76, 0x1e, 0x5d, 0x02, 0xab, 0xf8, 0x03, 0x19, 0x65, - 0xe3, 0x44, 0xc6, 0xcb, 0x33, 0x8c, 0xb9, 0xe6, 0x38, 0xda, 0x2a, 0x43, 0xdb, 0x73, 0xa1, 0xbd, - 0x40, 0xd1, 0x15, 0x68, 0xbd, 0x1d, 0xbd, 0x1e, 0x1d, 0xbd, 0x1b, 0x95, 0x9b, 0x7f, 0x7f, 0xc4, - 0xba, 0xc4, 0x48, 0x76, 0x0d, 0xe9, 0xc4, 0xf8, 0x5f, 0xaa, 0x76, 0x1f, 0xec, 0xc8, 0x54, 0x5d, - 0x88, 0xf6, 0xc9, 0xdf, 0x67, 0xac, 0x34, 0x5b, 0x61, 0x0f, 0xdd, 0xab, 0x5b, 0x87, 0x5c, 0xdf, - 0x3a, 0xe4, 0xe7, 0xad, 0x43, 0x3e, 0xdd, 0x39, 0xb5, 0xeb, 0x3b, 0xa7, 0xf6, 0xfd, 0xce, 0xa9, - 0x9d, 0xd9, 0xe5, 0xd7, 0x20, 0xb4, 0xf1, 0x6f, 0xbc, 0xf7, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x4e, - 0x61, 0x3c, 0xe9, 0x2c, 0x04, 0x00, 0x00, + // 606 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x53, 0x4d, 0x6b, 0xdb, 0x4c, + 0x10, 0xf6, 0x5a, 0xb6, 0x6c, 0x4f, 0x3e, 0x5e, 0xb3, 0xe4, 0x7d, 0xe3, 0x84, 0x17, 0x45, 0xe8, + 0x64, 0x28, 0x95, 0x20, 0x09, 0xed, 0x25, 0xa7, 0x14, 0x43, 0xa1, 0xb5, 0x43, 0x36, 0x29, 0x2d, + 0xb9, 0x84, 0x95, 0xbc, 0x95, 0x45, 0xac, 0x95, 0xd0, 0xae, 0x4b, 0xfc, 0x2f, 0x7a, 0xee, 0x4f, + 0xe8, 0xad, 0xfd, 0x15, 0x39, 0xe6, 0x58, 0x7a, 0x08, 0x25, 0xf9, 0x23, 0x65, 0x47, 0x72, 0xdc, + 0x34, 0xa5, 0x17, 0x31, 0x1f, 0xcf, 0x3e, 0x33, 0xa3, 0x79, 0x06, 0x5c, 0xa5, 0xb3, 0x42, 0x04, + 0xf8, 0xcd, 0xc3, 0x20, 0x2f, 0xb2, 0x34, 0x0f, 0x03, 0x3d, 0xcf, 0x85, 0xf2, 0xf3, 0x22, 0xd3, + 0x19, 0xfd, 0xc7, 0xc4, 0x84, 0x9e, 0x88, 0x99, 0x3a, 0x8f, 0xb2, 0x7c, 0xbe, 0xbd, 0x11, 0x67, + 0x71, 0x86, 0xb9, 0xc0, 0x58, 0x25, 0x6c, 0x7b, 0xab, 0x24, 0x9a, 0xf2, 0x50, 0x4c, 0x1f, 0x32, + 0x78, 0x07, 0x60, 0x9f, 0xf0, 0x34, 0x9f, 0x0a, 0xba, 0x01, 0xcd, 0x0f, 0x7c, 0x3a, 0x13, 0x3d, + 0xe2, 0x92, 0x3e, 0x61, 0xa5, 0x43, 0xff, 0x87, 0x8e, 0x4e, 0x52, 0xa1, 0x34, 0x4f, 0xf3, 0x5e, + 0xdd, 0x25, 0x7d, 0x8b, 0x2d, 0x03, 0x5e, 0x01, 0x70, 0x9a, 0xa4, 0xe2, 0x44, 0x14, 0x89, 0x50, + 0xf4, 0x09, 0xd8, 0x58, 0x42, 0xf5, 0x88, 0x6b, 0xf5, 0x57, 0x76, 0xd7, 0x7c, 0x3d, 0xe1, 0x32, + 0x53, 0xfe, 0x6b, 0x13, 0x3d, 0x6c, 0x5c, 0xdd, 0xec, 0xd4, 0x58, 0x05, 0xa1, 0xcf, 0xa1, 0xa5, + 0xb0, 0xb0, 0xea, 0xd5, 0x11, 0xbd, 0xe9, 0xff, 0x36, 0x8c, 0x5f, 0x36, 0x56, 0xbd, 0x5b, 0xa0, + 0xbd, 0x4f, 0x04, 0x56, 0x91, 0x70, 0xc8, 0x75, 0x34, 0x11, 0x05, 0x7d, 0x06, 0x0d, 0x33, 0x11, + 0xf6, 0xbd, 0xbe, 0xeb, 0x3d, 0xa2, 0xf9, 0x15, 0xec, 0x9f, 0xce, 0x73, 0xc1, 0x10, 0x4f, 0x29, + 0x34, 0x24, 0x4f, 0x05, 0x4e, 0xd5, 0x61, 0x68, 0x2f, 0x7f, 0x82, 0x85, 0xc1, 0xd2, 0xf1, 0xfa, + 0xd0, 0x30, 0xef, 0xa8, 0x0d, 0xf5, 0xc1, 0x71, 0xb7, 0x46, 0x5b, 0x60, 0x8d, 0x06, 0xc7, 0x5d, + 0x62, 0x02, 0x6c, 0xd0, 0xad, 0x63, 0x80, 0x0d, 0xba, 0x96, 0xf7, 0x85, 0x40, 0x87, 0x09, 0x3e, + 0x7e, 0x99, 0x48, 0xad, 0xe8, 0x26, 0xb4, 0x94, 0x16, 0xf9, 0x79, 0xaa, 0xb0, 0x39, 0x8b, 0xd9, + 0xc6, 0x1d, 0x2a, 0x53, 0xfa, 0xfd, 0x4c, 0x46, 0x8b, 0xd2, 0xc6, 0xa6, 0x5b, 0xd0, 0x56, 0x9a, + 0x17, 0xda, 0xa0, 0x2d, 0x44, 0xb7, 0xd0, 0x1f, 0x2a, 0xfa, 0x2f, 0xd8, 0x42, 0x8e, 0x4d, 0xa2, + 0x81, 0x89, 0xa6, 0x90, 0xe3, 0xa1, 0xa2, 0xdb, 0xd0, 0x8e, 0x8b, 0x6c, 0x96, 0x27, 0x32, 0xee, + 0x35, 0x5d, 0xab, 0xdf, 0x61, 0xf7, 0x3e, 0x5d, 0x87, 0x7a, 0x38, 0xef, 0xd9, 0x2e, 0xe9, 0xb7, + 0x59, 0x3d, 0x9c, 0x1b, 0xf6, 0x82, 0xcb, 0x58, 0x18, 0x92, 0x56, 0xc9, 0x8e, 0xfe, 0x50, 0x79, + 0x5f, 0x09, 0x34, 0x5f, 0x4c, 0x66, 0xf2, 0x82, 0x3a, 0xb0, 0x92, 0x26, 0xf2, 0xdc, 0xec, 0x77, + 0xd9, 0x73, 0x27, 0x4d, 0xa4, 0x59, 0xf2, 0x50, 0x61, 0x9e, 0x5f, 0xde, 0xe7, 0x2b, 0x39, 0xa4, + 0xfc, 0xb2, 0xca, 0xef, 0x55, 0x9b, 0xb0, 0x70, 0x13, 0x3b, 0x8f, 0x36, 0x81, 0x55, 0xfc, 0x81, + 0x8c, 0xb2, 0x71, 0x22, 0xe3, 0xe5, 0x1a, 0xc6, 0x5c, 0x73, 0x1c, 0x6d, 0x95, 0xa1, 0xed, 0xb9, + 0xd0, 0x5e, 0xa0, 0xe8, 0x0a, 0xb4, 0xde, 0x8c, 0x5e, 0x8d, 0x8e, 0xde, 0x8e, 0xca, 0x3f, 0xff, + 0xee, 0x88, 0x75, 0x89, 0xf7, 0x99, 0xc0, 0x1a, 0xd2, 0x89, 0x71, 0xa5, 0xbe, 0xe8, 0xef, 0xea, + 0x3b, 0x30, 0x2a, 0xfa, 0x7e, 0xb3, 0xb3, 0x1f, 0x27, 0x7a, 0x32, 0x0b, 0xfd, 0x28, 0x4b, 0x83, + 0x12, 0xf0, 0x34, 0xc9, 0x2a, 0x2b, 0xc8, 0x2f, 0xe2, 0xe0, 0xc1, 0xad, 0xf8, 0x67, 0xf8, 0xfa, + 0x5e, 0xb5, 0xfb, 0x60, 0x47, 0xa6, 0xea, 0x42, 0xb4, 0xff, 0xfd, 0x79, 0xc6, 0x85, 0xd6, 0x4b, + 0xec, 0xa1, 0x7b, 0x75, 0xeb, 0x90, 0xeb, 0x5b, 0x87, 0xfc, 0xb8, 0x75, 0xc8, 0xc7, 0x3b, 0xa7, + 0x76, 0x7d, 0xe7, 0xd4, 0xbe, 0xdd, 0x39, 0xb5, 0x33, 0xbb, 0x3c, 0xea, 0xd0, 0xc6, 0x6b, 0xdc, + 0xfb, 0x19, 0x00, 0x00, 0xff, 0xff, 0x7f, 0x60, 0xc8, 0x48, 0xf3, 0x03, 0x00, 0x00, } func (m *Sample) Marshal() (dAtA []byte, err error) { @@ -580,11 +588,11 @@ func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { if len(m.Labels) > 0 { for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { { - size := m.Labels[iNdEx].Size() - i -= size - if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { return 0, err } + i -= size i = encodeVarintTypes(dAtA, i, uint64(size)) } i-- @@ -1092,7 +1100,7 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Labels = append(m.Labels, github_com_thanos_io_thanos_pkg_store_labelpb.ZLabel{}) + m.Labels = append(m.Labels, labelpb.Label{}) if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/pkg/store/storepb/prompb/types.proto b/pkg/store/storepb/prompb/types.proto index 2b7ac25775..0257a30646 100644 --- a/pkg/store/storepb/prompb/types.proto +++ b/pkg/store/storepb/prompb/types.proto @@ -35,8 +35,9 @@ message Sample { // TimeSeries represents samples and labels for a single time series. message TimeSeries { - // TODO(bwplotka): Don't use zero copy ZLabels, see https://github.com/thanos-io/thanos/pull/3279 for details. - repeated thanos.Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"]; + // TODO(bwplotka): Don't use zero copy ZLabels. The message is to often to large and labels are sometimes referenced + // for longer than request time, causing long term over allocation. + repeated thanos.Label labels = 1 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false]; } diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 046a55e656..447d65425f 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -473,8 +473,7 @@ func createBlock( return id, errors.Errorf("nothing to write, asked for %d samples", numSamples) } - blockDir := filepath.Join(dir, id.String()) - if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ + if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, diff --git a/pkg/testutil/testorbench.go b/pkg/testutil/testorbench.go index 5f7dc15ea7..5d8bc6e320 100644 --- a/pkg/testutil/testorbench.go +++ b/pkg/testutil/testorbench.go @@ -28,6 +28,7 @@ type TB interface { IsBenchmark() bool Run(name string, f func(t TB)) bool + ReportAllocs() SetBytes(n int64) N() int ResetTimer() @@ -36,21 +37,36 @@ type TB interface { // tb implements TB as well as testing.TB interfaces. type tb struct { testing.TB + + reportAlloc bool } // NewTB creates tb from testing.TB. func NewTB(t testing.TB) TB { return &tb{TB: t} } +// NewTBWithAlloc creates tb from testing.TB. +func NewTBWithAlloc(t testing.TB) TB { + if b, ok := t.(*testing.B); ok { + b.ReportAllocs() + } + return &tb{TB: t, reportAlloc: true} +} + // Run benchmarks/tests f as a subbenchmark/subtest with the given name. It reports // whether there were any failures. // // A subbenchmark/subtest is like any other benchmark/test. func (t *tb) Run(name string, f func(t TB)) bool { if b, ok := t.TB.(*testing.B); ok { - return b.Run(name, func(nested *testing.B) { f(&tb{TB: nested}) }) + return b.Run(name, func(nested *testing.B) { + if t.reportAlloc { + nested.ReportAllocs() + } + f(&tb{TB: nested, reportAlloc: t.reportAlloc}) + }) } - if t, ok := t.TB.(*testing.T); ok { - return t.Run(name, func(nested *testing.T) { f(&tb{TB: nested}) }) + if tt, ok := t.TB.(*testing.T); ok { + return tt.Run(name, func(nested *testing.T) { f(&tb{TB: nested}) }) } panic("not a benchmark and not a test") } @@ -78,6 +94,12 @@ func (t *tb) ResetTimer() { } } +func (t *tb) ReportAllocs() { + if b, ok := t.TB.(*testing.B); ok { + b.ReportAllocs() + } +} + // IsBenchmark returns true if it's a benchmark. func (t *tb) IsBenchmark() bool { _, ok := t.TB.(*testing.B)