Skip to content

Commit a9c1e37

Browse files
committed
feat: add Flush()
1 parent 830a29a commit a9c1e37

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
- [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error).
88

99
### Breaking change
10-
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()`.
10+
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()` and `Flush()`.
1111

1212
## 2.9.2 [2022-07-29]
1313
### Bug fixes

api/writeAPIBlocking.go

+14
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
// to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server
2222
// and the result of the operation is returned.
2323
// When a point is written to the buffer, nil error is always returned.
24+
// Flush() can be used to trigger sending of batch when it doesn't have the batch-size.
2425
//
2526
// Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.
2627

@@ -42,6 +43,8 @@ type WriteAPIBlocking interface {
4243
// EnableBatching turns on implicit batching
4344
// Batch size is controlled via write.Options
4445
EnableBatching()
46+
// Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size.
47+
Flush(ctx context.Context) error
4548
}
4649

4750
// writeAPIBlocking implements WriteAPIBlocking interface
@@ -108,3 +111,14 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point
108111
}
109112
return w.write(ctx, line)
110113
}
114+
115+
func (w *writeAPIBlocking) Flush(ctx context.Context) error {
116+
w.mu.Lock()
117+
defer w.mu.Unlock()
118+
if w.batching && len(w.batch) > 0 {
119+
body := strings.Join(w.batch, "\n")
120+
w.batch = w.batch[:0]
121+
return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
122+
}
123+
return nil
124+
}

api/writeAPIBlocking_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,15 @@ func TestWriteBatchIng(t *testing.T) {
146146
service.Close()
147147
}
148148
}
149+
150+
for i := 0; i < 4; i++ {
151+
err := writeAPI.WriteRecord(context.Background(), lines[i])
152+
require.Nil(t, err)
153+
}
154+
assert.Equal(t, 0, service.Requests())
155+
require.Len(t, service.Lines(), 0)
156+
err := writeAPI.Flush(context.Background())
157+
require.Nil(t, err)
158+
assert.Equal(t, 1, service.Requests())
159+
require.Len(t, service.Lines(), 4)
149160
}

0 commit comments

Comments
 (0)