Skip to content

Commit 60ede4c

Browse files
authored
receive: Fixed leak on receive and querier proxying Store API Series, which was leaking on errors. (#2866)
* receive: Fixed leak on receive and querier proxying Store API Series, which was leaking on errors. Fixes: #2823 TestTenantSeriesSetServert_NotLeakingIfNotExhausted was showing leaks: ``` TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:132: leaktest: timed out checking goroutines TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]: github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Send(0xc000708360, 0xc0003104c0, 0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:141 +0x13e github.com/thanos-io/thanos/pkg/store.(*mockedStoreServer).Series(0xc0004e6330, 0xc0007083c0, 0x20ac2c0, 0xc000708360, 0x5116a0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:173 +0x76 github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series.func1(0x2097760, 0xc00003c940) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:121 +0x56 github.com/thanos-io/thanos/pkg/tracing.DoInSpan(0x2097760, 0xc00003c940, 0x1c8bace, 0x17, 0xc000173760, 0x0, 0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/tracing/tracing.go:72 +0xcc github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series(0xc000708360, 0x20983e0, 0xc0004e6330, 0xc0007083c0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:120 +0xfa github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2.1(0xc000708360, 0xc0004e6330) /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:225 +0x62 created by github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2 /home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:224 +0x618 --- FAIL: TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet (10.03s) FAIL Process finished with exit code 1 ``` TestMultiTSDBStore_NotLeakingOnPrematureFinish was showing: ``` TestMultiTSDBStore_NotLeakingOnSendError: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]: github.com/thanos-io/thanos/pkg/store.ctxRespSender.send(...) /home/bwplotka/Repos/thanos/pkg/store/proxy.go:198 github.com/thanos-io/thanos/pkg/store.(*MultiTSDBStore).Series.func1(0x0, 0x0) /home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:214 +0x5cf golang.org/x/sync/errgroup.(*Group).Go.func1(0xc0002708d0, 0xc000416380) /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59 created by golang.org/x/sync/errgroup.(*Group).Go /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66 --- FAIL: TestMultiTSDBStore_NotLeakingOnSendError (10.02s) ``` Signed-off-by: Bartlomiej Plotka <[email protected]> * Quick fix for leaks. Signed-off-by: Bartlomiej Plotka <[email protected]> * Fixed issues found by lint. Signed-off-by: Bartlomiej Plotka <[email protected]> * Get back copying. Signed-off-by: Bartlomiej Plotka <[email protected]> * Lint Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent cdac8a1 commit 60ede4c

File tree

6 files changed

+293
-63
lines changed

6 files changed

+293
-63
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1616
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
1717
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
1818
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
19+
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and qwuerier Store API Series, which were leaking on errors.
1920

2021
### Changed
2122

pkg/receive/multitsdb.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/thanos-io/thanos/pkg/objstore"
2424
"github.com/thanos-io/thanos/pkg/shipper"
2525
"github.com/thanos-io/thanos/pkg/store"
26+
"github.com/thanos-io/thanos/pkg/store/storepb"
2627
"golang.org/x/sync/errgroup"
2728
)
2829

@@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
211212
return merr.Err()
212213
}
213214

214-
func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
215+
func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {
215216
t.mtx.RLock()
216217
defer t.mtx.RUnlock()
217218

218-
res := make(map[string]*store.TSDBStore, len(t.tenants))
219+
res := make(map[string]storepb.StoreServer, len(t.tenants))
219220
for k, tenant := range t.tenants {
220221
s := tenant.store()
221222
if s != nil {

pkg/store/multitsdb.go

+54-38
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ import (
2424
)
2525

2626
// MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances.
27+
// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864
2728
type MultiTSDBStore struct {
2829
logger log.Logger
2930
component component.SourceStoreAPI
30-
tsdbStores func() map[string]*TSDBStore
31+
tsdbStores func() map[string]storepb.StoreServer
3132
}
3233

3334
// NewMultiTSDBStore creates a new MultiTSDBStore.
34-
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore {
35+
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore {
3536
if logger == nil {
3637
logger = log.NewNopLogger()
3738
}
@@ -89,59 +90,70 @@ type tenantSeriesSetServer struct {
8990

9091
ctx context.Context
9192

92-
warnCh warnSender
93-
recv chan *storepb.Series
94-
cur *storepb.Series
93+
directCh directSender
94+
recv chan *storepb.Series
95+
cur *storepb.Series
9596

9697
err error
9798
tenant string
9899
}
99100

101+
// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
102+
// Details https://github.com/thanos-io/thanos/issues/2864.
100103
func newTenantSeriesSetServer(
101104
ctx context.Context,
102105
tenant string,
103-
warnCh warnSender,
106+
directCh directSender,
104107
) *tenantSeriesSetServer {
105108
return &tenantSeriesSetServer{
106-
ctx: ctx,
107-
tenant: tenant,
108-
warnCh: warnCh,
109-
recv: make(chan *storepb.Series),
109+
ctx: ctx,
110+
tenant: tenant,
111+
directCh: directCh,
112+
recv: make(chan *storepb.Series),
110113
}
111114
}
112115

113-
func (s *tenantSeriesSetServer) Context() context.Context {
114-
return s.ctx
115-
}
116+
func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx }
116117

117-
func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
118+
func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) {
118119
var err error
119120
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
120121
err = store.Series(r, s)
121122
})
122-
123123
if err != nil {
124124
if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
125125
s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant)
126126
} else {
127127
// Consistently prefix tenant specific warnings as done in various other places.
128128
err = errors.New(prefixTenantWarning(s.tenant, err.Error()))
129-
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
129+
s.directCh.send(storepb.NewWarnSeriesResponse(err))
130130
}
131131
}
132-
133132
close(s.recv)
134133
}
135134

