Skip to content

Commit af8a37b

Browse files
owaistigrannajaryan
authored andcommitted
Ported kinesis exporter from Omnition (#60)
Porting the existing kinesis exporter from Omnition's Otel distribution to contrib. Porting from: https://github.com/Omnition/omnition-opentelemetry-collector/tree/master/exporter/kinesis
1 parent e900d2d commit af8a37b

File tree

12 files changed

+1189
-13
lines changed

12 files changed

+1189
-13
lines changed

cmd/otelcontribcol/components.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/open-telemetry/opentelemetry-collector/receiver"
2323

2424
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter"
25+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kinesisexporter"
2526
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter"
2627
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
2728
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
@@ -55,6 +56,7 @@ func components() (config.Factories, error) {
5556
&azuremonitorexporter.Factory{},
5657
&signalfxexporter.Factory{},
5758
&sapmexporter.Factory{},
59+
&kinesisexporter.Factory{},
5860
}
5961
for _, exp := range factories.Exporters {
6062
exporters = append(exporters, exp)

exporter/kinesisexporter/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common

exporter/kinesisexporter/config.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2019 OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kinesisexporter
16+
17+
import (
18+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
19+
)
20+
21+
// AWSConfig contains AWS specific configuration such as kinesis stream, region, etc.
22+
type AWSConfig struct {
23+
StreamName string `mapstructure:"stream_name"`
24+
KinesisEndpoint string `mapstructure:"kinesis_endpoint"`
25+
Region string `mapstructure:"region"`
26+
Role string `mapstructure:"role"`
27+
}
28+
29+
// KPLConfig contains kinesis producer library related config to controls things
30+
// like aggregation, batching, connections, retries, etc.
31+
type KPLConfig struct {
32+
AggregateBatchCount int `mapstructure:"aggregate_batch_count"`
33+
AggregateBatchSize int `mapstructure:"aggregate_batch_size"`
34+
BatchSize int `mapstructure:"batch_size"`
35+
BatchCount int `mapstructure:"batch_count"`
36+
BacklogCount int `mapstructure:"backlog_count"`
37+
FlushIntervalSeconds int `mapstructure:"flush_interval_seconds"`
38+
MaxConnections int `mapstructure:"max_connections"`
39+
MaxRetries int `mapstructure:"max_retries"`
40+
MaxBackoffSeconds int `mapstructure:"max_backoff_seconds"`
41+
}
42+
43+
// Config contains the main configuration options for the kinesis exporter
44+
type Config struct {
45+
configmodels.ExporterSettings `mapstructure:",squash"`
46+
47+
AWS AWSConfig `mapstructure:"aws"`
48+
KPL KPLConfig `mapstructure:"kpl"`
49+
50+
QueueSize int `mapstructure:"queue_size"`
51+
NumWorkers int `mapstructure:"num_workers"`
52+
MaxBytesPerBatch int `mapstructure:"max_bytes_per_batch"`
53+
MaxBytesPerSpan int `mapstructure:"max_bytes_per_span"`
54+
FlushIntervalSeconds int `mapstructure:"flush_interval_seconds"`
55+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2019 OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kinesisexporter
16+
17+
import (
18+
"path"
19+
"testing"
20+
21+
"github.com/open-telemetry/opentelemetry-collector/config"
22+
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
23+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func TestDefaultConfig(t *testing.T) {
29+
factories, err := config.ExampleComponents()
30+
assert.Nil(t, err)
31+
32+
factory := &Factory{}
33+
factories.Exporters[factory.Type()] = factory
34+
cfg, err := config.LoadConfigFile(
35+
t, path.Join(".", "testdata", "default.yaml"), factories,
36+
)
37+
require.NoError(t, err)
38+
require.NotNil(t, cfg)
39+
40+
e := cfg.Exporters["kinesis"]
41+
42+
assert.Equal(t, e,
43+
&Config{
44+
ExporterSettings: configmodels.ExporterSettings{
45+
TypeVal: "kinesis",
46+
NameVal: "kinesis",
47+
},
48+
AWS: AWSConfig{
49+
Region: "us-west-2",
50+
},
51+
KPL: KPLConfig{
52+
BatchSize: 5242880,
53+
BatchCount: 1000,
54+
BacklogCount: 2000,
55+
FlushIntervalSeconds: 5,
56+
MaxConnections: 24,
57+
},
58+
59+
QueueSize: 100000,
60+
NumWorkers: 8,
61+
FlushIntervalSeconds: 5,
62+
MaxBytesPerBatch: 100000,
63+
MaxBytesPerSpan: 900000,
64+
},
65+
)
66+
}
67+
68+
func TestConfig(t *testing.T) {
69+
factories, err := config.ExampleComponents()
70+
assert.Nil(t, err)
71+
72+
factory := &Factory{}
73+
factories.Exporters[factory.Type()] = factory
74+
cfg, err := config.LoadConfigFile(
75+
t, path.Join(".", "testdata", "config.yaml"), factories,
76+
)
77+
78+
require.NoError(t, err)
79+
require.NotNil(t, cfg)
80+
81+
e := cfg.Exporters["kinesis"]
82+
83+
assert.Equal(t, e,
84+
&Config{
85+
ExporterSettings: configmodels.ExporterSettings{
86+
TypeVal: "kinesis",
87+
NameVal: "kinesis",
88+
},
89+
AWS: AWSConfig{
90+
StreamName: "test-stream",
91+
KinesisEndpoint: "kinesis.mars-1.aws.galactic",
92+
Region: "mars-1",
93+
Role: "arn:test-role",
94+
},
95+
KPL: KPLConfig{
96+
AggregateBatchCount: 10,
97+
AggregateBatchSize: 11,
98+
BatchSize: 12,
99+
BatchCount: 13,
100+
BacklogCount: 14,
101+
FlushIntervalSeconds: 15,
102+
MaxConnections: 16,
103+
MaxRetries: 17,
104+
MaxBackoffSeconds: 18,
105+
},
106+
107+
QueueSize: 1,
108+
NumWorkers: 2,
109+
FlushIntervalSeconds: 3,
110+
MaxBytesPerBatch: 4,
111+
MaxBytesPerSpan: 5,
112+
},
113+
)
114+
}
115+
116+
func TestConfigCheck(t *testing.T) {
117+
cfg := (&Factory{}).CreateDefaultConfig()
118+
assert.NoError(t, configcheck.ValidateConfig(cfg))
119+
}

exporter/kinesisexporter/exporter.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2019 OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kinesisexporter
16+
17+
import (
18+
"context"
19+
20+
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
21+
"github.com/open-telemetry/opentelemetry-collector/consumer/consumererror"
22+
"github.com/open-telemetry/opentelemetry-collector/exporter"
23+
jaegertranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/jaeger"
24+
kinesis "github.com/signalfx/opencensus-go-exporter-kinesis"
25+
"go.uber.org/zap"
26+
)
27+
28+
// Exporter implements an OpenTelemetry trace exporter that exports all spans to AWS Kinesis
29+
type Exporter struct {
30+
kinesis *kinesis.Exporter
31+
logger *zap.Logger
32+
}
33+
34+
var _ (exporter.TraceExporter) = (*Exporter)(nil)
35+
36+
// Start tells the exporter to start. The exporter may prepare for exporting
37+
// by connecting to the endpoint. Host parameter can be used for communicating
38+
// with the host after Start() has already returned. If error is returned by
39+
// Start() then the collector startup will be aborted.
40+
func (e Exporter) Start(host exporter.Host) error {
41+
return nil
42+
}
43+
44+
// Shutdown is invoked during exporter shutdown.
45+
func (e Exporter) Shutdown() error {
46+
e.kinesis.Flush()
47+
return nil
48+
}
49+
50+
// ConsumeTraceData receives a span batch and exports it to AWS Kinesis
51+
func (e Exporter) ConsumeTraceData(c context.Context, td consumerdata.TraceData) error {
52+
pBatch, err := jaegertranslator.OCProtoToJaegerProto(td)
53+
if err != nil {
54+
e.logger.Error("error translating span batch", zap.Error(err))
55+
return consumererror.Permanent(err)
56+
}
57+
// TODO: Use a multi error type
58+
var exportErr error
59+
for _, span := range pBatch.GetSpans() {
60+
if span.Process == nil {
61+
span.Process = pBatch.Process
62+
}
63+
err := e.kinesis.ExportSpan(span)
64+
if err != nil {
65+
e.logger.Error("error exporting span to kinesis", zap.Error(err))
66+
exportErr = err
67+
}
68+
}
69+
return exportErr
70+
}

exporter/kinesisexporter/factory.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2019 OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kinesisexporter
16+
17+
import (
18+
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
19+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
20+
"github.com/open-telemetry/opentelemetry-collector/exporter"
21+
kinesis "github.com/signalfx/opencensus-go-exporter-kinesis"
22+
"go.uber.org/zap"
23+
)
24+
25+
const (
26+
// The value of "type" key in configuration.
27+
typeStr = "kinesis"
28+
exportFormat = "jaeger-proto"
29+
)
30+
31+
// Factory is the factory for Kinesis exporter.
32+
type Factory struct {
33+
}
34+
35+
// Type gets the type of the Exporter config created by this factory.
36+
func (f *Factory) Type() string {
37+
return typeStr
38+
}
39+
40+
// CreateDefaultConfig creates the default configuration for exporter.
41+
func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
42+
return &Config{
43+
AWS: AWSConfig{
44+
Region: "us-west-2",
45+
},
46+
KPL: KPLConfig{
47+
BatchSize: 5242880,
48+
BatchCount: 1000,
49+
BacklogCount: 2000,
50+
FlushIntervalSeconds: 5,
51+
MaxConnections: 24,
52+
},
53+
54+
QueueSize: 100000,
55+
NumWorkers: 8,
56+
FlushIntervalSeconds: 5,
57+
MaxBytesPerBatch: 100000,
58+
MaxBytesPerSpan: 900000,
59+
}
60+
}
61+
62+
// CreateTraceExporter initializes and returns a new trace exporter
63+
func (f *Factory) CreateTraceExporter(logger *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) {
64+
c := cfg.(*Config)
65+
k, err := kinesis.NewExporter(kinesis.Options{
66+
Name: c.Name(),
67+
StreamName: c.AWS.StreamName,
68+
AWSRegion: c.AWS.Region,
69+
AWSRole: c.AWS.Role,
70+
AWSKinesisEndpoint: c.AWS.KinesisEndpoint,
71+
72+
KPLAggregateBatchSize: c.KPL.AggregateBatchSize,
73+
KPLAggregateBatchCount: c.KPL.AggregateBatchCount,
74+
KPLBatchSize: c.KPL.BatchSize,
75+
KPLBatchCount: c.KPL.BatchCount,
76+
KPLBacklogCount: c.KPL.BacklogCount,
77+
KPLFlushIntervalSeconds: c.KPL.FlushIntervalSeconds,
78+
KPLMaxConnections: c.KPL.MaxConnections,
79+
KPLMaxRetries: c.KPL.MaxRetries,
80+
KPLMaxBackoffSeconds: c.KPL.MaxBackoffSeconds,
81+
82+
QueueSize: c.QueueSize,
83+
NumWorkers: c.NumWorkers,
84+
MaxAllowedSizePerSpan: c.MaxBytesPerSpan,
85+
MaxListSize: c.MaxBytesPerBatch,
86+
ListFlushInterval: c.FlushIntervalSeconds,
87+
Encoding: exportFormat,
88+
}, logger)
89+
if err != nil {
90+
return nil, err
91+
}
92+
return Exporter{k, logger}, nil
93+
}
94+
95+
// CreateMetricsExporter creates a metrics exporter based on this config.
96+
func (f *Factory) CreateMetricsExporter(logger *zap.Logger, cfg configmodels.Exporter) (exporter.MetricsExporter, error) {
97+
return nil, configerror.ErrDataTypeIsNotSupported
98+
}

exporter/kinesisexporter/go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kinesisexporter
2+
3+
go 1.12
4+
5+
require (
6+
github.com/open-telemetry/opentelemetry-collector v0.2.1-0.20191119152140-567e1046cefa
7+
github.com/signalfx/opencensus-go-exporter-kinesis v0.4.0
8+
github.com/stretchr/testify v1.4.0
9+
go.uber.org/zap v1.10.0
10+
)

0 commit comments

Comments
 (0)