Skip to content

Commit c01f532

Browse files
authored
[signalfx_correlation] Add signalfx_correlation exporter skeleton (#1332)
* [signalfx_correlation] Add signalfx_correlation exporter skeleton This is for moving the correlation out of sapmexporter into a dedicated exporter so that the correlation can be used even when sapm isn't (for example, on an agent that is exporting in otlp to a gateway instead of sapm.) * fix readme
1 parent ab86ac9 commit c01f532

File tree

12 files changed

+2126
-0
lines changed

12 files changed

+2126
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# SignalFx Correlation Exporter
2+
3+
This exporter observes traces being sent for their service and environment. When a new service or environment is
4+
seen it associates the source (e.g. host or pod) to that source or environment in SignalFx metrics so that those
5+
metrics can be filtered by the service and environment.
6+
7+
Supported pipeline types: traces
8+
9+
## Configuration
10+
11+
The following configuration options are required:
12+
13+
- `endpoint` (required, no default): This is the base URL for API requests (e.g. https://api.signalfx.com).
14+
- `access_token` (no default): The authentication token provided by SignalFx.
15+
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
16+
- `stale_service_timeout` (default = 5 minutes): How long to wait after a span's service name is last seen before uncorrelating it.
17+
- `max_requests` (default = 20): Max HTTP requests to be made in parallel.
18+
- `max_buffered` (default = 10,000): Max number of correlation updates that can be buffered before updates are dropped.
19+
- `max_retries` (default = 2): Max number of retries that will be made for failed correlation updates.
20+
- `log_updates` (default = false): Whether or not to log correlation updates to dimensions (at `DEBUG` level).
21+
- `retry_delay` (default = 30 seconds): How long to wait between retries.
22+
- `cleanup_interval` (default = 1 minute): How frequently to purge duplicate requests.
23+
- `sync_attributes` (default = `{"k8s.pod.uid": "k8s.pod.uid", "container.id": "container.id"}`) Map containing key of the attribute to read from spans to sync to dimensions specified as the value.
24+
25+
Example:
26+
27+
```yaml
28+
exporters:
29+
signalfx_correlation:
30+
access_token: YOUR_ACCESS_TOKEN
31+
endpoint: https://api.YOUR_SIGNALFX_REALM.signalfx.com
32+
```
33+
34+
The full list of settings exposed for this exporter are documented [here](config.go)
35+
with detailed sample configurations [here](testdata/config.yaml).
36+
37+
This exporter also offers proxy support as documented
38+
[here](https://github.com/open-telemetry/opentelemetry-collector/tree/master/exporter#proxy-support).
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalfxcorrelationexporter
16+
17+
import (
18+
"errors"
19+
"net/url"
20+
"time"
21+
22+
"github.com/signalfx/signalfx-agent/pkg/apm/correlations"
23+
"go.opentelemetry.io/collector/config/confighttp"
24+
"go.opentelemetry.io/collector/config/configmodels"
25+
)
26+
27+
// Config defines configuration for signalfx_correlation exporter.
28+
type Config struct {
29+
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
30+
confighttp.HTTPClientSettings `mapstructure:",squash"`
31+
correlations.Config `mapstructure:",squash"`
32+
33+
// How long to wait after a trace span's service name is last seen before
34+
// uncorrelating that service.
35+
StaleServiceTimeout time.Duration `mapstructure:"stale_service_timeout"`
36+
// SyncAttributes is a key of the span attribute name to sync to the dimension as the value.
37+
SyncAttributes map[string]string `mapstructure:"sync_attributes"`
38+
39+
// AccessToken is the authentication token provided by SignalFx.
40+
AccessToken string `mapstructure:"access_token"`
41+
}
42+
43+
func (c *Config) validate() error {
44+
if c.Endpoint == "" {
45+
return errors.New("`endpoint` not specified")
46+
}
47+
48+
_, err := url.Parse(c.Endpoint)
49+
if err != nil {
50+
return err
51+
}
52+
53+
return nil
54+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalfxcorrelationexporter
16+
17+
import (
18+
"path"
19+
"testing"
20+
"time"
21+
22+
"github.com/signalfx/signalfx-agent/pkg/apm/correlations"
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
"go.opentelemetry.io/collector/component/componenttest"
26+
"go.opentelemetry.io/collector/config/confighttp"
27+
"go.opentelemetry.io/collector/config/configmodels"
28+
"go.opentelemetry.io/collector/config/configtest"
29+
)
30+
31+
func TestLoadConfig(t *testing.T) {
32+
facotries, err := componenttest.ExampleComponents()
33+
assert.Nil(t, err)
34+
35+
factory := NewFactory()
36+
facotries.Exporters[configmodels.Type(typeStr)] = factory
37+
cfg, err := configtest.LoadConfigFile(
38+
t, path.Join(".", "testdata", "config.yaml"), facotries,
39+
)
40+
41+
require.NoError(t, err)
42+
require.NotNil(t, cfg)
43+
44+
assert.Equal(t, len(cfg.Exporters), 2)
45+
46+
r0 := cfg.Exporters["signalfx_correlation"]
47+
assert.Equal(t, r0, factory.CreateDefaultConfig())
48+
49+
r1 := cfg.Exporters["signalfx_correlation/configured"].(*Config)
50+
assert.Equal(t, r1,
51+
&Config{
52+
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "signalfx_correlation/configured"},
53+
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "https://api.signalfx.com", Timeout: 10 * time.Second},
54+
AccessToken: "abcd1234",
55+
StaleServiceTimeout: 5 * time.Minute,
56+
SyncAttributes: map[string]string{
57+
"k8s.pod.uid": "k8s.pod.uid",
58+
"container.id": "container.id",
59+
},
60+
Config: correlations.Config{
61+
MaxRequests: 20,
62+
MaxBuffered: 10_000,
63+
MaxRetries: 2,
64+
LogUpdates: false,
65+
RetryDelay: 30 * time.Second,
66+
CleanupInterval: 1 * time.Minute,
67+
},
68+
})
69+
}
70+
71+
func TestInvalidConfig(t *testing.T) {
72+
invalid := Config{
73+
AccessToken: "abcd1234",
74+
}
75+
noEndpointErr := invalid.validate()
76+
require.Error(t, noEndpointErr)
77+
78+
invalid = Config{
79+
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ":123:456"},
80+
AccessToken: "abcd1234",
81+
}
82+
invalidURLErr := invalid.validate()
83+
require.Error(t, invalidURLErr)
84+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package signalfxcorrelationexporter performs span to metric correlation for SignalFx.
16+
package signalfxcorrelationexporter
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalfxcorrelationexporter
16+
17+
import (
18+
"context"
19+
20+
"go.opentelemetry.io/collector/component"
21+
"go.opentelemetry.io/collector/consumer/pdata"
22+
"go.opentelemetry.io/collector/exporter/exporterhelper"
23+
"go.uber.org/zap"
24+
)
25+
26+
// corrExporter is a wrapper struct of the correlation exporter
27+
type corrExporter struct {
28+
logger *zap.Logger
29+
config *Config
30+
}
31+
32+
func (se *corrExporter) Shutdown(context.Context) error {
33+
return nil
34+
}
35+
36+
func newCorrExporter(cfg *Config, params component.ExporterCreateParams) (corrExporter, error) {
37+
err := cfg.validate()
38+
if err != nil {
39+
return corrExporter{}, err
40+
}
41+
42+
return corrExporter{
43+
logger: params.Logger,
44+
config: cfg,
45+
}, err
46+
}
47+
48+
func newTraceExporter(cfg *Config, params component.ExporterCreateParams) (component.TraceExporter, error) {
49+
se, err := newCorrExporter(cfg, params)
50+
if err != nil {
51+
return nil, err
52+
}
53+
54+
return exporterhelper.NewTraceExporter(
55+
cfg,
56+
se.pushTraceData,
57+
exporterhelper.WithShutdown(se.Shutdown))
58+
}
59+
60+
// pushTraceData processes traces to extract services and environments to associate them to their emitting host/pods.
61+
func (se *corrExporter) pushTraceData(ctx context.Context, td pdata.Traces) (droppedSpansCount int, err error) {
62+
return 0, nil
63+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package signalfxcorrelationexporter
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
"go.opentelemetry.io/collector/component"
24+
"go.opentelemetry.io/collector/config/confighttp"
25+
"go.opentelemetry.io/collector/config/configmodels"
26+
"go.opentelemetry.io/collector/consumer/pdata"
27+
"go.uber.org/zap"
28+
)
29+
30+
func TestCreateTraceExporter(t *testing.T) {
31+
config := &Config{
32+
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost"},
33+
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "signalfx_correlation/configured"},
34+
AccessToken: "abcd1234",
35+
}
36+
params := component.ExporterCreateParams{Logger: zap.NewNop()}
37+
38+
te, err := newCorrExporter(config, params)
39+
assert.Nil(t, err)
40+
assert.NotNil(t, te, "failed to create trace exporter")
41+
42+
assert.NoError(t, te.Shutdown(context.Background()), "trace exporter shutdown failed")
43+
}
44+
45+
func TestCreateTraceExporterWithInvalidConfig(t *testing.T) {
46+
config := &Config{}
47+
params := component.ExporterCreateParams{Logger: zap.NewNop()}
48+
te, err := newTraceExporter(config, params)
49+
require.Error(t, err)
50+
assert.Nil(t, te)
51+
}
52+
53+
func TestExporterConsumeTraces(t *testing.T) {
54+
config := &Config{
55+
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: "http://localhost"},
56+
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "signalfx_correlation/configured"},
57+
AccessToken: "abcd1234",
58+
}
59+
params := component.ExporterCreateParams{Logger: zap.NewNop()}
60+
61+
te, err := newTraceExporter(config, params)
62+
assert.Nil(t, err)
63+
assert.NotNil(t, te, "failed to create trace exporter")
64+
defer te.Shutdown(context.Background())
65+
66+
assert.NoError(t, te.ConsumeTraces(context.Background(), pdata.NewTraces()))
67+
}

0 commit comments

Comments
 (0)