From e09c413db6f38eb3fc0e6d26072a09bcec2ad408 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 16 Oct 2020 14:10:55 +0200 Subject: [PATCH 1/5] receive: Added benchmark for writes. Signed-off-by: Bartlomiej Plotka --- pkg/receive/handler_test.go | 260 ++++++++++++++++++++++++++++- pkg/testutil/e2eutil/prometheus.go | 3 +- pkg/testutil/testorbench.go | 28 +++- 3 files changed, 281 insertions(+), 10 deletions(-) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 46409cd6d8..60247004d0 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -7,10 +7,14 @@ import ( "bytes" "context" "fmt" + "io/ioutil" + "math" "math/rand" "net/http" "net/http/httptest" + "os" "strconv" + "strings" "sync" "testing" "time" @@ -19,12 +23,16 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/testutil" "google.golang.org/grpc" - "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -140,7 +148,7 @@ func TestCountCause(t *testing.T) { } } -func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { cfg := []HashringConfig{ { Hashring: "test", @@ -166,7 +174,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, - ForwardTimeout: 5 * time.Second, + ForwardTimeout: 10 * time.Second, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), }) handlers = append(handlers, h) @@ -476,7 +484,7 @@ func TestReceiveQuorum(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -815,7 +823,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { // to see all requests completing all the time, since we're using local // network we are not expecting anything to go wrong with these. t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -922,3 +930,245 @@ type fakeRemoteWriteGRPCServer struct { func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { return f.h.RemoteWrite(ctx, in) } + +func serialize(t testing.TB, lbls []labelpb.ZLabel, samples []prompb.Sample) []byte { + // Create significant number of samples to see the weight of it on profiles. + r := &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: lbls, + Samples: samples, + }, + }, + } + body, err := proto.Marshal(r) + testutil.Ok(t, err) + return snappy.Encode(nil, body) +} + +func BenchmarkHandlerReceiveHTTP(b *testing.B) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTBWithAlloc(b)) +} + +func TestHandlerReceiveHTTP(t *testing.T) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t)) +} + +type tsModifierTenantStorage struct { + TenantStorage + + modifier int64 +} + +func (s *tsModifierTenantStorage) TenantAppendable(tenant string) (Appendable, error) { + a, err := s.TenantStorage.TenantAppendable(tenant) + return &tsModifierAppendable{Appendable: a, modifier: s.modifier}, err +} + +type tsModifierAppendable struct { + Appendable + + modifier int64 +} + +func (a *tsModifierAppendable) Appender(ctx context.Context) (storage.Appender, error) { + ret, err := a.Appendable.Appender(ctx) + return &tsModifierAppender{Appender: ret, modifier: a.modifier}, err +} + +type tsModifierAppender struct { + storage.Appender + + modifier int64 +} + +var cnt int64 + +func (a *tsModifierAppender) Add(l labels.Labels, _ int64, v float64) (uint64, error) { + cnt += a.modifier + return a.Appender.Add(l, cnt, v) +} + +func (a *tsModifierAppender) AddFast(ref uint64, _ int64, v float64) error { + cnt += a.modifier + return a.Appender.AddFast(ref, cnt, v) +} + +func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { + dir, err := ioutil.TempDir("", "test_receive") + testutil.Ok(b, err) + defer func() { testutil.Ok(b, os.RemoveAll(dir)) }() + + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handler := handlers[0] + + reg := prometheus.NewRegistry() + + logger := log.NewNopLogger() + m := NewMultiTSDB( + dir, logger, reg, &tsdb.Options{ + MinBlockDuration: int64(2 * time.Hour / time.Millisecond), + MaxBlockDuration: int64(2 * time.Hour / time.Millisecond), + RetentionDuration: int64(6 * time.Hour / time.Millisecond), + NoLockfile: true, + StripeSize: 1, // Disable stripe pre allocation so we can clear profiles. + }, + labels.FromStrings("replica", "01"), + "tenant_id", + nil, + false, + ) + defer func() { testutil.Ok(b, m.Close()) }() + handler.writer = NewWriter(logger, m) + + testutil.Ok(b, m.Flush()) + testutil.Ok(b, m.Open()) + + // Create 2MB of samples payload. + manySamples := make([]prompb.Sample, 10e4) + for i := range manySamples { + manySamples[i] = prompb.Sample{ + Value: math.MaxFloat64, + Timestamp: math.MinInt64, // Timestamp does not matter, it will be overridden. + } + } + + for _, tcase := range []struct { + name string + writeRequest []byte + }{ + { + name: "typical labels under 1KB, single sample", + writeRequest: serialize(b, func() []labelpb.ZLabel { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. + }, + { + name: "typical labels under 1KB, 2MB of samples", + writeRequest: serialize(b, func() []labelpb.ZLabel { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), manySamples), + }, + { + name: "bigger labels over 1KB, single sample", + writeRequest: serialize(b, func() []labelpb.ZLabel { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~50B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. + }, + { + name: "bigger labels over 1KB, 2MB of samples", + writeRequest: serialize(b, func() []labelpb.ZLabel { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~50B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), manySamples), + }, + { + name: "extremely large label value 10MB, single sample", + writeRequest: serialize(b, func() []labelpb.ZLabel { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + return []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}} + }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. + }, + { + name: "extremely large label value 10MB, 2MB samples", + writeRequest: serialize(b, func() []labelpb.ZLabel { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + return []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}} + }(), manySamples), + }, + } { + b.Run(tcase.name, func(b testutil.TB) { + handler.options.DefaultTenantID = fmt.Sprintf("%v-ok", tcase.name) + handler.writer.multiTSDB = &tsModifierTenantStorage{TenantStorage: m, modifier: 1} + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + + b.Run("OK", func(b testutil.TB) { + b.ReportAllocs() + + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + } + }) + + handler.options.DefaultTenantID = fmt.Sprintf("%v-conflicting", tcase.name) + handler.writer.multiTSDB = &tsModifierTenantStorage{TenantStorage: m, modifier: -1} // Timestamp can't go down + // which will cause conflict error. + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + // First request should be fine, since we don't change timestamp, rest is wrong. + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + + b.Run("conflict errors", func(b testutil.TB) { + b.ReportAllocs() + + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusConflict, r.Code, "%v", i) + } + }) + }) + } +} diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 046a55e656..447d65425f 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -473,8 +473,7 @@ func createBlock( return id, errors.Errorf("nothing to write, asked for %d samples", numSamples) } - blockDir := filepath.Join(dir, id.String()) - if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ + if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, diff --git a/pkg/testutil/testorbench.go b/pkg/testutil/testorbench.go index 5f7dc15ea7..5d8bc6e320 100644 --- a/pkg/testutil/testorbench.go +++ b/pkg/testutil/testorbench.go @@ -28,6 +28,7 @@ type TB interface { IsBenchmark() bool Run(name string, f func(t TB)) bool + ReportAllocs() SetBytes(n int64) N() int ResetTimer() @@ -36,21 +37,36 @@ type TB interface { // tb implements TB as well as testing.TB interfaces. type tb struct { testing.TB + + reportAlloc bool } // NewTB creates tb from testing.TB. func NewTB(t testing.TB) TB { return &tb{TB: t} } +// NewTBWithAlloc creates tb from testing.TB. +func NewTBWithAlloc(t testing.TB) TB { + if b, ok := t.(*testing.B); ok { + b.ReportAllocs() + } + return &tb{TB: t, reportAlloc: true} +} + // Run benchmarks/tests f as a subbenchmark/subtest with the given name. It reports // whether there were any failures. // // A subbenchmark/subtest is like any other benchmark/test. func (t *tb) Run(name string, f func(t TB)) bool { if b, ok := t.TB.(*testing.B); ok { - return b.Run(name, func(nested *testing.B) { f(&tb{TB: nested}) }) + return b.Run(name, func(nested *testing.B) { + if t.reportAlloc { + nested.ReportAllocs() + } + f(&tb{TB: nested, reportAlloc: t.reportAlloc}) + }) } - if t, ok := t.TB.(*testing.T); ok { - return t.Run(name, func(nested *testing.T) { f(&tb{TB: nested}) }) + if tt, ok := t.TB.(*testing.T); ok { + return tt.Run(name, func(nested *testing.T) { f(&tb{TB: nested}) }) } panic("not a benchmark and not a test") } @@ -78,6 +94,12 @@ func (t *tb) ResetTimer() { } } +func (t *tb) ReportAllocs() { + if b, ok := t.TB.(*testing.B); ok { + b.ReportAllocs() + } +} + // IsBenchmark returns true if it's a benchmark. func (t *tb) IsBenchmark() bool { _, ok := t.TB.(*testing.B) From 65aa6c13638c7dc4bd39e9b70cfe8d760f1b96f0 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 16 Oct 2020 23:46:06 +0200 Subject: [PATCH 2/5] Optimized write: AddFast and debug prints. GOROOT=/home/bwplotka/.gvm/gos/go1.15 #gosetup GOPATH=/home/bwplotka/Repos/thanosgopath #gosetup /home/bwplotka/.gvm/gos/go1.15/bin/go test -c -o /tmp/___BenchmarkHandlerReceiveHTTP_in_github_com_thanos_io_thanos_pkg_receive github.com/thanos-io/thanos/pkg/receive #gosetup /tmp/___BenchmarkHandlerReceiveHTTP_in_github_com_thanos_io_thanos_pkg_receive -test.v -test.bench ^\QBenchmarkHandlerReceiveHTTP\E$ -test.run ^$ goos: linux goarch: amd64 pkg: github.com/thanos-io/thanos/pkg/receive BenchmarkHandlerReceiveHTTP BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_single_sample BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/OK BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/OK-12 66613 17432 ns/op 6382 B/op 47 allocs/op BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/conflict_errors BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/conflict_errors-12 65179 20922 ns/op 9392 B/op 90 allocs/op BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/OK BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/OK-12 51 22250679 ns/op 14286524 B/op 5118 allocs/op BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/conflict_errors BenchmarkHandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/conflict_errors-12 31 47359942 ns/op 45323821 B/op 500126 allocs/op BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/OK BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/OK-12 33147 30462 ns/op 10578 B/op 49 allocs/op BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/conflict_errors BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/conflict_errors-12 53385 24087 ns/op 13606 B/op 92 allocs/op BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/OK BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/OK-12 49 24593572 ns/op 15436524 B/op 5141 allocs/op BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/conflict_errors BenchmarkHandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/conflict_errors-12 25 52976766 ns/op 45329167 B/op 500129 allocs/op BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/OK BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/OK-12 60 27261861 ns/op 32538875 B/op 54 allocs/op BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/conflict_errors BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/conflict_errors-12 78 15315152 ns/op 32540624 B/op 97 allocs/op BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/OK BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/OK-12 33 35595450 ns/op 47171972 B/op 5100 allocs/op BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/conflict_errors BenchmarkHandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/conflict_errors-12 21 58917325 ns/op 78629564 B/op 500125 allocs/op PASS Process finished with exit code 0 Signed-off-by: Bartlomiej Plotka --- pkg/receive/hashring_test.go | 23 ----------------------- pkg/receive/multitsdb_test.go | 3 ++- pkg/receive/writer.go | 25 +++++++++++++------------ 3 files changed, 15 insertions(+), 36 deletions(-) diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index d822bb0e21..f5a1411a5b 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -10,29 +10,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) -func TestHash(t *testing.T) { - ts := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - { - Name: "baz", - Value: "qux", - }, - }, - } - - ts2 := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ts.Labels[1], ts.Labels[0]}, - } - - if hash("", ts) != hash("", ts2) { - t.Errorf("expected hashes to be independent of label order") - } -} - func TestHashringGet(t *testing.T) { ts := &prompb.TimeSeries{ Labels: []labelpb.ZLabel{ diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 97b1755176..824c128c57 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -16,10 +16,10 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/store/labelpb" "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/runutil" - "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -48,6 +48,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Flush()) testutil.Ok(t, m.Open()) + // TODO: Ensure to not create new tenant accidentally. app, err := m.TenantAppendable("foo") testutil.Ok(t, err) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 2b1a1b01b7..ab50b9a203 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -61,29 +62,29 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR var errs terrors.MultiError for _, t := range wreq.Timeseries { - lset := make(labels.Labels, len(t.Labels)) - for j := range t.Labels { - lset[j] = labels.Label{ - Name: t.Labels[j].Name, - Value: t.Labels[j].Value, - } - } + lset := labelpb.ZLabelsToPromLabels(t.Labels) // Append as many valid samples as possible, but keep track of the errors. - for _, s := range t.Samples { - _, err = app.Add(lset, s.Timestamp, s.Value) + var ref uint64 + for i, s := range t.Samples { + if i == 0 { + ref, err = app.Add(lset, s.Timestamp, s.Value) + } else { + err = app.AddFast(ref, s.Timestamp, s.Value) + } + switch err { case nil: continue case storage.ErrOutOfOrderSample: numOutOfOrder++ - level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset.String(), "sample", s.String()) + level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset, "sample", &s) case storage.ErrDuplicateSampleForTimestamp: numDuplicates++ - level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset.String(), "sample", s.String()) + level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "sample", &s) case storage.ErrOutOfBounds: numOutOfBounds++ - level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset.String(), "sample", s.String()) + level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset, "sample", &s) } } } From 1db161529bdd4b25710138af3af65fad87ebca68 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 16 Oct 2020 23:57:10 +0200 Subject: [PATCH 3/5] Removed ZLabels from promb. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tried DeepCopy but not worth it: ``` benchstat -delta-test=none _dev/diffc.out _dev/diffd.out name old time/op new time/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/OK-12 18.5µs ± 0% 17.8µs ± 0% -3.89% HandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/conflict_errors-12 22.5µs ± 0% 20.9µs ± 0% -7.03% HandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/OK-12 25.0ms ± 0% 24.3ms ± 0% -2.62% HandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/conflict_errors-12 54.2ms ± 0% 52.3ms ± 0% -3.35% HandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/OK-12 33.9µs ± 0% 26.6µs ± 0% -21.46% HandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/conflict_errors-12 24.3µs ± 0% 24.6µs ± 0% +1.43% HandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/OK-12 24.4ms ± 0% 24.8ms ± 0% +1.36% HandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/conflict_errors-12 52.4ms ± 0% 48.8ms ± 0% -7.03% HandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/OK-12 15.2ms ± 0% 15.0ms ± 0% -1.37% HandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/conflict_errors-12 15.9ms ± 0% 15.2ms ± 0% -4.08% HandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/OK-12 38.7ms ± 0% 39.6ms ± 0% +2.28% HandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/conflict_errors-12 67.7ms ± 0% 64.8ms ± 0% -4.34% name old alloc/op new alloc/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/OK-12 7.35kB ± 0% 7.66kB ± 0% +4.25% HandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/conflict_errors-12 10.4kB ± 0% 10.7kB ± 0% +2.94% HandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/OK-12 14.9MB ± 0% 15.3MB ± 0% +2.56% HandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/conflict_errors-12 45.3MB ± 0% 45.3MB ± 0% -0.00% HandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/OK-12 11.9kB ± 0% 12.2kB ± 0% +2.79% HandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/conflict_errors-12 14.9kB ± 0% 15.2kB ± 0% +2.18% HandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/OK-12 15.9MB ± 0% 17.7MB ± 0% +11.38% HandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/conflict_errors-12 45.3MB ± 0% 45.3MB ± 0% -0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/OK-12 43.0MB ± 0% 43.0MB ± 0% -0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/conflict_errors-12 43.0MB ± 0% 43.0MB ± 0% -0.01% HandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/OK-12 61.2MB ± 0% 60.8MB ± 0% -0.72% HandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/conflict_errors-12 89.1MB ± 0% 89.1MB ± 0% -0.00% name old allocs/op new allocs/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/OK-12 67.0 ± 0% 68.0 ± 0% +1.49% HandlerReceiveHTTP/typical_labels_under_1KB,_single_sample/conflict_errors-12 110 ± 0% 111 ± 0% +0.91% HandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/OK-12 5.14k ± 0% 5.14k ± 0% +0.02% HandlerReceiveHTTP/typical_labels_under_1KB,_2MB_of_samples/conflict_errors-12 500k ± 0% 500k ± 0% -0.00% HandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/OK-12 69.0 ± 0% 70.0 ± 0% +1.45% HandlerReceiveHTTP/bigger_labels_over_1KB,_single_sample/conflict_errors-12 112 ± 0% 113 ± 0% +0.89% HandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/OK-12 5.17k ± 0% 5.18k ± 0% +0.17% HandlerReceiveHTTP/bigger_labels_over_1KB,_2MB_of_samples/conflict_errors-12 500k ± 0% 500k ± 0% +0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/OK-12 57.0 ± 0% 57.0 ± 0% 0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_single_sample/conflict_errors-12 100 ± 0% 100 ± 0% 0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/OK-12 5.11k ± 0% 5.11k ± 0% -0.08% HandlerReceiveHTTP/extremely_large_label_value_10MB,_2MB_samples/conflict_errors-12 500k ± 0% 500k ± 0% 0.00% ``` Signed-off-by: Bartlomiej Plotka --- pkg/receive/handler_test.go | 40 +++++------ pkg/receive/hashring_test.go | 2 +- pkg/receive/writer.go | 2 +- pkg/store/prometheus.go | 2 +- pkg/store/storepb/prompb/types.pb.go | 102 +++++++++++++++------------ pkg/store/storepb/prompb/types.proto | 5 +- 6 files changed, 81 insertions(+), 72 deletions(-) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 60247004d0..be461ee0bf 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -198,7 +198,7 @@ func TestReceiveQuorum(t *testing.T) { wreq1 := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { - Labels: []labelpb.ZLabel{ + Labels: []labelpb.Label{ { Name: "foo", Value: "bar", @@ -534,7 +534,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { wreq1 := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ { - Labels: []labelpb.ZLabel{ + Labels: []labelpb.Label{ { Name: "foo", Value: "bar", @@ -927,11 +927,11 @@ type fakeRemoteWriteGRPCServer struct { h storepb.WriteableStoreServer } -func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { +func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, _ ...grpc.CallOption) (*storepb.WriteResponse, error) { return f.h.RemoteWrite(ctx, in) } -func serialize(t testing.TB, lbls []labelpb.ZLabel, samples []prompb.Sample) []byte { +func serialize(t testing.TB, lbls []labelpb.Label, samples []prompb.Sample) []byte { // Create significant number of samples to see the weight of it on profiles. r := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ @@ -1039,70 +1039,70 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { }{ { name: "typical labels under 1KB, single sample", - writeRequest: serialize(b, func() []labelpb.ZLabel { - lbls := make([]labelpb.ZLabel, 10) + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) for i := 0; i < len(lbls); i++ { // Label ~20B name, 50B value. - lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} } return lbls }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. }, { name: "typical labels under 1KB, 2MB of samples", - writeRequest: serialize(b, func() []labelpb.ZLabel { - lbls := make([]labelpb.ZLabel, 10) + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) for i := 0; i < len(lbls); i++ { // Label ~20B name, 50B value. - lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} } return lbls }(), manySamples), }, { name: "bigger labels over 1KB, single sample", - writeRequest: serialize(b, func() []labelpb.ZLabel { - lbls := make([]labelpb.ZLabel, 10) + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) for i := 0; i < len(lbls); i++ { // Label ~50B name, 50B value. - lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} } return lbls }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. }, { name: "bigger labels over 1KB, 2MB of samples", - writeRequest: serialize(b, func() []labelpb.ZLabel { - lbls := make([]labelpb.ZLabel, 10) + writeRequest: serialize(b, func() []labelpb.Label { + lbls := make([]labelpb.Label, 10) for i := 0; i < len(lbls); i++ { // Label ~50B name, 50B value. - lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + lbls[i] = labelpb.Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} } return lbls }(), manySamples), }, { name: "extremely large label value 10MB, single sample", - writeRequest: serialize(b, func() []labelpb.ZLabel { + writeRequest: serialize(b, func() []labelpb.Label { lbl := &strings.Builder{} lbl.Grow(1024 * 1024 * 10) // 10MB. word := "abcdefghij" for i := 0; i < lbl.Cap()/len(word); i++ { _, _ = lbl.WriteString(word) } - return []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}} + return []labelpb.Label{{Name: "__name__", Value: lbl.String()}} }(), []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}), // Timestamp does not matter, it will be overridden. }, { name: "extremely large label value 10MB, 2MB samples", - writeRequest: serialize(b, func() []labelpb.ZLabel { + writeRequest: serialize(b, func() []labelpb.Label { lbl := &strings.Builder{} lbl.Grow(1024 * 1024 * 10) // 10MB. word := "abcdefghij" for i := 0; i < lbl.Cap()/len(word); i++ { _, _ = lbl.WriteString(word) } - return []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}} + return []labelpb.Label{{Name: "__name__", Value: lbl.String()}} }(), manySamples), }, } { diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go index f5a1411a5b..52b68daddf 100644 --- a/pkg/receive/hashring_test.go +++ b/pkg/receive/hashring_test.go @@ -12,7 +12,7 @@ import ( func TestHashringGet(t *testing.T) { ts := &prompb.TimeSeries{ - Labels: []labelpb.ZLabel{ + Labels: []labelpb.Label{ { Name: "foo", Value: "bar", diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index ab50b9a203..8055d3e8c3 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -62,7 +62,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR var errs terrors.MultiError for _, t := range wreq.Timeseries { - lset := labelpb.ZLabelsToPromLabels(t.Labels) + lset := labelpb.LabelsToPromLabels(t.Labels) // Append as many valid samples as possible, but keep track of the errors. var ref uint64 diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 0e4210084a..e601874ce6 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -232,7 +232,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_Series span.SetTag("series_count", len(resp.Results[0].Timeseries)) for _, e := range resp.Results[0].Timeseries { - lset := labelpb.ExtendLabels(labelpb.ZLabelsToPromLabels(e.Labels), externalLabels) + lset := labelpb.ExtendLabels(labelpb.LabelsToPromLabels(e.Labels), externalLabels) if len(e.Samples) == 0 { // As found in https://github.com/thanos-io/thanos/issues/381 // Prometheus can give us completely empty time series. Ignore these with log until we figure out that diff --git a/pkg/store/storepb/prompb/types.pb.go b/pkg/store/storepb/prompb/types.pb.go index f9b8bbc131..d97af30c4a 100644 --- a/pkg/store/storepb/prompb/types.pb.go +++ b/pkg/store/storepb/prompb/types.pb.go @@ -12,8 +12,8 @@ import ( _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" - _ "github.com/thanos-io/thanos/pkg/store/labelpb" github_com_thanos_io_thanos_pkg_store_labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" + labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" ) // Reference imports to suppress errors if they are not otherwise used. @@ -138,9 +138,10 @@ func (m *Sample) GetTimestamp() int64 { // TimeSeries represents samples and labels for a single time series. type TimeSeries struct { - // TODO(bwplotka): Don't use zero copy ZLabels, see https://github.com/thanos-io/thanos/pull/3279 for details. - Labels []github_com_thanos_io_thanos_pkg_store_labelpb.ZLabel `protobuf:"bytes,1,rep,name=labels,proto3,customtype=github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` + // TODO(bwplotka): Don't use zero copy ZLabels. The message is to often to large and labels are sometimes referenced + // for longer than request time, causing long term over allocation. + Labels []labelpb.Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` } func (m *TimeSeries) Reset() { *m = TimeSeries{} } @@ -176,6 +177,13 @@ func (m *TimeSeries) XXX_DiscardUnknown() { var xxx_messageInfo_TimeSeries proto.InternalMessageInfo +func (m *TimeSeries) GetLabels() []labelpb.Label { + if m != nil { + return m.Labels + } + return nil +} + func (m *TimeSeries) GetSamples() []Sample { if m != nil { return m.Samples @@ -468,45 +476,45 @@ func init() { func init() { proto.RegisterFile("store/storepb/prompb/types.proto", fileDescriptor_166e07899dab7c14) } var fileDescriptor_166e07899dab7c14 = []byte{ - // 599 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x54, 0xcb, 0x6e, 0xd3, 0x4c, - 0x14, 0xce, 0xc4, 0x89, 0x93, 0x9c, 0x5e, 0xfe, 0x68, 0xd4, 0x9f, 0xa6, 0x15, 0x72, 0x2d, 0xaf, - 0xb2, 0xc1, 0x96, 0xda, 0x0a, 0x36, 0x5d, 0x15, 0x45, 0x42, 0x82, 0xa4, 0xea, 0xb4, 0x08, 0xd4, - 0x4d, 0x35, 0x76, 0x06, 0xc7, 0x6a, 0x3c, 0xb6, 0x3c, 0x13, 0xd4, 0xbc, 0x05, 0x6b, 0x1e, 0x81, - 0x1d, 0x3c, 0x45, 0x97, 0x5d, 0x22, 0x16, 0x15, 0x6a, 0x5f, 0x04, 0xcd, 0xb1, 0xd3, 0x50, 0xca, - 0x9a, 0x4d, 0x74, 0x2e, 0xdf, 0x7c, 0xe7, 0xf6, 0xc5, 0xe0, 0x2a, 0x9d, 0x15, 0x22, 0xc0, 0xdf, - 0x3c, 0x0c, 0xf2, 0x22, 0x4b, 0xf3, 0x30, 0xd0, 0xf3, 0x5c, 0x28, 0x3f, 0x2f, 0x32, 0x9d, 0xd1, - 0xff, 0x4c, 0x4c, 0xe8, 0x89, 0x98, 0xa9, 0xf3, 0x28, 0xcb, 0xe7, 0xdb, 0x1b, 0x71, 0x16, 0x67, - 0x98, 0x0b, 0x8c, 0x55, 0xc2, 0xb6, 0xb7, 0x4a, 0xa2, 0x29, 0x0f, 0xc5, 0xf4, 0x21, 0x83, 0x77, - 0x00, 0xf6, 0x09, 0x4f, 0xf3, 0xa9, 0xa0, 0x1b, 0xd0, 0xfc, 0xc8, 0xa7, 0x33, 0xd1, 0x23, 0x2e, - 0xe9, 0x13, 0x56, 0x3a, 0xf4, 0x29, 0x74, 0x74, 0x92, 0x0a, 0xa5, 0x79, 0x9a, 0xf7, 0xea, 0x2e, - 0xe9, 0x5b, 0x6c, 0x19, 0xf0, 0xbe, 0x10, 0x80, 0xd3, 0x24, 0x15, 0x27, 0xa2, 0x48, 0x84, 0xa2, - 0x11, 0xd8, 0x58, 0x43, 0xf5, 0x88, 0x6b, 0xf5, 0x57, 0x76, 0xd7, 0x7c, 0x3d, 0xe1, 0x32, 0x53, - 0xfe, 0x1b, 0x13, 0x3d, 0x3c, 0xb8, 0xba, 0xd9, 0xa9, 0xfd, 0xb8, 0xd9, 0xd9, 0x8f, 0x13, 0x3d, - 0x99, 0x85, 0x7e, 0x94, 0xa5, 0x41, 0x09, 0x78, 0x96, 0x64, 0x95, 0x15, 0xe4, 0x17, 0x71, 0xf0, - 0xa0, 0x5d, 0xff, 0x0c, 0x5f, 0xb3, 0x8a, 0x9a, 0xbe, 0x80, 0x96, 0xc2, 0x8e, 0x55, 0xaf, 0x8e, - 0x55, 0x36, 0xfd, 0x3f, 0xb6, 0xe0, 0x97, 0x13, 0x1d, 0x36, 0x4c, 0x3d, 0xb6, 0x40, 0x7b, 0x9f, - 0x09, 0xac, 0x22, 0xd5, 0x90, 0xeb, 0x68, 0x22, 0x0a, 0xfa, 0x1c, 0x1a, 0x66, 0x15, 0x38, 0xf0, - 0xfa, 0xae, 0xf7, 0x88, 0xe6, 0x77, 0xb0, 0x7f, 0x3a, 0xcf, 0x05, 0x43, 0x3c, 0xa5, 0xd0, 0x90, - 0x3c, 0x15, 0xb8, 0x8e, 0x0e, 0x43, 0x7b, 0xb9, 0x3d, 0x0b, 0x83, 0xa5, 0xe3, 0xf5, 0xa1, 0x61, - 0xde, 0x51, 0x1b, 0xea, 0x83, 0xe3, 0x6e, 0x8d, 0xb6, 0xc0, 0x1a, 0x0d, 0x8e, 0xbb, 0xc4, 0x04, - 0xd8, 0xa0, 0x5b, 0xc7, 0x00, 0x1b, 0x74, 0x2d, 0xef, 0x2b, 0x81, 0x0e, 0x13, 0x7c, 0xfc, 0x2a, - 0x91, 0x5a, 0xd1, 0x4d, 0x68, 0x29, 0x2d, 0xf2, 0xf3, 0x54, 0x61, 0x73, 0x16, 0xb3, 0x8d, 0x3b, - 0x54, 0xa6, 0xf4, 0x87, 0x99, 0x8c, 0x16, 0xa5, 0x8d, 0x4d, 0xb7, 0xa0, 0xad, 0x34, 0x2f, 0xb4, - 0x41, 0x5b, 0x88, 0x6e, 0xa1, 0x3f, 0x54, 0xf4, 0x7f, 0xb0, 0x85, 0x1c, 0x9b, 0x44, 0x03, 0x13, - 0x4d, 0x21, 0xc7, 0x43, 0x45, 0xb7, 0xa1, 0x1d, 0x17, 0xd9, 0x2c, 0x4f, 0x64, 0xdc, 0x6b, 0xba, - 0x56, 0xbf, 0xc3, 0xee, 0x7d, 0xba, 0x0e, 0xf5, 0x70, 0xde, 0xb3, 0x5d, 0xd2, 0x6f, 0xb3, 0x7a, - 0x38, 0x37, 0xec, 0x05, 0x97, 0xb1, 0x30, 0x24, 0xad, 0x92, 0x1d, 0xfd, 0xa1, 0xf2, 0xbe, 0x11, - 0x68, 0xbe, 0x9c, 0xcc, 0xe4, 0x05, 0x75, 0x60, 0x25, 0x4d, 0xe4, 0xb9, 0x11, 0xc6, 0xb2, 0xe7, - 0x4e, 0x9a, 0x48, 0x23, 0x8e, 0xa1, 0xc2, 0x3c, 0xbf, 0xbc, 0xcf, 0x57, 0x3a, 0x4a, 0xf9, 0x65, - 0x95, 0xdf, 0xab, 0x2e, 0x61, 0xe1, 0x25, 0x76, 0x1e, 0x5d, 0x02, 0xab, 0xf8, 0x03, 0x19, 0x65, - 0xe3, 0x44, 0xc6, 0xcb, 0x33, 0x8c, 0xb9, 0xe6, 0x38, 0xda, 0x2a, 0x43, 0xdb, 0x73, 0xa1, 0xbd, - 0x40, 0xd1, 0x15, 0x68, 0xbd, 0x1d, 0xbd, 0x1e, 0x1d, 0xbd, 0x1b, 0x95, 0x9b, 0x7f, 0x7f, 0xc4, - 0xba, 0xc4, 0x48, 0x76, 0x0d, 0xe9, 0xc4, 0xf8, 0x5f, 0xaa, 0x76, 0x1f, 0xec, 0xc8, 0x54, 0x5d, - 0x88, 0xf6, 0xc9, 0xdf, 0x67, 0xac, 0x34, 0x5b, 0x61, 0x0f, 0xdd, 0xab, 0x5b, 0x87, 0x5c, 0xdf, - 0x3a, 0xe4, 0xe7, 0xad, 0x43, 0x3e, 0xdd, 0x39, 0xb5, 0xeb, 0x3b, 0xa7, 0xf6, 0xfd, 0xce, 0xa9, - 0x9d, 0xd9, 0xe5, 0xd7, 0x20, 0xb4, 0xf1, 0x6f, 0xbc, 0xf7, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x4e, - 0x61, 0x3c, 0xe9, 0x2c, 0x04, 0x00, 0x00, + // 606 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x53, 0x4d, 0x6b, 0xdb, 0x4c, + 0x10, 0xf6, 0x5a, 0xb6, 0x6c, 0x4f, 0x3e, 0x5e, 0xb3, 0xe4, 0x7d, 0xe3, 0x84, 0x17, 0x45, 0xe8, + 0x64, 0x28, 0x95, 0x20, 0x09, 0xed, 0x25, 0xa7, 0x14, 0x43, 0xa1, 0xb5, 0x43, 0x36, 0x29, 0x2d, + 0xb9, 0x84, 0x95, 0xbc, 0x95, 0x45, 0xac, 0x95, 0xd0, 0xae, 0x4b, 0xfc, 0x2f, 0x7a, 0xee, 0x4f, + 0xe8, 0xad, 0xfd, 0x15, 0x39, 0xe6, 0x58, 0x7a, 0x08, 0x25, 0xf9, 0x23, 0x65, 0x47, 0x72, 0xdc, + 0x34, 0xa5, 0x17, 0x31, 0x1f, 0xcf, 0x3e, 0x33, 0xa3, 0x79, 0x06, 0x5c, 0xa5, 0xb3, 0x42, 0x04, + 0xf8, 0xcd, 0xc3, 0x20, 0x2f, 0xb2, 0x34, 0x0f, 0x03, 0x3d, 0xcf, 0x85, 0xf2, 0xf3, 0x22, 0xd3, + 0x19, 0xfd, 0xc7, 0xc4, 0x84, 0x9e, 0x88, 0x99, 0x3a, 0x8f, 0xb2, 0x7c, 0xbe, 0xbd, 0x11, 0x67, + 0x71, 0x86, 0xb9, 0xc0, 0x58, 0x25, 0x6c, 0x7b, 0xab, 0x24, 0x9a, 0xf2, 0x50, 0x4c, 0x1f, 0x32, + 0x78, 0x07, 0x60, 0x9f, 0xf0, 0x34, 0x9f, 0x0a, 0xba, 0x01, 0xcd, 0x0f, 0x7c, 0x3a, 0x13, 0x3d, + 0xe2, 0x92, 0x3e, 0x61, 0xa5, 0x43, 0xff, 0x87, 0x8e, 0x4e, 0x52, 0xa1, 0x34, 0x4f, 0xf3, 0x5e, + 0xdd, 0x25, 0x7d, 0x8b, 0x2d, 0x03, 0x5e, 0x01, 0x70, 0x9a, 0xa4, 0xe2, 0x44, 0x14, 0x89, 0x50, + 0xf4, 0x09, 0xd8, 0x58, 0x42, 0xf5, 0x88, 0x6b, 0xf5, 0x57, 0x76, 0xd7, 0x7c, 0x3d, 0xe1, 0x32, + 0x53, 0xfe, 0x6b, 0x13, 0x3d, 0x6c, 0x5c, 0xdd, 0xec, 0xd4, 0x58, 0x05, 0xa1, 0xcf, 0xa1, 0xa5, + 0xb0, 0xb0, 0xea, 0xd5, 0x11, 0xbd, 0xe9, 0xff, 0x36, 0x8c, 0x5f, 0x36, 0x56, 0xbd, 0x5b, 0xa0, + 0xbd, 0x4f, 0x04, 0x56, 0x91, 0x70, 0xc8, 0x75, 0x34, 0x11, 0x05, 0x7d, 0x06, 0x0d, 0x33, 0x11, + 0xf6, 0xbd, 0xbe, 0xeb, 0x3d, 0xa2, 0xf9, 0x15, 0xec, 0x9f, 0xce, 0x73, 0xc1, 0x10, 0x4f, 0x29, + 0x34, 0x24, 0x4f, 0x05, 0x4e, 0xd5, 0x61, 0x68, 0x2f, 0x7f, 0x82, 0x85, 0xc1, 0xd2, 0xf1, 0xfa, + 0xd0, 0x30, 0xef, 0xa8, 0x0d, 0xf5, 0xc1, 0x71, 0xb7, 0x46, 0x5b, 0x60, 0x8d, 0x06, 0xc7, 0x5d, + 0x62, 0x02, 0x6c, 0xd0, 0xad, 0x63, 0x80, 0x0d, 0xba, 0x96, 0xf7, 0x85, 0x40, 0x87, 0x09, 0x3e, + 0x7e, 0x99, 0x48, 0xad, 0xe8, 0x26, 0xb4, 0x94, 0x16, 0xf9, 0x79, 0xaa, 0xb0, 0x39, 0x8b, 0xd9, + 0xc6, 0x1d, 0x2a, 0x53, 0xfa, 0xfd, 0x4c, 0x46, 0x8b, 0xd2, 0xc6, 0xa6, 0x5b, 0xd0, 0x56, 0x9a, + 0x17, 0xda, 0xa0, 0x2d, 0x44, 0xb7, 0xd0, 0x1f, 0x2a, 0xfa, 0x2f, 0xd8, 0x42, 0x8e, 0x4d, 0xa2, + 0x81, 0x89, 0xa6, 0x90, 0xe3, 0xa1, 0xa2, 0xdb, 0xd0, 0x8e, 0x8b, 0x6c, 0x96, 0x27, 0x32, 0xee, + 0x35, 0x5d, 0xab, 0xdf, 0x61, 0xf7, 0x3e, 0x5d, 0x87, 0x7a, 0x38, 0xef, 0xd9, 0x2e, 0xe9, 0xb7, + 0x59, 0x3d, 0x9c, 0x1b, 0xf6, 0x82, 0xcb, 0x58, 0x18, 0x92, 0x56, 0xc9, 0x8e, 0xfe, 0x50, 0x79, + 0x5f, 0x09, 0x34, 0x5f, 0x4c, 0x66, 0xf2, 0x82, 0x3a, 0xb0, 0x92, 0x26, 0xf2, 0xdc, 0xec, 0x77, + 0xd9, 0x73, 0x27, 0x4d, 0xa4, 0x59, 0xf2, 0x50, 0x61, 0x9e, 0x5f, 0xde, 0xe7, 0x2b, 0x39, 0xa4, + 0xfc, 0xb2, 0xca, 0xef, 0x55, 0x9b, 0xb0, 0x70, 0x13, 0x3b, 0x8f, 0x36, 0x81, 0x55, 0xfc, 0x81, + 0x8c, 0xb2, 0x71, 0x22, 0xe3, 0xe5, 0x1a, 0xc6, 0x5c, 0x73, 0x1c, 0x6d, 0x95, 0xa1, 0xed, 0xb9, + 0xd0, 0x5e, 0xa0, 0xe8, 0x0a, 0xb4, 0xde, 0x8c, 0x5e, 0x8d, 0x8e, 0xde, 0x8e, 0xca, 0x3f, 0xff, + 0xee, 0x88, 0x75, 0x89, 0xf7, 0x99, 0xc0, 0x1a, 0xd2, 0x89, 0x71, 0xa5, 0xbe, 0xe8, 0xef, 0xea, + 0x3b, 0x30, 0x2a, 0xfa, 0x7e, 0xb3, 0xb3, 0x1f, 0x27, 0x7a, 0x32, 0x0b, 0xfd, 0x28, 0x4b, 0x83, + 0x12, 0xf0, 0x34, 0xc9, 0x2a, 0x2b, 0xc8, 0x2f, 0xe2, 0xe0, 0xc1, 0xad, 0xf8, 0x67, 0xf8, 0xfa, + 0x5e, 0xb5, 0xfb, 0x60, 0x47, 0xa6, 0xea, 0x42, 0xb4, 0xff, 0xfd, 0x79, 0xc6, 0x85, 0xd6, 0x4b, + 0xec, 0xa1, 0x7b, 0x75, 0xeb, 0x90, 0xeb, 0x5b, 0x87, 0xfc, 0xb8, 0x75, 0xc8, 0xc7, 0x3b, 0xa7, + 0x76, 0x7d, 0xe7, 0xd4, 0xbe, 0xdd, 0x39, 0xb5, 0x33, 0xbb, 0x3c, 0xea, 0xd0, 0xc6, 0x6b, 0xdc, + 0xfb, 0x19, 0x00, 0x00, 0xff, 0xff, 0x7f, 0x60, 0xc8, 0x48, 0xf3, 0x03, 0x00, 0x00, } func (m *Sample) Marshal() (dAtA []byte, err error) { @@ -580,11 +588,11 @@ func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { if len(m.Labels) > 0 { for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { { - size := m.Labels[iNdEx].Size() - i -= size - if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { return 0, err } + i -= size i = encodeVarintTypes(dAtA, i, uint64(size)) } i-- @@ -1092,7 +1100,7 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Labels = append(m.Labels, github_com_thanos_io_thanos_pkg_store_labelpb.ZLabel{}) + m.Labels = append(m.Labels, labelpb.Label{}) if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/pkg/store/storepb/prompb/types.proto b/pkg/store/storepb/prompb/types.proto index 2b7ac25775..0257a30646 100644 --- a/pkg/store/storepb/prompb/types.proto +++ b/pkg/store/storepb/prompb/types.proto @@ -35,8 +35,9 @@ message Sample { // TimeSeries represents samples and labels for a single time series. message TimeSeries { - // TODO(bwplotka): Don't use zero copy ZLabels, see https://github.com/thanos-io/thanos/pull/3279 for details. - repeated thanos.Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"]; + // TODO(bwplotka): Don't use zero copy ZLabels. The message is to often to large and labels are sometimes referenced + // for longer than request time, causing long term over allocation. + repeated thanos.Label labels = 1 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false]; } From 153636f5547b065d4f865554beae9308727eee9c Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Sat, 17 Oct 2020 00:08:46 +0200 Subject: [PATCH 4/5] Optimized hash. Signed-off-by: Bartlomiej Plotka --- go.mod | 1 + pkg/receive/handler.go | 24 ++++++++---- pkg/receive/hashring.go | 27 +++---------- pkg/receive/writer.go | 2 +- pkg/store/labelpb/label.go | 30 ++++++++++++++ pkg/store/labelpb/label_test.go | 69 +++++++++++++++++++++++++++++++++ 6 files changed, 124 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 87554f58f9..98185e7249 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/armon/go-metrics v0.3.3 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 + github.com/cespare/xxhash/v2 v2.1.1 github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0 github.com/chromedp/chromedp v0.5.3 github.com/cortexproject/cortex v1.3.1-0.20200923145333-8587ea61fe17 diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 1070aaffab..17dac4e03c 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -4,10 +4,11 @@ package receive import ( + "bytes" "context" "crypto/tls" "fmt" - "io/ioutil" + "io" stdlog "log" "net" "net/http" @@ -282,13 +283,17 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { span, ctx := tracing.StartSpan(r.Context(), "receive_http") defer span.Finish() - compressed, err := ioutil.ReadAll(r.Body) + compressed := &bytes.Buffer{} + if r.ContentLength >= 0 { + compressed.Grow(int(r.ContentLength)) + } + _, err := io.Copy(compressed, r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - reqBuf, err := snappy.Decode(nil, compressed) + reqBuf, err := snappy.Decode(nil, compressed.Bytes()) if err != nil { level.Error(h.logger).Log("msg", "snappy decode error", "err", err) http.Error(w, err.Error(), http.StatusBadRequest) @@ -314,6 +319,10 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { if len(tenant) == 0 { tenant = h.options.DefaultTenantID } + if len(tenant) == 0 { + http.Error(w, "no tenant ID supplied", http.StatusBadRequest) + return + } err = h.handleRequest(ctx, rep, tenant, &wreq) switch err { @@ -401,9 +410,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma } }() - logger := log.With(h.logger, "tenant", tenant) + // Avoid log.With extra allocations for rare log lines. + logTags := []interface{}{"tenant", tenant} if id, ok := middleware.RequestIDFromContext(pctx); ok { - logger = log.With(logger, "request-id", id) + logTags = append(logTags, "request-id", id) } ec := make(chan error) @@ -524,7 +534,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma b.attempt++ dur := h.expBackoff.ForAttempt(b.attempt) b.nextAllowed = time.Now().Add(dur) - level.Debug(h.logger).Log("msg", "target unavailable backing off", "for", dur) + level.Debug(h.logger).Log(append(logTags, "msg", "msg", "target unavailable backing off", "for", dur)...) } else { h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} } @@ -553,7 +563,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma go func() { for err := range ec { if err != nil { - level.Debug(logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) + level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err)...) } } }() diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index ef6c94390a..76eaa7ee25 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -9,13 +9,11 @@ import ( "sort" "sync" - "github.com/cespare/xxhash" "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) -const sep = '\xff' - // insufficientNodesError is returned when a hashring does not // have enough nodes to satisfy a request for a node. type insufficientNodesError struct { @@ -38,23 +36,6 @@ type Hashring interface { GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error) } -// hash returns a hash for the given tenant and time series. -func hash(tenant string, ts *prompb.TimeSeries) uint64 { - // Sort labelset to ensure a stable hash. - sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) - - b := make([]byte, 0, 1024) - b = append(b, []byte(tenant)...) - b = append(b, sep) - for _, v := range ts.Labels { - b = append(b, v.Name...) - b = append(b, sep) - b = append(b, v.Value...) - b = append(b, sep) - } - return xxhash.Sum64(b) -} - // SingleNodeHashring always returns the same node. type SingleNodeHashring string @@ -84,7 +65,11 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st if n >= uint64(len(s)) { return "", &insufficientNodesError{have: uint64(len(s)), want: n + 1} } - return s[(hash(tenant, ts)+n)%uint64(len(s))], nil + + // TODO(bwplotka): Labels should be sorted already, consider removing this. + sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) + + return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil } // multiHashring represents a set of hashrings. diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 8055d3e8c3..eaeb98adba 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -64,8 +64,8 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR for _, t := range wreq.Timeseries { lset := labelpb.LabelsToPromLabels(t.Labels) - // Append as many valid samples as possible, but keep track of the errors. var ref uint64 + // Append as many valid samples as possible, but keep track of the errors. for i, s := range t.Samples { if i == 0 { ref, err = app.Add(lset, s.Timestamp, s.Value) diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 5638f69e5f..977cc2afd1 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -14,6 +14,7 @@ import ( "strings" "unsafe" + "github.com/cespare/xxhash/v2" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" ) @@ -295,3 +296,32 @@ func DeepCopy(lbls []ZLabel) []ZLabel { } return ret } + +var sep = []byte{'\xff'} + +// HashWithPrefix returns a hash for the given prefix and labels. +func HashWithPrefix(prefix string, lbls []Label) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + b = append(b, prefix...) + + for i, v := range lbls { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB allocate do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range lbls[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(sep) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(sep) + } + return h.Sum64() + } + b = append(b, v.Name...) + b = append(b, sep[0]) + b = append(b, v.Value...) + b = append(b, sep[0]) + } + return xxhash.Sum64(b) +} diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index 6656eea445..abab21c0b2 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -5,6 +5,7 @@ package labelpb import ( "fmt" + "strings" "testing" "github.com/prometheus/prometheus/pkg/labels" @@ -104,3 +105,71 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) { } }) } + +func TestHashWithPrefix(t *testing.T) { + lbls := []Label{ + {Name: "foo", Value: "bar"}, + {Name: "baz", Value: "qux"}, + } + testutil.Equals(t, HashWithPrefix("a", lbls), HashWithPrefix("a", lbls)) + testutil.Assert(t, HashWithPrefix("a", lbls) != HashWithPrefix("a", []Label{lbls[0]})) + testutil.Assert(t, HashWithPrefix("a", lbls) != HashWithPrefix("a", []Label{lbls[1], lbls[0]})) + testutil.Assert(t, HashWithPrefix("a", lbls) != HashWithPrefix("b", lbls)) +} + +var benchmarkLabelsResult uint64 + +func BenchmarkHasWithPrefix(b *testing.B) { + for _, tcase := range []struct { + name string + lbls []Label + }{ + { + name: "typical labels under 1KB", + lbls: func() []Label { + lbls := make([]Label, 10) + for i := 0; i < len(lbls); i++ { + // ZLabel ~20B name, 50B value. + lbls[i] = Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), + }, + { + name: "bigger labels over 1KB", + lbls: func() []Label { + lbls := make([]Label, 10) + for i := 0; i < len(lbls); i++ { + //ZLabel ~50B name, 50B value. + lbls[i] = Label{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + return lbls + }(), + }, + { + name: "extremely large label value 10MB", + lbls: func() []Label { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + return []Label{{Name: "__name__", Value: lbl.String()}} + }(), + }, + } { + b.Run(tcase.name, func(b *testing.B) { + var h uint64 + + const prefix = "test-" + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + h = HashWithPrefix(prefix, tcase.lbls) + } + benchmarkLabelsResult = h + }) + } +} From 9f0ec8715d949d50b8d6c4d1c018fb174893e21e Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Sat, 17 Oct 2020 00:54:50 +0200 Subject: [PATCH 5/5] Reduced allocs on write append errors. Signed-off-by: Bartlomiej Plotka --- pkg/receive/writer.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index eaeb98adba..4bb289b70b 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -61,6 +61,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR } var errs terrors.MultiError + var oooSamples, dupSamples, outOfBoundsSamples []prompb.Sample for _, t := range wreq.Timeseries { lset := labelpb.LabelsToPromLabels(t.Labels) @@ -77,29 +78,34 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR case nil: continue case storage.ErrOutOfOrderSample: - numOutOfOrder++ - level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset, "sample", &s) + oooSamples = append(oooSamples, s) case storage.ErrDuplicateSampleForTimestamp: - numDuplicates++ - level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "sample", &s) + dupSamples = append(dupSamples, s) case storage.ErrOutOfBounds: - numOutOfBounds++ - level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset, "sample", &s) + outOfBoundsSamples = append(outOfBoundsSamples, s) } } + if len(oooSamples) > 0 || len(outOfBoundsSamples) > 0 || len(dupSamples) > 0 { + level.Warn(r.logger).Log("msg", "Skipped problematic samples", "outOfOrder", oooSamples, + "duplicates", dupSamples, "outOfBounds", outOfBoundsSamples, "lset", lset) + + numOutOfOrder += len(oooSamples) + numDuplicates += len(dupSamples) + numOutOfBounds += len(outOfBoundsSamples) + oooSamples = oooSamples[:0] + dupSamples = dupSamples[:0] + outOfBoundsSamples = outOfBoundsSamples[:0] + } } if numOutOfOrder > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder) - errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "failed to non-fast add %d samples", numOutOfOrder)) + errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, " add %d samples", numOutOfOrder)) } if numDuplicates > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates) - errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "failed to non-fast add %d samples", numDuplicates)) + errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates)) } if numOutOfBounds > 0 { - level.Warn(r.logger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds) - errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "failed to non-fast add %d samples", numOutOfBounds)) + errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds)) } if err := app.Commit(); err != nil {