From a5e151eca65fff27a2571480526c3b0c694b20ca Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 15 May 2020 07:19:28 +0200 Subject: [PATCH 1/4] refactor: QueryApi, WriteApi, WriteApiBlocking and related objects moved into the api and bellow packages (api/write, api/query) --- .circleci/config.yml | 2 +- CHANGELOG.md | 9 +- api/doc.go | 135 +------- api/examples_test.go | 289 +++++++++++++++++- api/http/options.go | 44 +++ query.go => api/query.go | 25 +- table.go => api/query/table.go | 22 +- table_test.go => api/query/table_test.go | 14 +- query_test.go => api/query_test.go | 245 +++++++-------- write.go => api/write.go | 51 ++-- api/write/ext.go | 93 ++++++ api/write/options.go | 110 +++++++ point.go => api/write/point.go | 2 +- point_test.go => api/write/point_test.go | 100 +----- .../writeApiBlocking.go | 22 +- .../writeApiBlocking_test.go | 31 +- write_test.go => api/write_test.go | 69 ++--- client.go | 26 +- client_e2e_test.go | 76 ++--- compatibility.go | 28 ++ examples_test.go | 149 +-------- go.sum | 4 + {api => internal/examples}/fakeclient.go | 40 ++- internal/http/service.go | 9 +- internal/log/logger.go | 2 + internal/test/point.go | 1 + queue.go => internal/write/queue.go | 16 +- queue_test.go => internal/write/queue_test.go | 4 +- .../write/writeService.go | 72 +++-- options.go | 81 ++--- 30 files changed, 997 insertions(+), 774 deletions(-) create mode 100644 api/http/options.go rename query.go => api/query.go (94%) rename table.go => api/query/table.go (85%) rename table_test.go => api/query/table_test.go (95%) rename query_test.go => api/query_test.go (78%) rename write.go => api/write.go (76%) create mode 100644 api/write/ext.go create mode 100644 api/write/options.go rename point.go => api/write/point.go (99%) rename point_test.go => api/write/point_test.go (71%) rename writeApiBlocking.go => api/writeApiBlocking.go (70%) rename writeApiBlocking_test.go => api/writeApiBlocking_test.go (71%) rename write_test.go => api/write_test.go (80%) create mode 100644 compatibility.go rename {api => internal/examples}/fakeclient.go (64%) create mode 100644 internal/test/point.go rename queue.go => internal/write/queue.go (74%) rename queue_test.go => internal/write/queue_test.go (88%) rename writeService.go => internal/write/writeService.go (64%) diff --git a/.circleci/config.yml b/.circleci/config.yml index 52e1030f..b7c97849 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -39,7 +39,7 @@ jobs: mkdir -p /tmp/artifacts - run: command: | - go test -v -e2e -race -coverprofile=coverage.txt -covermode=atomic ./... + go test -v -race -coverprofile=coverage.txt -covermode=atomic ./... --tags e2e bash <(curl -s https://codecov.io/bash) go tool cover -html=coverage.txt -o coverage.html mv coverage.html /tmp/artifacts diff --git a/CHANGELOG.md b/CHANGELOG.md index 577765a1..0246f2fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,15 @@ ## 1.2.0 [in progress] + +### Breaking Changes + - [#107](https://github.com/influxdata/influxdb-client-go/pull/107) Renamed `InfluxDBClient` interface to `Client`, so the full name `influxdb2.Client` suits better to Go naming conventions + - [#125](https://github.com/influxdata/influxdb-client-go/pull/125) `WriteApi`,`WriteApiBlocking`,`QueryApi` interfaces and related objects like `Point`, `FluxTableMetadata`, `FluxTableColumn`, `FluxRecord`, moved to the `api` ( and `api/write`, `api/query`) packages + to provide consistent interface + ### Features 1. [#120](https://github.com/influxdata/influxdb-client-go/pull/120) Health check API 1. [#122](https://github.com/influxdata/influxdb-client-go/pull/122) Delete API 1. [#124](https://github.com/influxdata/influxdb-client-go/pull/124) Buckets API -### Breaking Change - - [#107](https://github.com/influxdata/influxdb-client-go/pull/100) Renamed `InfluxDBClient` interface to `Client`, so the full name `influxdb2.Client` suits better to Go naming conventions - ### Bug fixes 1. [#108](https://github.com/influxdata/influxdb-client-go/issues/108) Fix default retry interval doc 1. [#110](https://github.com/influxdata/influxdb-client-go/issues/110) Allowing empty (nil) values in query result diff --git a/api/doc.go b/api/doc.go index be6f5dbe..cd10f880 100644 --- a/api/doc.go +++ b/api/doc.go @@ -2,138 +2,5 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -// Package api provides clients for InfluxDB server APIs -// -// Examples -// -// Users API -// -// // Create influxdb client -// client := influxdb2.NewClient("http://localhost:9999", "my-token") -// -// // Find organization -// org, err := client.OrganizationsApi().FindOrganizationByName(context.Background(), "my-org") -// if err != nil { -// panic(err) -// } -// -// // Get users API client -// usersApi := client.UsersApi() -// -// // Create new user -// user, err := usersApi.CreateUserWithName(context.Background(), "user-01") -// if err != nil { -// panic(err) -// } -// -// // Set user password -// err = usersApi.UpdateUserPassword(context.Background(), user, "pass-at-least-8-chars") -// if err != nil { -// panic(err) -// } -// -// // Add user to organization -// _, err = client.OrganizationsApi().AddMember(context.Background(), org, user) -// if err != nil { -// panic(err) -// } -// -// Organizations API -// -// // Create influxdb client -// client := influxdb2.NewClient("http://localhost:9999", "my-token") -// -// // Get Organizations API client -// orgApi := client.OrganizationsApi() -// -// // Create new organization -// org, err := orgApi.CreateOrganizationWithName(context.Background(), "org-2") -// if err != nil { -// panic(err) -// } -// -// orgDescription := "My second org " -// org.Description = &orgDescription -// -// org, err = orgApi.UpdateOrganization(context.Background(), org) -// if err != nil { -// panic(err) -// } -// -// // Find user to set owner -// user, err := client.UsersApi().FindUserByName(context.Background(), "user-01") -// if err != nil { -// panic(err) -// } -// -// // Add another owner (first owner is the one who create organization -// _, err = orgApi.AddOwner(context.Background(), org, user) -// if err != nil { -// panic(err) -// } -// -// // Create new user to add to org -// newUser, err := client.UsersApi().CreateUserWithName(context.Background(), "user-02") -// if err != nil { -// panic(err) -// } -// -// // Add new user to organization -// _, err = orgApi.AddMember(context.Background(), org, newUser) -// if err != nil { -// panic(err) -// } -// -// Authorizations API -// -// // Create influxdb client -// client := influxdb2.NewClient("http://localhost:9999", "my-token") -// -// // Find user to grant permission -// user, err := client.UsersApi().FindUserByName(context.Background(), "user-01") -// if err != nil { -// panic(err) -// } -// -// // Find organization -// org, err := client.OrganizationsApi().FindOrganizationByName(context.Background(), "my-org") -// if err != nil { -// panic(err) -// } -// -// // create write permission for buckets -// permissionWrite := &domain.Permission{ -// Action: domain.PermissionActionWrite, -// Resource: domain.Resource{ -// Type: domain.ResourceTypeBuckets, -// }, -// } -// -// // create read permission for buckets -// permissionRead := &domain.Permission{ -// Action: domain.PermissionActionRead, -// Resource: domain.Resource{ -// Type: domain.ResourceTypeBuckets, -// }, -// } -// -// // group permissions -// permissions := []domain.Permission{*permissionWrite, *permissionRead} -// -// // create authorization object using info above -// auth := &domain.Authorization{ -// OrgID: org.Id, -// Permissions: &permissions, -// User: &user.Name, -// UserID: user.Id, -// } -// -// // grant permission and create token -// authCreated, err := client.AuthorizationsApi().CreateAuthorization(context.Background(), auth) -// if err != nil { -// panic(err) -// } -// -// // Use token -// fmt.Println("Token: ", *authCreated.Token) +// Package api provides clients for InfluxDB server APIs. package api diff --git a/api/examples_test.go b/api/examples_test.go index 25c55f60..7af30d29 100644 --- a/api/examples_test.go +++ b/api/examples_test.go @@ -2,8 +2,13 @@ package api_test import ( "context" - influxdb2 "github.com/influxdata/influxdb-client-go/api" + "fmt" + "github.com/influxdata/influxdb-client-go/api" + "github.com/influxdata/influxdb-client-go/api/write" "github.com/influxdata/influxdb-client-go/domain" + influxdb2 "github.com/influxdata/influxdb-client-go/internal/examples" + "math/rand" + "time" ) func ExampleBucketsApi() { @@ -36,3 +41,285 @@ func ExampleBucketsApi() { // Close the client client.Close() } + +func ExampleWriteApiBlocking() { + // Create client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + // Get blocking write client + writeApi := client.WriteApiBlocking("my-org", "my-bucket") + // write some points + for i := 0; i < 100; i++ { + // create data point + p := write.NewPoint( + "system", + map[string]string{ + "id": fmt.Sprintf("rack_%v", i%10), + "vendor": "AWS", + "hostname": fmt.Sprintf("host_%v", i%100), + }, + map[string]interface{}{ + "temperature": rand.Float64() * 80.0, + "disk_free": rand.Float64() * 1000.0, + "disk_total": (i/10 + 1) * 1000000, + "mem_total": (i/100 + 1) * 10000000, + "mem_free": rand.Uint64(), + }, + time.Now()) + // write synchronously + err := writeApi.WritePoint(context.Background(), p) + if err != nil { + panic(err) + } + } + // Ensures background processes finishes + client.Close() +} + +func ExampleWriteApi() { + // Create client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + // Get non-blocking write client + writeApi := client.WriteApi("my-org", "my-bucket") + // write some points + for i := 0; i < 100; i++ { + // create point + p := write.NewPoint( + "system", + map[string]string{ + "id": fmt.Sprintf("rack_%v", i%10), + "vendor": "AWS", + "hostname": fmt.Sprintf("host_%v", i%100), + }, + map[string]interface{}{ + "temperature": rand.Float64() * 80.0, + "disk_free": rand.Float64() * 1000.0, + "disk_total": (i/10 + 1) * 1000000, + "mem_total": (i/100 + 1) * 10000000, + "mem_free": rand.Uint64(), + }, + time.Now()) + // write asynchronously + writeApi.WritePoint(p) + } + // Force all unwritten data to be sent + writeApi.Flush() + // Ensures background processes finishes + client.Close() +} + +func ExampleWriteApi_errors() { + // Create client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + // Get non-blocking write client + writeApi := client.WriteApi("my-org", "my-bucket") + // Get errors channel + errorsCh := writeApi.Errors() + // Create go proc for reading and logging errors + go func() { + for err := range errorsCh { + fmt.Printf("write error: %s\n", err.Error()) + } + }() + // write some points + for i := 0; i < 100; i++ { + // create point + p := write.NewPointWithMeasurement("stat"). + AddTag("id", fmt.Sprintf("rack_%v", i%10)). + AddTag("vendor", "AWS"). + AddTag("hostname", fmt.Sprintf("host_%v", i%100)). + AddField("temperature", rand.Float64()*80.0). + AddField("disk_free", rand.Float64()*1000.0). + AddField("disk_total", (i/10+1)*1000000). + AddField("mem_total", (i/100+1)*10000000). + AddField("mem_free", rand.Uint64()). + SetTime(time.Now()) + // write asynchronously + writeApi.WritePoint(p) + } + // Force all unwritten data to be sent + writeApi.Flush() + // Ensures background processes finishes + client.Close() +} + +func ExampleQueryApi_query() { + // Create client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + // Get query client + queryApi := client.QueryApi("my-org") + // get QueryTableResult + result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`) + if err == nil { + // Iterate over query response + for result.Next() { + // Notice when group key has changed + if result.TableChanged() { + fmt.Printf("table: %s\n", result.TableMetadata().String()) + } + // Access data + fmt.Printf("value: %v\n", result.Record().Value()) + } + // check for an error + if result.Err() != nil { + fmt.Printf("query parsing error: %s\n", result.Err().Error()) + } + } else { + panic(err) + } + // Ensures background processes finishes + client.Close() +} + +func ExampleQueryApi_queryRaw() { + // Create client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + // Get query client + queryApi := client.QueryApi("my-org") + // Query and get complete result as a string + // Use default dialect + result, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, api.DefaultDialect()) + if err == nil { + fmt.Println("QueryResult:") + fmt.Println(result) + } else { + panic(err) + } + // Ensures background processes finishes + client.Close() +} + +func ExampleOrganizationsApi() { + // Create influxdb client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + + // Get Organizations API client + orgApi := client.OrganizationsApi() + + // Create new organization + org, err := orgApi.CreateOrganizationWithName(context.Background(), "org-2") + if err != nil { + panic(err) + } + + orgDescription := "My second org " + org.Description = &orgDescription + + org, err = orgApi.UpdateOrganization(context.Background(), org) + if err != nil { + panic(err) + } + + // Find user to set owner + user, err := client.UsersApi().FindUserByName(context.Background(), "user-01") + if err != nil { + panic(err) + } + + // Add another owner (first owner is the one who create organization + _, err = orgApi.AddOwner(context.Background(), org, user) + if err != nil { + panic(err) + } + + // Create new user to add to org + newUser, err := client.UsersApi().CreateUserWithName(context.Background(), "user-02") + if err != nil { + panic(err) + } + + // Add new user to organization + _, err = orgApi.AddMember(context.Background(), org, newUser) + if err != nil { + panic(err) + } + // Ensures background processes finishes + client.Close() +} + +func ExampleAuthorizationsApi() { + // Create influxdb client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + + // Find user to grant permission + user, err := client.UsersApi().FindUserByName(context.Background(), "user-01") + if err != nil { + panic(err) + } + + // Find organization + org, err := client.OrganizationsApi().FindOrganizationByName(context.Background(), "my-org") + if err != nil { + panic(err) + } + + // create write permission for buckets + permissionWrite := &domain.Permission{ + Action: domain.PermissionActionWrite, + Resource: domain.Resource{ + Type: domain.ResourceTypeBuckets, + }, + } + + // create read permission for buckets + permissionRead := &domain.Permission{ + Action: domain.PermissionActionRead, + Resource: domain.Resource{ + Type: domain.ResourceTypeBuckets, + }, + } + + // group permissions + permissions := []domain.Permission{*permissionWrite, *permissionRead} + + // create authorization object using info above + auth := &domain.Authorization{ + OrgID: org.Id, + Permissions: &permissions, + User: &user.Name, + UserID: user.Id, + } + + // grant permission and create token + authCreated, err := client.AuthorizationsApi().CreateAuthorization(context.Background(), auth) + if err != nil { + panic(err) + } + // Use token + fmt.Println("Token: ", *authCreated.Token) + // Ensures background processes finishes + client.Close() +} + +func ExampleUsersApi() { + // Create influxdb client + client := influxdb2.NewClient("http://localhost:9999", "my-token") + + // Find organization + org, err := client.OrganizationsApi().FindOrganizationByName(context.Background(), "my-org") + if err != nil { + panic(err) + } + + // Get users API client + usersApi := client.UsersApi() + + // Create new user + user, err := usersApi.CreateUserWithName(context.Background(), "user-01") + if err != nil { + panic(err) + } + + // Set user password + err = usersApi.UpdateUserPassword(context.Background(), user, "pass-at-least-8-chars") + if err != nil { + panic(err) + } + + // Add user to organization + _, err = client.OrganizationsApi().AddMember(context.Background(), org, user) + if err != nil { + panic(err) + } + // Ensures background processes finishes + client.Close() +} diff --git a/api/http/options.go b/api/http/options.go new file mode 100644 index 00000000..f92b835c --- /dev/null +++ b/api/http/options.go @@ -0,0 +1,44 @@ +// Copyright 2020 InfluxData, Inc. All rights reserved. +// Use of this source code is governed by MIT +// license that can be found in the LICENSE file. + +package http + +import ( + "crypto/tls" +) + +// Options holds http configuration properties for communicating with InfluxDB server +type Options struct { + // TLS configuration for secure connection. Default nil + tlsConfig *tls.Config + // HTTP request timeout in sec. Default 20 + httpRequestTimeout uint +} + +// TlsConfig returns TlsConfig +func (o *Options) TlsConfig() *tls.Config { + return o.tlsConfig +} + +// SetTlsConfig sets TLS configuration for secure connection +func (o *Options) SetTlsConfig(tlsConfig *tls.Config) *Options { + o.tlsConfig = tlsConfig + return o +} + +// HttpRequestTimeout returns HTTP request timeout +func (o *Options) HttpRequestTimeout() uint { + return o.httpRequestTimeout +} + +// SetHttpRequestTimeout sets HTTP request timeout in sec +func (o *Options) SetHttpRequestTimeout(httpRequestTimeout uint) *Options { + o.httpRequestTimeout = httpRequestTimeout + return o +} + +// DefaultOptions returns Options object with default values +func DefaultOptions() *Options { + return &Options{httpRequestTimeout: 20} +} diff --git a/query.go b/api/query.go similarity index 94% rename from query.go rename to api/query.go index 3720d3b9..edf7b0eb 100644 --- a/query.go +++ b/api/query.go @@ -2,7 +2,7 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package api import ( "bytes" @@ -13,7 +13,6 @@ import ( "encoding/json" "errors" "fmt" - ihttp "github.com/influxdata/influxdb-client-go/internal/http" "io" "io/ioutil" "net/http" @@ -24,7 +23,9 @@ import ( "sync" "time" + "github.com/influxdata/influxdb-client-go/api/query" "github.com/influxdata/influxdb-client-go/domain" + ihttp "github.com/influxdata/influxdb-client-go/internal/http" ) const ( @@ -47,11 +48,10 @@ type QueryApi interface { Query(ctx context.Context, query string) (*QueryTableResult, error) } -func newQueryApi(org string, service ihttp.Service, client Client) QueryApi { +func NewQueryApi(org string, service ihttp.Service) QueryApi { return &queryApiImpl{ org: org, httpService: service, - client: client, } } @@ -59,7 +59,6 @@ func newQueryApi(org string, service ihttp.Service, client Client) QueryApi { type queryApiImpl struct { org string httpService ihttp.Service - client Client url string lock sync.Mutex } @@ -174,8 +173,8 @@ type QueryTableResult struct { csvReader *csv.Reader tablePosition int tableChanged bool - table *FluxTableMetadata - record *FluxRecord + table *query.FluxTableMetadata + record *query.FluxRecord err error } @@ -183,13 +182,13 @@ type QueryTableResult struct { // Each new table is introduced by the #dataType annotation in csv func (q *QueryTableResult) TablePosition() int { if q.table != nil { - return q.table.position + return q.table.Position() } return -1 } // TableMetadata returns actual flux table metadata -func (q *QueryTableResult) TableMetadata() *FluxTableMetadata { +func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata { return q.table } @@ -201,7 +200,7 @@ func (q *QueryTableResult) TableChanged() bool { // Record returns last parsed flux table data row // Use Record methods to access value and row properties -func (q *QueryTableResult) Record() *FluxRecord { +func (q *QueryTableResult) Record() *query.FluxRecord { return q.record } @@ -292,13 +291,13 @@ readRow: } } } - q.record = newFluxRecord(q.table.Position(), values) + q.record = query.NewFluxRecord(q.table.Position(), values) case "#datatype": - q.table = newFluxTableMetadata(q.tablePosition) + q.table = query.NewFluxTableMetadata(q.tablePosition) q.tablePosition++ q.tableChanged = true for i, d := range row[1:] { - q.table.AddColumn(newFluxColumn(i, d)) + q.table.AddColumn(query.NewFluxColumn(i, d)) } goto readRow case "#group": diff --git a/table.go b/api/query/table.go similarity index 85% rename from table.go rename to api/query/table.go index 3fa2e2c5..e09757d1 100644 --- a/table.go +++ b/api/query/table.go @@ -2,7 +2,7 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package query import ( "fmt" @@ -32,9 +32,14 @@ type FluxRecord struct { values map[string]interface{} } -// newFluxTableMetadata creates FluxTableMetadata for the table on position -func newFluxTableMetadata(position int) *FluxTableMetadata { - return &FluxTableMetadata{position: position, columns: make([]*FluxColumn, 0, 10)} +// NewFluxTableMetadata creates FluxTableMetadata for the table on position +func NewFluxTableMetadata(position int) *FluxTableMetadata { + return NewFluxTableMetadataFull(position, make([]*FluxColumn, 0, 10)) +} + +// NewFluxTableMetadataFull creates FluxTableMetadata +func NewFluxTableMetadataFull(position int, columns []*FluxColumn) *FluxTableMetadata { + return &FluxTableMetadata{position: position, columns: columns} } // Position returns position of the table in the flux query result @@ -76,10 +81,15 @@ func (f *FluxTableMetadata) String() string { } // newFluxColumn creates FluxColumn for position and data type -func newFluxColumn(index int, dataType string) *FluxColumn { +func NewFluxColumn(index int, dataType string) *FluxColumn { return &FluxColumn{index: index, dataType: dataType} } +// newFluxColumn creates FluxColumn +func NewFluxColumnFull(dataType string, defaultValue string, name string, group bool, index int) *FluxColumn { + return &FluxColumn{index: index, name: name, dataType: dataType, group: group, defaultValue: defaultValue} +} + // SetDefaultValue sets default value for the column func (f *FluxColumn) SetDefaultValue(defaultValue string) { f.defaultValue = defaultValue @@ -131,7 +141,7 @@ func (f *FluxColumn) String() string { } // newFluxRecord returns new record for the table with values -func newFluxRecord(table int, values map[string]interface{}) *FluxRecord { +func NewFluxRecord(table int, values map[string]interface{}) *FluxRecord { return &FluxRecord{table: table, values: values} } diff --git a/table_test.go b/api/query/table_test.go similarity index 95% rename from table_test.go rename to api/query/table_test.go index b3c35b45..51b87bda 100644 --- a/table_test.go +++ b/api/query/table_test.go @@ -2,14 +2,24 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package query import ( + "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" ) +func mustParseTime(s string) time.Time { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return t +} + func TestTable(t *testing.T) { table := &FluxTableMetadata{position: 1} table.AddColumn(&FluxColumn{dataType: "string", defaultValue: "_result", name: "result", group: false, index: 0}) diff --git a/query_test.go b/api/query_test.go similarity index 78% rename from query_test.go rename to api/query_test.go index 8e5cd6f5..662e7672 100644 --- a/query_test.go +++ b/api/query_test.go @@ -2,13 +2,16 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package api import ( "context" "encoding/csv" "fmt" + http2 "github.com/influxdata/influxdb-client-go/api/http" + "github.com/influxdata/influxdb-client-go/api/query" "github.com/influxdata/influxdb-client-go/internal/gzip" + ihttp "github.com/influxdata/influxdb-client-go/internal/http" "github.com/stretchr/testify/assert" "io/ioutil" "net/http" @@ -37,22 +40,22 @@ func TestQueryCVSResultSingleTable(t *testing.T) { ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf ` - expectedTable := &FluxTableMetadata{position: 0, - columns: []*FluxColumn{ - {dataType: "string", defaultValue: "_result", name: "result", group: false, index: 0}, - {dataType: "long", defaultValue: "", name: "table", group: false, index: 1}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_start", group: true, index: 2}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_stop", group: true, index: 3}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_time", group: false, index: 4}, - {dataType: "double", defaultValue: "", name: "_value", group: false, index: 5}, - {dataType: "string", defaultValue: "", name: "_field", group: true, index: 6}, - {dataType: "string", defaultValue: "", name: "_measurement", group: true, index: 7}, - {dataType: "string", defaultValue: "", name: "a", group: true, index: 8}, - {dataType: "string", defaultValue: "", name: "b", group: true, index: 9}, + expectedTable := query.NewFluxTableMetadataFull(0, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4), + query.NewFluxColumnFull("double", "", "_value", false, 5), + query.NewFluxColumnFull("string", "", "_field", true, 6), + query.NewFluxColumnFull("string", "", "_measurement", true, 7), + query.NewFluxColumnFull("string", "", "a", true, 8), + query.NewFluxColumnFull("string", "", "b", true, 9), }, - } - expectedRecord1 := &FluxRecord{table: 0, - values: map[string]interface{}{ + ) + expectedRecord1 := query.NewFluxRecord(0, + map[string]interface{}{ "result": "_result", "table": int64(0), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -64,10 +67,10 @@ func TestQueryCVSResultSingleTable(t *testing.T) { "a": "1", "b": "adsfasdf", }, - } + ) - expectedRecord2 := &FluxRecord{table: 0, - values: map[string]interface{}{ + expectedRecord2 := query.NewFluxRecord(0, + map[string]interface{}{ "result": "_result", "table": int64(0), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -79,7 +82,7 @@ func TestQueryCVSResultSingleTable(t *testing.T) { "a": "1", "b": "adsfasdf", }, - } + ) reader := strings.NewReader(csvTable) csvReader := csv.NewReader(reader) @@ -133,22 +136,22 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,2,i,test,0,adsfasdf ` - expectedTable1 := &FluxTableMetadata{position: 0, - columns: []*FluxColumn{ - {dataType: "string", defaultValue: "_result", name: "result", group: false, index: 0}, - {dataType: "long", defaultValue: "", name: "table", group: false, index: 1}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_start", group: true, index: 2}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_stop", group: true, index: 3}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_time", group: false, index: 4}, - {dataType: "double", defaultValue: "", name: "_value", group: false, index: 5}, - {dataType: "string", defaultValue: "", name: "_field", group: true, index: 6}, - {dataType: "string", defaultValue: "", name: "_measurement", group: true, index: 7}, - {dataType: "string", defaultValue: "", name: "a", group: true, index: 8}, - {dataType: "string", defaultValue: "", name: "b", group: true, index: 9}, + expectedTable1 := query.NewFluxTableMetadataFull(0, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4), + query.NewFluxColumnFull("double", "", "_value", false, 5), + query.NewFluxColumnFull("string", "", "_field", true, 6), + query.NewFluxColumnFull("string", "", "_measurement", true, 7), + query.NewFluxColumnFull("string", "", "a", true, 8), + query.NewFluxColumnFull("string", "", "b", true, 9), }, - } - expectedRecord11 := &FluxRecord{table: 0, - values: map[string]interface{}{ + ) + expectedRecord11 := query.NewFluxRecord(0, + map[string]interface{}{ "result": "_result", "table": int64(0), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -160,9 +163,9 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "1", "b": "adsfasdf", }, - } - expectedRecord12 := &FluxRecord{table: 0, - values: map[string]interface{}{ + ) + expectedRecord12 := query.NewFluxRecord(0, + map[string]interface{}{ "result": "_result", "table": int64(0), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -174,24 +177,24 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "1", "b": "adsfasdf", }, - } - - expectedTable2 := &FluxTableMetadata{position: 1, - columns: []*FluxColumn{ - {dataType: "string", defaultValue: "_result", name: "result", group: false, index: 0}, - {dataType: "long", defaultValue: "", name: "table", group: false, index: 1}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_start", group: true, index: 2}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_stop", group: true, index: 3}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_time", group: false, index: 4}, - {dataType: "long", defaultValue: "", name: "_value", group: false, index: 5}, - {dataType: "string", defaultValue: "", name: "_field", group: true, index: 6}, - {dataType: "string", defaultValue: "", name: "_measurement", group: true, index: 7}, - {dataType: "string", defaultValue: "", name: "a", group: true, index: 8}, - {dataType: "string", defaultValue: "", name: "b", group: true, index: 9}, + ) + + expectedTable2 := query.NewFluxTableMetadataFull(1, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4), + query.NewFluxColumnFull("long", "", "_value", false, 5), + query.NewFluxColumnFull("string", "", "_field", true, 6), + query.NewFluxColumnFull("string", "", "_measurement", true, 7), + query.NewFluxColumnFull("string", "", "a", true, 8), + query.NewFluxColumnFull("string", "", "b", true, 9), }, - } - expectedRecord21 := &FluxRecord{table: 1, - values: map[string]interface{}{ + ) + expectedRecord21 := query.NewFluxRecord(1, + map[string]interface{}{ "result": "_result", "table": int64(1), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -203,9 +206,9 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "1", "b": "adsfasdf", }, - } - expectedRecord22 := &FluxRecord{table: 1, - values: map[string]interface{}{ + ) + expectedRecord22 := query.NewFluxRecord(1, + map[string]interface{}{ "result": "_result", "table": int64(1), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -217,24 +220,24 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "1", "b": "adsfasdf", }, - } - - expectedTable3 := &FluxTableMetadata{position: 2, - columns: []*FluxColumn{ - {dataType: "string", defaultValue: "_result", name: "result", group: false, index: 0}, - {dataType: "long", defaultValue: "", name: "table", group: false, index: 1}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_start", group: true, index: 2}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_stop", group: true, index: 3}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_time", group: false, index: 4}, - {dataType: "bool", defaultValue: "", name: "_value", group: false, index: 5}, - {dataType: "string", defaultValue: "", name: "_field", group: true, index: 6}, - {dataType: "string", defaultValue: "", name: "_measurement", group: true, index: 7}, - {dataType: "string", defaultValue: "", name: "a", group: true, index: 8}, - {dataType: "string", defaultValue: "", name: "b", group: true, index: 9}, + ) + + expectedTable3 := query.NewFluxTableMetadataFull(2, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4), + query.NewFluxColumnFull("bool", "", "_value", false, 5), + query.NewFluxColumnFull("string", "", "_field", true, 6), + query.NewFluxColumnFull("string", "", "_measurement", true, 7), + query.NewFluxColumnFull("string", "", "a", true, 8), + query.NewFluxColumnFull("string", "", "b", true, 9), }, - } - expectedRecord31 := &FluxRecord{table: 2, - values: map[string]interface{}{ + ) + expectedRecord31 := query.NewFluxRecord(2, + map[string]interface{}{ "result": "_result", "table": int64(2), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -246,9 +249,9 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "0", "b": "adsfasdf", }, - } - expectedRecord32 := &FluxRecord{table: 2, - values: map[string]interface{}{ + ) + expectedRecord32 := query.NewFluxRecord(2, + map[string]interface{}{ "result": "_result", "table": int64(2), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -260,24 +263,24 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "0", "b": "adsfasdf", }, - } - - expectedTable4 := &FluxTableMetadata{position: 3, - columns: []*FluxColumn{ - {dataType: "string", defaultValue: "_result", name: "result", group: false, index: 0}, - {dataType: "long", defaultValue: "", name: "table", group: false, index: 1}, - {dataType: "dateTime:RFC3339Nano", defaultValue: "", name: "_start", group: true, index: 2}, - {dataType: "dateTime:RFC3339Nano", defaultValue: "", name: "_stop", group: true, index: 3}, - {dataType: "dateTime:RFC3339Nano", defaultValue: "", name: "_time", group: false, index: 4}, - {dataType: "unsignedLong", defaultValue: "", name: "_value", group: false, index: 5}, - {dataType: "string", defaultValue: "", name: "_field", group: true, index: 6}, - {dataType: "string", defaultValue: "", name: "_measurement", group: true, index: 7}, - {dataType: "string", defaultValue: "", name: "a", group: true, index: 8}, - {dataType: "string", defaultValue: "", name: "b", group: true, index: 9}, + ) + + expectedTable4 := query.NewFluxTableMetadataFull(3, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_start", true, 2), + query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_stop", true, 3), + query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_time", false, 4), + query.NewFluxColumnFull("unsignedLong", "", "_value", false, 5), + query.NewFluxColumnFull("string", "", "_field", true, 6), + query.NewFluxColumnFull("string", "", "_measurement", true, 7), + query.NewFluxColumnFull("string", "", "a", true, 8), + query.NewFluxColumnFull("string", "", "b", true, 9), }, - } - expectedRecord41 := &FluxRecord{table: 3, - values: map[string]interface{}{ + ) + expectedRecord41 := query.NewFluxRecord(3, + map[string]interface{}{ "result": "_result", "table": int64(3), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -289,9 +292,9 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "0", "b": "adsfasdf", }, - } - expectedRecord42 := &FluxRecord{table: 3, - values: map[string]interface{}{ + ) + expectedRecord42 := query.NewFluxRecord(3, + map[string]interface{}{ "result": "_result", "table": int64(3), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), @@ -303,7 +306,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { "a": "0", "b": "adsfasdf", }, - } + ) reader := strings.NewReader(csvTable) csvReader := csv.NewReader(reader) @@ -393,22 +396,22 @@ func TestQueryCVSResultSingleTableMultiColumnsNoValue(t *testing.T) { ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z ,,1,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z ` - expectedTable := &FluxTableMetadata{position: 0, - columns: []*FluxColumn{ - {dataType: "string", defaultValue: "_result", name: "result", group: false, index: 0}, - {dataType: "long", defaultValue: "", name: "table", group: false, index: 1}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_start", group: true, index: 2}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_stop", group: true, index: 3}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "_time", group: false, index: 4}, - {dataType: "long", defaultValue: "", name: "deviceId", group: true, index: 5}, - {dataType: "string", defaultValue: "", name: "sensor", group: true, index: 6}, - {dataType: "duration", defaultValue: "", name: "elapsed", group: false, index: 7}, - {dataType: "base64Binary", defaultValue: "", name: "note", group: false, index: 8}, - {dataType: "dateTime:RFC3339", defaultValue: "", name: "start", group: false, index: 9}, + expectedTable := query.NewFluxTableMetadataFull(0, + []*query.FluxColumn{ + query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("long", "", "table", false, 1), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), + query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4), + query.NewFluxColumnFull("long", "", "deviceId", true, 5), + query.NewFluxColumnFull("string", "", "sensor", true, 6), + query.NewFluxColumnFull("duration", "", "elapsed", false, 7), + query.NewFluxColumnFull("base64Binary", "", "note", false, 8), + query.NewFluxColumnFull("dateTime:RFC3339", "", "start", false, 9), }, - } - expectedRecord1 := &FluxRecord{table: 0, - values: map[string]interface{}{ + ) + expectedRecord1 := query.NewFluxRecord(0, + map[string]interface{}{ "result": "_result", "table": int64(0), "_start": mustParseTime("2020-04-28T12:36:50.990018157Z"), @@ -420,10 +423,10 @@ func TestQueryCVSResultSingleTableMultiColumnsNoValue(t *testing.T) { "note": []byte("datainbase64"), "start": time.Date(2020, 4, 27, 0, 0, 0, 0, time.UTC), }, - } + ) - expectedRecord2 := &FluxRecord{table: 0, - values: map[string]interface{}{ + expectedRecord2 := query.NewFluxRecord(0, + map[string]interface{}{ "result": "_result", "table": int64(1), "_start": mustParseTime("2020-04-28T12:36:50.990018157Z"), @@ -435,7 +438,7 @@ func TestQueryCVSResultSingleTableMultiColumnsNoValue(t *testing.T) { "note": []byte("xxxxxccccccddddd"), "start": time.Date(2020, 4, 28, 0, 0, 0, 0, time.UTC), }, - } + ) reader := strings.NewReader(csvTable) csvReader := csv.NewReader(reader) @@ -505,8 +508,7 @@ func TestQueryRawResult(t *testing.T) { } })) defer server.Close() - client := NewClient(server.URL, "a") - queryApi := client.QueryApi("org") + queryApi := NewQueryApi("org", ihttp.NewService(server.URL, "a", http2.DefaultOptions())) result, err := queryApi.QueryRaw(context.Background(), "flux", nil) require.Nil(t, err) @@ -692,8 +694,7 @@ func TestFluxError(t *testing.T) { } })) defer server.Close() - client := NewClient(server.URL, "a") - queryApi := client.QueryApi("org") + queryApi := NewQueryApi("org", ihttp.NewService(server.URL, "a", http2.DefaultOptions())) result, err := queryApi.QueryRaw(context.Background(), "errored flux", nil) assert.Equal(t, "", result) diff --git a/write.go b/api/write.go similarity index 76% rename from write.go rename to api/write.go index 70d77c93..b1e23f9f 100644 --- a/write.go +++ b/api/write.go @@ -2,11 +2,14 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package api import ( "context" + "github.com/influxdata/influxdb-client-go/api/write" "github.com/influxdata/influxdb-client-go/internal/http" + "github.com/influxdata/influxdb-client-go/internal/log" + iwrite "github.com/influxdata/influxdb-client-go/internal/write" "strings" "time" ) @@ -20,7 +23,7 @@ type WriteApi interface { // WritePoint writes asynchronously Point into bucket. // WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size. // Blocking alternative is available in the WriteApiBlocking interface - WritePoint(point *Point) + WritePoint(point *write.Point) // Flush forces all pending writes from the buffer to be sent Flush() // Flushes all pending writes and stop async processes. After this the Write client cannot be used @@ -32,10 +35,10 @@ type WriteApi interface { } type writeApiImpl struct { - service *writeService + service *iwrite.Service writeBuffer []string - writeCh chan *batch + writeCh chan *iwrite.Batch bufferCh chan string writeStop chan int bufferStop chan int @@ -44,17 +47,18 @@ type writeApiImpl struct { errCh chan error bufferInfoCh chan writeBuffInfoReq writeInfoCh chan writeBuffInfoReq + writeOptions *write.Options } type writeBuffInfoReq struct { writeBuffLen int } -func newWriteApiImpl(org string, bucket string, service http.Service, client Client) *writeApiImpl { +func NewWriteApiImpl(org string, bucket string, service http.Service, writeOptions *write.Options) *writeApiImpl { w := &writeApiImpl{ - service: newWriteService(org, bucket, service, client), - writeBuffer: make([]string, 0, client.Options().BatchSize()+1), - writeCh: make(chan *batch), + service: iwrite.NewService(org, bucket, service, writeOptions), + writeBuffer: make([]string, 0, writeOptions.BatchSize()+1), + writeCh: make(chan *iwrite.Batch), doneCh: make(chan int), bufferCh: make(chan string), bufferStop: make(chan int), @@ -62,6 +66,7 @@ func newWriteApiImpl(org string, bucket string, service http.Service, client Cli bufferFlush: make(chan int), bufferInfoCh: make(chan writeBuffInfoReq), writeInfoCh: make(chan writeBuffInfoReq), + writeOptions: writeOptions, } go w.bufferProc() go w.writeProc() @@ -88,7 +93,7 @@ func (w *writeApiImpl) waitForFlushing() { if writeBuffInfo.writeBuffLen == 0 { break } - logger.Info("Waiting buffer is flushed") + log.Log.Info("Waiting buffer is flushed") time.Sleep(time.Millisecond) } for { @@ -97,21 +102,21 @@ func (w *writeApiImpl) waitForFlushing() { if writeBuffInfo.writeBuffLen == 0 { break } - logger.Info("Waiting buffer is flushed") + log.Log.Info("Waiting buffer is flushed") time.Sleep(time.Millisecond) } //time.Sleep(time.Millisecond) } func (w *writeApiImpl) bufferProc() { - logger.Info("Buffer proc started") - ticker := time.NewTicker(time.Duration(w.service.client.Options().FlushInterval()) * time.Millisecond) + log.Log.Info("Buffer proc started") + ticker := time.NewTicker(time.Duration(w.writeOptions.FlushInterval()) * time.Millisecond) x: for { select { case line := <-w.bufferCh: w.writeBuffer = append(w.writeBuffer, line) - if len(w.writeBuffer) == int(w.service.client.Options().BatchSize()) { + if len(w.writeBuffer) == int(w.writeOptions.BatchSize()) { w.flushBuffer() } case <-ticker.C: @@ -127,15 +132,15 @@ x: w.bufferInfoCh <- buffInfo } } - logger.Info("Buffer proc finished") + log.Log.Info("Buffer proc finished") w.doneCh <- 1 } func (w *writeApiImpl) flushBuffer() { if len(w.writeBuffer) > 0 { //go func(lines []string) { - logger.Info("sending batch") - batch := &batch{batch: buffer(w.writeBuffer)} + log.Log.Info("sending batch") + batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval()) w.writeCh <- batch // lines = lines[:0] //}(w.writeBuffer) @@ -145,24 +150,24 @@ func (w *writeApiImpl) flushBuffer() { } func (w *writeApiImpl) writeProc() { - logger.Info("Write proc started") + log.Log.Info("Write proc started") x: for { select { case batch := <-w.writeCh: - err := w.service.handleWrite(context.Background(), batch) + err := w.service.HandleWrite(context.Background(), batch) if err != nil && w.errCh != nil { w.errCh <- err } case <-w.writeStop: - logger.Info("Write proc: received stop") + log.Log.Info("Write proc: received stop") break x case buffInfo := <-w.writeInfoCh: buffInfo.writeBuffLen = len(w.writeCh) w.writeInfoCh <- buffInfo } } - logger.Info("Write proc finished") + log.Log.Info("Write proc finished") w.doneCh <- 1 } @@ -202,11 +207,11 @@ func (w *writeApiImpl) WriteRecord(line string) { w.bufferCh <- string(b) } -func (w *writeApiImpl) WritePoint(point *Point) { +func (w *writeApiImpl) WritePoint(point *write.Point) { //w.bufferCh <- point.ToLineProtocol(w.service.clientImpl.Options().Precision) - line, err := w.service.encodePoints(point) + line, err := w.service.EncodePoints(point) if err != nil { - logger.Errorf("point encoding error: %s\n", err.Error()) + log.Log.Errorf("point encoding error: %s\n", err.Error()) } else { w.bufferCh <- line } diff --git a/api/write/ext.go b/api/write/ext.go new file mode 100644 index 00000000..3014c059 --- /dev/null +++ b/api/write/ext.go @@ -0,0 +1,93 @@ +// Copyright 2020 InfluxData, Inc. All rights reserved. +// Use of this source code is governed by MIT +// license that can be found in the LICENSE file. + +package write + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// Point extension methods for test + +// ToLineProtocol creates InfluxDB line protocol string from the Point, converting associated timestamp according to precision +// and write result to the string builder +func PointToLineProtocolBuffer(p *Point, sb *strings.Builder, precision time.Duration) { + escapeKey(sb, p.Name()) + sb.WriteRune(',') + for i, t := range p.TagList() { + if i > 0 { + sb.WriteString(",") + } + escapeKey(sb, t.Key) + sb.WriteString("=") + escapeKey(sb, t.Value) + } + sb.WriteString(" ") + for i, f := range p.FieldList() { + if i > 0 { + sb.WriteString(",") + } + escapeKey(sb, f.Key) + sb.WriteString("=") + switch f.Value.(type) { + case string: + sb.WriteString(`"`) + escapeValue(sb, f.Value.(string)) + sb.WriteString(`"`) + default: + sb.WriteString(fmt.Sprintf("%v", f.Value)) + } + switch f.Value.(type) { + case int64: + sb.WriteString("i") + case uint64: + sb.WriteString("u") + } + } + if !p.Time().IsZero() { + sb.WriteString(" ") + switch precision { + case time.Microsecond: + sb.WriteString(strconv.FormatInt(p.Time().UnixNano()/1000, 10)) + case time.Millisecond: + sb.WriteString(strconv.FormatInt(p.Time().UnixNano()/1000000, 10)) + case time.Second: + sb.WriteString(strconv.FormatInt(p.Time().Unix(), 10)) + default: + sb.WriteString(strconv.FormatInt(p.Time().UnixNano(), 10)) + } + } + sb.WriteString("\n") +} + +// ToLineProtocol creates InfluxDB line protocol string from the Point, converting associated timestamp according to precision +func PointToLineProtocol(p *Point, precision time.Duration) string { + var sb strings.Builder + sb.Grow(1024) + PointToLineProtocolBuffer(p, &sb, precision) + return sb.String() +} + +func escapeKey(sb *strings.Builder, key string) { + for _, r := range key { + switch r { + case ' ', ',', '=': + sb.WriteString(`\`) + } + sb.WriteRune(r) + } +} + +func escapeValue(sb *strings.Builder, value string) { + for _, r := range value { + switch r { + case '\\', '"': + sb.WriteString(`\`) + } + sb.WriteRune(r) + } +} diff --git a/api/write/options.go b/api/write/options.go new file mode 100644 index 00000000..fd3174b6 --- /dev/null +++ b/api/write/options.go @@ -0,0 +1,110 @@ +// Copyright 2020 InfluxData, Inc. All rights reserved. +// Use of this source code is governed by MIT +// license that can be found in the LICENSE file. + +package write + +import ( + "time" +) + +// Options holds write configuration properties +type Options struct { + // Maximum number of points sent to server in single request. Default 5000 + batchSize uint + // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms + flushInterval uint + // Default retry interval in ms, if not sent by server. Default 1000ms + retryInterval uint + // Maximum count of retry attempts of failed writes + maxRetries uint + // Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 10,000 + retryBufferLimit uint + // Precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second + // Default time.Nanosecond + precision time.Duration + // Whether to use GZip compression in requests. Default false + useGZip bool +} + +// BatchSize returns size of batch +func (o *Options) BatchSize() uint { + return o.batchSize +} + +// SetBatchSize sets number of points sent in single request +func (o *Options) SetBatchSize(batchSize uint) *Options { + o.batchSize = batchSize + return o +} + +// FlushInterval returns flush interval in ms +func (o *Options) FlushInterval() uint { + return o.flushInterval +} + +// SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written +func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options { + o.flushInterval = flushIntervalMs + return o +} + +// RetryInterval returns the retry interval in ms +func (o *Options) RetryInterval() uint { + return o.retryInterval +} + +// SetRetryInterval sets retry interval in ms, which is set if not sent by server +func (o *Options) SetRetryInterval(retryIntervalMs uint) *Options { + o.retryInterval = retryIntervalMs + return o +} + +// MaxRetries returns maximum count of retry attempts of failed writes +func (o *Options) MaxRetries() uint { + return o.maxRetries +} + +// SetMaxRetries sets maximum count of retry attempts of failed writes +func (o *Options) SetMaxRetries(maxRetries uint) *Options { + o.maxRetries = maxRetries + return o +} + +// RetryBufferLimit returns retry buffer limit +func (o *Options) RetryBufferLimit() uint { + return o.retryBufferLimit +} + +// SetRetryBufferLimit sets maximum number of points to keep for retry. Should be multiple of BatchSize. +func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options { + o.retryBufferLimit = retryBufferLimit + return o +} + +// Precision returns time precision for writes +func (o *Options) Precision() time.Duration { + return o.precision +} + +// SetPrecision sets time precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second +func (o *Options) SetPrecision(precision time.Duration) *Options { + o.precision = precision + return o +} + +// UseGZip returns true if write request are gzip`ed +func (o *Options) UseGZip() bool { + return o.useGZip +} + +// SetUseGZip specifies whether to use GZip compression in write requests. +func (o *Options) SetUseGZip(useGZip bool) *Options { + o.useGZip = useGZip + return o +} + +// DefaultOptions returns Options object with default values +func DefaultOptions() *Options { + return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000} +} diff --git a/point.go b/api/write/point.go similarity index 99% rename from point.go rename to api/write/point.go index c47d4188..ffed2b25 100644 --- a/point.go +++ b/api/write/point.go @@ -2,7 +2,7 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package write import ( "sort" diff --git a/point_test.go b/api/write/point_test.go similarity index 71% rename from point_test.go rename to api/write/point_test.go index 2bf3b1b0..464a4d86 100644 --- a/point_test.go +++ b/api/write/point_test.go @@ -2,13 +2,12 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package write import ( "bytes" "fmt" "math/rand" - "strconv" "strings" "testing" "time" @@ -47,85 +46,6 @@ func init() { } } -// ToLineProtocol creates InfluxDB line protocol string from the Point, converting associated timestamp according to precision -// and write result to the string builder -func (m *Point) ToLineProtocolBuffer(sb *strings.Builder, precision time.Duration) { - escapeKey(sb, m.Name()) - sb.WriteRune(',') - for i, t := range m.tags { - if i > 0 { - sb.WriteString(",") - } - escapeKey(sb, t.Key) - sb.WriteString("=") - escapeKey(sb, t.Value) - } - sb.WriteString(" ") - for i, f := range m.fields { - if i > 0 { - sb.WriteString(",") - } - escapeKey(sb, f.Key) - sb.WriteString("=") - switch f.Value.(type) { - case string: - sb.WriteString(`"`) - escapeValue(sb, f.Value.(string)) - sb.WriteString(`"`) - default: - sb.WriteString(fmt.Sprintf("%v", f.Value)) - } - switch f.Value.(type) { - case int64: - sb.WriteString("i") - case uint64: - sb.WriteString("u") - } - } - if !m.timestamp.IsZero() { - sb.WriteString(" ") - switch precision { - case time.Microsecond: - sb.WriteString(strconv.FormatInt(m.Time().UnixNano()/1000, 10)) - case time.Millisecond: - sb.WriteString(strconv.FormatInt(m.Time().UnixNano()/1000000, 10)) - case time.Second: - sb.WriteString(strconv.FormatInt(m.Time().Unix(), 10)) - default: - sb.WriteString(strconv.FormatInt(m.Time().UnixNano(), 10)) - } - } - sb.WriteString("\n") -} - -// ToLineProtocol creates InfluxDB line protocol string from the Point, converting associated timestamp according to precision -func (m *Point) ToLineProtocol(precision time.Duration) string { - var sb strings.Builder - sb.Grow(1024) - m.ToLineProtocolBuffer(&sb, precision) - return sb.String() -} - -func escapeKey(sb *strings.Builder, key string) { - for _, r := range key { - switch r { - case ' ', ',', '=': - sb.WriteString(`\`) - } - sb.WriteRune(r) - } -} - -func escapeValue(sb *strings.Builder, value string) { - for _, r := range value { - switch r { - case '\\', '"': - sb.WriteString(`\`) - } - sb.WriteRune(r) - } -} - func TestNewPoint(t *testing.T) { p := NewPoint( "test", @@ -186,7 +106,7 @@ func verifyPoint(t *testing.T, p *Point) { {Key: "uint32", Value: uint64(34578)}, {Key: "uint8", Value: uint64(34)}, }) - line := p.ToLineProtocol(time.Nanosecond) + line := PointToLineProtocol(p, time.Nanosecond) assert.True(t, strings.HasSuffix(line, "\n")) //cut off last \n char line = line[:len(line)-1] @@ -234,19 +154,19 @@ func TestPrecision(t *testing.T) { p.AddField("float64", 80.1234567) p.SetTime(time.Unix(60, 89)) - line := p.ToLineProtocol(time.Nanosecond) + line := PointToLineProtocol(p, time.Nanosecond) assert.Equal(t, line, "test,id=10 float64=80.1234567 60000000089\n") p.SetTime(time.Unix(60, 56789)) - line = p.ToLineProtocol(time.Microsecond) + line = PointToLineProtocol(p, time.Microsecond) assert.Equal(t, line, "test,id=10 float64=80.1234567 60000056\n") p.SetTime(time.Unix(60, 123456789)) - line = p.ToLineProtocol(time.Millisecond) + line = PointToLineProtocol(p, time.Millisecond) assert.Equal(t, line, "test,id=10 float64=80.1234567 60123\n") p.SetTime(time.Unix(60, 123456789)) - line = p.ToLineProtocol(time.Second) + line = PointToLineProtocol(p, time.Second) assert.Equal(t, line, "test,id=10 float64=80.1234567 60\n") } @@ -258,7 +178,7 @@ func BenchmarkPointEncoderSingle(b *testing.B) { for _, p := range points { var buffer bytes.Buffer e := lp.NewEncoder(&buffer) - e.Encode(p) + _, _ = e.Encode(p) buff.WriteString(buffer.String()) } s = buff.String() @@ -270,7 +190,7 @@ func BenchmarkPointEncoderMulti(b *testing.B) { var buffer bytes.Buffer e := lp.NewEncoder(&buffer) for _, p := range points { - e.Encode(p) + _, _ = e.Encode(p) } s = buffer.String() } @@ -280,7 +200,7 @@ func BenchmarkPointStringSingle(b *testing.B) { for n := 0; n < b.N; n++ { var buff strings.Builder for _, p := range points { - buff.WriteString(p.ToLineProtocol(time.Nanosecond)) + buff.WriteString(PointToLineProtocol(p, time.Nanosecond)) } s = buff.String() } @@ -290,7 +210,7 @@ func BenchmarkPointStringMulti(b *testing.B) { for n := 0; n < b.N; n++ { var buff strings.Builder for _, p := range points { - p.ToLineProtocolBuffer(&buff, time.Nanosecond) + PointToLineProtocolBuffer(p, &buff, time.Nanosecond) } s = buff.String() } diff --git a/writeApiBlocking.go b/api/writeApiBlocking.go similarity index 70% rename from writeApiBlocking.go rename to api/writeApiBlocking.go index 2e042283..6b3fceb2 100644 --- a/writeApiBlocking.go +++ b/api/writeApiBlocking.go @@ -2,11 +2,13 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package api import ( "context" + "github.com/influxdata/influxdb-client-go/api/write" "github.com/influxdata/influxdb-client-go/internal/http" + iwrite "github.com/influxdata/influxdb-client-go/internal/write" "strings" ) @@ -19,24 +21,22 @@ type WriteApiBlocking interface { // WritePoint data point into bucket. // WritePoint writes without implicit batching. Batch is created from given number of points // Non-blocking alternative is available in the WriteApi interface - WritePoint(ctx context.Context, point ...*Point) error + WritePoint(ctx context.Context, point ...*write.Point) error } // writeApiBlockingImpl implements WriteApiBlocking interface type writeApiBlockingImpl struct { - service *writeService + service *iwrite.Service + writeOptions *write.Options } // creates writeApiBlockingImpl for org and bucket with underlying client -func newWriteApiBlockingImpl(org string, bucket string, service http.Service, client Client) *writeApiBlockingImpl { - return &writeApiBlockingImpl{service: newWriteService(org, bucket, service, client)} +func NewWriteApiBlockingImpl(org string, bucket string, service http.Service, writeOptions *write.Options) *writeApiBlockingImpl { + return &writeApiBlockingImpl{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions} } func (w *writeApiBlockingImpl) write(ctx context.Context, line string) error { - err := w.service.handleWrite(ctx, &batch{ - batch: line, - retryInterval: w.service.client.Options().RetryInterval(), - }) + err := w.service.HandleWrite(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval())) return err } @@ -55,8 +55,8 @@ func (w *writeApiBlockingImpl) WriteRecord(ctx context.Context, line ...string) return nil } -func (w *writeApiBlockingImpl) WritePoint(ctx context.Context, point ...*Point) error { - line, err := w.service.encodePoints(point...) +func (w *writeApiBlockingImpl) WritePoint(ctx context.Context, point ...*write.Point) error { + line, err := w.service.EncodePoints(point...) if err != nil { return err } diff --git a/writeApiBlocking_test.go b/api/writeApiBlocking_test.go similarity index 71% rename from writeApiBlocking_test.go rename to api/writeApiBlocking_test.go index 0dbd80f3..24c5c3bc 100644 --- a/writeApiBlocking_test.go +++ b/api/writeApiBlocking_test.go @@ -2,29 +2,30 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package api import ( "context" - "github.com/influxdata/influxdb-client-go/internal/http" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "sync" "testing" "time" + + "github.com/influxdata/influxdb-client-go/api/write" + "github.com/influxdata/influxdb-client-go/internal/http" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestWritePoint(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetBatchSize(5) - writeApi := newWriteApiBlockingImpl("my-org", "my-bucket", service, client) + service := newTestService(t, "http://localhost:8888") + writeApi := NewWriteApiBlockingImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) points := genPoints(10) err := writeApi.WritePoint(context.Background(), points...) require.Nil(t, err) require.Len(t, service.lines, 10) for i, p := range points { - line := p.ToLineProtocol(client.options.Precision()) + line := write.PointToLineProtocol(p, writeApi.writeOptions.Precision()) //cut off last \n char line = line[:len(line)-1] assert.Equal(t, service.lines[i], line) @@ -32,10 +33,8 @@ func TestWritePoint(t *testing.T) { } func TestWriteRecord(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetBatchSize(5) - writeApi := newWriteApiBlockingImpl("my-org", "my-bucket", service, client) + service := newTestService(t, "http://localhost:8888") + writeApi := NewWriteApiBlockingImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) lines := genRecords(10) err := writeApi.WriteRecord(context.Background(), lines...) require.Nil(t, err) @@ -56,10 +55,8 @@ func TestWriteRecord(t *testing.T) { } func TestWriteContextCancel(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetBatchSize(5) - writeApi := newWriteApiBlockingImpl("my-org", "my-bucket", service, client) + service := newTestService(t, "http://localhost:8888") + writeApi := NewWriteApiBlockingImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) lines := genRecords(10) ctx, cancel := context.WithCancel(context.Background()) var err error diff --git a/write_test.go b/api/write_test.go similarity index 80% rename from write_test.go rename to api/write_test.go index 1f45f4cb..0bdd3746 100644 --- a/write_test.go +++ b/api/write_test.go @@ -2,15 +2,12 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package api import ( "compress/gzip" "context" "fmt" - ihttp "github.com/influxdata/influxdb-client-go/internal/http" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "io" "io/ioutil" "math/rand" @@ -19,13 +16,18 @@ import ( "sync" "testing" "time" + + "github.com/influxdata/influxdb-client-go/api/write" + ihttp "github.com/influxdata/influxdb-client-go/internal/http" + "github.com/influxdata/influxdb-client-go/internal/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testHttpService struct { serverUrl string authorization string lines []string - options *Options t *testing.T wasGzip bool requestHandler func(c *testHttpService, url string, body io.Reader) error @@ -129,25 +131,20 @@ func (t *testHttpService) Lines() []string { return t.lines } -func newTestClient() *clientImpl { - return &clientImpl{serverUrl: "http://locahost:4444", options: DefaultOptions()} -} - -func newTestService(t *testing.T, client Client) *testHttpService { +func newTestService(t *testing.T, serverUrl string) *testHttpService { return &testHttpService{ t: t, - options: client.Options(), - serverUrl: client.ServerUrl() + "/api/v2/", + serverUrl: serverUrl + "/api/v2/", } } -func genPoints(num int) []*Point { - points := make([]*Point, num) +func genPoints(num int) []*write.Point { + points := make([]*write.Point, num) rand.Seed(321) t := time.Now() for i := 0; i < len(points); i++ { - points[i] = NewPoint( + points[i] = write.NewPoint( "test", map[string]string{ "id": fmt.Sprintf("rack_%v", i%10), @@ -185,10 +182,8 @@ func genRecords(num int) []string { } func TestWriteApiImpl_Write(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetBatchSize(5) - writeApi := newWriteApiImpl("my-org", "my-bucket", service, client) + service := newTestService(t, "http://localhost:8888") + writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) points := genPoints(10) for _, p := range points { writeApi.WritePoint(p) @@ -196,7 +191,7 @@ func TestWriteApiImpl_Write(t *testing.T) { writeApi.Close() require.Len(t, service.Lines(), 10) for i, p := range points { - line := p.ToLineProtocol(client.options.Precision()) + line := write.PointToLineProtocol(p, writeApi.writeOptions.Precision()) //cut off last \n char line = line[:len(line)-1] assert.Equal(t, service.Lines()[i], line) @@ -204,10 +199,8 @@ func TestWriteApiImpl_Write(t *testing.T) { } func TestGzipWithFlushing(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetBatchSize(5).SetUseGZip(true) - writeApi := newWriteApiImpl("my-org", "my-bucket", service, client) + service := newTestService(t, "http://localhost:8888") + writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true)) points := genPoints(5) for _, p := range points { writeApi.WritePoint(p) @@ -217,7 +210,7 @@ func TestGzipWithFlushing(t *testing.T) { assert.True(t, service.wasGzip) service.Close() - client.options.SetUseGZip(false) + writeApi.writeOptions.SetUseGZip(false) for _, p := range points { writeApi.WritePoint(p) } @@ -228,10 +221,8 @@ func TestGzipWithFlushing(t *testing.T) { writeApi.Close() } func TestFlushInterval(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetBatchSize(10).SetFlushInterval(500) - writeApi := newWriteApiImpl("my-org", "my-bucket", service, client) + service := newTestService(t, "http://localhost:8888") + writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(500)) points := genPoints(5) for _, p := range points { writeApi.WritePoint(p) @@ -242,8 +233,7 @@ func TestFlushInterval(t *testing.T) { writeApi.Close() service.Close() - client.options.SetFlushInterval(2000) - writeApi = newWriteApiImpl("my-org", "my-bucket", service, client) + writeApi = NewWriteApiImpl("my-org", "my-bucket", service, writeApi.writeOptions.SetFlushInterval(2000)) for _, p := range points { writeApi.WritePoint(p) } @@ -255,12 +245,9 @@ func TestFlushInterval(t *testing.T) { } func TestRetry(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetLogLevel(3). - SetBatchSize(5). - SetRetryInterval(10000) - writeApi := newWriteApiImpl("my-org", "my-bucket", service, client) + service := newTestService(t, "http://localhost:8888") + log.Log.SetDebugLevel(5) + writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000)) points := genPoints(15) for i := 0; i < 5; i++ { writeApi.WritePoint(points[i]) @@ -295,15 +282,14 @@ func TestRetry(t *testing.T) { } func TestWriteError(t *testing.T) { - client := newTestClient() - service := newTestService(t, client) - client.options.SetLogLevel(3).SetBatchSize(5) + service := newTestService(t, "http://localhost:8888") + log.Log.SetDebugLevel(3) service.replyError = &ihttp.Error{ StatusCode: 400, Code: "write", Message: "error", } - writeApi := newWriteApiImpl("my-org", "my-bucket", service, client) + writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) errCh := writeApi.Errors() var recErr error var wg sync.WaitGroup @@ -320,5 +306,4 @@ func TestWriteError(t *testing.T) { wg.Wait() require.NotNil(t, recErr) writeApi.Close() - client.Close() } diff --git a/client.go b/client.go index 5f5a7df5..838001ef 100644 --- a/client.go +++ b/client.go @@ -10,6 +10,7 @@ import ( "context" "errors" "github.com/influxdata/influxdb-client-go/api" + "github.com/influxdata/influxdb-client-go/internal/log" "sync" "github.com/influxdata/influxdb-client-go/domain" @@ -37,11 +38,11 @@ type Client interface { // ServerUrl returns the url of the server url client talks to ServerUrl() string // WriteApi returns the asynchronous, non-blocking, Write client - WriteApi(org, bucket string) WriteApi + WriteApi(org, bucket string) api.WriteApi // WriteApi returns the synchronous, blocking, Write client - WriteApiBlocking(org, bucket string) WriteApiBlocking + WriteApiBlocking(org, bucket string) api.WriteApiBlocking // QueryApi returns Query client - QueryApi(org string) QueryApi + QueryApi(org string) api.QueryApi // AuthorizationsApi returns Authorizations API client AuthorizationsApi() api.AuthorizationsApi // OrganizationsApi returns Organizations API client @@ -58,7 +59,7 @@ type Client interface { type clientImpl struct { serverUrl string options *Options - writeApis []WriteApi + writeApis []api.WriteApi lock sync.Mutex httpService ihttp.Service apiClient *domain.ClientWithResponses @@ -81,14 +82,15 @@ func NewClient(serverUrl string, authToken string) Client { // Authentication token can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. // In such case Setup will set authentication token func NewClientWithOptions(serverUrl string, authToken string, options *Options) Client { - service := ihttp.NewService(serverUrl, "Token "+authToken, options.tlsConfig, options.httpRequestTimeout) + service := ihttp.NewService(serverUrl, "Token "+authToken, options.httpOptions) client := &clientImpl{ serverUrl: serverUrl, options: options, - writeApis: make([]WriteApi, 0, 5), + writeApis: make([]api.WriteApi, 0, 5), httpService: service, apiClient: domain.NewClientWithResponses(service), } + log.Log.SetDebugLevel(client.Options().LogLevel()) return client } func (c *clientImpl) Options() *Options { @@ -153,14 +155,14 @@ func (c *clientImpl) Health(ctx context.Context) (*domain.HealthCheck, error) { return response.JSON200, nil } -func (c *clientImpl) WriteApi(org, bucket string) WriteApi { - w := newWriteApiImpl(org, bucket, c.httpService, c) +func (c *clientImpl) WriteApi(org, bucket string) api.WriteApi { + w := api.NewWriteApiImpl(org, bucket, c.httpService, c.options.writeOptions) c.writeApis = append(c.writeApis, w) return w } -func (c *clientImpl) WriteApiBlocking(org, bucket string) WriteApiBlocking { - w := newWriteApiBlockingImpl(org, bucket, c.httpService, c) +func (c *clientImpl) WriteApiBlocking(org, bucket string) api.WriteApiBlocking { + w := api.NewWriteApiBlockingImpl(org, bucket, c.httpService, c.options.writeOptions) return w } @@ -170,8 +172,8 @@ func (c *clientImpl) Close() { } } -func (c *clientImpl) QueryApi(org string) QueryApi { - return newQueryApi(org, c.httpService, c) +func (c *clientImpl) QueryApi(org string) api.QueryApi { + return api.NewQueryApi(org, c.httpService) } func (c *clientImpl) AuthorizationsApi() api.AuthorizationsApi { diff --git a/client_e2e_test.go b/client_e2e_test.go index c7efe942..e9f074a7 100644 --- a/client_e2e_test.go +++ b/client_e2e_test.go @@ -1,36 +1,36 @@ +// +build e2e + // Copyright 2020 InfluxData, Inc. All rights reserved. // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package influxdb2_test import ( "context" "flag" "fmt" - "github.com/influxdata/influxdb-client-go/api" - "github.com/influxdata/influxdb-client-go/domain" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/influxdata/influxdb-client-go/api/write" "strconv" "strings" "testing" "time" + + influxdb2 "github.com/influxdata/influxdb-client-go" + "github.com/influxdata/influxdb-client-go/api" + "github.com/influxdata/influxdb-client-go/domain" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -var e2e bool var authToken string func init() { - flag.BoolVar(&e2e, "e2e", false, "run the end tests (requires a working influxdb instance on 127.0.0.1)") flag.StringVar(&authToken, "token", "", "authentication token") } func TestReady(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClient("http://localhost:9999", "") + client := influxdb2.NewClient("http://localhost:9999", "") ok, err := client.Ready(context.Background()) if err != nil { @@ -42,10 +42,7 @@ func TestReady(t *testing.T) { } func TestSetup(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClientWithOptions("http://localhost:9999", "", DefaultOptions().SetLogLevel(2)) + client := influxdb2.NewClientWithOptions("http://localhost:9999", "", influxdb2.DefaultOptions().SetLogLevel(2)) response, err := client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 0) if err != nil { t.Error(err) @@ -60,10 +57,7 @@ func TestSetup(t *testing.T) { } func TestHealth(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClient("http://localhost:9999", "") + client := influxdb2.NewClient("http://localhost:9999", "") health, err := client.Health(context.Background()) if err != nil { @@ -74,10 +68,7 @@ func TestHealth(t *testing.T) { } func TestWrite(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClientWithOptions("http://localhost:9999", authToken, DefaultOptions().SetLogLevel(3)) + client := influxdb2.NewClientWithOptions("http://localhost:9999", authToken, influxdb2.DefaultOptions().SetLogLevel(3)) writeApi := client.WriteApi("my-org", "my-bucket") errCh := writeApi.Errors() errorsCount := 0 @@ -94,7 +85,7 @@ func TestWrite(t *testing.T) { } for i, f := int64(10), 33.0; i < 20; i++ { - p := NewPoint("test", + p := influxdb2.NewPoint("test", map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, map[string]interface{}{"f": f, "i": i}, time.Now()) @@ -108,10 +99,7 @@ func TestWrite(t *testing.T) { } func TestQueryRaw(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClient("http://localhost:9999", authToken) + client := influxdb2.NewClient("http://localhost:9999", authToken) queryApi := client.QueryApi("my-org") res, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "test")`, nil) @@ -124,10 +112,7 @@ func TestQueryRaw(t *testing.T) { } func TestQuery(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClient("http://localhost:9999", authToken) + client := influxdb2.NewClient("http://localhost:9999", authToken) queryApi := client.QueryApi("my-org") fmt.Println("QueryResult") @@ -148,10 +133,7 @@ func TestQuery(t *testing.T) { } func TestAuthorizationsApi(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClient("http://localhost:9999", authToken) + client := influxdb2.NewClient("http://localhost:9999", authToken) authApi := client.AuthorizationsApi() listRes, err := authApi.GetAuthorizations(context.Background()) require.Nil(t, err) @@ -223,10 +205,7 @@ func TestAuthorizationsApi(t *testing.T) { } func TestOrganizations(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClient("http://localhost:9999", authToken) + client := influxdb2.NewClient("http://localhost:9999", authToken) orgsApi := client.OrganizationsApi() usersApi := client.UsersApi() orgName := "my-org-2" @@ -357,10 +336,7 @@ func TestOrganizations(t *testing.T) { } func TestUsers(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } - client := NewClient("http://localhost:9999", authToken) + client := influxdb2.NewClient("http://localhost:9999", authToken) usersApi := client.UsersApi() @@ -402,18 +378,15 @@ func TestUsers(t *testing.T) { } func TestDelete(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } ctx := context.Background() - client := NewClient("http://localhost:9999", authToken) + client := influxdb2.NewClient("http://localhost:9999", authToken) writeApi := client.WriteApiBlocking("my-org", "my-bucket") queryApi := client.QueryApi("my-org") tmStart := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) writeF := func(start time.Time, count int64) time.Time { tm := start for i, f := int64(0), 0.0; i < count; i++ { - p := NewPoint("test", + p := write.NewPoint("test", map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, map[string]interface{}{"f": f, "i": i}, tm) @@ -476,11 +449,8 @@ func TestDelete(t *testing.T) { } func TestBuckets(t *testing.T) { - if !e2e { - t.Skip("e2e not enabled. Launch InfluxDB 2 on localhost and run test with -e2e") - } ctx := context.Background() - client := NewClient("http://localhost:9999", authToken) + client := influxdb2.NewClient("http://localhost:9999", authToken) bucketsApi := client.BucketsApi() diff --git a/compatibility.go b/compatibility.go new file mode 100644 index 00000000..2fe30695 --- /dev/null +++ b/compatibility.go @@ -0,0 +1,28 @@ +// Copyright 2020 InfluxData, Inc. All rights reserved. +// Use of this source code is governed by MIT +// license that can be found in the LICENSE file. + +package influxdb2 + +import ( + "github.com/influxdata/influxdb-client-go/api/write" + "time" +) + +// Proxy methods for backward compatibility + +// NewPointWithMeasurement creates a empty Point +// Use AddTag and AddField to fill point with data +func NewPointWithMeasurement(measurement string) *write.Point { + return write.NewPointWithMeasurement(measurement) +} + +// NewPoint creates a Point from measurement name, tags, fields and a timestamp. +func NewPoint( + measurement string, + tags map[string]string, + fields map[string]interface{}, + ts time.Time, +) *write.Point { + return write.NewPoint(measurement, tags, fields, ts) +} diff --git a/examples_test.go b/examples_test.go index 2885249a..dd2b8c6d 100644 --- a/examples_test.go +++ b/examples_test.go @@ -1,157 +1,22 @@ package influxdb2_test import ( - "context" - "fmt" - "math/rand" - "time" - "github.com/influxdata/influxdb-client-go" ) -func ExampleWriteApiBlocking() { +func ExampleClient_newClient() { // Create client client := influxdb2.NewClient("http://localhost:9999", "my-token") - // Get blocking write client - writeApi := client.WriteApiBlocking("my-org", "my-bucket") - // write some points - for i := 0; i < 100; i++ { - // create data point - p := influxdb2.NewPoint( - "system", - map[string]string{ - "id": fmt.Sprintf("rack_%v", i%10), - "vendor": "AWS", - "hostname": fmt.Sprintf("host_%v", i%100), - }, - map[string]interface{}{ - "temperature": rand.Float64() * 80.0, - "disk_free": rand.Float64() * 1000.0, - "disk_total": (i/10 + 1) * 1000000, - "mem_total": (i/100 + 1) * 10000000, - "mem_free": rand.Uint64(), - }, - time.Now()) - // write synchronously - err := writeApi.WritePoint(context.Background(), p) - if err != nil { - panic(err) - } - } - // Ensures background processes finishes - client.Close() + + // always close client at the end + defer client.Close() } -func ExampleWriteApi() { +func ExampleClient_newClientWithOptions() { // Create client and set batch size to 20 client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token", influxdb2.DefaultOptions().SetBatchSize(20)) - // Get non-blocking write client - writeApi := client.WriteApi("my-org", "my-bucket") - // write some points - for i := 0; i < 100; i++ { - // create point - p := influxdb2.NewPoint( - "system", - map[string]string{ - "id": fmt.Sprintf("rack_%v", i%10), - "vendor": "AWS", - "hostname": fmt.Sprintf("host_%v", i%100), - }, - map[string]interface{}{ - "temperature": rand.Float64() * 80.0, - "disk_free": rand.Float64() * 1000.0, - "disk_total": (i/10 + 1) * 1000000, - "mem_total": (i/100 + 1) * 10000000, - "mem_free": rand.Uint64(), - }, - time.Now()) - // write asynchronously - writeApi.WritePoint(p) - } - // Force all unwritten data to be sent - writeApi.Flush() - // Ensures background processes finishes - client.Close() -} - -func ExampleWriteApi_errors() { - // Create client - client := influxdb2.NewClient("http://localhost:9999", "my-token") - // Get non-blocking write client - writeApi := client.WriteApi("my-org", "my-bucket") - // Get errors channel - errorsCh := writeApi.Errors() - // Create go proc for reading and logging errors - go func() { - for err := range errorsCh { - fmt.Printf("write error: %s\n", err.Error()) - } - }() - // write some points - for i := 0; i < 100; i++ { - // create point - p := influxdb2.NewPointWithMeasurement("stat"). - AddTag("id", fmt.Sprintf("rack_%v", i%10)). - AddTag("vendor", "AWS"). - AddTag("hostname", fmt.Sprintf("host_%v", i%100)). - AddField("temperature", rand.Float64()*80.0). - AddField("disk_free", rand.Float64()*1000.0). - AddField("disk_total", (i/10+1)*1000000). - AddField("mem_total", (i/100+1)*10000000). - AddField("mem_free", rand.Uint64()). - SetTime(time.Now()) - // write asynchronously - writeApi.WritePoint(p) - } - // Force all unwritten data to be sent - writeApi.Flush() - // Ensures background processes finishes - client.Close() -} -func ExampleQueryApi_query() { - // Create client - client := influxdb2.NewClient("http://localhost:9999", "my-token") - // Get query client - queryApi := client.QueryApi("my-org") - // get QueryTableResult - result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`) - if err == nil { - // Iterate over query response - for result.Next() { - // Notice when group key has changed - if result.TableChanged() { - fmt.Printf("table: %s\n", result.TableMetadata().String()) - } - // Access data - fmt.Printf("value: %v\n", result.Record().Value()) - } - // check for an error - if result.Err() != nil { - fmt.Printf("query parsing error: %s\n", result.Err().Error()) - } - } else { - panic(err) - } - // Ensures background processes finishes - client.Close() -} - -func ExampleQueryApi_queryRaw() { - // Create client - client := influxdb2.NewClient("http://localhost:9999", "my-token") - // Get query client - queryApi := client.QueryApi("my-org") - // Query and get complete result as a string - // Use default dialect - result, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect()) - if err == nil { - fmt.Println("QueryResult:") - fmt.Println(result) - } else { - panic(err) - } - // Ensures background processes finishes - client.Close() + // always close client at the end + defer client.Close() } diff --git a/go.sum b/go.sum index 2e192280..adddd8fb 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,10 @@ github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxm github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/labstack/echo/v4 v4.1.11 h1:z0BZoArY4FqdpUEl+wlHp4hnr/oSR6MTmQmv8OHSoww= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= @@ -55,6 +57,7 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191115151921-52ab43148777 h1:wejkGHRTr38uaKRqECZlsCsJ1/TGxIyFbH32x5zUdu4= golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -64,6 +67,7 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/api/fakeclient.go b/internal/examples/fakeclient.go similarity index 64% rename from api/fakeclient.go rename to internal/examples/fakeclient.go index 431feccb..7eb0e84f 100644 --- a/api/fakeclient.go +++ b/internal/examples/fakeclient.go @@ -2,10 +2,11 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package api +package examples import ( "context" + "github.com/influxdata/influxdb-client-go/api" "github.com/influxdata/influxdb-client-go/domain" ) @@ -16,6 +17,14 @@ import ( type Options struct { } +func (o *Options) SetBatchSize(_ uint) *Options { + return o +} + +func DefaultOptions() *Options { + return nil +} + type fakeClient struct { } @@ -49,38 +58,37 @@ func (c *fakeClient) Health(_ context.Context) (*domain.HealthCheck, error) { return nil, nil } -//func (c *fakeClient) WriteApi(org, bucket string) WriteApi { -// return nil -//} +func (c *fakeClient) WriteApi(_, _ string) api.WriteApi { + return nil +} -//func (c *fakeClient) WriteApiBlocking(org, bucket string) WriteApiBlocking { -// w := newWriteApiBlockingImpl(org, bucket, c.httpService, c) -// return w -//} +func (c *fakeClient) WriteApiBlocking(_, _ string) api.WriteApiBlocking { + return nil +} func (c *fakeClient) Close() { } -//func (c *fakeClient) QueryApi(org string) QueryApi { -// return nil -//} +func (c *fakeClient) QueryApi(_ string) api.QueryApi { + return nil +} -func (c *fakeClient) AuthorizationsApi() AuthorizationsApi { +func (c *fakeClient) AuthorizationsApi() api.AuthorizationsApi { return nil } -func (c *fakeClient) OrganizationsApi() OrganizationsApi { +func (c *fakeClient) OrganizationsApi() api.OrganizationsApi { return nil } -func (c *fakeClient) UsersApi() UsersApi { +func (c *fakeClient) UsersApi() api.UsersApi { return nil } -func (c *fakeClient) DeleteApi() DeleteApi { +func (c *fakeClient) DeleteApi() api.DeleteApi { return nil } -func (c *fakeClient) BucketsApi() BucketsApi { +func (c *fakeClient) BucketsApi() api.BucketsApi { return nil } diff --git a/internal/http/service.go b/internal/http/service.go index 762567a0..8756cd85 100644 --- a/internal/http/service.go +++ b/internal/http/service.go @@ -6,7 +6,6 @@ package http import ( "context" - "crypto/tls" "encoding/json" "io" "io/ioutil" @@ -16,6 +15,8 @@ import ( "net/url" "strconv" "time" + + http2 "github.com/influxdata/influxdb-client-go/api/http" ) // Http operation callbacks @@ -44,7 +45,7 @@ type serviceImpl struct { } // NewService creates instance of http Service with given parameters -func NewService(serverUrl, authorization string, tlsConfig *tls.Config, httpRequestTimeout uint) Service { +func NewService(serverUrl, authorization string, httpOptions *http2.Options) Service { apiUrl, err := url.Parse(serverUrl) serverApiUrl := serverUrl if err == nil { @@ -58,13 +59,13 @@ func NewService(serverUrl, authorization string, tlsConfig *tls.Config, httpRequ serverUrl: serverUrl, authorization: authorization, client: &http.Client{ - Timeout: time.Second * time.Duration(httpRequestTimeout), + Timeout: time.Second * time.Duration(httpOptions.HttpRequestTimeout()), Transport: &http.Transport{ DialContext: (&net.Dialer{ Timeout: 5 * time.Second, }).DialContext, TLSHandshakeTimeout: 5 * time.Second, - TLSClientConfig: tlsConfig, + TLSClientConfig: httpOptions.TlsConfig(), }, }, } diff --git a/internal/log/logger.go b/internal/log/logger.go index 59f4fb2e..d87d8d92 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -9,6 +9,8 @@ import ( "log" ) +var Log Logger + // Logger provides filtered and categorized logging API. // It logs to standard logger, only errors by default type Logger struct { diff --git a/internal/test/point.go b/internal/test/point.go new file mode 100644 index 00000000..56e54040 --- /dev/null +++ b/internal/test/point.go @@ -0,0 +1 @@ +package test diff --git a/queue.go b/internal/write/queue.go similarity index 74% rename from queue.go rename to internal/write/queue.go index 1e4e0d55..f467ac8d 100644 --- a/queue.go +++ b/internal/write/queue.go @@ -2,9 +2,11 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package write -import "container/list" +import ( + "container/list" +) type queue struct { list *list.List @@ -14,7 +16,7 @@ type queue struct { func newQueue(limit int) *queue { return &queue{list: list.New(), limit: limit} } -func (q *queue) push(batch *batch) bool { +func (q *queue) push(batch *Batch) bool { overWrite := false if q.list.Len() == q.limit { q.pop() @@ -24,18 +26,18 @@ func (q *queue) push(batch *batch) bool { return overWrite } -func (q *queue) pop() *batch { +func (q *queue) pop() *Batch { el := q.list.Front() if el != nil { q.list.Remove(el) - return el.Value.(*batch) + return el.Value.(*Batch) } return nil } -func (q *queue) first() *batch { +func (q *queue) first() *Batch { el := q.list.Front() - return el.Value.(*batch) + return el.Value.(*Batch) } func (q *queue) isEmpty() bool { diff --git a/queue_test.go b/internal/write/queue_test.go similarity index 88% rename from queue_test.go rename to internal/write/queue_test.go index 328f020c..40175b29 100644 --- a/queue_test.go +++ b/internal/write/queue_test.go @@ -2,7 +2,7 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package write import ( "testing" @@ -13,7 +13,7 @@ import ( func TestQueue(t *testing.T) { que := newQueue(2) assert.True(t, que.isEmpty()) - b := &batch{batch: "batch", retryInterval: 3, retries: 3} + b := &Batch{batch: "batch", retryInterval: 3, retries: 3} que.push(b) assert.False(t, que.isEmpty()) b2 := que.pop() diff --git a/writeService.go b/internal/write/writeService.go similarity index 64% rename from writeService.go rename to internal/write/writeService.go index a75bd57e..8f0ff76c 100644 --- a/writeService.go +++ b/internal/write/writeService.go @@ -2,12 +2,11 @@ // Use of this source code is governed by MIT // license that can be found in the LICENSE file. -package influxdb2 +package write import ( "bytes" "context" - ihttp "github.com/influxdata/influxdb-client-go/internal/http" "io" "net/http" "net/url" @@ -15,20 +14,27 @@ import ( "sync" "time" + "github.com/influxdata/influxdb-client-go/api/write" "github.com/influxdata/influxdb-client-go/internal/gzip" + ihttp "github.com/influxdata/influxdb-client-go/internal/http" "github.com/influxdata/influxdb-client-go/internal/log" lp "github.com/influxdata/line-protocol" ) -var logger log.Logger - -type batch struct { +type Batch struct { batch string retryInterval uint retries uint } -type writeService struct { +func NewBatch(data string, retryInterval uint) *Batch { + return &Batch{ + batch: data, + retryInterval: retryInterval, + } +} + +type Service struct { org string bucket string httpService ihttp.Service @@ -36,38 +42,38 @@ type writeService struct { lastWriteAttempt time.Time retryQueue *queue lock sync.Mutex - client Client + writeOptions *write.Options } -func newWriteService(org string, bucket string, httpService ihttp.Service, client Client) *writeService { - logger.SetDebugLevel(client.Options().LogLevel()) - retryBufferLimit := client.Options().RetryBufferLimit() / client.Options().BatchSize() +func NewService(org string, bucket string, httpService ihttp.Service, options *write.Options) *Service { + + retryBufferLimit := options.RetryBufferLimit() / options.BatchSize() if retryBufferLimit == 0 { retryBufferLimit = 1 } - return &writeService{org: org, bucket: bucket, httpService: httpService, client: client, retryQueue: newQueue(int(retryBufferLimit))} + return &Service{org: org, bucket: bucket, httpService: httpService, writeOptions: options, retryQueue: newQueue(int(retryBufferLimit))} } -func (w *writeService) handleWrite(ctx context.Context, batch *batch) error { - logger.Debug("Write proc: received write request") +func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { + log.Log.Debug("Write proc: received write request") batchToWrite := batch retrying := false for { select { case <-ctx.Done(): - logger.Debug("Write proc: ctx cancelled req") + log.Log.Debug("Write proc: ctx cancelled req") return ctx.Err() default: } if !w.retryQueue.isEmpty() { - logger.Debug("Write proc: taking batch from retry queue") + log.Log.Debug("Write proc: taking batch from retry queue") if !retrying { b := w.retryQueue.first() // Can we write? In case of retryable error we must wait a bit if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.retryInterval))) { retrying = true } else { - logger.Warn("Write proc: cannot write yet, storing batch to queue") + log.Log.Warn("Write proc: cannot write yet, storing batch to queue") w.retryQueue.push(batch) batchToWrite = nil } @@ -77,14 +83,14 @@ func (w *writeService) handleWrite(ctx context.Context, batch *batch) error { batchToWrite.retries++ if batch != nil { if w.retryQueue.push(batch) { - logger.Warn("Write proc: Retry buffer full, discarding oldest batch") + log.Log.Warn("Write proc: Retry buffer full, discarding oldest batch") } batch = nil } } } if batchToWrite != nil { - err := w.writeBatch(ctx, batchToWrite) + err := w.WriteBatch(ctx, batchToWrite) batchToWrite = nil if err != nil { return err @@ -96,16 +102,16 @@ func (w *writeService) handleWrite(ctx context.Context, batch *batch) error { return nil } -func (w *writeService) writeBatch(ctx context.Context, batch *batch) error { - wUrl, err := w.writeUrl() +func (w *Service) WriteBatch(ctx context.Context, batch *Batch) error { + wUrl, err := w.WriteUrl() if err != nil { - logger.Errorf("%s\n", err.Error()) + log.Log.Errorf("%s\n", err.Error()) return err } var body io.Reader body = strings.NewReader(batch.batch) - logger.Debugf("Writing batch: %s", batch.batch) - if w.client.Options().UseGZip() { + log.Log.Debugf("Writing batch: %s", batch.batch) + if w.writeOptions.UseGZip() { body, err = gzip.CompressWithGzip(body) if err != nil { return err @@ -113,7 +119,7 @@ func (w *writeService) writeBatch(ctx context.Context, batch *batch) error { } w.lastWriteAttempt = time.Now() perror := w.httpService.PostRequest(ctx, wUrl, body, func(req *http.Request) { - if w.client.Options().UseGZip() { + if w.writeOptions.UseGZip() { req.Header.Set("Content-Encoding", "gzip") } }, func(r *http.Response) error { @@ -125,31 +131,31 @@ func (w *writeService) writeBatch(ctx context.Context, batch *batch) error { if perror != nil { if perror.StatusCode == http.StatusTooManyRequests || perror.StatusCode == http.StatusServiceUnavailable { - logger.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error()) + log.Log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error()) if perror.RetryAfter > 0 { batch.retryInterval = perror.RetryAfter * 1000 } else { - batch.retryInterval = w.client.Options().RetryInterval() + batch.retryInterval = w.writeOptions.RetryInterval() } - if batch.retries < w.client.Options().MaxRetries() { + if batch.retries < w.writeOptions.MaxRetries() { if w.retryQueue.push(batch) { - logger.Warn("Retry buffer full, discarding oldest batch") + log.Log.Warn("Retry buffer full, discarding oldest batch") } } } else { - logger.Errorf("Write error: %s\n", perror.Error()) + log.Log.Errorf("Write error: %s\n", perror.Error()) } return perror } return nil } -func (w *writeService) encodePoints(points ...*Point) (string, error) { +func (w *Service) EncodePoints(points ...*write.Point) (string, error) { var buffer bytes.Buffer e := lp.NewEncoder(&buffer) e.SetFieldTypeSupport(lp.UintSupport) e.FailOnFieldErr(true) - e.SetPrecision(w.client.Options().Precision()) + e.SetPrecision(w.writeOptions.Precision()) for _, point := range points { _, err := e.Encode(point) if err != nil { @@ -159,7 +165,7 @@ func (w *writeService) encodePoints(points ...*Point) (string, error) { return buffer.String(), nil } -func (w *writeService) writeUrl() (string, error) { +func (w *Service) WriteUrl() (string, error) { if w.url == "" { u, err := url.Parse(w.httpService.ServerApiUrl()) if err != nil { @@ -172,7 +178,7 @@ func (w *writeService) writeUrl() (string, error) { params := u.Query() params.Set("org", w.org) params.Set("bucket", w.bucket) - params.Set("precision", precisionToString(w.client.Options().Precision())) + params.Set("precision", precisionToString(w.writeOptions.Precision())) u.RawQuery = params.Encode() w.lock.Lock() w.url = u.String() diff --git a/options.go b/options.go index d1eaed1a..242c77d5 100644 --- a/options.go +++ b/options.go @@ -6,86 +6,73 @@ package influxdb2 import ( "crypto/tls" + "github.com/influxdata/influxdb-client-go/api/http" + "github.com/influxdata/influxdb-client-go/api/write" "time" ) // Options holds configuration properties for communicating with InfluxDB server type Options struct { - // Maximum number of points sent to server in single request. Default 5000 - batchSize uint - // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms - flushInterval uint - // Default retry interval in ms, if not sent by server. Default 1000ms - retryInterval uint - // Maximum count of retry attempts of failed writes - maxRetries uint - // Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 10,000 - retryBufferLimit uint - // DebugLevel to filter log messages. Each level mean to log all categories bellow. 0 error, 1 - warning, 2 - info, 3 - debug + // LogLevel to filter log messages. Each level mean to log all categories bellow. 0 error, 1 - warning, 2 - info, 3 - debug logLevel uint - // Precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second - // Default time.Nanosecond - precision time.Duration - // Whether to use GZip compression in requests. Default false - useGZip bool - // TLS configuration for secure connection. Default nil - tlsConfig *tls.Config - // HTTP request timeout in sec. Default 20 - httpRequestTimeout uint + // Writing options + writeOptions *write.Options + // Http options + httpOptions *http.Options } // BatchSize returns size of batch func (o *Options) BatchSize() uint { - return o.batchSize + return o.WriteOptions().BatchSize() } // SetBatchSize sets number of points sent in single request func (o *Options) SetBatchSize(batchSize uint) *Options { - o.batchSize = batchSize + o.WriteOptions().SetBatchSize(batchSize) return o } // FlushInterval returns flush interval in ms func (o *Options) FlushInterval() uint { - return o.flushInterval + return o.WriteOptions().FlushInterval() } // SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options { - o.flushInterval = flushIntervalMs + o.WriteOptions().SetFlushInterval(flushIntervalMs) return o } // RetryInterval returns the retry interval in ms func (o *Options) RetryInterval() uint { - return o.retryInterval + return o.WriteOptions().RetryInterval() } // SetRetryInterval sets retry interval in ms, which is set if not sent by server func (o *Options) SetRetryInterval(retryIntervalMs uint) *Options { - o.retryInterval = retryIntervalMs + o.WriteOptions().SetRetryInterval(retryIntervalMs) return o } // MaxRetries returns maximum count of retry attempts of failed writes func (o *Options) MaxRetries() uint { - return o.maxRetries + return o.WriteOptions().MaxRetries() } // SetMaxRetries sets maximum count of retry attempts of failed writes func (o *Options) SetMaxRetries(maxRetries uint) *Options { - o.maxRetries = maxRetries + o.WriteOptions().SetMaxRetries(maxRetries) return o } // RetryBufferLimit returns retry buffer limit func (o *Options) RetryBufferLimit() uint { - return o.retryBufferLimit + return o.WriteOptions().RetryBufferLimit() } // SetRetryBufferLimit sets maximum number of points to keep for retry. Should be multiple of BatchSize. func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options { - o.retryBufferLimit = retryBufferLimit + o.WriteOptions().SetRetryBufferLimit(retryBufferLimit) return o } @@ -103,49 +90,65 @@ func (o *Options) SetLogLevel(logLevel uint) *Options { // Precision returns time precision for writes func (o *Options) Precision() time.Duration { - return o.precision + return o.WriteOptions().Precision() } // SetPrecision sets time precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second func (o *Options) SetPrecision(precision time.Duration) *Options { - o.precision = precision + o.WriteOptions().SetPrecision(precision) return o } // UseGZip returns true if write request are gzip`ed func (o *Options) UseGZip() bool { - return o.useGZip + return o.WriteOptions().UseGZip() } // SetUseGZip specifies whether to use GZip compression in write requests. func (o *Options) SetUseGZip(useGZip bool) *Options { - o.useGZip = useGZip + o.WriteOptions().SetUseGZip(useGZip) return o } // TlsConfig returns TlsConfig func (o *Options) TlsConfig() *tls.Config { - return o.tlsConfig + return o.HttpOptions().TlsConfig() } // SetTlsConfig sets TLS configuration for secure connection func (o *Options) SetTlsConfig(tlsConfig *tls.Config) *Options { - o.tlsConfig = tlsConfig + o.HttpOptions().SetTlsConfig(tlsConfig) return o } // HttpRequestTimeout returns HTTP request timeout func (o *Options) HttpRequestTimeout() uint { - return o.httpRequestTimeout + return o.HttpOptions().HttpRequestTimeout() } // SetHttpRequestTimeout sets HTTP request timeout in sec func (o *Options) SetHttpRequestTimeout(httpRequestTimeout uint) *Options { - o.httpRequestTimeout = httpRequestTimeout + o.HttpOptions().SetHttpRequestTimeout(httpRequestTimeout) return o } +// WriteOptions returns write related options +func (o *Options) WriteOptions() *write.Options { + if o.writeOptions == nil { + o.writeOptions = write.DefaultOptions() + } + return o.writeOptions +} + +// HttpOptions returns http related options +func (o *Options) HttpOptions() *http.Options { + if o.httpOptions == nil { + o.httpOptions = http.DefaultOptions() + } + return o.httpOptions +} + // DefaultOptions returns Options object with default values func DefaultOptions() *Options { - return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000, httpRequestTimeout: 20} + return &Options{logLevel: 0, writeOptions: write.DefaultOptions(), httpOptions: http.DefaultOptions()} } From d6138cb5924f511eb402bf4711b3dceb2460e0ab Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 15 May 2020 10:38:49 +0200 Subject: [PATCH 2/4] fix: wait for write --- api/write_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/api/write_test.go b/api/write_test.go index 0bdd3746..66598b32 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -201,12 +201,16 @@ func TestWriteApiImpl_Write(t *testing.T) { func TestGzipWithFlushing(t *testing.T) { service := newTestService(t, "http://localhost:8888") writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true)) + log.Log.SetDebugLevel(4) points := genPoints(5) for _, p := range points { writeApi.WritePoint(p) } - time.Sleep(time.Millisecond * 10) - require.Len(t, service.Lines(), 5) + start := time.Now() + writeApi.waitForFlushing() + end := time.Now() + fmt.Printf("Flash duration: %dns\n", end.Sub(start).Nanoseconds()) + assert.Len(t, service.Lines(), 5) assert.True(t, service.wasGzip) service.Close() @@ -214,8 +218,8 @@ func TestGzipWithFlushing(t *testing.T) { for _, p := range points { writeApi.WritePoint(p) } - time.Sleep(time.Millisecond * 10) - require.Len(t, service.Lines(), 5) + writeApi.waitForFlushing() + assert.Len(t, service.Lines(), 5) assert.False(t, service.wasGzip) writeApi.Close() From ec7cde7ee0daaa97973432ad6e6035c30cda465d Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 15 May 2020 10:46:06 +0200 Subject: [PATCH 3/4] fix: data race --- api/write_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/write_test.go b/api/write_test.go index 66598b32..7ae0cab0 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -200,8 +200,8 @@ func TestWriteApiImpl_Write(t *testing.T) { func TestGzipWithFlushing(t *testing.T) { service := newTestService(t, "http://localhost:8888") - writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true)) log.Log.SetDebugLevel(4) + writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true)) points := genPoints(5) for _, p := range points { writeApi.WritePoint(p) From b53376b7fdd2a4db5bf3b78742dea85f4c707563 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 15 May 2020 11:20:49 +0200 Subject: [PATCH 4/4] ci: fix coverage calculation --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b7c97849..0f1c5b4d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -39,7 +39,7 @@ jobs: mkdir -p /tmp/artifacts - run: command: | - go test -v -race -coverprofile=coverage.txt -covermode=atomic ./... --tags e2e + go test -v -race ./... -coverprofile=coverage.txt -covermode=atomic -coverpkg ./... -tags e2e bash <(curl -s https://codecov.io/bash) go tool cover -html=coverage.txt -o coverage.html mv coverage.html /tmp/artifacts