Skip to content

Commit 1aa69ed

Browse files
committed
Quick fix for leaks.
Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent b3f37a1 commit 1aa69ed

File tree

3 files changed

+75
-56
lines changed

3 files changed

+75
-56
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1616
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
1717
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
1818
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
19-
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive: Fixed a leak on receive Store API Series, which was leaking on errors.
19+
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive,Querier: Fixed leaks on receive and qwuerier Store API Series, which were leaking on errors.
2020

2121
### Changed
2222

pkg/store/multitsdb.go

+42-34
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ type tenantSeriesSetServer struct {
9090

9191
ctx context.Context
9292

93-
warnCh warnSender
94-
recv chan *storepb.Series
95-
cur *storepb.Series
93+
directCh directSender
94+
recv chan *storepb.Series
95+
cur *storepb.Series
9696

9797
err error
9898
tenant string
@@ -103,13 +103,13 @@ type tenantSeriesSetServer struct {
103103
func newTenantSeriesSetServer(
104104
ctx context.Context,
105105
tenant string,
106-
warnCh warnSender,
106+
directCh directSender,
107107
) *tenantSeriesSetServer {
108108
return &tenantSeriesSetServer{
109-
ctx: ctx,
110-
tenant: tenant,
111-
warnCh: warnCh,
112-
recv: make(chan *storepb.Series),
109+
ctx: ctx,
110+
tenant: tenant,
111+
directCh: directCh,
112+
recv: make(chan *storepb.Series),
113113
}
114114
}
115115

@@ -120,27 +120,30 @@ func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.Ser
120120
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
121121
err = store.Series(r, s)
122122
})
123-
124123
if err != nil {
125124
if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
126125
s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant)
127126
} else {
128127
// Consistently prefix tenant specific warnings as done in various other places.
129128
err = errors.New(prefixTenantWarning(s.tenant, err.Error()))
130-
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
129+
s.directCh.send(storepb.NewWarnSeriesResponse(err))
131130
}
132131
}
133-
134132
close(s.recv)
135133
}
136134

137135
func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
138136
series := r.GetSeries()
139-
chunks := make([]storepb.AggrChunk, len(series.Chunks))
140-
copy(chunks, series.Chunks)
141-
s.recv <- &storepb.Series{
142-
Labels: series.Labels,
143-
Chunks: chunks,
137+
if series == nil {
138+
// Proxy non series responses directly to client
139+
s.directCh.send(r)
140+
return nil
141+
}
142+
// For series, pass it to our AggChunkSeriesSet.
143+
select {
144+
case <-s.ctx.Done():
145+
return s.ctx.Err()
146+
case s.recv <- series:
144147
}
145148
return nil
146149
}
@@ -157,37 +160,39 @@ func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
157160
return s.cur.Labels, s.cur.Chunks
158161
}
159162

160-
func (s *tenantSeriesSetServer) Err() error {
161-
return s.err
162-
}
163+
func (s *tenantSeriesSetServer) Err() error { return s.err }
163164

164165
// Series returns all series for a requested time range and label matcher. The
165166
// returned data may exceed the requested time bounds. The data returned may
166167
// have been read and merged from multiple underlying TSDBStore instances.
167168
func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
169+
span, ctx := tracing.StartSpan(srv.Context(), "multitsdb_series")
170+
defer span.Finish()
171+
168172
stores := s.tsdbStores()
169173
if len(stores) == 0 {
170174
return nil
171175
}
172176

173-
var (
174-
g, gctx = errgroup.WithContext(srv.Context())
175-
span, ctx = tracing.StartSpan(gctx, "multitsdb_series")
176-
// Allow to buffer max 10 series response.
177-
// Each might be quite large (multi chunk long series given by sidecar).
178-
respSender, respRecv, closeFn = newRespCh(gctx, 10)
179-
)
180-
defer span.Finish()
177+
g, gctx := errgroup.WithContext(ctx)
178+
179+
// Allow to buffer max 10 series response.
180+
// Each might be quite large (multi chunk long series given by sidecar).
181+
respSender, respCh := newCancellableRespChannel(gctx, 10)
181182

