Skip to content

Commit 3ce844b

Browse files
authored
REPLICATOR: Add TimePartitionMetaFilter to thanos replicate. (#2979)
* Add TimePartitionMetaFilter to thanos replicate. This will allow time based replication. Signed-off-by: Kevin Hellemun <[email protected]> * Update docs and changelog. Signed-off-by: Kevin Hellemun <[email protected]> * Fix lint Signed-off-by: Kevin Hellemun <[email protected]> * Add posibility to delete blocks older than max time Signed-off-by: Kevin Hellemun <[email protected]> * Fix metric naming. Signed-off-by: Kevin Hellemun <[email protected]> * Fix order of arguments for max and min time. Signed-off-by: Kevin Hellemun <[email protected]> * Remove deletion of blocks from replicator. Signed-off-by: Kevin Hellemun <[email protected]> * Fix import formatting Signed-off-by: Kevin Hellemun <[email protected]> * Revert unneeded changes due to replicator no longer deleting blocs Signed-off-by: Kevin Hellemun <[email protected]> * Fix typo Signed-off-by: Kevin Hellemun <[email protected]>
1 parent 5abda52 commit 3ce844b

File tree

6 files changed

+38
-11
lines changed

6 files changed

+38
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
2222
- [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility.
2323
- [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range.
2424
- [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`.
25+
- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-time
2526

2627
### Fixed
2728
- [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.

cmd/thanos/tools_bucket.go

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/thanos-io/thanos/pkg/extprom"
3737
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
3838
"github.com/thanos-io/thanos/pkg/logging"
39+
"github.com/thanos-io/thanos/pkg/model"
3940
"github.com/thanos-io/thanos/pkg/objstore"
4041
"github.com/thanos-io/thanos/pkg/objstore/client"
4142
"github.com/thanos-io/thanos/pkg/prober"
@@ -441,6 +442,10 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
441442
compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints()
442443
matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings()
443444
singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool()
445+
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
446+
Default("0000-01-01T00:00:00Z"))
447+
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
448+
Default("9999-12-31T23:59:59Z"))
444449

445450
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
446451
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
@@ -466,6 +471,8 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
466471
objStoreConfig,
467472
toObjStoreConfig,
468473
*singleRun,
474+
minTime,
475+
maxTime,
469476
)
470477
})
471478
}

docs/components/tools.md

+16
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,22 @@ Flags:
472472
--matcher=key="value" ... Only blocks whose external labels exactly match
473473
this matcher will be replicated.
474474
--single-run Run replication only one time, then exit.
475+
--min-time=0000-01-01T00:00:00Z
476+
Start of time range limit to replicate. Thanos
477+
Replicate will replicate only metrics, which
478+
happened later than this value. Option can be a
479+
constant time in RFC3339 format or time
480+
duration relative to current time, such as -1d
481+
or 2h45m. Valid duration units are ms, s, m, h,
482+
d, w, y.
483+
--max-time=9999-12-31T23:59:59Z
484+
End of time range limit to replicate. Thanos
485+
Replicate will replicate only metrics, which
486+
happened earlier than this value. Option can be
487+
a constant time in RFC3339 format or time
488+
duration relative to current time, such as -1d
489+
or 2h45m. Valid duration units are ms, s, m, h,
490+
d, w, y.
475491
476492
```
477493

pkg/replicate/replicator.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"strings"
1111
"time"
1212

13+
thanosmodel "github.com/thanos-io/thanos/pkg/model"
14+
1315
"github.com/go-kit/kit/log"
1416
"github.com/go-kit/kit/log/level"
1517
"github.com/oklog/run"
@@ -80,6 +82,7 @@ func RunReplicate(
8082
fromObjStoreConfig *extflag.PathOrContent,
8183
toObjStoreConfig *extflag.PathOrContent,
8284
singleRun bool,
85+
minTime, maxTime *thanosmodel.TimeOrDurationValue,
8386
) error {
8487
logger = log.With(logger, "component", "replicate")
8588

@@ -161,7 +164,15 @@ func RunReplicate(
161164
replicationRunDuration.WithLabelValues(labelSuccess)
162165
replicationRunDuration.WithLabelValues(labelError)
163166

164-
fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil, nil)
167+
fetcher, err := thanosblock.NewMetaFetcher(
168+
logger,
169+
32,
170+
fromBkt,
171+
"",
172+
reg,
173+
[]thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)},
174+
nil,
175+
)
165176
if err != nil {
166177
return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt)
167178
}

pkg/replicate/scheme.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
241241
// If the origin meta file content and target meta file content is
242242
// equal, we know we have already successfully replicated
243243
// previously.
244-
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String())
244+
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID)
245245
rs.metrics.blocksAlreadyReplicated.Inc()
246246

247247
return nil

pkg/replicate/scheme_test.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -315,15 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) {
315315
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
316316
testutil.Ok(t, err)
317317

318-
r := newReplicationScheme(
319-
logger,
320-
newReplicationMetrics(nil),
321-
filter,
322-
fetcher,
323-
objstore.WithNoopInstr(originBucket),
324-
targetBucket,
325-
nil,
326-
)
318+
r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)
327319

328320
err = r.execute(ctx)
329321
testutil.Ok(t, err)

0 commit comments

Comments
 (0)