9
9
"net"
10
10
"net/http"
11
11
"strconv"
12
+ "sync"
12
13
"sync/atomic"
13
14
14
15
"github.com/go-kit/kit/log"
@@ -46,10 +47,12 @@ type Handler struct {
46
47
logger log.Logger
47
48
receiver * Writer
48
49
router * route.Router
49
- hashring Hashring
50
50
options * Options
51
51
listener net.Listener
52
52
53
+ mtx sync.RWMutex
54
+ hashring Hashring
55
+
53
56
// Metrics
54
57
requestDuration * prometheus.HistogramVec
55
58
requestsTotal * prometheus.CounterVec
@@ -140,6 +143,9 @@ func (h *Handler) StorageReady() {
140
143
// Hashring sets the hashring for the handler and marks the hashring as ready.
141
144
// If the hashring is nil, then the hashring is marked as not ready.
142
145
func (h * Handler ) Hashring (hashring Hashring ) {
146
+ h .mtx .Lock ()
147
+ defer h .mtx .Unlock ()
148
+
143
149
if hashring == nil {
144
150
atomic .StoreUint32 (& h .hashringReady , 0 )
145
151
h .hashring = nil
@@ -275,6 +281,15 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) {
275
281
func (h * Handler ) forward (ctx context.Context , tenant string , r replica , wreq * prompb.WriteRequest ) error {
276
282
wreqs := make (map [string ]* prompb.WriteRequest )
277
283
replicas := make (map [string ]replica )
284
+
285
+ h .mtx .RLock ()
286
+ // It is possible that hashring is ready in testReady() but become unready now,
287
+ // so we need to lock here.
288
+ if h .hashring == nil {
289
+ h .mtx .RUnlock ()
290
+ return errors .New ("hashring is not ready" )
291
+ }
292
+
278
293
// Batch all of the time series in the write request
279
294
// into several smaller write requests that are
280
295
// grouped by target endpoint. This ensures that
@@ -285,6 +300,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
285
300
for i := range wreq .Timeseries {
286
301
endpoint , err := h .hashring .GetN (tenant , & wreq .Timeseries [i ], r .n )
287
302
if err != nil {
303
+ h .mtx .RUnlock ()
288
304
return err
289
305
}
290
306
if _ , ok := wreqs [endpoint ]; ! ok {
@@ -294,6 +310,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
294
310
wr := wreqs [endpoint ]
295
311
wr .Timeseries = append (wr .Timeseries , wreq .Timeseries [i ])
296
312
}
313
+ h .mtx .RUnlock ()
297
314
298
315
return h .parallelizeRequests (ctx , tenant , replicas , wreqs )
299
316
}
@@ -400,14 +417,25 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
400
417
wreqs := make (map [string ]* prompb.WriteRequest )
401
418
replicas := make (map [string ]replica )
402
419
var i uint64
420
+
421
+ h .mtx .RLock ()
422
+ // It is possible that hashring is ready in testReady() but become unready now,
423
+ // so we need to lock here.
424
+ if h .hashring == nil {
425
+ h .mtx .RLock ()
426
+ return errors .New ("hashring is not ready" )
427
+ }
428
+
403
429
for i = 0 ; i < h .options .ReplicationFactor ; i ++ {
404
430
endpoint , err := h .hashring .GetN (tenant , & wreq .Timeseries [0 ], i )
405
431
if err != nil {
432
+ h .mtx .RUnlock ()
406
433
return err
407
434
}
408
435
wreqs [endpoint ] = wreq
409
436
replicas [endpoint ] = replica {i , true }
410
437
}
438
+ h .mtx .RUnlock ()
411
439
412
440
err := h .parallelizeRequests (ctx , tenant , replicas , wreqs )
413
441
if errs , ok := err .(terrors.MultiError ); ok {
0 commit comments