Skip to content

Commit dc43fee

Browse files
authored
Prevent read on TSDB once closeAllTSDB function has been called (#4304)
* Prevent read on TSDB once closeAllTSDB function has been called Signed-off-by: ilangofman <[email protected]> * Fix formatting issues Signed-off-by: ilangofman <[email protected]> * Address PR comments and remove unit test no longer required Signed-off-by: ilangofman <[email protected]> * Remove closed bool no longer used Signed-off-by: ilangofman <[email protected]> * Remove error no longer used Signed-off-by: ilangofman <[email protected]> * Remove comment and change return var Signed-off-by: ilangofman <[email protected]> * Update log message to debug Signed-off-by: ilangofman <[email protected]> * Moved log message to separate function Signed-off-by: ilangofman <[email protected]> * change function name for checking if tsdb is closing Signed-off-by: ilangofman <[email protected]> * ingester should read/write only when state is running for block store Signed-off-by: ilangofman <[email protected]> * Move running check to ingester v2 file Signed-off-by: ilangofman <[email protected]> * Remove extra space Signed-off-by: ilangofman <[email protected]> * Remove duplication from push func Signed-off-by: ilangofman <[email protected]>
1 parent 485474c commit dc43fee

File tree

3 files changed

+76
-33
lines changed

3 files changed

+76
-33
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
2929
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
3030
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
31+
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
3132

3233
## 1.10.0-rc.0 / 2021-06-28
3334

pkg/ingester/ingester.go

+39-33
Original file line numberDiff line numberDiff line change
@@ -480,9 +480,19 @@ func (i *Ingester) checkRunningOrStopping() error {
480480
return status.Error(codes.Unavailable, s.String())
481481
}
482482

483+
// Using block store, the ingester is only available when it is in a Running state. The ingester is not available
484+
// when stopping to prevent any read or writes to the TSDB after the ingester has closed them.
485+
func (i *Ingester) checkRunning() error {
486+
s := i.State()
487+
if s == services.Running {
488+
return nil
489+
}
490+
return status.Error(codes.Unavailable, s.String())
491+
}
492+
483493
// Push implements client.IngesterServer
484494
func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
485-
if err := i.checkRunningOrStopping(); err != nil {
495+
if err := i.checkRunning(); err != nil {
486496
return nil, err
487497
}
488498

@@ -762,14 +772,14 @@ func (i *Ingester) purgeUserMetricsMetadata() {
762772

763773
// Query implements service.IngesterServer
764774
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
765-
if err := i.checkRunningOrStopping(); err != nil {
766-
return nil, err
767-
}
768-
769775
if i.cfg.BlocksStorageEnabled {
770776
return i.v2Query(ctx, req)
771777
}
772778

779+
if err := i.checkRunningOrStopping(); err != nil {
780+
return nil, err
781+
}
782+
773783
userID, err := tenant.TenantID(ctx)
774784
if err != nil {
775785
return nil, err
@@ -829,14 +839,14 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
829839

830840
// QueryStream implements service.IngesterServer
831841
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
832-
if err := i.checkRunningOrStopping(); err != nil {
833-
return err
834-
}
835-
836842
if i.cfg.BlocksStorageEnabled {
837843
return i.v2QueryStream(req, stream)
838844
}
839845

846+
if err := i.checkRunningOrStopping(); err != nil {
847+
return err
848+
}
849+
840850
spanLog, ctx := spanlogger.New(stream.Context(), "QueryStream")
841851
defer spanLog.Finish()
842852

@@ -913,10 +923,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
913923

914924
// Query implements service.IngesterServer
915925
func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) {
916-
if err := i.checkRunningOrStopping(); err != nil {
917-
return nil, err
918-
}
919-
920926
if !i.cfg.BlocksStorageEnabled {
921927
return nil, errors.New("not supported")
922928
}
@@ -926,14 +932,14 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
926932

927933
// LabelValues returns all label values that are associated with a given label name.
928934
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
929-
if err := i.checkRunningOrStopping(); err != nil {
930-
return nil, err
931-
}
932-
933935
if i.cfg.BlocksStorageEnabled {
934936
return i.v2LabelValues(ctx, req)
935937
}
936938

939+
if err := i.checkRunningOrStopping(); err != nil {
940+
return nil, err
941+
}
942+
937943
i.userStatesMtx.RLock()
938944
defer i.userStatesMtx.RUnlock()
939945
state, ok, err := i.userStates.getViaContext(ctx)
@@ -951,14 +957,14 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque
951957

952958
// LabelNames return all the label names.
953959
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
954-
if err := i.checkRunningOrStopping(); err != nil {
955-
return nil, err
956-
}
957-
958960
if i.cfg.BlocksStorageEnabled {
959961
return i.v2LabelNames(ctx, req)
960962
}
961963

964+
if err := i.checkRunningOrStopping(); err != nil {
965+
return nil, err
966+
}
967+
962968
i.userStatesMtx.RLock()
963969
defer i.userStatesMtx.RUnlock()
964970
state, ok, err := i.userStates.getViaContext(ctx)
@@ -976,14 +982,14 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest
976982

977983
// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
978984
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
979-
if err := i.checkRunningOrStopping(); err != nil {
980-
return nil, err
981-
}
982-
983985
if i.cfg.BlocksStorageEnabled {
984986
return i.v2MetricsForLabelMatchers(ctx, req)
985987
}
986988

989+
if err := i.checkRunningOrStopping(); err != nil {
990+
return nil, err
991+
}
992+
987993
i.userStatesMtx.RLock()
988994
defer i.userStatesMtx.RUnlock()
989995
state, ok, err := i.userStates.getViaContext(ctx)
@@ -1046,14 +1052,14 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad
10461052

10471053
// UserStats returns ingestion statistics for the current user.
10481054
func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) {
1049-
if err := i.checkRunningOrStopping(); err != nil {
1050-
return nil, err
1051-
}
1052-
10531055
if i.cfg.BlocksStorageEnabled {
10541056
return i.v2UserStats(ctx, req)
10551057
}
10561058

1059+
if err := i.checkRunningOrStopping(); err != nil {
1060+
return nil, err
1061+
}
1062+
10571063
i.userStatesMtx.RLock()
10581064
defer i.userStatesMtx.RUnlock()
10591065
state, ok, err := i.userStates.getViaContext(ctx)
@@ -1075,14 +1081,14 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)
10751081

10761082
// AllUserStats returns ingestion statistics for all users known to this ingester.
10771083
func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
1078-
if err := i.checkRunningOrStopping(); err != nil {
1079-
return nil, err
1080-
}
1081-
10821084
if i.cfg.BlocksStorageEnabled {
10831085
return i.v2AllUserStats(ctx, req)
10841086
}
10851087

1088+
if err := i.checkRunningOrStopping(); err != nil {
1089+
return nil, err
1090+
}
1091+
10861092
i.userStatesMtx.RLock()
10871093
defer i.userStatesMtx.RUnlock()
10881094
users := i.userStates.cp()

pkg/ingester/ingester_v2.go

+36
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,10 @@ func (u *userTSDB) releaseAppendLock() {
980980
}
981981

982982
func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
983+
if err := i.checkRunning(); err != nil {
984+
return nil, err
985+
}
986+
983987
userID, err := tenant.TenantID(ctx)
984988
if err != nil {
985989
return nil, err
@@ -1036,6 +1040,10 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie
10361040
}
10371041

10381042
func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) {
1043+
if err := i.checkRunning(); err != nil {
1044+
return nil, err
1045+
}
1046+
10391047
userID, err := tenant.TenantID(ctx)
10401048
if err != nil {
10411049
return nil, err
@@ -1083,6 +1091,10 @@ func (i *Ingester) v2QueryExemplars(ctx context.Context, req *client.ExemplarQue
10831091
}
10841092

10851093
func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
1094+
if err := i.checkRunning(); err != nil {
1095+
return nil, err
1096+
}
1097+
10861098
labelName, startTimestampMs, endTimestampMs, matchers, err := client.FromLabelValuesRequest(req)
10871099
if err != nil {
10881100
return nil, err
@@ -1120,6 +1132,10 @@ func (i *Ingester) v2LabelValues(ctx context.Context, req *client.LabelValuesReq
11201132
}
11211133

11221134
func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
1135+
if err := i.checkRunning(); err != nil {
1136+
return nil, err
1137+
}
1138+
11231139
userID, err := tenant.TenantID(ctx)
11241140
if err != nil {
11251141
return nil, err
@@ -1152,6 +1168,10 @@ func (i *Ingester) v2LabelNames(ctx context.Context, req *client.LabelNamesReque
11521168
}
11531169

11541170
func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
1171+
if err := i.checkRunning(); err != nil {
1172+
return nil, err
1173+
}
1174+
11551175
userID, err := tenant.TenantID(ctx)
11561176
if err != nil {
11571177
return nil, err
@@ -1219,6 +1239,10 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx context.Context, req *client.Me
12191239
}
12201240

12211241
func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) {
1242+
if err := i.checkRunning(); err != nil {
1243+
return nil, err
1244+
}
1245+
12221246
userID, err := tenant.TenantID(ctx)
12231247
if err != nil {
12241248
return nil, err
@@ -1233,6 +1257,10 @@ func (i *Ingester) v2UserStats(ctx context.Context, req *client.UserStatsRequest
12331257
}
12341258

12351259
func (i *Ingester) v2AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
1260+
if err := i.checkRunning(); err != nil {
1261+
return nil, err
1262+
}
1263+
12361264
i.userStatesMtx.RLock()
12371265
defer i.userStatesMtx.RUnlock()
12381266

@@ -1265,6 +1293,10 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024
12651293

12661294
// v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
12671295
func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
1296+
if err := i.checkRunning(); err != nil {
1297+
return err
1298+
}
1299+
12681300
spanlog, ctx := spanlogger.New(stream.Context(), "v2QueryStream")
12691301
defer spanlog.Finish()
12701302

@@ -1788,6 +1820,10 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
17881820

17891821
// getMemorySeriesMetric returns the total number of in-memory series across all open TSDBs.
17901822
func (i *Ingester) getMemorySeriesMetric() float64 {
1823+
if err := i.checkRunning(); err != nil {
1824+
return 0
1825+
}
1826+
17911827
i.userStatesMtx.RLock()
17921828
defer i.userStatesMtx.RUnlock()
17931829

0 commit comments

Comments
 (0)