8
8
"context"
9
9
"strings"
10
10
"sync"
11
+ "sync/atomic"
11
12
12
13
http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
13
14
"github.com/influxdata/influxdb-client-go/v2/api/write"
@@ -51,7 +52,7 @@ type WriteAPIBlocking interface {
51
52
type writeAPIBlocking struct {
52
53
service * iwrite.Service
53
54
writeOptions * write.Options
54
- batching bool
55
+ batching int32
55
56
batch []string
56
57
mu sync.Mutex
57
58
}
@@ -69,28 +70,28 @@ func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Se
69
70
}
70
71
71
72
func (w * writeAPIBlocking ) EnableBatching () {
72
- w .mu .Lock ()
73
- defer w .mu .Unlock ()
74
- if ! w .batching {
75
- w .batching = true
73
+ if atomic .LoadInt32 (& w .batching ) == 0 {
74
+ w .mu .Lock ()
75
+ w .batching = 1
76
76
w .batch = make ([]string , 0 , w .writeOptions .BatchSize ())
77
+ w .mu .Unlock ()
77
78
}
78
79
}
79
80
80
81
func (w * writeAPIBlocking ) write (ctx context.Context , line string ) error {
81
- w . mu . Lock ()
82
- defer w . mu . Unlock ()
83
- body := line
84
- if w .batching {
85
- w .batch = append (w .batch , line )
86
- if len (w .batch ) == int (w .writeOptions .BatchSize ()) {
87
- body = strings . Join ( w . batch , " \n " )
88
- w . batch = w . batch [: 0 ]
89
- } else {
90
- return nil
91
- }
82
+ if atomic . LoadInt32 ( & w . batching ) > 0 {
83
+ return func ( b string ) error {
84
+ w . mu . Lock ()
85
+ defer w .mu . Unlock ()
86
+ w .batch = append (w .batch , b )
87
+ if len (w .batch ) == int (w .writeOptions .BatchSize ()) {
88
+ return w . flush ( ctx )
89
+ } else {
90
+ return nil
91
+ }
92
+ }( line )
92
93
}
93
- err := w .service .WriteBatch (ctx , iwrite .NewBatch (body , w .writeOptions .MaxRetryTime ()))
94
+ err := w .service .WriteBatch (ctx , iwrite .NewBatch (line , w .writeOptions .MaxRetryTime ()))
94
95
if err != nil {
95
96
return err
96
97
}
@@ -112,13 +113,23 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point
112
113
return w .write (ctx , line )
113
114
}
114
115
115
- func ( w * writeAPIBlocking ) Flush ( ctx context. Context ) error {
116
- w . mu . Lock ()
117
- defer w . mu . Unlock ()
118
- if w . batching && len (w .batch ) > 0 {
116
+ // flush is unsychronized helper for creating and sending batch
117
+ // Must be called from synchronized block
118
+ func ( w * writeAPIBlocking ) flush ( ctx context. Context ) error {
119
+ if len (w .batch ) > 0 {
119
120
body := strings .Join (w .batch , "\n " )
120
121
w .batch = w .batch [:0 ]
121
- return w .service .WriteBatch (ctx , iwrite .NewBatch (body , w .writeOptions .MaxRetryTime ()))
122
+ b := iwrite .NewBatch (body , w .writeOptions .MaxRetryTime ())
123
+ return w .service .WriteBatch (ctx , b )
124
+ }
125
+ return nil
126
+ }
127
+
128
+ func (w * writeAPIBlocking ) Flush (ctx context.Context ) error {
129
+ if atomic .LoadInt32 (& w .batching ) > 0 {
130
+ w .mu .Lock ()
131
+ defer w .mu .Unlock ()
132
+ return w .flush (ctx )
122
133
}
123
134
return nil
124
135
}
0 commit comments