Skip to content

Commit 7a4de55

Browse files
committed
receive: Fixed leak on receive Store API Series, which was leaking on errors.
Fixes: #2823 TestTenantSeriesSetServert_NotLeakingIfNotExhausted was showing leaks: ``` TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:132: leaktest: timed out checking goroutines TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]: github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Send(0xc000708360, 0xc0003104c0, 0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:141 +0x13e github.com/thanos-io/thanos/pkg/store.(*mockedStoreServer).Series(0xc0004e6330, 0xc0007083c0, 0x20ac2c0, 0xc000708360, 0x5116a0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:173 +0x76 github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series.func1(0x2097760, 0xc00003c940) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:121 +0x56 github.com/thanos-io/thanos/pkg/tracing.DoInSpan(0x2097760, 0xc00003c940, 0x1c8bace, 0x17, 0xc000173760, 0x0, 0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/tracing/tracing.go:72 +0xcc github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series(0xc000708360, 0x20983e0, 0xc0004e6330, 0xc0007083c0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:120 +0xfa github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2.1(0xc000708360, 0xc0004e6330) /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:225 +0x62 created by github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2 /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:224 +0x618 --- FAIL: TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet (10.03s) FAIL Process finished with exit code 1 ``` TestMultiTSDBStore_NotLeakingOnSendError was showing: ``` TestMultiTSDBStore_NotLeakingOnSendError: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]: github.com/thanos-io/thanos/pkg/store.ctxRespSender.send(...) /home/bwplotka/Repos/thanos/pkg/store/proxy.go:198 github.com/thanos-io/thanos/pkg/store.(*MultiTSDBStore).Series.func1(0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:214 +0x5cf golang.org/x/sync/errgroup.(*Group).Go.func1(0xc0002708d0, 0xc000416380) /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59 created by golang.org/x/sync/errgroup.(*Group).Go /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66 --- FAIL: TestMultiTSDBStore_NotLeakingOnSendError (10.02s) ``` Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent 9d0bd29 commit 7a4de55

File tree

4 files changed

+133
-11
lines changed

4 files changed

+133
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1616
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
1717
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
1818
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
19+
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive: Fixed a leak on receive Store API Series, which was leaking on errors.
1920

2021
### Changed
2122

pkg/receive/multitsdb.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/thanos-io/thanos/pkg/objstore"
2424
"github.com/thanos-io/thanos/pkg/shipper"
2525
"github.com/thanos-io/thanos/pkg/store"
26+
"github.com/thanos-io/thanos/pkg/store/storepb"
2627
"golang.org/x/sync/errgroup"
2728
)
2829

@@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
211212
return merr.Err()
212213
}
213214

214-
func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
215+
func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {
215216
t.mtx.RLock()
216217
defer t.mtx.RUnlock()
217218

218-
res := make(map[string]*store.TSDBStore, len(t.tenants))
219+
res := make(map[string]storepb.StoreServer, len(t.tenants))
219220
for k, tenant := range t.tenants {
220221
s := tenant.store()
221222
if s != nil {

pkg/store/multitsdb.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ import (
2424
)
2525

2626
// MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances.
27+
// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864
2728
type MultiTSDBStore struct {
2829
logger log.Logger
2930
component component.SourceStoreAPI
30-
tsdbStores func() map[string]*TSDBStore
31+
tsdbStores func() map[string]storepb.StoreServer
3132
}
3233

3334
// NewMultiTSDBStore creates a new MultiTSDBStore.
34-
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore {
35+
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore {
3536
if logger == nil {
3637
logger = log.NewNopLogger()
3738
}
@@ -97,6 +98,8 @@ type tenantSeriesSetServer struct {
9798
tenant string
9899
}
99100

101+
// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
102+
// Details https://github.com/thanos-io/thanos/issues/2864.
100103
func newTenantSeriesSetServer(
101104
ctx context.Context,
102105
tenant string,
@@ -110,11 +113,9 @@ func newTenantSeriesSetServer(
110113
}
111114
}
112115

113-
func (s *tenantSeriesSetServer) Context() context.Context {
114-
return s.ctx
115-
}
116+
func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx }
116117

117-
func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
118+
func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) {
118119
var err error
119120
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
120121
err = store.Series(r, s)
@@ -202,7 +203,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
202203
defer wg.Done()
203204
ss.Series(store, r)
204205
}()
205-
206206
seriesSet = append(seriesSet, ss)
207207
}
208208

pkg/store/multitsdb_test.go

+122-2
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@
44
package store
55

66
import (
7+
"context"
78
"fmt"
89
"io/ioutil"
910
"math"
1011
"math/rand"
1112
"os"
1213
"path/filepath"
1314
"testing"
15+
"time"
1416

17+
"github.com/fortytw2/leaktest"
1518
"github.com/go-kit/kit/log"
19+
"github.com/pkg/errors"
20+
"github.com/prometheus/prometheus/pkg/labels"
1621
"github.com/prometheus/prometheus/tsdb"
1722
"github.com/thanos-io/thanos/pkg/component"
1823
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -21,6 +26,8 @@ import (
2126
)
2227

2328
func TestMultiTSDBSeries(t *testing.T) {
29+
defer leaktest.CheckTimeout(t, 10*time.Second)()
30+
2431
tb := testutil.NewTB(t)
2532
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
2633
if ok := t.Run("headOnly", func(t testutil.TB) {
@@ -116,12 +123,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
116123
dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)}
117124
}
118125

119-
tsdbs := map[string]*TSDBStore{}
126+
tsdbs := map[string]storepb.StoreServer{}
120127
for i, db := range dbs {
121128
tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger}
122129
}
123130

124-
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs })
131+
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })
125132

