Skip to content

Commit 4e97d74

Browse files
moh-osman3kristinapathakjmacd
authored andcommitted
(otelarrowexporter): Create exporter per unique value of configured metadataKeys (open-telemetry#34827)
**Description:** This PR forks open-telemetry#34235 and adds unit tests. **Link to tracking Issue:** open-telemetry#34178 --------- Co-authored-by: kristina.pathak <[email protected]> Co-authored-by: Joshua MacDonald <[email protected]>
1 parent 5b67c15 commit 4e97d74

File tree

9 files changed

+547
-43
lines changed

9 files changed

+547
-43
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: otelarrowexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow separate arrow exporter per unique value of configured metadataKeys.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34178]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/otelarrowexporter/README.md

+11
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ streams.
115115

116116
- `prioritizer` (default: "leastloaded"): policy for distributing load across multiple streams.
117117

118+
### Matching Metadata Per Stream
119+
120+
The following configuration values allow for separate streams per unique
121+
metadata combinations:
122+
- `metadata_keys` (default = empty): When set, this exporter will create one
123+
arrow exporter instance per distinct combination of values in the
124+
client.Metadata.
125+
- `metadata_cardinality_limit` (default = 1000): When metadata_keys is not empty,
126+
this setting limits the number of unique combinations of metadata key values
127+
that will be processed over the lifetime of the exporter.
128+
118129
### Network Configuration
119130

120131
This component uses `round_robin` by default as the gRPC load

exporter/otelarrowexporter/config.go

+36
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col
55