182183
g.Go(func() error {
184+
// This go routine is responsible for calling store's Series concurrently. Merged results
185+
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
186+
// When this go routine finishes or is cancelled, respCh channel is closed.
187+
183188
var (
184189
seriesSet []storepb.SeriesSet
185190
wg = &sync.WaitGroup{}
186191
)
187192

188193
defer func() {
189194
wg.Wait()
190-
closeFn()
195+
close(respCh)
191196
}()
192197

193198
for tenant, store := range stores {
@@ -214,13 +219,16 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
214219
}
215220
return mergedSet.Err()
216221
})
217-
218-
for resp := range respRecv {
219-
if err := srv.Send(resp); err != nil {
220-
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
222+
g.Go(func() error {
223+
// Go routine for gathering merged responses and sending them over to client. It stops when
224+
// respCh channel is closed OR on error from client.
225+
for resp := range respCh {
226+
if err := srv.Send(resp); err != nil {
227+
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
228+
}
221229
}
222-
}
223-
230+
return nil
231+
})
224232
return g.Wait()
225233
}
226234

pkg/store/proxy.go

+32-21
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,23 @@ func mergeLabels(a []storepb.Label, b labels.Labels) []storepb.Label {
184184
return res
185185
}
186186

187-
type ctxRespSender struct {
187+
// cancellableRespSender is a response channel that does need to be exhausted on cancel.
188+
type cancellableRespSender struct {
188189
ctx context.Context
189190
ch chan<- *storepb.SeriesResponse
190191
}
191192

192-
func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb.SeriesResponse, func()) {
193+
func newCancellableRespChannel(ctx context.Context, buffer int) (*cancellableRespSender, chan *storepb.SeriesResponse) {
193194
respCh := make(chan *storepb.SeriesResponse, buffer)
194-
return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) }
195+
return &cancellableRespSender{ctx: ctx, ch: respCh}, respCh
195196
}
196197

197-
func (s ctxRespSender) send(r *storepb.SeriesResponse) {
198-
s.ch <- r
198+
// send or returns on cancel.
199+
func (s cancellableRespSender) send(r *storepb.SeriesResponse) {
200+
select {
201+
case <-s.ctx.Done():
202+
case s.ch <- r:
203+
}
199204
}
200205

201206
// Series returns all series for a requested time range and label matcher. Requested series are taken from other
@@ -213,15 +218,17 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
213218
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
214219
}
215220

216-
var (
217-
g, gctx = errgroup.WithContext(srv.Context())
221+
g, gctx := errgroup.WithContext(srv.Context())
218222

219-
// Allow to buffer max 10 series response.
220-
// Each might be quite large (multi chunk long series given by sidecar).
221-
respSender, respRecv, closeFn = newRespCh(gctx, 10)
222-
)
223+
// Allow to buffer max 10 series response.
224+
// Each might be quite large (multi chunk long series given by sidecar).
225+
respSender, respCh := newCancellableRespChannel(gctx, 10)
223226

224227
g.Go(func() error {
228+
// This go routine is responsible for calling store's Series concurrently. Merged results
229+
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
230+
// When this go routine finishes or is cancelled, respCh channel is closed.
231+
225232
var (
226233
seriesSet []storepb.SeriesSet
227234
storeDebugMsgs []string
@@ -239,7 +246,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
239246

240247
defer func() {
241248
wg.Wait()
242-
closeFn()
249+
close(respCh)
243250
}()
244251

245252
for _, st := range s.stores() {
@@ -306,21 +313,25 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
306313
}
307314
return mergedSet.Err()
308315
})
309-
310-
for resp := range respRecv {
311-
if err := srv.Send(resp); err != nil {
312-
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
316+
g.Go(func() error {
317+
// Go routine for gathering merged responses and sending them over to client. It stops when
318+
// respCh channel is closed OR on error from client.
319+
for resp := range respCh {
320+
if err := srv.Send(resp); err != nil {
321+
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
322+
}
313323
}
314-
}
315-
324+
return nil
325+
})
316326
if err := g.Wait(); err != nil {
327+
// TODO(bwplotka): Replace with request logger.
317328
level.Error(s.logger).Log("err", err)
318329
return err
319330
}
320331
return nil
321332
}
322333

323-
type warnSender interface {
334+
type directSender interface {
324335
send(*storepb.SeriesResponse)
325336
}
326337

@@ -331,7 +342,7 @@ type streamSeriesSet struct {
331342
logger log.Logger
332343

333344
stream storepb.Store_SeriesClient
334-
warnCh warnSender
345+
warnCh directSender
335346

336347
currSeries *storepb.Series
337348
recvCh chan *storepb.Series
@@ -367,7 +378,7 @@ func startStreamSeriesSet(
367378
closeSeries context.CancelFunc,
368379
wg *sync.WaitGroup,
369380
stream storepb.Store_SeriesClient,
370-
warnCh warnSender,
381+
warnCh directSender,
371382
name string,
372383
partialResponse bool,
373384
responseTimeout time.Duration,

0 commit comments

Comments
 (0)