Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce ingester memory by using TSDB's index #3951

Merged
merged 5 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* `-alertmanager.cluster.peers` instead of `-cluster.peer`
* `-alertmanager.cluster.peer-timeout` instead of `-cluster.peer-timeout`
* [FEATURE] Ruler: added `local` backend support to the ruler storage configuration under the `-ruler-storage.` flag prefix. #3932
* [ENHANCEMENT] Blocks storage: reduce ingester memory by eliminating series reference cache. #3951
* [ENHANCEMENT] Ruler: optimized `<prefix>/api/v1/rules` and `<prefix>/api/v1/alerts` when ruler sharding is enabled. #3916
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
* `cortex_ruler_clients`
Expand Down
33 changes: 24 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/armon/go-metrics v0.3.6
github.com/aws/aws-sdk-go v1.37.8
github.com/aws/aws-sdk-go v1.38.3
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/dustin/go-humanize v1.0.0
Expand Down Expand Up @@ -44,10 +44,10 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/alertmanager v0.21.1-0.20210310093010-0f9cab6991e6
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.18.0
github.com/prometheus/prometheus v1.8.2-0.20210321183757-31a518faab18
github.com/prometheus/common v0.20.0
github.com/prometheus/prometheus v1.8.2-0.20210324152458-c7a62b95cea0
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
Expand All @@ -60,11 +60,11 @@ require (
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0.0.20210225194612-fa82d11a958a
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0.0.20210225194612-fa82d11a958a
go.uber.org/atomic v1.7.0
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
google.golang.org/api v0.39.0
google.golang.org/grpc v1.34.0
golang.org/x/net v0.0.0-20210324051636-2c4c8ecb7826
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/api v0.42.0
google.golang.org/grpc v1.36.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
sigs.k8s.io/yaml v1.2.0
Expand Down Expand Up @@ -101,3 +101,18 @@ replace github.com/go-openapi/strfmt => github.com/go-openapi/strfmt v0.19.5
replace github.com/go-openapi/swag => github.com/go-openapi/swag v0.19.9

replace github.com/go-openapi/validate => github.com/go-openapi/validate v0.19.8

// Pin these, which are updated as dependencies in Prometheus; we will take those updates separately and carefully
replace (
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.37.8
github.com/google/pprof => github.com/google/pprof v0.0.0-20210208152844-1612e9be7af6
github.com/miekg/dns => github.com/miekg/dns v1.1.38
github.com/prometheus/client_golang => github.com/prometheus/client_golang v1.9.0
golang.org/x/crypto => golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9
golang.org/x/net => golang.org/x/net v0.0.0-20210119194325-5f4716e94777
golang.org/x/oauth2 => golang.org/x/oauth2 v0.0.0-20210210192628-66670185b0cd
golang.org/x/sync => golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys => golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
google.golang.org/api => google.golang.org/api v0.39.0
google.golang.org/grpc => google.golang.org/grpc v1.34.0
)
369 changes: 20 additions & 349 deletions go.sum

Large diffs are not rendered by default.

68 changes: 18 additions & 50 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ const (
type userTSDB struct {
db *tsdb.DB
userID string
refCache *cortex_tsdb.RefCache
activeSeries *ActiveSeries
seriesInMetric *metricCounter
limiter *Limiter
Expand Down Expand Up @@ -185,7 +184,8 @@ func (u *userTSDB) compactHead(blockDuration int64) error {

defer u.casState(forceCompacting, active)

// Ingestion of samples in parallel with forced compaction can lead to overlapping blocks.
// Ingestion of samples in parallel with forced compaction can lead to overlapping blocks,
// and possible invalidation of the references returned from Appender.GetRef().
// So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over.
u.pushesInFlight.Wait()

Expand Down Expand Up @@ -383,7 +383,6 @@ type TSDBState struct {
walReplayTime prometheus.Histogram
appenderAddDuration prometheus.Histogram
appenderCommitDuration prometheus.Histogram
refCachePurgeDuration prometheus.Histogram
idleTsdbChecks *prometheus.CounterVec
}

Expand Down Expand Up @@ -435,11 +434,6 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
Help: "The total time it takes for a push request to commit samples appended to TSDB.",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}),
refCachePurgeDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_tsdb_refcache_purge_duration_seconds",
Help: "The total time it takes to purge the TSDB series reference cache for a single tenant.",
Buckets: prometheus.DefBuckets,
}),