66
import (
77
"fmt"
8+
"strings"
89
"time"
910

1011
"github.com/open-telemetry/otel-arrow/pkg/config"
@@ -45,6 +46,23 @@ type Config struct {
4546
// exporter is built and configured via code instead of yaml.
4647
// Uses include custom dialer, custom user-agent, etc.
4748
UserDialOptions []grpc.DialOption `mapstructure:"-"`
49+
50+
// MetadataKeys is a list of client.Metadata keys that will be
51+
// used to form distinct exporters. If this setting is empty,
52+
// a single exporter instance will be used. When this setting
53+
// is not empty, one exporter will be used per distinct
54+
// combination of values for the listed metadata keys.
55+
//
56+
// Empty value and unset metadata are treated as distinct cases.
57+
//
58+
// Entries are case-insensitive. Duplicated entries will
59+
// trigger a validation error.
60+
MetadataKeys []string `mapstructure:"metadata_keys"`
61+
62+
// MetadataCardinalityLimit indicates the maximum number of
63+
// exporter instances that will be created through a distinct
64+
// combination of MetadataKeys.
65+
MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`
4866
}
4967

5068
// ArrowConfig includes whether Arrow is enabled and the number of
@@ -90,6 +108,24 @@ var _ component.Config = (*Config)(nil)
90108

91109
var _ component.ConfigValidator = (*ArrowConfig)(nil)
92110

111+
func (cfg *Config) Validate() error {
112+
err := cfg.Arrow.Validate()
113+
if err != nil {
114+
return err
115+
}
116+
117+
uniq := map[string]bool{}
118+
for _, k := range cfg.MetadataKeys {
119+
l := strings.ToLower(k)
120+
if _, has := uniq[l]; has {
121+
return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l)
122+
}
123+
uniq[l] = true
124+
}
125+
126+
return nil
127+
}
128+
93129
// Validate returns an error when the number of streams is less than 1.
94130
func (cfg *ArrowConfig) Validate() error {
95131
if cfg.NumStreams < 1 {

exporter/otelarrowexporter/factory.go

+20-19
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,16 @@ func createDefaultConfig() component.Config {
7272
}
7373
}
7474

75-
func (exp *baseExporter) helperOptions() []exporterhelper.Option {
75+
func helperOptions(e exp) []exporterhelper.Option {
76+
cfg := e.getConfig().(*Config)
7677
return []exporterhelper.Option{
7778
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
78-
exporterhelper.WithTimeout(exp.config.TimeoutSettings),
79-
exporterhelper.WithRetry(exp.config.RetryConfig),
80-
exporterhelper.WithQueue(exp.config.QueueSettings),
81-
exporterhelper.WithStart(exp.start),
82-
exporterhelper.WithBatcher(exp.config.BatcherConfig),
83-
exporterhelper.WithShutdown(exp.shutdown),
79+
exporterhelper.WithTimeout(cfg.TimeoutSettings),
80+
exporterhelper.WithRetry(cfg.RetryConfig),
81+
exporterhelper.WithQueue(cfg.QueueSettings),
82+
exporterhelper.WithStart(e.start),
83+
exporterhelper.WithBatcher(cfg.BatcherConfig),
84+
exporterhelper.WithShutdown(e.shutdown),
8485
}
8586
}
8687

@@ -103,13 +104,13 @@ func createTracesExporter(
103104
set exporter.Settings,
104105
cfg component.Config,
105106
) (exporter.Traces, error) {
106-
exp, err := newExporter(cfg, set, createArrowTracesStream)
107+
e, err := newMetadataExporter(cfg, set, createArrowTracesStream)
107108
if err != nil {
108109
return nil, err
109110
}
110-
return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config,
111-
exp.pushTraces,
112-
exp.helperOptions()...,
111+
return exporterhelper.NewTracesExporter(ctx, e.getSettings(), e.getConfig(),
112+
e.pushTraces,
113+
helperOptions(e)...,
113114
)
114115
}
115116

@@ -122,13 +123,13 @@ func createMetricsExporter(
122123
set exporter.Settings,
123124
cfg component.Config,
124125
) (exporter.Metrics, error) {
125-
exp, err := newExporter(cfg, set, createArrowMetricsStream)
126+
e, err := newMetadataExporter(cfg, set, createArrowMetricsStream)
126127
if err != nil {
127128
return nil, err
128129
}
129-
return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config,
130-
exp.pushMetrics,
131-
exp.helperOptions()...,
130+
return exporterhelper.NewMetricsExporter(ctx, e.getSettings(), e.getConfig(),
131+
e.pushMetrics,
132+
helperOptions(e)...,
132133
)
133134
}
134135

@@ -141,12 +142,12 @@ func createLogsExporter(
141142
set exporter.Settings,
142143
cfg component.Config,
143144
) (exporter.Logs, error) {
144-
exp, err := newExporter(cfg, set, createArrowLogsStream)
145+
e, err := newMetadataExporter(cfg, set, createArrowLogsStream)
145146
if err != nil {
146147
return nil, err
147148
}
148-
return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config,
149-
exp.pushLogs,
150-
exp.helperOptions()...,
149+
return exporterhelper.NewLogsExporter(ctx, e.getSettings(), e.getConfig(),
150+
e.pushLogs,
151+
helperOptions(e)...,
151152
)
152153
}

exporter/otelarrowexporter/factory_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ func TestCreateDefaultConfig(t *testing.T) {
3131
cfg := factory.CreateDefaultConfig()
3232
assert.NotNil(t, cfg, "failed to create default config")
3333
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
34-
ocfg, ok := factory.CreateDefaultConfig().(*Config)
35-
assert.True(t, ok)
34+
ocfg := factory.CreateDefaultConfig().(*Config)
3635
assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig())
3736
assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig())
3837
assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutConfig())
+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"runtime"
11+
"sort"
12+
"strings"
13+
"sync"
14+
15+
arrowPkg "github.com/apache/arrow/go/v16/arrow"
16+
"go.opentelemetry.io/collector/client"
17+
"go.opentelemetry.io/collector/component"
18+
"go.opentelemetry.io/collector/consumer/consumererror"
19+
"go.opentelemetry.io/collector/exporter"
20+
"go.opentelemetry.io/collector/pdata/plog"
21+
"go.opentelemetry.io/collector/pdata/pmetric"
22+
"go.opentelemetry.io/collector/pdata/ptrace"
23+
"go.opentelemetry.io/otel/attribute"
24+
"go.uber.org/multierr"
25+
"google.golang.org/grpc/metadata"
26+
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd"
28+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
29+
)
30+
31+
var (
32+
// errTooManyExporters is returned when the MetadataCardinalityLimit has been reached.
33+
errTooManyExporters = consumererror.NewPermanent(errors.New("too many exporter metadata-value combinations"))
34+
)
35+
36+
type metadataExporter struct {
37+
config *Config
38+
settings exporter.Settings
39+
scf streamClientFactory
40+
host component.Host
41+
42+
metadataKeys []string
43+
exporters sync.Map
44+
netReporter *netstats.NetworkReporter
45+
46+
userAgent string
47+
48+
// Guards the size and the storing logic to ensure no more than limit items are stored.
49+
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
50+
lock sync.Mutex
51+
size int
52+
}
53+
54+
var _ exp = (*metadataExporter)(nil)
55+
56+
func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) {
57+
oCfg := cfg.(*Config)
58+
netReporter, err := netstats.NewExporterNetworkReporter(set)
59+
if err != nil {
60+
return nil, err
61+
}
62+
userAgent := fmt.Sprintf("%s/%s (%s/%s)",
63+
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)
64+
65+
if !oCfg.Arrow.Disabled {
66+
// Ignoring an error because Validate() was called.
67+
_ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd)
68+
69+
userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams)
70+
}
71+
// use lower-case, to be consistent with http/2 headers.
72+
mks := make([]string, len(oCfg.MetadataKeys))
73+
for i, k := range oCfg.MetadataKeys {
74+
mks[i] = strings.ToLower(k)
75+
}
76+
sort.Strings(mks)
77+
if len(mks) == 0 {
78+
return newExporter(cfg, set, streamClientFactory, userAgent, netReporter)
79+
}
80+
return &metadataExporter{
81+
config: oCfg,
82+
settings: set,
83+
scf: streamClientFactory,
84+
metadataKeys: mks,
85+
userAgent: userAgent,
86+
netReporter: netReporter,
87+
}, nil
88+
}
89+
90+
func (e *metadataExporter) getSettings() exporter.Settings {
91+
return e.settings
92+
}
93+
94+
func (e *metadataExporter) getConfig() component.Config {
95+
return e.config
96+
}
97+
98+
func (e *metadataExporter) start(_ context.Context, host component.Host) (err error) {
99+
e.host = host
100+
return nil
101+
}
102+
103+
func (e *metadataExporter) shutdown(ctx context.Context) error {
104+
var err error
105+
e.exporters.Range(func(_ any, value any) bool {
106+
be := value.(exp)
107+
err = multierr.Append(err, be.shutdown(ctx))
108+
return true
109+
})
110+
return err
111+
}
112+
113+
func (e *metadataExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
114+
s, mdata := e.getAttrSet(ctx, e.metadataKeys)
115+
116+
be, err := e.getOrCreateExporter(ctx, s, mdata)
117+
if err != nil {
118+
return err
119+
}
120+
return be.pushTraces(ctx, td)
121+
}
122+
123+
func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
124+
s, mdata := e.getAttrSet(ctx, e.metadataKeys)
125+
126+
be, err := e.getOrCreateExporter(ctx, s, mdata)
127+
if err != nil {
128+
return err
129+
}
130+
131+
return be.pushMetrics(ctx, md)
132+
}
133+
134+
func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
135+
s, mdata := e.getAttrSet(ctx, e.metadataKeys)
136+
137+
be, err := e.getOrCreateExporter(ctx, s, mdata)
138+
if err != nil {
139+
return err
140+
}
141+
142+
return be.pushLogs(ctx, ld)
143+
}
144+
145+
func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) {
146+
e.lock.Lock()
147+
defer e.lock.Unlock()
148+
149+
if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) {
150+
return nil, errTooManyExporters
151+
}
152+
153+
v, ok := e.exporters.Load(s)
154+
if ok {
155+
return v.(exp), nil
156+
}
157+
158+
newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent, e.netReporter)
159+
if err != nil {
160+
return nil, fmt.Errorf("failed to create exporter: %w", err)
161+
}
162+
163+
var loaded bool
164+
v, loaded = e.exporters.LoadOrStore(s, newExp)
165+
if !loaded {
166+
// set metadata keys for base exporter to add them to the outgoing context.
167+
newExp.(*baseExporter).setMetadata(md)
168+
169+
// Start the goroutine only if we added the object to the map, otherwise is already started.
170+
err = newExp.start(ctx, e.host)
171+
if err != nil {
172+
e.exporters.Delete(s)
173+
return nil, fmt.Errorf("failed to start exporter: %w", err)
174+
}
175+
176+
e.size++
177+
}
178+
179+
return v.(exp), nil
180+
}
181+
182+
// getAttrSet is code taken from the core collector's batchprocessor multibatch logic.
183+
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.107.0/processor/batchprocessor/batch_processor.go#L298
184+
func (e *metadataExporter) getAttrSet(ctx context.Context, keys []string) (attribute.Set, metadata.MD) {
185+
// Get each metadata key value, form the corresponding
186+
// attribute set for use as a map lookup key.
187+
info := client.FromContext(ctx)
188+
md := map[string][]string{}
189+
var attrs []attribute.KeyValue
190+
for _, k := range keys {
191+
// Lookup the value in the incoming metadata, copy it
192+
// into the outgoing metadata, and create a unique
193+
// value for the attributeSet.
194+
vs := info.Metadata.Get(k)
195+
md[k] = vs
196+
if len(vs) == 1 {
197+
attrs = append(attrs, attribute.String(k, vs[0]))
198+
} else {
199+
attrs = append(attrs, attribute.StringSlice(k, vs))
200+
}
201+
}
202+
return attribute.NewSet(attrs...), metadata.MD(md)
203+
}

0 commit comments

Comments
 (0)