Skip to content

Commit b3f37a1

Browse files
committed
receive: Fixed leak on receive and querier proxying Store API Series, which was leaking on errors.
Fixes: #2823 TestTenantSeriesSetServert_NotLeakingIfNotExhausted was showing leaks: ``` TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:132: leaktest: timed out checking goroutines TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]: github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Send(0xc000708360, 0xc0003104c0, 0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:141 +0x13e github.com/thanos-io/thanos/pkg/store.(*mockedStoreServer).Series(0xc0004e6330, 0xc0007083c0, 0x20ac2c0, 0xc000708360, 0x5116a0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:173 +0x76 github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series.func1(0x2097760, 0xc00003c940) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:121 +0x56 github.com/thanos-io/thanos/pkg/tracing.DoInSpan(0x2097760, 0xc00003c940, 0x1c8bace, 0x17, 0xc000173760, 0x0, 0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/tracing/tracing.go:72 +0xcc github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series(0xc000708360, 0x20983e0, 0xc0004e6330, 0xc0007083c0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:120 +0xfa github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2.1(0xc000708360, 0xc0004e6330) /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:225 +0x62 created by github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2 /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:224 +0x618 --- FAIL: TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet (10.03s) FAIL Process finished with exit code 1 ``` TestMultiTSDBStore_NotLeakingOnPrematureFinish was showing: ``` TestMultiTSDBStore_NotLeakingOnSendError: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]: github.com/thanos-io/thanos/pkg/store.ctxRespSender.send(...) /home/bwplotka/Repos/thanos/pkg/store/proxy.go:198 github.com/thanos-io/thanos/pkg/store.(*MultiTSDBStore).Series.func1(0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:214 +0x5cf golang.org/x/sync/errgroup.(*Group).Go.func1(0xc0002708d0, 0xc000416380) /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59 created by golang.org/x/sync/errgroup.(*Group).Go /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66 --- FAIL: TestMultiTSDBStore_NotLeakingOnSendError (10.02s) ``` Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent 9d0bd29 commit b3f37a1

File tree

5 files changed

+218
-11
lines changed

5 files changed

+218
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1616
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
1717
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
1818
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
19+
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive: Fixed a leak on receive Store API Series, which was leaking on errors.
1920

2021
### Changed
2122

pkg/receive/multitsdb.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/thanos-io/thanos/pkg/objstore"
2424
"github.com/thanos-io/thanos/pkg/shipper"
2525
"github.com/thanos-io/thanos/pkg/store"
26+
"github.com/thanos-io/thanos/pkg/store/storepb"
2627
"golang.org/x/sync/errgroup"
2728
)
2829

@@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
211212
return merr.Err()
212213
}
213214

214-
func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
215+
func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {
215216
t.mtx.RLock()
216217
defer t.mtx.RUnlock()
217218

218-
res := make(map[string]*store.TSDBStore, len(t.tenants))
219+
res := make(map[string]storepb.StoreServer, len(t.tenants))
219220
for k, tenant := range t.tenants {
220221
s := tenant.store()
221222
if s != nil {

pkg/store/multitsdb.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ import (
2424
)
2525

2626
// MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances.
27+
// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864
2728
type MultiTSDBStore struct {
2829
logger log.Logger
2930
component component.SourceStoreAPI
30-
tsdbStores func() map[string]*TSDBStore
31+
tsdbStores func() map[string]storepb.StoreServer
3132
}
3233

3334
// NewMultiTSDBStore creates a new MultiTSDBStore.
34-
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore {
35+
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore {
3536
if logger == nil {
3637
logger = log.NewNopLogger()
3738
}
@@ -97,6 +98,8 @@ type tenantSeriesSetServer struct {
9798
tenant string
9899
}
99100

101+
// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
102+
// Details https://github.com/thanos-io/thanos/issues/2864.
100103
func newTenantSeriesSetServer(
101104
ctx context.Context,
102105
tenant string,
@@ -110,11 +113,9 @@ func newTenantSeriesSetServer(
110113
}
111114
}
112115

113-
func (s *tenantSeriesSetServer) Context() context.Context {
114-
return s.ctx
115-
}
116+
func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx }
116117

117-
func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
118+
func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) {
118119
var err error
119120
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
120121
err = store.Series(r, s)
@@ -202,7 +203,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
202203
defer wg.Done()
203204
ss.Series(store, r)
204205
}()
205-
206206
seriesSet = append(seriesSet, ss)
207207
}
208208

pkg/store/multitsdb_test.go

+140-2
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44
package store
55

66
import (
7+
"context"
78
"fmt"
89
"io/ioutil"
910
"math"
1011
"math/rand"
1112
"os"
1213
"path/filepath"
1314
"testing"
15+
"time"
1416

17+
"github.com/fortytw2/leaktest"
1518
"github.com/go-kit/kit/log"
19+
"github.com/prometheus/prometheus/pkg/labels"
1620
"github.com/prometheus/prometheus/tsdb"
1721
"github.com/thanos-io/thanos/pkg/component"
1822
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -21,6 +25,8 @@ import (
2125
)
2226

2327
func TestMultiTSDBSeries(t *testing.T) {
28+
defer leaktest.CheckTimeout(t, 10*time.Second)()
29+
2430
tb := testutil.NewTB(t)
2531
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
2632
if ok := t.Run("headOnly", func(t testutil.TB) {
@@ -116,12 +122,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
116122
dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)}
117123
}
118124

119-
tsdbs := map[string]*TSDBStore{}
125+
tsdbs := map[string]storepb.StoreServer{}
120126
for i, db := range dbs {
121127
tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger}
122128
}
123129

124-
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs })
130+
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })
125131

