Skip to content

Commit 265b768

Browse files
bwplotkaclyang82
authored andcommitted
receive: Improved efficiency of multitsdb appends, upgraded Prometheus deps. (thanos-io#4078)
* receive: Improved efficiency of multitsdb appends. Release vs current main looks the same: ``` benchstat -delta-test=none ../_dev/thanos/2021/receive/5.txt ../_dev/thanos/2021/receive2/main-go1.15.txt name old time/op new time/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.56ms ± 0% 1.45ms ± 0% -7.12% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 6.49ms ± 0% 7.14ms ± 0% +9.92% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 16.0ms ± 0% 16.4ms ± 0% +2.79% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 71.7ms ± 0% 69.4ms ± 0% -3.20% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 138ms ± 0% 131ms ± 0% -4.79% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 1.58s ± 0% 1.68s ± 0% +6.11% name old alloc/op new alloc/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.70MB ± 0% 1.70MB ± 0% +0.12% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 4.84MB ± 0% 4.84MB ± 0% +0.04% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 18.3MB ± 0% 18.2MB ± 0% -0.19% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 49.6MB ± 0% 49.6MB ± 0% +0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 331MB ± 0% 331MB ± 0% -0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 804MB ± 0% 804MB ± 0% +0.00% name old allocs/op new allocs/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 15.6k ± 0% 15.6k ± 0% +0.04% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 35.6k ± 0% 35.6k ± 0% +0.01% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 155k ± 0% 155k ± 0% -0.08% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 355k ± 0% 355k ± 0% +0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 147 ± 0% 145 ± 0% -1.36% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 417 ± 0% 421 ± 0% +0.96% ``` Unfortunately go1.16 introduces more allocs overall (not that much more): ``` benchstat -delta-test=none ../_dev/thanos/2021/receive2/main-go1.15.txt ../_dev/thanos/2021/receive2/main-go1.16.3.txt name old time/op new time/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.45ms ± 0% 1.62ms ± 0% +11.87% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 7.14ms ± 0% 6.47ms ± 0% -9.40% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 16.4ms ± 0% 15.8ms ± 0% -3.87% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 69.4ms ± 0% 66.4ms ± 0% -4.35% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 131ms ± 0% 141ms ± 0% +7.59% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 1.68s ± 0% 1.67s ± 0% -0.49% name old alloc/op new alloc/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.70MB ± 0% 1.75MB ± 0% +2.50% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 4.84MB ± 0% 4.89MB ± 0% +0.88% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 18.2MB ± 0% 18.8MB ± 0% +3.07% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 49.6MB ± 0% 50.1MB ± 0% +1.09% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 331MB ± 0% 343MB ± 0% +3.63% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 804MB ± 0% 816MB ± 0% +1.50% name old allocs/op new allocs/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 15.6k ± 0% 15.6k ± 0% -0.01% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 35.6k ± 0% 35.6k ± 0% +0.01% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 155k ± 0% 155k ± 0% +0.08% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 355k ± 0% 355k ± 0% +0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 145 ± 0% 166 ± 0% +14.48% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 421 ± 0% 440 ± 0% +4.51% ``` Signed-off-by: Bartlomiej Plotka <[email protected]> * Prometheus upgrade. No difference. ``` benchstat -delta-test=none ../_dev/thanos/2021/receive2/main-go1.16.3.txt ../_dev/thanos/2021/receive2/impr-go1.16.3-promup.txt name old time/op new time/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.62ms ± 0% 1.77ms ± 0% +9.57% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 6.47ms ± 0% 5.71ms ± 0% -11.76% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 15.8ms ± 0% 15.2ms ± 0% -3.83% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 66.4ms ± 0% 59.5ms ± 0% -10.37% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 141ms ± 0% 129ms ± 0% -8.60% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 1.67s ± 0% 1.41s ± 0% -15.58% name old alloc/op new alloc/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.75MB ± 0% 1.75MB ± 0% +0.04% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 4.89MB ± 0% 4.89MB ± 0% +0.02% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 18.8MB ± 0% 18.8MB ± 0% -0.05% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 50.1MB ± 0% 50.1MB ± 0% +0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 343MB ± 0% 344MB ± 0% +0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 816MB ± 0% 816MB ± 0% -0.00% name old allocs/op new allocs/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 15.6k ± 0% 15.6k ± 0% +0.01% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 35.6k ± 0% 35.6k ± 0% +0.00% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 155k ± 0% 155k ± 0% -0.06% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 355k ± 0% 355k ± 0% +0.00% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 166 ± 0% 169 ± 0% +1.81% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 440 ± 0% 435 ± 0% -1.14% ``` Signed-off-by: Bartlomiej Plotka <[email protected]> * ReadAll to Grow + Copy. Signed-off-by: Bartlomiej Plotka <[email protected]> * Moved hashring to optimized hash function. ``` benchstat -delta-test=none ../_dev/thanos/2021/receive2/impr1-go1.16.3.txt ../_dev/thanos/2021/receive2/impr3-go1.16.3.txt name old time/op new time/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.54ms ± 0% 1.64ms ± 0% +6.54% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 6.96ms ± 0% 8.02ms ± 0% +15.23% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 16.1ms ± 0% 16.5ms ± 0% +2.77% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 65.4ms ± 0% 65.1ms ± 0% -0.49% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 168ms ± 0% 119ms ± 0% -29.49% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 1.69s ± 0% 1.37s ± 0% -19.05% name old alloc/op new alloc/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.75MB ± 0% 1.63MB ± 0% -6.43% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 4.89MB ± 0% 4.77MB ± 0% -2.50% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 18.8MB ± 0% 17.6MB ± 0% -6.55% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 50.1MB ± 0% 48.9MB ± 0% -2.55% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 344MB ± 0% 225MB ± 0% -34.63% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 816MB ± 0% 697MB ± 0% -14.59% name old allocs/op new allocs/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 15.6k ± 0% 13.6k ± 0% -12.85% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 35.6k ± 0% 33.6k ± 0% -5.64% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 155k ± 0% 135k ± 0% -12.95% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 355k ± 0% 335k ± 0% -5.64% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 170 ± 0% 101 ± 0% -40.59% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 439 ± 0% 372 ± 0% -15.26% ``` Signed-off-by: Bartlomiej Plotka <[email protected]> * Used Prometheus GetRef to avoid reallocating the same series. ``` benchstat -delta-test=none ../_dev/thanos/2021/receive2/impr3-go1.16.3.txt ../_dev/thanos/2021/receive2/impr4-go1.16.3.txt name old time/op new time/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.64ms ± 0% 1.15ms ± 0% -30.02% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 8.02ms ± 0% 5.57ms ± 0% -30.53% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 16.5ms ± 0% 11.5ms ± 0% -30.28% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 65.1ms ± 0% 58.8ms ± 0% -9.66% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 119ms ± 0% 114ms ± 0% -3.56% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 1.37s ± 0% 1.43s ± 0% +4.58% name old alloc/op new alloc/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 1.63MB ± 0% 1.15MB ± 0% -29.48% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 4.77MB ± 0% 4.29MB ± 0% -10.07% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 17.6MB ± 0% 12.8MB ± 0% -27.20% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 48.9MB ± 0% 44.1MB ± 0% -9.82% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 225MB ± 0% 120MB ± 0% -46.70% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 697MB ± 0% 592MB ± 0% -15.05% name old allocs/op new allocs/op delta HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12 13.6k ± 0% 3.6k ± 0% -73.58% HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12 33.6k ± 0% 23.6k ± 0% -29.75% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12 135k ± 0% 35k ± 0% -73.84% HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12 335k ± 0% 235k ± 0% -29.84% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12 101 ± 0% 79 ± 0% -21.78% HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12 372 ± 0% 360 ± 0% -3.23% ``` Signed-off-by: Bartlomiej Plotka <[email protected]> * Build fixes. Signed-off-by: Bartlomiej Plotka <[email protected]> * Fixes. Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent b0bca31 commit 265b768

File tree

10 files changed

+383
-216
lines changed

10 files changed

+383
-216
lines changed

go.mod

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module github.com/thanos-io/thanos
22

33
require (
4-
cloud.google.com/go v0.74.0
4+
cloud.google.com/go v0.79.0
55
cloud.google.com/go/storage v1.10.0
66
github.com/Azure/azure-pipeline-go v0.2.2
77
github.com/Azure/azure-storage-blob-go v0.8.0
@@ -10,6 +10,7 @@ require (
1010
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
1111
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
1212
github.com/cespare/xxhash v1.1.0
13+
github.com/cespare/xxhash/v2 v2.1.1
1314
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
1415
github.com/chromedp/chromedp v0.5.3
1516
github.com/cortexproject/cortex v1.7.1-0.20210224085859-66d6fb5b0d42
@@ -32,7 +33,7 @@ require (
3233
github.com/leanovate/gopter v0.2.4
3334
github.com/lightstep/lightstep-tracer-go v0.18.1
3435
github.com/lovoo/gcloud-opentracing v0.3.0
35-
github.com/miekg/dns v1.1.38
36+
github.com/miekg/dns v1.1.41
3637
github.com/minio/minio-go/v7 v7.0.10
3738
github.com/mozillazg/go-cos v0.13.0
3839
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
@@ -47,8 +48,8 @@ require (
4748
github.com/prometheus/alertmanager v0.21.1-0.20201106142418-c39b78780054
4849
github.com/prometheus/client_golang v1.9.0
4950
github.com/prometheus/client_model v0.2.0
50-
github.com/prometheus/common v0.15.0
51-
github.com/prometheus/prometheus v1.8.2-0.20210215121130-6f488061dfb4
51+
github.com/prometheus/common v0.20.0
52+
github.com/prometheus/prometheus v1.8.2-0.20210413124018-62afcabd01ea
5253
github.com/uber/jaeger-client-go v2.25.0+incompatible
5354
github.com/uber/jaeger-lib v2.4.0+incompatible
5455
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
@@ -57,13 +58,13 @@ require (
5758
go.uber.org/atomic v1.7.0
5859
go.uber.org/automaxprocs v1.2.0
5960
go.uber.org/goleak v1.1.10
60-
golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9
61-
golang.org/x/oauth2 v0.0.0-20210210192628-66670185b0cd
62-
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
61+
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
62+
golang.org/x/oauth2 v0.0.0-20210323180902-22b0adad7558
63+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
6364
golang.org/x/text v0.3.5
64-
google.golang.org/api v0.39.0
65-
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d
66-
google.golang.org/grpc v1.34.0
65+
google.golang.org/api v0.42.0
66+
google.golang.org/genproto v0.0.0-20210312152112-fc591d9ea70f
67+
google.golang.org/grpc v1.36.0
6768
gopkg.in/alecthomas/kingpin.v2 v2.2.6
6869
gopkg.in/fsnotify.v1 v1.4.7
6970
gopkg.in/yaml.v2 v2.4.0
@@ -80,7 +81,7 @@ replace (
8081
// TODO: Remove this: https://github.com/thanos-io/thanos/issues/3967.
8182
github.com/minio/minio-go/v7 => github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6
8283
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
83-
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210215121130-6f488061dfb4
84+
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20210413124018-62afcabd01ea
8485
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
8586
google.golang.org/grpc => google.golang.org/grpc v1.29.1
8687

go.sum

+109-33
Large diffs are not rendered by default.

pkg/receive/handler.go

+20-12
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
package receive
55

66
import (
7+
"bytes"
78
"context"
89
"crypto/tls"
910
"fmt"
10-
"io/ioutil"
11+
"io"
1112
stdlog "log"
1213
"net"
1314
"net/http"
@@ -262,7 +263,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string,
262263
replicated: rep != 0,
263264
}
264265

265-
// on-the-wire format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated.
266+
// On the wire, format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated.
266267
if r.replicated {
267268
r.n--
268269
}
@@ -277,17 +278,24 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
277278
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
278279
defer span.Finish()
279280

280-
// TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files.
281-
compressed, err := ioutil.ReadAll(r.Body)
281+
// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
282+
// Since this is receive hot path, grow upfront saving allocations and CPU time.
283+
compressed := bytes.Buffer{}
284+
if r.ContentLength >= 0 {
285+
compressed.Grow(int(r.ContentLength))
286+
} else {
287+
compressed.Grow(512)
288+
}
289+
_, err := io.Copy(&compressed, r.Body)
282290
if err != nil {
283-
http.Error(w, err.Error(), http.StatusInternalServerError)
291+
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
284292
return
285293
}
286294

287-
reqBuf, err := snappy.Decode(nil, compressed)
295+
reqBuf, err := snappy.Decode(nil, compressed.Bytes())
288296
if err != nil {
289297
level.Error(h.logger).Log("msg", "snappy decode error", "err", err)
290-
http.Error(w, err.Error(), http.StatusBadRequest)
298+
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
291299
return
292300
}
293301

@@ -410,9 +418,9 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
410418
}
411419
}()
412420

413-
logger := log.With(h.logger, "tenant", tenant)
421+
logTags := []interface{}{"tenant", tenant}
414422
if id, ok := middleware.RequestIDFromContext(pctx); ok {
415-
logger = log.With(logger, "request-id", id)
423+
logTags = append(logTags, "request-id", id)
416424
}
417425

418426
ec := make(chan error)
@@ -462,7 +470,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
462470
if err != nil {
463471
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
464472
// To avoid breaking the counting logic, we need to flatten the error.
465-
level.Debug(h.logger).Log("msg", "local tsdb write failed", "err", err.Error())
473+
level.Debug(h.logger).Log(append(logTags, "msg", "local tsdb write failed", "err", err.Error()))
466474
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
467475
return
468476
}
@@ -525,7 +533,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
525533
b.attempt++
526534
dur := h.expBackoff.ForAttempt(b.attempt)
527535
b.nextAllowed = time.Now().Add(dur)
528-
level.Debug(h.logger).Log("msg", "target unavailable backing off", "for", dur)
536+
level.Debug(h.logger).Log(append(logTags, "msg", "target unavailable backing off", "for", dur))
529537
} else {
530538
h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
531539
}
@@ -554,7 +562,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
554562
go func() {
555563
for err := range ec {
556564
if err != nil {
557-
level.Debug(logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
565+
level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err))
558566
}
559567
}
560568
}()

pkg/receive/handler_test.go

+113-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/golang/snappy"
2727
"github.com/pkg/errors"
2828
"github.com/prometheus/client_golang/prometheus"
29+
"github.com/prometheus/prometheus/pkg/exemplar"
2930
"github.com/prometheus/prometheus/pkg/labels"
3031
"github.com/prometheus/prometheus/storage"
3132
"github.com/prometheus/prometheus/tsdb"
@@ -185,6 +186,108 @@ func TestDetermineWriteErrorCause(t *testing.T) {
185186
}
186187
}
187188

189+
type fakeTenantAppendable struct {
190+
f *fakeAppendable
191+
}
192+
193+
func newFakeTenantAppendable(f *fakeAppendable) *fakeTenantAppendable {
194+
return &fakeTenantAppendable{f: f}
195+
}
196+
197+
func (t *fakeTenantAppendable) TenantAppendable(_ string) (Appendable, error) {
198+
return t.f, nil
199+
}
200+
201+
type fakeAppendable struct {
202+
appender storage.Appender
203+
appenderErr func() error
204+
}
205+
206+
var _ Appendable = &fakeAppendable{}
207+
208+
func nilErrFn() error {
209+
return nil
210+
}
211+
212+
func (f *fakeAppendable) Appender(_ context.Context) (storage.Appender, error) {
213+
errf := f.appenderErr
214+
if errf == nil {
215+
errf = nilErrFn
216+
}
217+
return f.appender, errf()
218+
}
219+
220+
type fakeAppender struct {
221+
sync.Mutex
222+
samples map[uint64][]prompb.Sample
223+
exemplars map[uint64][]exemplar.Exemplar
224+
appendErr func() error
225+
commitErr func() error
226+
rollbackErr func() error
227+
}
228+
229+
var _ storage.Appender = &fakeAppender{}
230+
var _ storage.GetRef = &fakeAppender{}
231+
232+
func newFakeAppender(appendErr, commitErr, rollbackErr func() error) *fakeAppender { //nolint:unparam
233+
if appendErr == nil {
234+
appendErr = nilErrFn
235+
}
236+
if commitErr == nil {
237+
commitErr = nilErrFn
238+
}
239+
if rollbackErr == nil {
240+
rollbackErr = nilErrFn
241+
}
242+
return &fakeAppender{
243+
samples: make(map[uint64][]prompb.Sample),
244+
appendErr: appendErr,
245+
commitErr: commitErr,
246+
rollbackErr: rollbackErr,
247+
}
248+
}
249+
250+
func (f *fakeAppender) Get(l labels.Labels) []prompb.Sample {
251+
f.Lock()
252+
defer f.Unlock()
253+
s := f.samples[l.Hash()]
254+
res := make([]prompb.Sample, len(s))
255+
copy(res, s)
256+
return res
257+
}
258+
259+
func (f *fakeAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
260+
f.Lock()
261+
defer f.Unlock()
262+
if ref == 0 {
263+
ref = l.Hash()
264+
}
265+
f.samples[ref] = append(f.samples[ref], prompb.Sample{Timestamp: t, Value: v})
266+
return ref, f.appendErr()
267+
}
268+
269+
func (f *fakeAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
270+
f.Lock()
271+
defer f.Unlock()
272+
if ref == 0 {
273+
ref = l.Hash()
274+
}
275+
f.exemplars[ref] = append(f.exemplars[ref], e)
276+
return ref, f.appendErr()
277+
}
278+
279+
func (f *fakeAppender) GetRef(l labels.Labels) (uint64, labels.Labels) {
280+
return l.Hash(), l
281+
}
282+
283+
func (f *fakeAppender) Commit() error {
284+
return f.commitErr()
285+
}
286+
287+
func (f *fakeAppender) Rollback() error {
288+
return f.rollbackErr()
289+
}
290+
188291
func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) {
189292
var (
190293
cfg = []HashringConfig{{Hashring: "test"}}
@@ -1015,6 +1118,10 @@ func (a *tsOverrideAppender) AddFast(ref uint64, _ int64, v float64) error {
10151118
return a.Appender.AddFast(ref, cnt, v)
10161119
}
10171120

1121+
func (a *tsOverrideAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {
1122+
return a.Appender.(storage.GetRef).GetRef(lset)
1123+
}
1124+
10181125
// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would
10191126
// be send to Thanos receive.
10201127
// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it.
@@ -1171,15 +1278,18 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
11711278
for i := 0; i < n; i++ {
11721279
r := httptest.NewRecorder()
11731280
handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))})
1174-
testutil.Equals(b, http.StatusConflict, r.Code, "%v", i)
1281+
testutil.Equals(b, http.StatusConflict, r.Code, "%v-%s", i, func() string {
1282+
b, _ := ioutil.ReadAll(r.Body)
1283+
return string(b)
1284+
}())
11751285
}
11761286
})
11771287
})
11781288
}
11791289

