@@ -101,7 +101,6 @@ const (
101
101
type userTSDB struct {
102
102
db * tsdb.DB
103
103
userID string
104
- refCache * cortex_tsdb.RefCache
105
104
activeSeries * ActiveSeries
106
105
seriesInMetric * metricCounter
107
106
limiter * Limiter
@@ -185,7 +184,8 @@ func (u *userTSDB) compactHead(blockDuration int64) error {
185
184
186
185
defer u .casState (forceCompacting , active )
187
186
188
- // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks.
187
+ // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks,
188
+ // and possible invalidation of the references returned from Appender.GetRef().
189
189
// So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over.
190
190
u .pushesInFlight .Wait ()
191
191
@@ -383,7 +383,6 @@ type TSDBState struct {
383
383
walReplayTime prometheus.Histogram
384
384
appenderAddDuration prometheus.Histogram
385
385
appenderCommitDuration prometheus.Histogram
386
- refCachePurgeDuration prometheus.Histogram
387
386
idleTsdbChecks * prometheus.CounterVec
388
387
}
389
388
@@ -435,11 +434,6 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
435
434
Help : "The total time it takes for a push request to commit samples appended to TSDB." ,
436
435
Buckets : []float64 {.001 , .005 , .01 , .025 , .05 , .1 , .25 , .5 , 1 , 2.5 , 5 , 10 },
437
436
}),
438
- refCachePurgeDuration : promauto .With (registerer ).NewHistogram (prometheus.HistogramOpts {
439
- Name : "cortex_ingester_tsdb_refcache_purge_duration_seconds" ,
440
- Help : "The total time it takes to purge the TSDB series reference cache for a single tenant." ,
441
- Buckets : prometheus .DefBuckets ,
442
- }),
443
437
444
438
idleTsdbChecks : idleTsdbChecks ,
445
439
}
@@ -619,11 +613,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
619
613
rateUpdateTicker := time .NewTicker (i .cfg .RateUpdatePeriod )
620
614
defer rateUpdateTicker .Stop ()
621
615
622
- // We use an hardcoded value for this ticker because there should be no
623
- // real value in customizing it.
624
- refCachePurgeTicker := time .NewTicker (5 * time .Minute )
625
- defer refCachePurgeTicker .Stop ()
626
-
627
616
var activeSeriesTickerChan <- chan time.Time
628
617
if i .cfg .ActiveSeriesMetricsEnabled {
629
618
t := time .NewTicker (i .cfg .ActiveSeriesMetricsUpdatePeriod )
@@ -646,17 +635,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
646
635
db .ingestedRuleSamples .tick ()
647
636
}
648
637
i .userStatesMtx .RUnlock ()
649
- case <- refCachePurgeTicker .C :
650
- for _ , userID := range i .getTSDBUsers () {
651
- userDB := i .getTSDB (userID )
652
- if userDB == nil {
653
- continue
654
- }
655
-
656
- startTime := time .Now ()
657
- userDB .refCache .Purge (startTime .Add (- cortex_tsdb .DefaultRefCacheTTL ))
658
- i .TSDBState .refCachePurgeDuration .Observe (time .Since (startTime ).Seconds ())
659
- }
660
638
661
639
case <- activeSeriesTickerChan :
662
640
i .v2UpdateActiveSeries ()
@@ -683,6 +661,12 @@ func (i *Ingester) v2UpdateActiveSeries() {
683
661
}
684
662
}
685
663
664
+ // GetRef() is an extra method added to TSDB to let Cortex check before calling Add()
665
+ type extendedAppender interface {
666
+ storage.Appender
667
+ storage.GetRef
668
+ }
669
+
686
670
// v2Push adds metrics to a block
687
671
func (i * Ingester ) v2Push (ctx context.Context , req * cortexpb.WriteRequest ) (* cortexpb.WriteResponse , error ) {
688
672
var firstPartialErr error
@@ -738,13 +722,17 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
738
722
)
739
723
740
724
// Walk the samples, appending them to the users database
741
- app := db .Appender (ctx )
725
+ app := db .Appender (ctx ).( extendedAppender )
742
726
for _ , ts := range req .Timeseries {
743
- // Check if we already have a cached reference for this series. Be aware
744
- // that even if we have a reference it's not guaranteed to be still valid.
727
+ // Keeps a reference to labels copy, if it was needed. This is to avoid making a copy twice,
728
+ // once for TSDB, and second time for activeSeries map.
729
+ var copiedLabels []labels.Label
730
+
745
731
// The labels must be sorted (in our case, it's guaranteed a write request
746
732
// has sorted labels once hit the ingester).
747
- cachedRef , copiedLabels , cachedRefExists := db .refCache .Ref (startAppend , cortexpb .FromLabelAdaptersToLabels (ts .Labels ))
733
+
734
+ // Look up a reference for this series. Holding the appendLock ensures that no compaction will happen while we use it.
735
+ ref := app .GetRef (cortexpb .FromLabelAdaptersToLabels (ts .Labels ))
748
736
749
737
// To find out if any sample was added to this series, we keep old value.
750
738
oldSucceededSamplesCount := succeededSamplesCount
@@ -753,30 +741,19 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
753
741
var err error
754
742
755
743
// If the cached reference exists, we try to use it.
756
- if cachedRefExists {
757
- var ref uint64
758
- if ref , err = app .Append (cachedRef , copiedLabels , s .TimestampMs , s .Value ); err == nil {
744
+ if ref != 0 {
745
+ labels := cortexpb . FromLabelAdaptersToLabels ( ts . Labels )
746
+ if _ , err = app .Append (ref , labels , s .TimestampMs , s .Value ); err == nil {
759
747
succeededSamplesCount ++
760
- // This means the reference changes which means we need to update our cache.
761
- if ref != cachedRef {
762
- db .refCache .SetRef (startAppend , copiedLabels , ref )
763
- }
764
748
continue
765
749
}
766
750
767
751
} else {
768
- var ref uint64
769
-
770
752
// Copy the label set because both TSDB and the cache may retain it.
771
753
copiedLabels = cortexpb .FromLabelAdaptersToLabelsWithCopy (ts .Labels )
772
754
755
+ // Retain the reference in case there are multiple samples for the series.
773
756
if ref , err = app .Append (0 , copiedLabels , s .TimestampMs , s .Value ); err == nil {
774
- db .refCache .SetRef (startAppend , copiedLabels , ref )
775
-
776
- // Set these in case there are multiple samples for the series.
777
- cachedRef = ref
778
- cachedRefExists = true
779
-
780
757
succeededSamplesCount ++
781
758
continue
782
759
}
@@ -812,6 +789,9 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
812
789
case errMaxSeriesPerMetricLimitExceeded :
813
790
perMetricSeriesLimitCount ++
814
791
updateFirstPartial (func () error {
792
+ if copiedLabels == nil {
793
+ copiedLabels = cortexpb .FromLabelAdaptersToLabelsWithCopy (ts .Labels )
794
+ }
815
795
return makeMetricLimitError (perMetricSeriesLimit , copiedLabels , i .limiter .FormatError (userID , cause ))
816
796
})
817
797
continue
@@ -1435,7 +1415,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
1435
1415
1436
1416
userDB := & userTSDB {
1437
1417
userID : userID ,
1438
- refCache : cortex_tsdb .NewRefCache (),
1439
1418
activeSeries : NewActiveSeries (),
1440
1419
seriesInMetric : newMetricCounter (i .limiter ),
1441
1420
ingestedAPISamples : newEWMARate (0.2 , i .cfg .RateUpdatePeriod ),
0 commit comments