idleTsdbChecks: idleTsdbChecks,
}
Expand Down Expand Up @@ -619,11 +613,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod)
defer rateUpdateTicker.Stop()

// We use an hardcoded value for this ticker because there should be no
// real value in customizing it.
refCachePurgeTicker := time.NewTicker(5 * time.Minute)
defer refCachePurgeTicker.Stop()

var activeSeriesTickerChan <-chan time.Time
if i.cfg.ActiveSeriesMetricsEnabled {
t := time.NewTicker(i.cfg.ActiveSeriesMetricsUpdatePeriod)
Expand All @@ -646,17 +635,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
db.ingestedRuleSamples.tick()
}
i.userStatesMtx.RUnlock()
case <-refCachePurgeTicker.C:
for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
continue
}

startTime := time.Now()
userDB.refCache.Purge(startTime.Add(-cortex_tsdb.DefaultRefCacheTTL))
i.TSDBState.refCachePurgeDuration.Observe(time.Since(startTime).Seconds())
}

case <-activeSeriesTickerChan:
i.v2UpdateActiveSeries()
Expand All @@ -683,6 +661,12 @@ func (i *Ingester) v2UpdateActiveSeries() {
}
}

// GetRef() is an extra method added to TSDB to let Cortex check before calling Add()
type extendedAppender interface {
storage.Appender
storage.GetRef
}

// v2Push adds metrics to a block
func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
var firstPartialErr error
Expand Down Expand Up @@ -738,13 +722,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
)

// Walk the samples, appending them to the users database
app := db.Appender(ctx)
app := db.Appender(ctx).(extendedAppender)
for _, ts := range req.Timeseries {
// Check if we already have a cached reference for this series. Be aware
// that even if we have a reference it's not guaranteed to be still valid.
// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
cachedRef, copiedLabels, cachedRefExists := db.refCache.Ref(startAppend, cortexpb.FromLabelAdaptersToLabels(ts.Labels))

// Look up a reference for this series.
ref, copiedLabels := app.GetRef(cortexpb.FromLabelAdaptersToLabels(ts.Labels))

// To find out if any sample was added to this series, we keep old value.
oldSucceededSamplesCount := succeededSamplesCount
Expand All @@ -753,30 +737,18 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
var err error

// If the cached reference exists, we try to use it.
if cachedRefExists {
var ref uint64
if ref, err = app.Append(cachedRef, copiedLabels, s.TimestampMs, s.Value); err == nil {
if ref != 0 {
if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil {
succeededSamplesCount++
// This means the reference changes which means we need to update our cache.
if ref != cachedRef {
db.refCache.SetRef(startAppend, copiedLabels, ref)
}
continue
}

} else {
var ref uint64

// Copy the label set because both TSDB and the cache may retain it.
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)

// Retain the reference in case there are multiple samples for the series.
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
db.refCache.SetRef(startAppend, copiedLabels, ref)

// Set these in case there are multiple samples for the series.
cachedRef = ref
cachedRefExists = true

succeededSamplesCount++
continue
}
Expand Down Expand Up @@ -827,11 +799,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor

if i.cfg.ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
db.activeSeries.UpdateSeries(cortexpb.FromLabelAdaptersToLabels(ts.Labels), startAppend, func(l labels.Labels) labels.Labels {
// If we have already made a copy during this push, no need to create new one.
if copiedLabels != nil {
return copiedLabels
}
return cortexpb.CopyLabels(l)
// we must already have copied the labels if succeededSamplesCount has been incremented.
return copiedLabels
})
}
}
Expand Down Expand Up @@ -1435,7 +1404,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {

userDB := &userTSDB{
userID: userID,
refCache: cortex_tsdb.NewRefCache(),
activeSeries: NewActiveSeries(),
seriesInMetric: newMetricCounter(i.limiter),
ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
Expand Down
Loading