11801290
runtime.GC()
11811291
// Take snapshot at the end to reveal how much memory we keep in TSDB.
1182-
testutil.Ok(b, Heap("../../"))
1292+
testutil.Ok(b, Heap("../../../_dev/thanos/2021/receive2"))
11831293

11841294
}
11851295

@@ -1188,7 +1298,7 @@ func Heap(dir string) (err error) {
11881298
return err
11891299
}
11901300

1191-
f, err := os.Create(filepath.Join(dir, "mem.pprof"))
1301+
f, err := os.Create(filepath.Join(dir, "impr5-go1.16.3.pprof"))
11921302
if err != nil {
11931303
return err
11941304
}

pkg/receive/hashring.go

+6-21
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@ import (
99
"sort"
1010
"sync"
1111

12-
"github.com/cespare/xxhash"
1312
"github.com/pkg/errors"
13+
"github.com/thanos-io/thanos/pkg/store/labelpb"
1414

1515
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
1616
)
1717

18-
const sep = '\xff'
19-
2018
// insufficientNodesError is returned when a hashring does not
2119
// have enough nodes to satisfy a request for a node.
2220
type insufficientNodesError struct {
@@ -39,23 +37,6 @@ type Hashring interface {
3937
GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error)
4038
}
4139

42-
// hash returns a hash for the given tenant and time series.
43-
func hash(tenant string, ts *prompb.TimeSeries) uint64 {
44-
// Sort labelset to ensure a stable hash.
45-
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })
46-
47-
b := make([]byte, 0, 1024)
48-
b = append(b, []byte(tenant)...)
49-
b = append(b, sep)
50-
for _, v := range ts.Labels {
51-
b = append(b, v.Name...)
52-
b = append(b, sep)
53-
b = append(b, v.Value...)
54-
b = append(b, sep)
55-
}
56-
return xxhash.Sum64(b)
57-
}
58-
5940
// SingleNodeHashring always returns the same node.
6041
type SingleNodeHashring string
6142

