Skip to content

Commit efe7e7f

Browse files
committed
fix(write): blocking write doesn't use reply strategy
1 parent 7f61a1a commit efe7e7f

9 files changed

+165
-138
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 2.2.3 [in progress]
2+
### Bug fixes
3+
1. [#236](https://github.com/influxdata/influxdb-client-go/pull/236) Setting MaxRetries to zero value disables retry strategy.
4+
1. [#239](https://github.com/influxdata/influxdb-client-go/pull/239) Blocking write client doesn't use retry handling.
5+
16
## 2.2.2 [2021-01-29]
27
### Bug fixes
38
1. [#229](https://github.com/influxdata/influxdb-client-go/pull/229) Connection errors are also subject for retrying.

api/http/error.go

+5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package http
66

77
import (
8+
"errors"
89
"fmt"
910
"strconv"
1011
)
@@ -30,6 +31,10 @@ func (e *Error) Error() string {
3031
}
3132
}
3233

34+
func (e *Error) ToError() error {
35+
return errors.New(e.Error())
36+
}
37+
3338
// NewError returns newly created Error initialised with nested error and default values
3439
func NewError(err error) *Error {
3540
return &Error{

api/writeAPIBlocking.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package api
77
import (
88
"context"
99
"strings"
10-
"sync"
1110

1211
http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
1312
"github.com/influxdata/influxdb-client-go/v2/api/write"
@@ -61,7 +60,6 @@ type WriteAPIBlocking interface {
6160
type writeAPIBlocking struct {
6261
service *iwrite.Service
6362
writeOptions *write.Options
64-
lock sync.Mutex
6563
}
6664

6765
// NewWriteAPIBlocking creates new WriteAPIBlocking instance for org and bucket with underlying client
@@ -70,10 +68,11 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write
7068
}
7169

7270
func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
73-
w.lock.Lock()
74-
defer w.lock.Unlock()
75-
err := w.service.HandleWrite(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval()))
76-
return err
71+
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval()))
72+
if err != nil {
73+
return err.ToError()
74+
}
75+
return nil
7776
}
7877

7978
func (w *writeAPIBlocking) WriteRecord(ctx context.Context, line ...string) error {

api/writeAPIBlocking_test.go

+27-22
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package api
66

77
import (
88
"context"
9+
"net"
10+
"net/http"
911
"sync"
1012
"testing"
1113
"time"
@@ -20,7 +22,7 @@ import (
2022
func TestWritePoint(t *testing.T) {
2123
service := test.NewTestService(t, "http://localhost:8888")
2224
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
23-
points := genPoints(10)
25+
points := test.GenPoints(10)
2426
err := writeAPI.WritePoint(context.Background(), points...)
2527
require.Nil(t, err)
2628
require.Len(t, service.Lines(), 10)
@@ -35,7 +37,7 @@ func TestWritePoint(t *testing.T) {
3537
func TestWriteRecord(t *testing.T) {
3638
service := test.NewTestService(t, "http://localhost:8888")
3739
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
38-
lines := genRecords(10)
40+
lines := test.GenRecords(10)
3941
err := writeAPI.WriteRecord(context.Background(), lines...)
4042
require.Nil(t, err)
4143
require.Len(t, service.Lines(), 10)
@@ -54,29 +56,10 @@ func TestWriteRecord(t *testing.T) {
5456
require.Equal(t, "invalid: data", err.Error())
5557
}
5658

57-
func TestWriteContextCancel(t *testing.T) {
58-
service := test.NewTestService(t, "http://localhost:8888")
59-
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
60-
lines := genRecords(10)
61-
ctx, cancel := context.WithCancel(context.Background())
62-
var err error
63-
var wg sync.WaitGroup
64-
wg.Add(1)
65-
go func() {
66-
<-time.After(time.Second)
67-
err = writeAPI.WriteRecord(ctx, lines...)
68-
wg.Done()
69-
}()
70-
cancel()
71-
wg.Wait()
72-
require.Equal(t, context.Canceled, err)
73-
assert.Len(t, service.Lines(), 0)
74-
}
75-
7659
func TestWriteParallel(t *testing.T) {
7760
service := test.NewTestService(t, "http://localhost:8888")
7861
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
79-
lines := genRecords(1000)
62+
lines := test.GenRecords(1000)
8063

8164
chanLine := make(chan string)
8265
var wg sync.WaitGroup
@@ -99,3 +82,25 @@ func TestWriteParallel(t *testing.T) {
9982

10083
service.Close()
10184
}
85+
86+
func TestWriteErrors(t *testing.T) {
87+
service := http2.NewService("http://locl:866", "", http2.DefaultOptions().SetHTTPClient(&http.Client{
88+
Timeout: 100 * time.Millisecond,
89+
Transport: &http.Transport{
90+
DialContext: (&net.Dialer{
91+
Timeout: 100 * time.Millisecond,
92+
}).DialContext,
93+
},
94+
}))
95+
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
96+
points := test.GenPoints(10)
97+
errors := 0
98+
for _, p := range points {
99+
err := writeAPI.WritePoint(context.Background(), p)
100+
if assert.Error(t, err) {
101+
errors++
102+
}
103+
}
104+
require.Equal(t, 10, errors)
105+
106+
}

api/write_test.go

+5-49
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package api
66

77
import (
88
"fmt"
9-
"math/rand"
109
"strings"
1110
"sync"
1211
"testing"
@@ -20,49 +19,6 @@ import (
2019
"github.com/stretchr/testify/require"
2120
)
2221

23-
func genPoints(num int) []*write.Point {
24-
points := make([]*write.Point, num)
25-
rand.Seed(321)
26-
27-
t := time.Now()
28-
for i := 0; i < len(points); i++ {
29-
points[i] = write.NewPoint(
30-
"test",
31-
map[string]string{
32-
"id": fmt.Sprintf("rack_%v", i%10),
33-
"vendor": "AWS",
34-
"hostname": fmt.Sprintf("host_%v", i%100),
35-
},
36-
map[string]interface{}{
37-
"temperature": rand.Float64() * 80.0,
38-
"disk_free": rand.Float64() * 1000.0,
39-
"disk_total": (i/10 + 1) * 1000000,
40-
"mem_total": (i/100 + 1) * 10000000,
41-
"mem_free": rand.Uint64(),
42-
},
43-
t)
44-
if i%10 == 0 {
45-
t = t.Add(time.Second)
46-
}
47-
}
48-
return points
49-
}
50-
51-
func genRecords(num int) []string {
52-
lines := make([]string, num)
53-
rand.Seed(321)
54-
55-
t := time.Now()
56-
for i := 0; i < len(lines); i++ {
57-
lines[i] = fmt.Sprintf("test,id=rack_%v,vendor=AWS,hostname=host_%v temperature=%v,disk_free=%v,disk_total=%vi,mem_total=%vi,mem_free=%vu %v",
58-
i%10, i%100, rand.Float64()*80.0, rand.Float64()*1000.0, (i/10+1)*1000000, (i/100+1)*10000000, rand.Uint64(), t.UnixNano())
59-
if i%10 == 0 {
60-
t = t.Add(time.Second)
61-
}
62-
}
63-
return lines
64-
}
65-
6622
func TestWriteAPIWriteDefaultTag(t *testing.T) {
6723
service := test.NewTestService(t, "http://localhost:8888")
6824
opts := write.DefaultOptions().
@@ -85,7 +41,7 @@ func TestWriteAPIWriteDefaultTag(t *testing.T) {
8541
func TestWriteAPIImpl_Write(t *testing.T) {
8642
service := test.NewTestService(t, "http://localhost:8888")
8743
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
88-
points := genPoints(10)
44+
points := test.GenPoints(10)
8945
for _, p := range points {
9046
writeAPI.WritePoint(p)
9147
}
@@ -103,7 +59,7 @@ func TestGzipWithFlushing(t *testing.T) {
10359
service := test.NewTestService(t, "http://localhost:8888")
10460
log.Log.SetLogLevel(log.DebugLevel)
10561
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true))
106-
points := genPoints(5)
62+
points := test.GenPoints(5)
10763
for _, p := range points {
10864
writeAPI.WritePoint(p)
10965
}
@@ -128,7 +84,7 @@ func TestGzipWithFlushing(t *testing.T) {
12884
func TestFlushInterval(t *testing.T) {
12985
service := test.NewTestService(t, "http://localhost:8888")
13086
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(500))
131-
points := genPoints(5)
87+
points := test.GenPoints(5)
13288
for _, p := range points {
13389
writeAPI.WritePoint(p)
13490
}
@@ -153,7 +109,7 @@ func TestRetry(t *testing.T) {
153109
service := test.NewTestService(t, "http://localhost:8888")
154110
log.Log.SetLogLevel(log.DebugLevel)
155111
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
156-
points := genPoints(15)
112+
points := test.GenPoints(15)
157113
for i := 0; i < 5; i++ {
158114
writeAPI.WritePoint(points[i])
159115
}
@@ -203,7 +159,7 @@ func TestWriteError(t *testing.T) {
203159
recErr = <-errCh
204160
wg.Done()
205161
}()
206-
points := genPoints(15)
162+
points := test.GenPoints(15)
207163
for i := 0; i < 5; i++ {
208164
writeAPI.WritePoint(points[i])
209165
}

client_test.go

+10-24
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"testing"
1313
"time"
1414

15-
http3 "github.com/influxdata/influxdb-client-go/v2/api/http"
1615
http2 "github.com/influxdata/influxdb-client-go/v2/internal/http"
1716
iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
1817
"github.com/stretchr/testify/assert"
@@ -39,8 +38,7 @@ func TestUrls(t *testing.T) {
3938
assert.Equal(t, url.serverURL, ci.serverURL)
4039
assert.Equal(t, url.serverAPIURL, ci.httpService.ServerAPIURL())
4140
ws := iwrite.NewService("org", "bucket", ci.httpService, c.Options().WriteOptions())
42-
wu, err := ws.WriteURL()
43-
require.Nil(t, err)
41+
wu := ws.WriteURL()
4442
assert.Equal(t, url.writeURLPrefix+"?bucket=bucket&org=org&precision=ns", wu)
4543
})
4644
}
@@ -94,7 +92,7 @@ func TestUserAgent(t *testing.T) {
9492
assert.Nil(t, err)
9593

9694
err = c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
97-
assert.Nil(t, err)
95+
assert.NoError(t, err)
9896
}
9997

10098
func TestServerError429(t *testing.T) {
@@ -109,12 +107,8 @@ func TestServerError429(t *testing.T) {
109107
defer server.Close()
110108
c := NewClient(server.URL, "x")
111109
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
112-
require.NotNil(t, err)
113-
perror, ok := err.(*http3.Error)
114-
require.True(t, ok)
115-
require.NotNil(t, perror)
116-
assert.Equal(t, "too many requests", perror.Code)
117-
assert.Equal(t, "exceeded rate limit", perror.Message)
110+
require.Error(t, err)
111+
assert.Equal(t, "too many requests: exceeded rate limit", err.Error())
118112
}
119113

120114
func TestServerOnPath(t *testing.T) {
@@ -130,7 +124,7 @@ func TestServerOnPath(t *testing.T) {
130124
defer server.Close()
131125
c := NewClient(server.URL+"/proxy/0:0/influx/", "x")
132126
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
133-
require.Nil(t, err)
127+
require.NoError(t, err)
134128
}
135129

136130
func TestServerErrorNonJSON(t *testing.T) {
@@ -143,12 +137,8 @@ func TestServerErrorNonJSON(t *testing.T) {
143137
defer server.Close()
144138
c := NewClient(server.URL, "x")
145139
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
146-
require.NotNil(t, err)
147-
perror, ok := err.(*http3.Error)
148-
require.True(t, ok)
149-
require.NotNil(t, perror)
150-
assert.Equal(t, "500 Internal Server Error", perror.Code)
151-
assert.Equal(t, "internal server error", perror.Message)
140+
require.Error(t, err)
141+
assert.Equal(t, "500 Internal Server Error: internal server error", err.Error())
152142
}
153143

154144
func TestServerErrorInflux1_8(t *testing.T) {
@@ -162,12 +152,8 @@ func TestServerErrorInflux1_8(t *testing.T) {
162152
defer server.Close()
163153
c := NewClient(server.URL, "x")
164154
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
165-
require.NotNil(t, err)
166-
perror, ok := err.(*http3.Error)
167-
require.True(t, ok)
168-
require.NotNil(t, perror)
169-
assert.Equal(t, "404 Not Found", perror.Code)
170-
assert.Equal(t, "bruh moment", perror.Message)
155+
require.Error(t, err)
156+
assert.Equal(t, "404 Not Found: bruh moment", err.Error())
171157
}
172158

173159
func TestServerErrorEmptyBody(t *testing.T) {
@@ -178,6 +164,6 @@ func TestServerErrorEmptyBody(t *testing.T) {
178164
defer server.Close()
179165
c := NewClient(server.URL, "x")
180166
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
181-
require.NotNil(t, err)
167+
require.Error(t, err)
182168
assert.Equal(t, "Unexpected status code 404", err.Error())
183169
}

internal/test/generators.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package test
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"time"
7+
8+
"github.com/influxdata/influxdb-client-go/v2/api/write"
9+
)
10+
11+
func GenPoints(num int) []*write.Point {
12+
points := make([]*write.Point, num)
13+
rand.Seed(321)
14+
15+
t := time.Now()
16+
for i := 0; i < len(points); i++ {
17+
points[i] = write.NewPoint(
18+
"test",
19+
map[string]string{
20+
"id": fmt.Sprintf("rack_%v", i%10),
21+
"vendor": "AWS",
22+
"hostname": fmt.Sprintf("host_%v", i%100),
23+
},
24+
map[string]interface{}{
25+
"temperature": rand.Float64() * 80.0,
26+
"disk_free": rand.Float64() * 1000.0,
27+
"disk_total": (i/10 + 1) * 1000000,
28+
"mem_total": (i/100 + 1) * 10000000,
29+
"mem_free": rand.Uint64(),
30+
},
31+
t)
32+
if i%10 == 0 {
33+
t = t.Add(time.Second)
34+
}
35+
}
36+
return points
37+
}
38+
39+
func GenRecords(num int) []string {
40+
lines := make([]string, num)
41+
rand.Seed(321)
42+
43+
t := time.Now()
44+
for i := 0; i < len(lines); i++ {
45+
lines[i] = fmt.Sprintf("test,id=rack_%v,vendor=AWS,hostname=host_%v temperature=%v,disk_free=%v,disk_total=%vi,mem_total=%vi,mem_free=%vu %v",
46+
i%10, i%100, rand.Float64()*80.0, rand.Float64()*1000.0, (i/10+1)*1000000, (i/100+1)*10000000, rand.Uint64(), t.UnixNano())
47+
if i%10 == 0 {
48+
t = t.Add(time.Second)
49+
}
50+
}
51+
return lines
52+
}

0 commit comments

Comments
 (0)