Skip to content

Commit b02a754

Browse files
authored
Merge pull request #344 from bonitoo-io/fix/write_flush_closing
fix: WriteAPI.Flush() also flushes retry queue
2 parents 725321d + 8a71e86 commit b02a754

File tree

7 files changed

+114
-6
lines changed

7 files changed

+114
-6
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## [unreleased]
22
### Bug fixes
33
- [#341](https://github.com/influxdata/influxdb-client-go/issues/341) Changing logging level of messages about discarding batch to Error.
4+
- [#344](https://github.com/influxdata/influxdb-client-go/issues/344) `WriteAPI.Flush()` writes also batches from the retry queue.
45

56
## 2.9.1 [2022-06-24]
67
### Bug fixes

api/write.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,12 @@ func (w *WriteAPIImpl) Errors() <-chan error {
109109
return w.errCh
110110
}
111111

112-
// Flush forces all pending writes from the buffer to be sent
112+
// Flush forces all pending writes from the buffer to be sent.
113+
// Flush also tries sending batches from retry queue without additional retrying.
113114
func (w *WriteAPIImpl) Flush() {
114115
w.bufferFlush <- struct{}{}
115116
w.waitForFlushing()
117+
w.service.Flush()
116118
}
117119

118120
func (w *WriteAPIImpl) waitForFlushing() {

api/write_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package api
66

77
import (
88
"fmt"
9+
"io"
910
"math"
1011
"runtime"
1112
"strings"
@@ -232,3 +233,35 @@ func TestClosing(t *testing.T) {
232233
assert.Len(t, service.Lines(), 0)
233234

234235
}
236+
237+
func TestFlushWithRetries(t *testing.T) {
238+
service := test.NewTestService(t, "http://localhost:8888")
239+
log.Log.SetLogLevel(log.DebugLevel)
240+
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetRetryInterval(200).SetBatchSize(1))
241+
points := test.GenPoints(5)
242+
fails := 0
243+
244+
var mu sync.Mutex
245+
246+
service.SetRequestHandler(func(url string, body io.Reader) error {
247+
mu.Lock()
248+
defer mu.Unlock()
249+
// fail 4 times, then succeed on the 5th try - maxRetries default is 5
250+
if fails >= 4 {
251+
_ = service.DecodeLines(body)
252+
return nil
253+
}
254+
fails++
255+
return fmt.Errorf("spurious failure")
256+
})
257+
// write will try first batch and others will be put to the retry queue of retry delay caused by first write error
258+
for i := 0; i < len(points); i++ {
259+
writeAPI.WritePoint(points[i])
260+
}
261+
// Flush will try sending first batch again and then others
262+
// 1st, 2nd and 3rd will fail, because test service rejects 4 writes
263+
writeAPI.Flush()
264+
writeAPI.Close()
265+
// two remained
266+
assert.Equal(t, 2, len(service.Lines()))
267+
}

internal/test/http_service.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type HTTPService struct {
2727
lines []string
2828
t *testing.T
2929
wasGzip bool
30-
requestHandler func(c *HTTPService, url string, body io.Reader) error
30+
requestHandler func(url string, body io.Reader) error
3131
replyError *http2.Error
3232
lock sync.Mutex
3333
}
@@ -42,6 +42,10 @@ func (t *HTTPService) SetWasGzip(wasGzip bool) {
4242
t.wasGzip = wasGzip
4343
}
4444

45+
func (t *HTTPService) SetRequestHandler(fn func(url string, body io.Reader) error) {
46+
t.requestHandler = fn
47+
}
48+
4549
// ServerURL returns testing URL
4650
func (t *HTTPService) ServerURL() string {
4751
return t.serverURL
@@ -127,9 +131,9 @@ func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reade
127131
return t.ReplyError()
128132
}
129133
if t.requestHandler != nil {
130-
err = t.requestHandler(t, url, body)
134+
err = t.requestHandler(url, body)
131135
} else {
132-
err = t.decodeLines(body)
136+
err = t.DecodeLines(body)
133137
}
134138

135139
if err != nil {
@@ -138,7 +142,7 @@ func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reade
138142
return nil
139143
}
140144

141-
func (t *HTTPService) decodeLines(body io.Reader) error {
145+
func (t *HTTPService) DecodeLines(body io.Reader) error {
142146
bytes, err := ioutil.ReadAll(body)
143147
if err != nil {
144148
return err

internal/write/service.go

+14
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,20 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) *http2.Error {
261261
return perror
262262
}
263263

264+
// Flush sends batches from retry queue immediately, without retrying
265+
func (w *Service) Flush() {
266+
for !w.retryQueue.isEmpty() {
267+
b := w.retryQueue.pop()
268+
if time.Now().After(b.Expires) {
269+
log.Error("Oldest batch in retry queue expired, discarding")
270+
continue
271+
}
272+
if err := w.WriteBatch(context.Background(), b); err != nil {
273+
log.Errorf("Error flushing batch from retry queue: %w", err.Unwrap())
274+
}
275+
}
276+
}
277+
264278
// pointWithDefaultTags encapsulates Point with default tags
265279
type pointWithDefaultTags struct {
266280
point *write.Point

internal/write/service_test.go

+54
Original file line numberDiff line numberDiff line change
@@ -593,3 +593,57 @@ func TestRetryIntervalAccumulation(t *testing.T) {
593593
// Debug line to capture output of successful test
594594
// assert.True(t, false)
595595
}
596+
597+
func TestFlush(t *testing.T) {
598+
log.Log.SetLogLevel(log.DebugLevel)
599+
hs := test.NewTestService(t, "http://localhost:8086")
600+
//
601+
opts := write.DefaultOptions().SetRetryInterval(1)
602+
ctx := context.Background()
603+
srv := NewService("my-org", "my-bucket", hs, opts)
604+
605+
hs.SetReplyError(&http.Error{
606+
Err: errors.New("connection refused"),
607+
})
608+
609+
lines := test.GenRecords(5)
610+
// Test flush will fail all batches
611+
for _, line := range lines {
612+
b := NewBatch(line, 20)
613+
_ = srv.HandleWrite(ctx, b)
614+
}
615+
assert.Equal(t, 5, srv.retryQueue.list.Len())
616+
srv.Flush()
617+
assert.Len(t, hs.Lines(), 0)
618+
619+
// Test flush will find all batches expired
620+
for _, line := range lines {
621+
b := NewBatch(line, 5)
622+
_ = srv.HandleWrite(ctx, b)
623+
}
624+
625+
assert.Equal(t, 5, srv.retryQueue.list.Len())
626+
<-time.After(5 * time.Millisecond)
627+
628+
hs.SetReplyError(nil)
629+
// all batches should expire
630+
srv.Flush()
631+
assert.Len(t, hs.Lines(), 0)
632+
assert.Equal(t, 0, srv.retryQueue.list.Len())
633+
634+
// Test flush will succeed
635+
hs.SetReplyError(&http.Error{
636+
Err: errors.New("connection refused"),
637+
})
638+
for _, line := range lines {
639+
b := NewBatch(line, 5)
640+
_ = srv.HandleWrite(ctx, b)
641+
}
642+
643+
assert.Equal(t, 5, srv.retryQueue.list.Len())
644+
hs.SetReplyError(nil)
645+
// all batches should expire
646+
srv.Flush()
647+
assert.Len(t, hs.Lines(), 5)
648+
assert.Equal(t, 0, srv.retryQueue.list.Len())
649+
}

log/logger.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,5 +129,5 @@ func (l *logger) Errorf(format string, v ...interface{}) {
129129
func (l *logger) Error(msg string) {
130130
l.lock.Lock()
131131
defer l.lock.Unlock()
132-
log.Print(l.prefix, " [E]! ", msg)
132+
log.Print(l.prefix, " E! ", msg)
133133
}

0 commit comments

Comments
 (0)