Skip to content

Replace sync/atomic with uber-go/atomic #2935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ github.com/prometheus/prometheus/pkg/testutils=github.com/thanos-io/thanos/pkg/t
github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUntypedFunc,UntypedFunc},\
github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\
NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/client_golang/prometheus/promauto.{NewCounter,\
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}" ./...
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\
sync/atomic=go.uber.org/atomic" ./...
@$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./...
@echo ">> linting all of the Go files GOGC=${GOGC}"
@$(GOLANGCI_LINT) run
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/uber/jaeger-lib v2.2.0+incompatible
go.elastic.co/apm v1.5.0
go.elastic.co/apm/module/apmot v1.5.0
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.2.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
Expand Down
8 changes: 4 additions & 4 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"net/url"
"path"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
Expand All @@ -25,6 +24,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -370,7 +370,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {

var (
wg sync.WaitGroup
numSuccess uint64
numSuccess atomic.Uint64
)
for _, am := range s.alertmanagers {
for _, u := range am.dispatcher.Endpoints() {
Expand All @@ -396,14 +396,14 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
s.latency.WithLabelValues(u.Host).Observe(time.Since(start).Seconds())
s.sent.WithLabelValues(u.Host).Add(float64(len(alerts)))

atomic.AddUint64(&numSuccess, 1)
numSuccess.Inc()
})
}(am, *u)
}
}
wg.Wait()

if numSuccess > 0 {
if numSuccess.Load() > 0 {
return
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/prober/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ package prober
import (
"io"
"net/http"
"sync/atomic"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
)

type check func() bool

// HTTPProbe represents health and readiness status of given component, and provides HTTP integration.
type HTTPProbe struct {
ready uint32
healthy uint32
ready atomic.Uint32
healthy atomic.Uint32
}

// NewHTTP returns HTTPProbe representing readiness and healthiness of given component.
Expand Down Expand Up @@ -49,34 +49,33 @@ func (p *HTTPProbe) handler(logger log.Logger, c check) http.HandlerFunc {

// isReady returns true if component is ready.
func (p *HTTPProbe) isReady() bool {
ready := atomic.LoadUint32(&p.ready)
ready := p.ready.Load()
return ready > 0
}

// isHealthy returns true if component is healthy.
func (p *HTTPProbe) isHealthy() bool {
healthy := atomic.LoadUint32(&p.healthy)
healthy := p.healthy.Load()
return healthy > 0
}

// Ready sets components status to ready.
func (p *HTTPProbe) Ready() {
atomic.SwapUint32(&p.ready, 1)
p.ready.Swap(1)
}

// NotReady sets components status to not ready with given error as a cause.
func (p *HTTPProbe) NotReady(err error) {
atomic.SwapUint32(&p.ready, 0)
p.ready.Swap(0)

}

// Healthy sets components status to healthy.
func (p *HTTPProbe) Healthy() {
atomic.SwapUint32(&p.healthy, 1)

p.healthy.Swap(1)
}

// NotHealthy sets components status to not healthy with given error as a cause.
func (p *HTTPProbe) NotHealthy(err error) {
atomic.SwapUint32(&p.healthy, 0)
p.healthy.Swap(0)
}
2 changes: 1 addition & 1 deletion pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/thanos-io/thanos/pkg/testutil"
"go.uber.org/atomic"
)

func TestReloader_ConfigApply(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -52,6 +51,7 @@ import (
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"go.uber.org/atomic"
)

var emptyRelabelConfig = make([]*relabel.Config, 0)
Expand Down Expand Up @@ -1281,10 +1281,10 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request

if !t.IsBenchmark() {
// Make sure the pool is correctly used. This is expected for 200k numbers.
testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets))
testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets.Load()))
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance))
chunkPool.(*mockedPool).gets = 0
testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance.Load()))
chunkPool.(*mockedPool).gets.Store(0)

for _, b := range blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
Expand All @@ -1306,22 +1306,22 @@ func (m fakePool) Put(_ *[]byte) {}

type mockedPool struct {
parent pool.BytesPool
balance uint64
gets uint64
balance atomic.Uint64
gets atomic.Uint64
}

func (m *mockedPool) Get(sz int) (*[]byte, error) {
b, err := m.parent.Get(sz)
if err != nil {
return nil, err
}
atomic.AddUint64(&m.balance, uint64(cap(*b)))
atomic.AddUint64(&m.gets, uint64(1))
m.balance.Add(uint64(cap(*b)))
m.gets.Add(uint64(1))
return b, nil
}

func (m *mockedPool) Put(b *[]byte) {
atomic.AddUint64(&m.balance, ^uint64(cap(*b)-1))
m.balance.Sub(uint64(cap(*b)))
m.parent.Put(b)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package store

import (
"sync"
"sync/atomic"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

type ChunksLimiter interface {
Expand All @@ -25,7 +25,7 @@ type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter
// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64
reserved uint64
reserved atomic.Uint64

// Counter metric which we will increase if limit is exceeded.
failedCounter prometheus.Counter
Expand All @@ -42,7 +42,7 @@ func (l *Limiter) Reserve(num uint64) error {
if l.limit == 0 {
return nil
}
if reserved := atomic.AddUint64(&l.reserved, num); reserved > l.limit {
if reserved := l.reserved.Add(num); reserved > l.limit {
// We need to protect from the counter being incremented twice due to concurrency
// while calling Reserve().
l.failedOnce.Do(l.failedCounter.Inc)
Expand Down