Skip to content

Commit e4b3530

Browse files
committed
Fixes after review
1 parent 3280c51 commit e4b3530

File tree

3 files changed

+25
-60
lines changed

3 files changed

+25
-60
lines changed

cmd/thanos/query.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func runQuery(
308308
},
309309
dialOpts,
310310
)
311-
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
311+
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
312312
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
313313
engine = promql.NewEngine(
314314
promql.EngineOpts{

pkg/store/proxy.go

+18-52
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/improbable-eng/thanos/pkg/store/storepb"
1616
"github.com/improbable-eng/thanos/pkg/strutil"
1717
"github.com/pkg/errors"
18-
"github.com/prometheus/client_golang/prometheus"
1918
"github.com/prometheus/tsdb/labels"
2019
"golang.org/x/sync/errgroup"
2120
"google.golang.org/grpc/codes"
@@ -45,50 +44,28 @@ type ProxyStore struct {
4544
component component.StoreAPI
4645
selectorLabels labels.Labels
4746

48-
responseTimeout time.Duration
49-
streamResponseDuration *prometheus.HistogramVec
50-
streamDuration *prometheus.HistogramVec
47+
responseTimeout time.Duration
5148
}
5249

5350
// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
5451
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL)
5552
func NewProxyStore(
5653
logger log.Logger,
57-
reg prometheus.Registerer,
5854
stores func() []Client,
5955
component component.StoreAPI,
6056
selectorLabels labels.Labels,
6157
responseTimeout time.Duration,
6258
) *ProxyStore {
63-
streamResponseDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
64-
Name: "thanos_query_stream_read_duration_seconds",
65-
Help: "Time it takes to perform a single read from GRPC stream.",
66-
Buckets: []float64{
67-
0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5,
68-
}}, []string{"store"})
69-
streamDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
70-
Name: "thanos_query_stream_duration_seconds",
71-
Help: "Time it takes to consume GRPC stream.",
72-
Buckets: []float64{
73-
0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 7.5, 10, 15, 20,
74-
}}, []string{"store"})
75-
reg.MustRegister(
76-
streamResponseDuration,
77-
streamDuration,
78-
)
79-
8059
if logger == nil {
8160
logger = log.NewNopLogger()
8261
}
8362

8463
s := &ProxyStore{
85-
logger: logger,
86-
stores: stores,
87-
component: component,
88-
selectorLabels: selectorLabels,
89-
responseTimeout: responseTimeout,
90-
streamResponseDuration: streamResponseDuration,
91-
streamDuration: streamDuration,
64+
logger: logger,
65+
stores: stores,
66+
component: component,
67+
selectorLabels: selectorLabels,
68+
responseTimeout: responseTimeout,
9269
}
9370
return s
9471
}
@@ -198,7 +175,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
198175
}
199176

200177
// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
201-
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.streamResponseDuration.WithLabelValues(st.Addr()), s.streamDuration.WithLabelValues(st.Addr())))
178+
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
202179
}
203180

204181
level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
@@ -254,10 +231,8 @@ type streamSeriesSet struct {
254231
name string
255232
partialResponse bool
256233

257-
responseTimeout time.Duration
258-
closeSeries context.CancelFunc
259-
streamResponseDuration prometheus.Observer
260-
streamDuration prometheus.Observer
234+
responseTimeout time.Duration
235+
closeSeries context.CancelFunc
261236
}
262237

263238
func startStreamSeriesSet(
@@ -270,35 +245,26 @@ func startStreamSeriesSet(
270245
name string,
271246
partialResponse bool,
272247
responseTimeout time.Duration,
273-
streamResponseDuration prometheus.Observer,
274-
streamDuration prometheus.Observer,
275248
) *streamSeriesSet {
276249
s := &streamSeriesSet{
277-
ctx: ctx,
278-
logger: logger,
279-
closeSeries: closeSeries,
280-
stream: stream,
281-
warnCh: warnCh,
282-
recvCh: make(chan *storepb.Series, 10),
283-
name: name,
284-
partialResponse: partialResponse,
285-
responseTimeout: responseTimeout,
286-
streamResponseDuration: streamResponseDuration,
287-
streamDuration: streamDuration,
250+
ctx: ctx,
251+
logger: logger,
252+
closeSeries: closeSeries,
253+
stream: stream,
254+
warnCh: warnCh,
255+
recvCh: make(chan *storepb.Series, 10),
256+
name: name,
257+
partialResponse: partialResponse,
258+
responseTimeout: responseTimeout,
288259
}
289260

290261
wg.Add(1)
291262
go func() {
292263
defer wg.Done()
293264
defer close(s.recvCh)
294265

295-
timer := prometheus.NewTimer(s.streamDuration)
296-
defer timer.ObserveDuration()
297-
298266
for {
299-
timer := prometheus.NewTimer(s.streamResponseDuration)
300267
r, err := s.stream.Recv()
301-
timer.ObserveDuration()
302268

303269
if err == io.EOF {
304270
return

pkg/store/proxy_test.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/improbable-eng/thanos/pkg/store/storepb"
1515
"github.com/improbable-eng/thanos/pkg/testutil"
1616
"github.com/pkg/errors"
17-
"github.com/prometheus/client_golang/prometheus"
1817
"github.com/prometheus/prometheus/pkg/labels"
1918
"github.com/prometheus/tsdb/chunkenc"
2019
tlabels "github.com/prometheus/tsdb/labels"
@@ -53,7 +52,7 @@ func TestProxyStore_Info(t *testing.T) {
5352
ctx, cancel := context.WithCancel(context.Background())
5453
defer cancel()
5554

56-
q := NewProxyStore(nil, prometheus.NewRegistry(),
55+
q := NewProxyStore(nil,
5756
func() []Client { return nil },
5857
component.Query,
5958
nil, 0*time.Second,
@@ -407,7 +406,7 @@ func TestProxyStore_Series(t *testing.T) {
407406
} {
408407

409408
if ok := t.Run(tc.title, func(t *testing.T) {
410-
q := NewProxyStore(nil, prometheus.NewRegistry(),
409+
q := NewProxyStore(nil,
411410
func() []Client { return tc.storeAPIs },
412411
component.Query,
413412
tc.selectorLabels,
@@ -529,7 +528,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
529528
},
530529
} {
531530
if ok := t.Run(tc.title, func(t *testing.T) {
532-
q := NewProxyStore(nil, prometheus.NewRegistry(),
531+
q := NewProxyStore(nil,
533532
func() []Client { return tc.storeAPIs },
534533
component.Query,
535534
tc.selectorLabels,
@@ -571,7 +570,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
571570
maxTime: 300,
572571
},
573572
}
574-
q := NewProxyStore(nil, prometheus.NewRegistry(),
573+
q := NewProxyStore(nil,
575574
func() []Client { return cls },
576575
component.Query,
577576
nil,
@@ -630,7 +629,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
630629

631630
}
632631

633-
q := NewProxyStore(nil, prometheus.NewRegistry(),
632+
q := NewProxyStore(nil,
634633
func() []Client { return cls },
635634
component.Query,
636635
tlabels.FromStrings("fed", "a"),
@@ -668,7 +667,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
668667
},
669668
}},
670669
}
671-
q := NewProxyStore(nil, prometheus.NewRegistry(),
670+
q := NewProxyStore(nil,
672671
func() []Client { return cls },
673672
component.Query,
674673
nil,

0 commit comments

Comments
 (0)