136135
func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
137136
series := r.GetSeries()
137+
if series == nil {
138+
// Proxy non series responses directly to client
139+
s.directCh.send(r)
140+
return nil
141+
}
142+
143+
// TODO(bwplotka): Consider avoid copying / learn why it has to copied.
138144
chunks := make([]storepb.AggrChunk, len(series.Chunks))
139145
copy(chunks, series.Chunks)
140-
s.recv <- &storepb.Series{
146+
147+
// For series, pass it to our AggChunkSeriesSet.
148+
select {
149+
case <-s.ctx.Done():
150+
return s.ctx.Err()
151+
case s.recv <- &storepb.Series{
141152
Labels: series.Labels,
142153
Chunks: chunks,
154+
}:
155+
return nil
143156
}
144-
return nil
145157
}
146158

147159
func (s *tenantSeriesSetServer) Next() (ok bool) {
@@ -156,37 +168,39 @@ func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
156168
return s.cur.Labels, s.cur.Chunks
157169
}
158170

159-
func (s *tenantSeriesSetServer) Err() error {
160-
return s.err
161-
}
171+
func (s *tenantSeriesSetServer) Err() error { return s.err }
162172

163173
// Series returns all series for a requested time range and label matcher. The
164174
// returned data may exceed the requested time bounds. The data returned may
165175
// have been read and merged from multiple underlying TSDBStore instances.
166176
func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
177+
span, ctx := tracing.StartSpan(srv.Context(), "multitsdb_series")
178+
defer span.Finish()
179+
167180
stores := s.tsdbStores()
168181
if len(stores) == 0 {
169182
return nil
170183
}
171184

172-
var (
173-
g, gctx = errgroup.WithContext(srv.Context())
174-
span, ctx = tracing.StartSpan(gctx, "multitsdb_series")
175-
// Allow to buffer max 10 series response.
176-
// Each might be quite large (multi chunk long series given by sidecar).
177-
respSender, respRecv, closeFn = newRespCh(gctx, 10)
178-
)
179-
defer span.Finish()
185+
g, gctx := errgroup.WithContext(ctx)
186+
187+
// Allow to buffer max 10 series response.
188+
// Each might be quite large (multi chunk long series given by sidecar).
189+
respSender, respCh := newCancelableRespChannel(gctx, 10)
180190

181191
g.Go(func() error {
192+
// This go routine is responsible for calling store's Series concurrently. Merged results
193+
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
194+
// When this go routine finishes or is canceled, respCh channel is closed.
195+
182196
var (
183197
seriesSet []storepb.SeriesSet
184198
wg = &sync.WaitGroup{}
185199
)
186200

187201
defer func() {
188202
wg.Wait()
189-
closeFn()
203+
close(respCh)
190204
}()
191205

192206
for tenant, store := range stores {
@@ -202,7 +216,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
202216
defer wg.Done()
203217
ss.Series(store, r)
204218
}()
205-
206219
seriesSet = append(seriesSet, ss)
207220
}
208221

@@ -214,13 +227,16 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
214227
}
215228
return mergedSet.Err()
216229
})
217-
218-
for resp := range respRecv {
219-
if err := srv.Send(resp); err != nil {
220-
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
230+
g.Go(func() error {
231+
// Go routine for gathering merged responses and sending them over to client. It stops when
232+
// respCh channel is closed OR on error from client.
233+
for resp := range respCh {
234+
if err := srv.Send(resp); err != nil {
235+
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
236+
}
221237
}
222-
}
223-
238+
return nil
239+
})
224240
return g.Wait()
225241
}
226242

pkg/store/multitsdb_test.go

+138-2
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44
package store
55