126132
var expected []storepb.Series
127133
lastLabels := storepb.Series{}
@@ -154,3 +160,135 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
154160
},
155161
)
156162
}
163+
164+
type mockedStoreServer struct {
165+
storepb.StoreServer
166+
167+
responses []*storepb.SeriesResponse
168+
}
169+
170+
func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
171+
for _, r := range m.responses {
172+
if err := server.Send(r); err != nil {
173+
return err
174+
}
175+
}
176+
return nil
177+
}
178+
179+
// Regression test against https://github.com/thanos-io/thanos/issues/2823.
180+
func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) {
181+
t.Run("exhausted StoreSet", func(t *testing.T) {
182+
defer leaktest.CheckTimeout(t, 10*time.Second)()
183+
184+
s := newTenantSeriesSetServer(context.Background(), "a", nil)
185+
186+
resps := []*storepb.SeriesResponse{
187+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
188+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
189+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
190+
}
191+
192+
m := &mockedStoreServer{responses: resps}
193+
194+
go func() {
195+
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
196+
}()
197+
198+
testutil.Ok(t, s.Err())
199+
i := 0
200+
for s.Next() {
201+
l, c := s.At()
202+
203+
testutil.Equals(t, resps[i].GetSeries().Labels, l)
204+
testutil.Equals(t, resps[i].GetSeries().Chunks, c)
205+
206+
i++
207+
}
208+
testutil.Ok(t, s.Err())
209+
testutil.Equals(t, 3, i)
210+
})
211+
212+
t.Run("cancelled, not exhausted StoreSet", func(t *testing.T) {
213+
defer leaktest.CheckTimeout(t, 10*time.Second)()
214+
215+
ctx, cancel := context.WithCancel(context.Background())
216+
s := newTenantSeriesSetServer(ctx, "a", nil)
217+
218+
m := &mockedStoreServer{responses: []*storepb.SeriesResponse{
219+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
220+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
221+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
222+
}}
223+
go func() {
224+
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
225+
}()
226+
227+
testutil.Ok(t, s.Err())
228+
testutil.Equals(t, true, s.Next())
229+
cancel()
230+
})
231+
}
232+
233+
type mockedSeriesServer struct {
234+
storepb.Store_SeriesServer
235+
ctx context.Context
236+
237+
send func(*storepb.SeriesResponse) error
238+
}
239+
240+
func (s *mockedSeriesServer) Send(r *storepb.SeriesResponse) error {
241+
return s.send(r)
242+
}
243+
func (s *mockedSeriesServer) Context() context.Context { return s.ctx }
244+
245+
// Regression test against https://github.com/thanos-io/thanos/issues/2823.
246+
// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted
247+
func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) {
248+
defer leaktest.CheckTimeout(t, 10*time.Second)()
249+
250+
m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer {
251+
return map[string]storepb.StoreServer{
252+
// Ensure more than 10 (internal respCh channel).
253+
"a": &mockedStoreServer{responses: []*storepb.SeriesResponse{
254+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
255+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
256+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
257+
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
258+
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
259+
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
260+
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
261+
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
262+
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
263+
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
264+
}},
265+
"b": &mockedStoreServer{responses: []*storepb.SeriesResponse{
266+
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
267+
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
268+
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
269+
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
270+
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
271+
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
272+
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
273+
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
274+
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
275+
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
276+
}},
277+
}
278+
})
279+
280+
if ok := t.Run("failing send", func(t *testing.T) {
281+
ctx, cancel := context.WithCancel(context.Background())
282+
// We mimic failing series server, but practically context cancel will do the same.
283+
testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
284+
ctx: ctx,
285+
send: func(*storepb.SeriesResponse) error {
286+
cancel()
287+
return ctx.Err()
288+
},
289+
}))
290+
testutil.NotOk(t, ctx.Err())
291+
}); !ok {
292+
return
293+
}
294+
}

pkg/store/proxy_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -1653,3 +1653,70 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
16531653
},
16541654
)
16551655
}
1656+
1657+
func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
1658+
defer leaktest.CheckTimeout(t, 10*time.Second)()
1659+
1660+
clients := []Client{
1661+
&testClient{
1662+
StoreClient: &mockedStoreAPI{
1663+
RespSeries: []*storepb.SeriesResponse{
1664+
// Ensure more than 10 (internal respCh channel).
1665+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1666+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1667+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1668+
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1669+
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1670+
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1671+
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1672+
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1673+
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1674+
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1675+
},
1676+
},
1677+
minTime: math.MinInt64,
1678+
maxTime: math.MaxInt64,
1679+
},
1680+
&testClient{
1681+
StoreClient: &mockedStoreAPI{
1682+
RespSeries: []*storepb.SeriesResponse{
1683+
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1684+
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1685+
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1686+
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1687+
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1688+
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1689+
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1690+
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1691+
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1692+
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
1693+
},
1694+
},
1695+
minTime: math.MinInt64,
1696+
maxTime: math.MaxInt64,
1697+
},
1698+
}
1699+
1700+
logger := log.NewNopLogger()
1701+
p := &ProxyStore{
1702+
logger: logger,
1703+
stores: func() []Client { return clients },
1704+
metrics: newProxyStoreMetrics(nil),
1705+
responseTimeout: 0,
1706+
}
1707+
1708+
if ok := t.Run("failling send", func(t *testing.T) {
1709+
ctx, cancel := context.WithCancel(context.Background())
1710+
// We mimic failing series server, but practically context cancel will do the same.
1711+
testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
1712+
ctx: ctx,
1713+
send: func(*storepb.SeriesResponse) error {
1714+
cancel()
1715+
return ctx.Err()
1716+
},
1717+
}))
1718+
testutil.NotOk(t, ctx.Err())
1719+
}); !ok {
1720+
return
1721+
}
1722+
}

0 commit comments

Comments
 (0)