@@ -85,7 +66,11 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
8566
if n >= uint64(len(s)) {
8667
return "", &insufficientNodesError{have: uint64(len(s)), want: n + 1}
8768
}
88-
return s[(hash(tenant, ts)+n)%uint64(len(s))], nil
69+
70+
// TODO(bwplotka): This might be not needed, double check.
71+
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })
72+
73+
return s[(labelpb.HashWithPrefix(tenant, ts.Labels)+n)%uint64(len(s))], nil
8974
}
9075

9176
// multiHashring represents a set of hashrings.

pkg/receive/hashring_test.go

-23
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,6 @@ import (
1010
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
1111
)
1212

13-
func TestHash(t *testing.T) {
14-
ts := &prompb.TimeSeries{
15-
Labels: []labelpb.ZLabel{
16-
{
17-
Name: "foo",
18-
Value: "bar",
19-
},
20-
{
21-
Name: "baz",
22-
Value: "qux",
23-
},
24-
},
25-
}
26-
27-
ts2 := &prompb.TimeSeries{
28-
Labels: []labelpb.ZLabel{ts.Labels[1], ts.Labels[0]},
29-
}
30-
31-
if hash("", ts) != hash("", ts2) {
32-
t.Errorf("expected hashes to be independent of label order")
33-
}
34-
}
35-
3613
func TestHashringGet(t *testing.T) {
3714
ts := &prompb.TimeSeries{
3815
Labels: []labelpb.ZLabel{

0 commit comments

Comments
 (0)