Skip to content

Commit cf5c552

Browse files
committed
fix replicate duplicate metrics
Signed-off-by: yeya24 <[email protected]>
1 parent f6921dc commit cf5c552

File tree

4 files changed

+25
-11
lines changed

4 files changed

+25
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1515

1616
- [#2238](https://github.com/thanos-io/thanos/pull/2238) Ruler: Fixed Issue #2204 bug in alert queue signalling filled up queue and alerts were dropped
1717
- [#2231](https://github.com/thanos-io/thanos/pull/2231) Bucket Web - Sort chunks by thanos.downsample.resolution for better grouping
18+
- [#2254](https://github.com/thanos-io/thanos/pull/2254) Bucket: Fix metrics registered multiple times in bucket replicate
1819

1920
### Added
2021

pkg/replicate/replicater.go renamed to pkg/replicate/replicator.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/client_golang/prometheus/promauto"
2121
"github.com/prometheus/common/model"
2222
"github.com/prometheus/prometheus/pkg/labels"
23+
thanosblock "github.com/thanos-io/thanos/pkg/block"
2324
"github.com/thanos-io/thanos/pkg/compact"
2425
"github.com/thanos-io/thanos/pkg/component"
2526
"github.com/thanos-io/thanos/pkg/extflag"
@@ -149,6 +150,11 @@ func RunReplicate(
149150
Help: "The Duration of replication runs split by success and error.",
150151
}, []string{"result"})
151152

153+
fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg)
154+
if err != nil {
155+
return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt)
156+
}
157+
152158
blockFilter := NewBlockFilter(
153159
logger,
154160
labelSelector,
@@ -170,7 +176,7 @@ func RunReplicate(
170176
logger := log.With(logger, "replication-run-id", ulid.String())
171177
level.Info(logger).Log("msg", "running replication attempt")
172178

173-
if err := newReplicationScheme(logger, metrics, blockFilter, fromBkt, toBkt, reg).execute(ctx); err != nil {
179+
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
174180
return errors.Wrap(err, "replication execute")
175181
}
176182

pkg/replicate/scheme.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ type replicationScheme struct {
105105
toBkt objstore.Bucket
106106

107107
blockFilter blockFilterFunc
108+
fetcher thanosblock.MetadataFetcher
108109

109110
logger log.Logger
110111
metrics *replicationMetrics
@@ -152,14 +153,23 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
152153
return m
153154
}
154155

155-
func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, from objstore.BucketReader, to objstore.Bucket, reg prometheus.Registerer) *replicationScheme {
156+
func newReplicationScheme(
157+
logger log.Logger,
158+
metrics *replicationMetrics,
159+
blockFilter blockFilterFunc,
160+
fetcher thanosblock.MetadataFetcher,
161+
from objstore.BucketReader,
162+
to objstore.Bucket,
163+
reg prometheus.Registerer,
164+
) *replicationScheme {
156165
if logger == nil {
157166
logger = log.NewNopLogger()
158167
}
159168

160169
return &replicationScheme{
161170
logger: logger,
162171
blockFilter: blockFilter,
172+
fetcher: fetcher,
163173
fromBkt: from,
164174
toBkt: to,
165175
metrics: metrics,
@@ -350,12 +360,7 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN
350360
// partial, this is just a temporary failure, as the block is still being
351361
// uploaded to the origin bucket.
352362
func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) {
353-
fetcher, err := thanosblock.NewMetaFetcher(rs.logger, 32, rs.fromBkt, "", rs.reg)
354-
if err != nil {
355-
return nil, false, errors.Wrapf(err, "create meta fetcher with buecket %v", rs.fromBkt)
356-
}
357-
358-
metas, _, err := fetcher.Fetch(ctx)
363+
metas, _, err := rs.fetcher.Fetch(ctx)
359364
if err != nil {
360365
switch errors.Cause(err) {
361366
default:

pkg/replicate/scheme_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/oklog/ulid"
2020
"github.com/prometheus/prometheus/pkg/labels"
2121
"github.com/prometheus/prometheus/tsdb"
22+
"github.com/thanos-io/thanos/pkg/block"
2223
"github.com/thanos-io/thanos/pkg/block/metadata"
2324
"github.com/thanos-io/thanos/pkg/compact"
2425
"github.com/thanos-io/thanos/pkg/objstore"
@@ -302,9 +303,7 @@ func TestReplicationSchemeAll(t *testing.T) {
302303
c.prepare(ctx, t, originBucket, targetBucket)
303304

304305
matcher, err := labels.NewMatcher(labels.MatchEqual, "test-labelname", "test-labelvalue")
305-
if err != nil {
306-
t.Fatal("Failed to create a matcher.")
307-
}
306+
testutil.Ok(t, err)
308307

309308
selector := labels.Selector{
310309
matcher,
@@ -314,11 +313,14 @@ func TestReplicationSchemeAll(t *testing.T) {
314313
}
315314

316315
filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter
316+
fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil)
317+
testutil.Ok(t, err)
317318

318319
r := newReplicationScheme(
319320
logger,
320321
newReplicationMetrics(nil),
321322
filter,
323+
fetcher,
322324
originBucket,
323325
targetBucket,
324326
nil,

0 commit comments

Comments
 (0)