Skip to content

Commit 4de0d40

Browse files
alanprotpracucci
andauthored
Fix race condition when opening and closing tsdb concurrently (#3959)
* Fix race condition when opening and closing tsdb concurrently Signed-off-by: Alan Protasio <[email protected]> * Changelog Signed-off-by: Alan Protasio <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent ccdbe91 commit 4de0d40

File tree

3 files changed

+62
-3
lines changed

3 files changed

+62
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
1919
* [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969 #3971 #3973
2020
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
21+
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
2122
* [BUGFIX] Querier: streamline tracing spans. #3924
2223

2324
## 1.8.0 in progress

pkg/ingester/ingester_v2.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -1924,9 +1924,16 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
19241924
// This will prevent going back to "active" state in deferred statement.
19251925
userDB.casState(closing, closed)
19261926

1927-
i.userStatesMtx.Lock()
1928-
delete(i.TSDBState.dbs, userID)
1929-
i.userStatesMtx.Unlock()
1927+
// Only remove user from TSDBState when everything is cleaned up
1928+
// This will prevent concurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant
1929+
// came in - while closing the tsdb for the same tenant.
1930+
// If this happens now, the request will get reject as the push will not be able to acquire the lock as the tsdb will be
1931+
// in closed state
1932+
defer func() {
1933+
i.userStatesMtx.Lock()
1934+
delete(i.TSDBState.dbs, userID)
1935+
i.userStatesMtx.Unlock()
1936+
}()
19301937

19311938
i.metrics.memUsers.Dec()
19321939
i.TSDBState.tsdbMetrics.removeRegistryForUser(userID)

pkg/ingester/ingester_v2_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -2190,6 +2190,57 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP
21902190
assert.Equal(t, tsdbNotActive, i.closeAndDeleteUserTSDBIfIdle(userID))
21912191
}
21922192

2193+
func TestIngester_closingAndOpeningTsdbConcurrently(t *testing.T) {
2194+
ctx := context.Background()
2195+
cfg := defaultIngesterTestConfig()
2196+
cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast.
2197+
2198+
// Create ingester
2199+
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
2200+
require.NoError(t, err)
2201+
2202+
require.NoError(t, services.StartAndAwaitRunning(ctx, i))
2203+
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck
2204+
2205+
// Wait until it's ACTIVE
2206+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
2207+
return i.lifecycler.GetState()
2208+
})
2209+
2210+
_, err = i.getOrCreateTSDB(userID, false)
2211+
require.NoError(t, err)
2212+
2213+
iterations := 5000
2214+
chanErr := make(chan error, 1)
2215+
quit := make(chan bool)
2216+
2217+
go func() {
2218+
for {
2219+
select {
2220+
case <-quit:
2221+
return
2222+
default:
2223+
_, err = i.getOrCreateTSDB(userID, false)
2224+
if err != nil {
2225+
chanErr <- err
2226+
}
2227+
}
2228+
}
2229+
}()
2230+
2231+
for k := 0; k < iterations; k++ {
2232+
i.closeAndDeleteUserTSDBIfIdle(userID)
2233+
}
2234+
2235+
select {
2236+
case err := <-chanErr:
2237+
assert.Fail(t, err.Error())
2238+
quit <- true
2239+
default:
2240+
quit <- true
2241+
}
2242+
}
2243+
21932244
func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
21942245
ctx := context.Background()
21952246
cfg := defaultIngesterTestConfig()

0 commit comments

Comments
 (0)