66
import (
7+
"context"
78
"fmt"
89
"io/ioutil"
910
"math"
1011
"math/rand"
1112
"os"
1213
"path/filepath"
1314
"testing"
15+
"time"
1416

17+
"github.com/fortytw2/leaktest"
1518
"github.com/go-kit/kit/log"
19+
"github.com/prometheus/prometheus/pkg/labels"
1620
"github.com/prometheus/prometheus/tsdb"
1721
"github.com/thanos-io/thanos/pkg/component"
1822
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -21,6 +25,8 @@ import (
2125
)
2226

2327
func TestMultiTSDBSeries(t *testing.T) {
28+
defer leaktest.CheckTimeout(t, 10*time.Second)()
29+
2430
tb := testutil.NewTB(t)
2531
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
2632
if ok := t.Run("headOnly", func(t testutil.TB) {
@@ -116,12 +122,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
116122
dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)}
117123
}
118124

119-
tsdbs := map[string]*TSDBStore{}
125+
tsdbs := map[string]storepb.StoreServer{}
120126
for i, db := range dbs {
121127
tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger, maxSamplesPerChunk: 120} // On production we have math.MaxInt64
122128
}
123129

124-
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs })
130+
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })
125131

126132
var expected []storepb.Series
127133
lastLabels := storepb.Series{}
@@ -154,3 +160,133 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
154160
},
155161
)
156162
}
163+
164+
type mockedStoreServer struct {
165+
storepb.StoreServer
166+
167+
responses []*storepb.SeriesResponse
168+
}
169+
170+
func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
171+
for _, r := range m.responses {
172+
if err := server.Send(r); err != nil {
173+
return err
174+
}
175+
}
176+
return nil
177+
}
178+
179+
// Regression test against https://github.com/thanos-io/thanos/issues/2823.
180+
func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) {
181+
t.Run("exhausted StoreSet", func(t *testing.T) {
182+
defer leaktest.CheckTimeout(t, 10*time.Second)()
183+
184+
s := newTenantSeriesSetServer(context.Background(), "a", nil)
185+
186+
resps := []*storepb.SeriesResponse{
187+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
188+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
189+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
190+
}
191+
192+
m := &mockedStoreServer{responses: resps}
193+
194+
go func() {
195+
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
196+
}()
197+
198+
testutil.Ok(t, s.Err())
199+
i := 0
200+
for s.Next() {
201+
l, c := s.At()
202+
203+
testutil.Equals(t, resps[i].GetSeries().Labels, l)
204+
testutil.Equals(t, resps[i].GetSeries().Chunks, c)
205+
206+
i++
207+
}
208+
testutil.Ok(t, s.Err())
209+
testutil.Equals(t, 3, i)
210+
})
211+
212+
t.Run("canceled, not exhausted StoreSet", func(t *testing.T) {
213+
defer leaktest.CheckTimeout(t, 10*time.Second)()
214+
215+
ctx, cancel := context.WithCancel(context.Background())
216+
s := newTenantSeriesSetServer(ctx, "a", nil)
217+
218+
m := &mockedStoreServer{responses: []*storepb.SeriesResponse{
219+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
220+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
221+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
222+
}}
223+
go func() {
224+
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
225+
}()
226+
227+
testutil.Ok(t, s.Err())
228+
testutil.Equals(t, true, s.Next())
229+
cancel()
230+
})
231+
}
232+
233+
type mockedSeriesServer struct {
234+
storepb.Store_SeriesServer
235+
ctx context.Context
236+
237+
send func(*storepb.SeriesResponse) error
238+
}
239+
240+
func (s *mockedSeriesServer) Send(r *storepb.SeriesResponse) error {
241+
return s.send(r)
242+
}
243+
func (s *mockedSeriesServer) Context() context.Context { return s.ctx }
244+
245+
// Regression test against https://github.com/thanos-io/thanos/issues/2823.
246+
// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted.
247+
func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) {
248+
defer leaktest.CheckTimeout(t, 10*time.Second)()
249+
250+
m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer {
251+
return map[string]storepb.StoreServer{
252+
// Ensure more than 10 (internal respCh channel).
253+
"a": &mockedStoreServer{responses: []*storepb.SeriesResponse{
254+
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
255+
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
256+
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
257+
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
258+
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
259+
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
260+
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
261+
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
262+
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
263+
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
264+
}},
265+
"b": &mockedStoreServer{responses: []*storepb.SeriesResponse{
266+
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
267+
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
268+
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
269+
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
270+
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
271+
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
272+
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
273+
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
274+
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
275+
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
276+
}},
277+
}
278+
})
279+
280+
t.Run("failing send", func(t *testing.T) {
281+
ctx, cancel := context.WithCancel(context.Background())
282+
// We mimic failing series server, but practically context cancel will do the same.
283+
testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
284+
ctx: ctx,
285+
send: func(*storepb.SeriesResponse) error {
286+
cancel()
287+
return ctx.Err()
288+
},
289+
}))
290+
testutil.NotOk(t, ctx.Err())
291+
})
292+
}

0 commit comments

Comments
 (0)