Skip to content

Commit 9d45dd9

Browse files
committed
fix: use mutex only if necessary (#352)
1 parent 21f5a82 commit 9d45dd9

File tree

4 files changed

+46
-30
lines changed

4 files changed

+46
-30
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
## [unreleased]
2+
### Bug fixes
3+
- [#354](https://github.com/influxdata/influxdb-client-go/pull/354) More efficient synchronization in WriteAPIBlocking.
4+
25

36
## 2.10.0 [2022-08-25]
47
### Features

api/write.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,18 @@ type WriteAPIImpl struct {
5151
service *iwrite.Service
5252
writeBuffer []string
5353

54-
errCh chan error
55-
writeCh chan *iwrite.Batch
56-
bufferCh chan string
57-
writeStop chan struct{}
58-
bufferStop chan struct{}
59-
bufferFlush chan struct{}
60-
doneCh chan struct{}
61-
bufferInfoCh chan writeBuffInfoReq
62-
writeInfoCh chan writeBuffInfoReq
63-
writeOptions *write.Options
64-
closingMu *sync.Mutex
54+
errCh chan error
55+
writeCh chan *iwrite.Batch
56+
bufferCh chan string
57+
writeStop chan struct{}
58+
bufferStop chan struct{}
59+
bufferFlush chan struct{}
60+
doneCh chan struct{}
61+
bufferInfoCh chan writeBuffInfoReq
62+
writeInfoCh chan writeBuffInfoReq
63+
writeOptions *write.Options
64+
closingMu *sync.Mutex
65+
// more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
6566
isErrChReader int32
6667
}
6768

api/writeAPIBlocking.go

+29-19
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"strings"
1010
"sync"
11+
"sync/atomic"
1112

1213
http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
1314
"github.com/influxdata/influxdb-client-go/v2/api/write"
@@ -51,9 +52,10 @@ type WriteAPIBlocking interface {
5152
type writeAPIBlocking struct {
5253
service *iwrite.Service
5354
writeOptions *write.Options
54-
batching bool
55-
batch []string
56-
mu sync.Mutex
55+
// more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
56+
batching int32
57+
batch []string
58+
mu sync.Mutex
5759
}
5860

5961
// NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
@@ -69,28 +71,26 @@ func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Se
6971
}
7072

7173
func (w *writeAPIBlocking) EnableBatching() {
72-
w.mu.Lock()
73-
defer w.mu.Unlock()
74-
if !w.batching {
75-
w.batching = true
74+
if atomic.LoadInt32(&w.batching) == 0 {
75+
w.mu.Lock()
76+
w.batching = 1
7677
w.batch = make([]string, 0, w.writeOptions.BatchSize())
78+
w.mu.Unlock()
7779
}
7880
}
7981

8082
func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
81-
w.mu.Lock()
82-
defer w.mu.Unlock()
83-
body := line
84-
if w.batching {
83+
if atomic.LoadInt32(&w.batching) > 0 {
84+
w.mu.Lock()
85+
defer w.mu.Unlock()
8586
w.batch = append(w.batch, line)
8687
if len(w.batch) == int(w.writeOptions.BatchSize()) {
87-
body = strings.Join(w.batch, "\n")
88-
w.batch = w.batch[:0]
88+
return w.flush(ctx)
8989
} else {
9090
return nil
9191
}
9292
}
93-
err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
93+
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
9494
if err != nil {
9595
return err
9696
}
@@ -112,13 +112,23 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point
112112
return w.write(ctx, line)
113113
}
114114

115-
func (w *writeAPIBlocking) Flush(ctx context.Context) error {
116-
w.mu.Lock()
117-
defer w.mu.Unlock()
118-
if w.batching && len(w.batch) > 0 {
115+
// flush is unsychronized helper for creating and sending batch
116+
// Must be called from synchronized block
117+
func (w *writeAPIBlocking) flush(ctx context.Context) error {
118+
if len(w.batch) > 0 {
119119
body := strings.Join(w.batch, "\n")
120120
w.batch = w.batch[:0]
121-
return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
121+
b := iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())
122+
return w.service.WriteBatch(ctx, b)
123+
}
124+
return nil
125+
}
126+
127+
func (w *writeAPIBlocking) Flush(ctx context.Context) error {
128+
if atomic.LoadInt32(&w.batching) > 0 {
129+
w.mu.Lock()
130+
defer w.mu.Unlock()
131+
return w.flush(ctx)
122132
}
123133
return nil
124134
}

internal/test/http_service.go

+2
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request
123123
// DoPostRequest reads http request, validates URL and stores data in the request
124124
func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error {
125125
req, err := http.NewRequest("POST", url, nil)
126+
t.lock.Lock()
126127
t.requests++
128+
t.lock.Unlock()
127129
if err != nil {
128130
return http2.NewError(err)
129131
}

0 commit comments

Comments
 (0)