@@ -101,7 +101,6 @@ const (
101
101
type userTSDB struct {
102
102
db * tsdb.DB
103
103
userID string
104
- refCache * cortex_tsdb.RefCache
105
104
activeSeries * ActiveSeries
106
105
seriesInMetric * metricCounter
107
106
limiter * Limiter
@@ -185,7 +184,8 @@ func (u *userTSDB) compactHead(blockDuration int64) error {
185
184
186
185
defer u .casState (forceCompacting , active )
187
186
188
- // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks.
187
+ // Ingestion of samples in parallel with forced compaction can lead to overlapping blocks,
188
+ // and possible invalidation of the references returned from Appender.GetRef().
189
189
// So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over.
190
190
u .pushesInFlight .Wait ()
191
191
@@ -383,7 +383,6 @@ type TSDBState struct {
383
383
walReplayTime prometheus.Histogram
384
384
appenderAddDuration prometheus.Histogram
385
385
appenderCommitDuration prometheus.Histogram
386
- refCachePurgeDuration prometheus.Histogram
387
386
idleTsdbChecks * prometheus.CounterVec
388
387
}
389
388
@@ -435,11 +434,6 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
435
434
Help : "The total time it takes for a push request to commit samples appended to TSDB." ,
436
435
Buckets : []float64 {.001 , .005 , .01 , .025 , .05 , .1 , .25 , .5 , 1 , 2.5 , 5 , 10 },
437
436
}),
438
- refCachePurgeDuration : promauto .With (registerer ).NewHistogram (prometheus.HistogramOpts {
439
- Name : "cortex_ingester_tsdb_refcache_purge_duration_seconds" ,
440
- Help : "The total time it takes to purge the TSDB series reference cache for a single tenant." ,
441
- Buckets : prometheus .DefBuckets ,
442
- }),
443
437
444
438
idleTsdbChecks : idleTsdbChecks ,
445
439
}
@@ -619,11 +613,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
619
613
rateUpdateTicker := time .NewTicker (i .cfg .RateUpdatePeriod )
620
614
defer rateUpdateTicker .Stop ()
621
615
622
- // We use an hardcoded value for this ticker because there should be no
623
- // real value in customizing it.
624
- refCachePurgeTicker := time .NewTicker (5 * time .Minute )
625
- defer refCachePurgeTicker .Stop ()
626
-
627
616
var activeSeriesTickerChan <- chan time.Time
628
617
if i .cfg .ActiveSeriesMetricsEnabled {
629
618
t := time .NewTicker (i .cfg .ActiveSeriesMetricsUpdatePeriod )
@@ -646,17 +635,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
646
635
db .ingestedRuleSamples .tick ()
647
636
}
648
637
i .userStatesMtx .RUnlock ()
649
- case <- refCachePurgeTicker .C :
650
- for _ , userID := range i .getTSDBUsers () {
651
- userDB := i .getTSDB (userID )
652
- if userDB == nil {
653
- continue
654
- }
655
-
656
- startTime := time .Now ()
657
- userDB .refCache .Purge (startTime .Add (- cortex_tsdb .DefaultRefCacheTTL ))
658
- i .TSDBState .refCachePurgeDuration .Observe (time .Since (startTime ).Seconds ())
659
- }
660
638
661
639
case <- activeSeriesTickerChan :
662
640
i .v2UpdateActiveSeries ()
@@ -683,6 +661,12 @@ func (i *Ingester) v2UpdateActiveSeries() {
683
661
}
684
662
}
685
663
664
+ // GetRef() is an extra method added to TSDB to let Cortex check before calling Add()
665
+ type extendedAppender interface {
666
+ storage.Appender
667
+ storage.GetRef
668
+ }
669
+
686
670
// v2Push adds metrics to a block
687
671
func (i * Ingester ) v2Push (ctx context.Context , req * cortexpb.WriteRequest ) (* cortexpb.WriteResponse , error ) {
688
672
var firstPartialErr error
@@ -738,13 +722,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
738
722
)
739
723
740
724
// Walk the samples, appending them to the users database
741
- app := db .Appender (ctx )
725
+ app := db .Appender (ctx ).( extendedAppender )
742
726
for _ , ts := range req .Timeseries {
743
- // Check if we already have a cached reference for this series. Be aware
744
- // that even if we have a reference it's not guaranteed to be still valid.
745
727
// The labels must be sorted (in our case, it's guaranteed a write request
746
728
// has sorted labels once hit the ingester).
747
- cachedRef , copiedLabels , cachedRefExists := db .refCache .Ref (startAppend , cortexpb .FromLabelAdaptersToLabels (ts .Labels ))
729
+
730
+ // Look up a reference for this series.
731
+ ref , copiedLabels := app .GetRef (cortexpb .FromLabelAdaptersToLabels (ts .Labels ))
748
732
749
733
// To find out if any sample was added to this series, we keep old value.
750
734
oldSucceededSamplesCount := succeededSamplesCount
@@ -753,30 +737,18 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
753
737
var err error
754
738
755
739
// If the cached reference exists, we try to use it.
756
- if cachedRefExists {
757
- var ref uint64
758
- if ref , err = app .Append (cachedRef , copiedLabels , s .TimestampMs , s .Value ); err == nil {
740
+ if ref != 0 {
741
+ if _ , err = app .Append (ref , copiedLabels , s .TimestampMs , s .Value ); err == nil {
759
742
succeededSamplesCount ++
760
- // This means the reference changes which means we need to update our cache.
761
- if ref != cachedRef {
762
- db .refCache .SetRef (startAppend , copiedLabels , ref )
763
- }
764
743
continue
765
744
}
766
745
767
746
} else {
768
- var ref uint64
769
-
770
- // Copy the label set because both TSDB and the cache may retain it.
747
+ // Copy the label set because both TSDB and the active series tracker may retain it.
771
748
copiedLabels = cortexpb .FromLabelAdaptersToLabelsWithCopy (ts .Labels )
772
749
750
+ // Retain the reference in case there are multiple samples for the series.
773
751
if ref , err = app .Append (0 , copiedLabels , s .TimestampMs , s .Value ); err == nil {
774
- db .refCache .SetRef (startAppend , copiedLabels , ref )
775
-
776
- // Set these in case there are multiple samples for the series.
777
- cachedRef = ref
778
- cachedRefExists = true
779
-
780
752
succeededSamplesCount ++
781
753
continue
782
754
}
@@ -827,11 +799,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
827
799
828
800
if i .cfg .ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
829
801
db .activeSeries .UpdateSeries (cortexpb .FromLabelAdaptersToLabels (ts .Labels ), startAppend , func (l labels.Labels ) labels.Labels {
830
- // If we have already made a copy during this push, no need to create new one.
831
- if copiedLabels != nil {
832
- return copiedLabels
833
- }
834
- return cortexpb .CopyLabels (l )
802
+ // we must already have copied the labels if succeededSamplesCount has been incremented.
803
+ return copiedLabels
835
804
})
836
805
}
837
806
}
@@ -1435,7 +1404,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
1435
1404
1436
1405
userDB := & userTSDB {
1437
1406
userID : userID ,
1438
- refCache : cortex_tsdb .NewRefCache (),
1439
1407
activeSeries : NewActiveSeries (),
1440
1408
seriesInMetric : newMetricCounter (i .limiter ),
1441
1409
ingestedAPISamples : newEWMARate (0.2 , i .cfg .RateUpdatePeriod ),
0 commit comments