126133
var expected []storepb.Series
127134
lastLabels := storepb.Series{}
@@ -154,3 +161,116 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
154161
},
155162
)
156163
}
164+
165+
type mockedStoreServer struct {
166+
storepb.StoreServer
167+
168+
responses []*storepb.SeriesResponse
169+
}
170+
171+
func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
172+
for _, r := range m.responses {
173+
if err := server.Send(r); err != nil {
174+
return err
175+
}
176+
}
177+
return nil
178+
}
179+
180+
// Regression test against https://github.com/thanos-io/thanos/issues/2823.
181+
func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) {
182+
t.Run("exhausted StoreSet", func(t *testing.T) {
183+
defer leaktest.CheckTimeout(t, 10*time.Second)()
184+
185+
s := newTenantSeriesSetServer(context.Background(), "a", nil)
186+
187+
resps := []*storepb.SeriesResponse{
188+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
189+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
190+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
191+
}
192+
193+
m := &mockedStoreServer{responses: resps}
194+
195+
go func() {
196+
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
197+
}()
198+
199+
testutil.Ok(t, s.Err())
200+
i := 0
201+
for s.Next() {
202+
l, c := s.At()
203+
204+
testutil.Equals(t, resps[i].GetSeries().Labels, l)
205+
testutil.Equals(t, resps[i].GetSeries().Chunks, c)
206+
207+
i++
208+
}
209+
testutil.Ok(t, s.Err())
210+
testutil.Equals(t, 3, i)
211+
})
212+
213+
t.Run("cancelled, not exhausted StoreSet", func(t *testing.T) {
214+
defer leaktest.CheckTimeout(t, 10*time.Second)()
215+
216+
ctx, cancel := context.WithCancel(context.Background())
217+
s := newTenantSeriesSetServer(ctx, "a", nil)
218+
219+
m := &mockedStoreServer{responses: []*storepb.SeriesResponse{
220+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
221+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
222+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
223+
}}
224+
go func() {
225+
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
226+
}()
227+
228+
testutil.Ok(t, s.Err())
229+
testutil.Equals(t, true, s.Next())
230+
cancel()
231+
})
232+
}
233+
234+
type faillingSeriesServer struct {
235+
storepb.Store_SeriesServer
236+
237+
ctx context.Context
238+
}
239+
240+
func (s *faillingSeriesServer) Send(*storepb.SeriesResponse) error {
241+
return errors.New("I am broken")
242+
}
243+
func (s *faillingSeriesServer) Context() context.Context { return s.ctx }
244+
245+
// Regression test against https://github.com/thanos-io/thanos/issues/2823.
246+
// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted
247+
func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) {
248+
defer leaktest.CheckTimeout(t, 10*time.Second)()
249+
250+
m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer {
251+
return map[string]storepb.StoreServer{
252+
// Ensure more than 10 (internal respCh channel).
253+
"a": &mockedStoreServer{responses: []*storepb.SeriesResponse{
254+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
255+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
256+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
257+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
258+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
259+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
260+
}},
261+
"b": &mockedStoreServer{responses: []*storepb.SeriesResponse{
262+
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
263+
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
264+
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
265+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
266+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
267+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
268+
}},
269+
}
270+
})
271+
272+
ctx, cancel := context.WithCancel(context.Background())
273+
// We mimic failing series server, but practically context cancel will do the same.
274+
testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &faillingSeriesServer{ctx: ctx}))
275+
cancel()
276+
}

0 commit comments

Comments
 (0)