Skip to content

Commit d2bba2e

Browse files
committed
Fixes after review
1 parent e2a020d commit d2bba2e

File tree

2 files changed

+19
-53
lines changed

2 files changed

+19
-53
lines changed

cmd/thanos/query.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func runQuery(
308308
},
309309
dialOpts,
310310
)
311-
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
311+
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
312312
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
313313
engine = promql.NewEngine(
314314
promql.EngineOpts{

pkg/store/proxy.go

+18-52
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/improbable-eng/thanos/pkg/store/storepb"
1616
"github.com/improbable-eng/thanos/pkg/strutil"
1717
"github.com/pkg/errors"
18-
"github.com/prometheus/client_golang/prometheus"
1918
"github.com/prometheus/tsdb/labels"
2019
"golang.org/x/sync/errgroup"
2120
"google.golang.org/grpc/codes"
@@ -45,50 +44,28 @@ type ProxyStore struct {
4544
component component.StoreAPI
4645
selectorLabels labels.Labels
4746

48-
responseTimeout time.Duration
49-
streamResponseDuration *prometheus.HistogramVec
50-
streamDuration *prometheus.HistogramVec
47+
responseTimeout time.Duration
5148
}
5249

5350
// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
5451
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL)
5552
func NewProxyStore(
5653
logger log.Logger,
57-
reg prometheus.Registerer,
5854
stores func() []Client,
5955
component component.StoreAPI,
6056
selectorLabels labels.Labels,
6157
responseTimeout time.Duration,
6258
) *ProxyStore {
63-
streamResponseDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
64-
Name: "thanos_query_stream_read_duration_seconds",
65-
Help: "Time it takes to perform a single read from GRPC stream.",
66-
Buckets: []float64{
67-
0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5,
68-
}}, []string{"store"})
69-
streamDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
70-
Name: "thanos_query_stream_duration_seconds",
71-
Help: "Time it takes to consume GRPC stream.",
72-
Buckets: []float64{
73-
0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 7.5, 10, 15, 20,
74-
}}, []string{"store"})
75-
reg.MustRegister(
76-
streamResponseDuration,
77-
streamDuration,
78-
)
79-
8059
if logger == nil {
8160
logger = log.NewNopLogger()
8261
}
8362

8463
s := &ProxyStore{
85-
logger: logger,
86-
stores: stores,
87-
component: component,
88-
selectorLabels: selectorLabels,
89-
responseTimeout: responseTimeout,
90-
streamResponseDuration: streamResponseDuration,
91-
streamDuration: streamDuration,
64+
logger: logger,
65+
stores: stores,
66+
component: component,
67+
selectorLabels: selectorLabels,
68+
responseTimeout: responseTimeout,
9269
}
9370
return s
9471
}
@@ -198,7 +175,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
198175
}
199176

200177
// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
201-
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.streamResponseDuration.WithLabelValues(st.Addr()), s.streamDuration.WithLabelValues(st.Addr())))
178+
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
202179
}
203180

204181
level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
@@ -254,10 +231,8 @@ type streamSeriesSet struct {
254231
name string
255232
partialResponse bool
256233

257-
responseTimeout time.Duration
258-
closeSeries context.CancelFunc
259-
streamResponseDuration prometheus.Observer
260-
streamDuration prometheus.Observer
234+
responseTimeout time.Duration
235+
closeSeries context.CancelFunc
261236
}
262237

263238
func startStreamSeriesSet(
@@ -270,35 +245,26 @@ func startStreamSeriesSet(
270245
name string,
271246
partialResponse bool,
272247
responseTimeout time.Duration,
273-
streamResponseDuration prometheus.Observer,
274-
streamDuration prometheus.Observer,
275248
) *streamSeriesSet {
276249
s := &streamSeriesSet{
277-
ctx: ctx,
278-
logger: logger,
279-
closeSeries: closeSeries,
280-
stream: stream,
281-
warnCh: warnCh,
282-
recvCh: make(chan *storepb.Series, 10),
283-
name: name,
284-
partialResponse: partialResponse,
285-
responseTimeout: responseTimeout,
286-
streamResponseDuration: streamResponseDuration,
287-
streamDuration: streamDuration,
250+
ctx: ctx,
251+
logger: logger,
252+
closeSeries: closeSeries,
253+
stream: stream,
254+
warnCh: warnCh,
255+
recvCh: make(chan *storepb.Series, 10),
256+
name: name,
257+
partialResponse: partialResponse,
258+
responseTimeout: responseTimeout,
288259
}
289260

290261
wg.Add(1)
291262
go func() {
292263
defer wg.Done()
293264
defer close(s.recvCh)
294265

295-
timer := prometheus.NewTimer(s.streamDuration)
296-
defer timer.ObserveDuration()
297-
298266
for {
299-
timer := prometheus.NewTimer(s.streamResponseDuration)
300267
r, err := s.stream.Recv()
301-
timer.ObserveDuration()
302268

303269
if err == io.EOF {
304270
return

0 commit comments

Comments
 (0)