diff --git a/Makefile b/Makefile index 8217befbaa..edebb4c6c8 100644 --- a/Makefile +++ b/Makefile @@ -266,7 +266,8 @@ github.com/prometheus/prometheus/pkg/testutils=github.com/thanos-io/thanos/pkg/t github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUntypedFunc,UntypedFunc},\ github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\ NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/client_golang/prometheus/promauto.{NewCounter,\ -NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}" ./... +NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\ +sync/atomic=go.uber.org/atomic" ./... @$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./... @echo ">> linting all of the Go files GOGC=${GOGC}" @$(GOLANGCI_LINT) run diff --git a/go.mod b/go.mod index 398b2adb87..3562c93cd0 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/uber/jaeger-lib v2.2.0+incompatible go.elastic.co/apm v1.5.0 go.elastic.co/apm/module/apmot v1.5.0 + go.uber.org/atomic v1.6.0 go.uber.org/automaxprocs v1.2.0 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index 0a7b4b30e1..59dd4c83f5 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -14,7 +14,6 @@ import ( "net/url" "path" "sync" - "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -25,6 +24,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" + "go.uber.org/atomic" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/tracing" @@ -370,7 +370,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { var ( wg sync.WaitGroup - numSuccess uint64 + numSuccess atomic.Uint64 ) for _, am := range s.alertmanagers { for _, u := range am.dispatcher.Endpoints() { @@ -396,14 +396,14 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { s.latency.WithLabelValues(u.Host).Observe(time.Since(start).Seconds()) s.sent.WithLabelValues(u.Host).Add(float64(len(alerts))) - atomic.AddUint64(&numSuccess, 1) + numSuccess.Inc() }) }(am, *u) } } wg.Wait() - if numSuccess > 0 { + if numSuccess.Load() > 0 { return } diff --git a/pkg/prober/http.go b/pkg/prober/http.go index fe273741f1..74bc5f6dcb 100644 --- a/pkg/prober/http.go +++ b/pkg/prober/http.go @@ -6,18 +6,18 @@ package prober import ( "io" "net/http" - "sync/atomic" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "go.uber.org/atomic" ) type check func() bool // HTTPProbe represents health and readiness status of given component, and provides HTTP integration. type HTTPProbe struct { - ready uint32 - healthy uint32 + ready atomic.Uint32 + healthy atomic.Uint32 } // NewHTTP returns HTTPProbe representing readiness and healthiness of given component. @@ -49,34 +49,33 @@ func (p *HTTPProbe) handler(logger log.Logger, c check) http.HandlerFunc { // isReady returns true if component is ready. func (p *HTTPProbe) isReady() bool { - ready := atomic.LoadUint32(&p.ready) + ready := p.ready.Load() return ready > 0 } // isHealthy returns true if component is healthy. func (p *HTTPProbe) isHealthy() bool { - healthy := atomic.LoadUint32(&p.healthy) + healthy := p.healthy.Load() return healthy > 0 } // Ready sets components status to ready. func (p *HTTPProbe) Ready() { - atomic.SwapUint32(&p.ready, 1) + p.ready.Swap(1) } // NotReady sets components status to not ready with given error as a cause. func (p *HTTPProbe) NotReady(err error) { - atomic.SwapUint32(&p.ready, 0) + p.ready.Swap(0) } // Healthy sets components status to healthy. func (p *HTTPProbe) Healthy() { - atomic.SwapUint32(&p.healthy, 1) - + p.healthy.Swap(1) } // NotHealthy sets components status to not healthy with given error as a cause. func (p *HTTPProbe) NotHealthy(err error) { - atomic.SwapUint32(&p.healthy, 0) + p.healthy.Swap(0) } diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 30fe5f4c13..af8b1f2593 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -15,12 +15,12 @@ import ( "path/filepath" "strings" "sync" - "sync/atomic" "testing" "time" "github.com/fortytw2/leaktest" "github.com/thanos-io/thanos/pkg/testutil" + "go.uber.org/atomic" ) func TestReloader_ConfigApply(t *testing.T) { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 951b83f732..ef6a306854 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -19,7 +19,6 @@ import ( "sort" "strconv" "sync" - "sync/atomic" "testing" "time" @@ -52,6 +51,7 @@ import ( storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" + "go.uber.org/atomic" ) var emptyRelabelConfig = make([]*relabel.Config, 0) @@ -1281,10 +1281,10 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request if !t.IsBenchmark() { // Make sure the pool is correctly used. This is expected for 200k numbers. - testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets)) + testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets.Load())) // TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate. - testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance)) - chunkPool.(*mockedPool).gets = 0 + testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance.Load())) + chunkPool.(*mockedPool).gets.Store(0) for _, b := range blocks { // NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series. @@ -1306,8 +1306,8 @@ func (m fakePool) Put(_ *[]byte) {} type mockedPool struct { parent pool.BytesPool - balance uint64 - gets uint64 + balance atomic.Uint64 + gets atomic.Uint64 } func (m *mockedPool) Get(sz int) (*[]byte, error) { @@ -1315,13 +1315,13 @@ func (m *mockedPool) Get(sz int) (*[]byte, error) { if err != nil { return nil, err } - atomic.AddUint64(&m.balance, uint64(cap(*b))) - atomic.AddUint64(&m.gets, uint64(1)) + m.balance.Add(uint64(cap(*b))) + m.gets.Add(uint64(1)) return b, nil } func (m *mockedPool) Put(b *[]byte) { - atomic.AddUint64(&m.balance, ^uint64(cap(*b)-1)) + m.balance.Sub(uint64(cap(*b))) m.parent.Put(b) } diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 1e354721c2..c60be901e9 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -5,10 +5,10 @@ package store import ( "sync" - "sync/atomic" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" ) type ChunksLimiter interface { @@ -25,7 +25,7 @@ type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter // Limiter is a simple mechanism for checking if something has passed a certain threshold. type Limiter struct { limit uint64 - reserved uint64 + reserved atomic.Uint64 // Counter metric which we will increase if limit is exceeded. failedCounter prometheus.Counter @@ -42,7 +42,7 @@ func (l *Limiter) Reserve(num uint64) error { if l.limit == 0 { return nil } - if reserved := atomic.AddUint64(&l.reserved, num); reserved > l.limit { + if reserved := l.reserved.Add(num); reserved > l.limit { // We need to protect from the counter being incremented twice due to concurrency // while calling Reserve(). l.failedOnce.Do(l.failedCounter.Inc)