Skip to content

Commit 2477c8e

Browse files
jpkrohlingandrewhowdencomatoulme
authored
[exporter/loadbalancing] feat(lb): Introduce the ability to load balance on composite keys in lb (#36567)
Right now, there's a problem at high throughput using the load balancer and the `service.name` resource attribute: The load balancers themself get slow. While it's possible to vertically scale them to a point (e.g. about 100k req/sec), as they get slow they star tot back up traffic and block on requests. Applications then can't write as many spans out, and start dropping spans. This commit seeks to address that by extending the load balancing collector to allow create a composite from attributes that can still keep the load balancing decision "consistent enough" to reduce cardinallity, but still spread the load across ${N} collectors. It doesn't make too many assumptions about how the operators will use this, except that the underlying data (the spans) are unlikely to be complete in all cases, and the key generation is "best effort". This is a deviation from the existing design, in which hard-requires "span.name". == Design Notes === Contributor Skill As a contributor, I'm very much new to the opentelemetry collector, and do not anticipate I will be contributing much except for as needs require to tune the collectors that I am responsible for. Given this, the code may violate certain assumptions that are otherwise "well known". === Required Knowledge The biggest surprise in this code was that despite accepting a slice, the routingIdentifierFromTraces function assumes spans have been processed with the batchpersignal.SplitTraces() function, which appears to ensure taht each "trace" contains only a single span (thus allowing them to be multiplexed effectively) This allows the function to be simplified quite substantially. === Use case The primary use case I am thinking about when writing this work is calculating metrics in the spanmetricsconnector component. Essentially, services drive far too much traffic for a single collector instance to handle, so we need to multiplex it in a way that still allows them to be calculated in a single place (limiting cardinality) but also, spreads the load across ${N} collectors. === Traces only implementation This commit addreses this only for traces, as I only care about traces. The logic can likely be extended easily, however. Fixes #35320 Fixes #33660 --------- Signed-off-by: Juraci Paixão Kröhling <[email protected]> Co-authored-by: Andrew Howden <[email protected]> Co-authored-by: Antoine Toulme <[email protected]> Co-authored-by: Antoine Toulme <[email protected]>
1 parent b1d9558 commit 2477c8e

File tree

5 files changed

+273
-21
lines changed

5 files changed

+273
-21
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
change_type: enhancement
2+
component: exporter/loadbalancing
3+
note: Add support for route with composite keys
4+
issues: [35320]

exporter/loadbalancingexporter/README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,13 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
114114
* This resolver currently returns a maximum of 100 hosts.
115115
* `TODO`: Feature request [29771](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29771) aims to cover the pagination for this scenario
116116
* The `routing_key` property is used to specify how to route values (spans or metrics) to exporters based on different parameters. This functionality is currently enabled only for `trace` and `metric` pipeline types. It supports one of the following values:
117-
* `service`: Routes values based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
117+
* `service`: Routes values based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate. In addition to resource / span attributes, `span.kind`, `span.name` (the top level properties of a span) are also supported.
118+
* `attributes`: Routes based on values in the attributes of the traces. This is similar to service, but useful for situations in which a single service overwhelms any given instance of the collector, and should be split over multiple collectors.
118119
* `traceID`: Routes spans based on their `traceID`. Invalid for metrics.
119120
* `metric`: Routes metrics based on their metric name. Invalid for spans.
120121
* `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data
121122
* loadbalancing exporter supports set of standard [queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), but they are disable by default to maintain compatibility
123+
* The `routing_attributes` property is used to list the attributes that should be used if the `routing_key` is `attributes`.
122124

123125
Simple example
124126

exporter/loadbalancingexporter/config.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const (
2020
metricNameRouting
2121
resourceRouting
2222
streamIDRouting
23+
attrRouting
2324
)
2425

2526
const (
@@ -28,6 +29,7 @@ const (
2829
metricNameRoutingStr = "metric"
2930
resourceRoutingStr = "resource"
3031
streamIDRoutingStr = "streamID"
32+
attrRoutingStr = "attributes"
3133
)
3234

3335
// Config defines configuration for the exporter.
@@ -36,9 +38,17 @@ type Config struct {
3638
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
3739
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
3840

39-
Protocol Protocol `mapstructure:"protocol"`
40-
Resolver ResolverSettings `mapstructure:"resolver"`
41-
RoutingKey string `mapstructure:"routing_key"`
41+
Protocol Protocol `mapstructure:"protocol"`
42+
Resolver ResolverSettings `mapstructure:"resolver"`
43+
44+
// RoutingKey is a single routing key value
45+
RoutingKey string `mapstructure:"routing_key"`
46+
47+
// RoutingAttributes creates a composite routing key, based on several resource attributes of the application.
48+
//
49+
// Supports all attributes available (both resource and span), as well as the pseudo attributes "span.kind" and
50+
// "span.name".
51+
RoutingAttributes []string `mapstructure:"routing_attributes"`
4252
}
4353

4454
// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.

exporter/loadbalancingexporter/trace_exporter.go

+83-11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"strings"
1011
"sync"
1112
"time"
1213

@@ -23,13 +24,19 @@ import (
2324
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
2425
)
2526

27+
const (
28+
pseudoAttrSpanName = "span.name"
29+
pseudoAttrSpanKind = "span.kind"
30+
)
31+
2632
var _ exporter.Traces = (*traceExporterImp)(nil)
2733

2834
type exporterTraces map[*wrappedExporter]ptrace.Traces
2935

3036
type traceExporterImp struct {
3137
loadBalancer *loadBalancer
3238
routingKey routingKey
39+
routingAttrs []string
3340

3441
logger *zap.Logger
3542
stopped bool
@@ -67,6 +74,9 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx
6774
switch cfg.(*Config).RoutingKey {
6875
case svcRoutingStr:
6976
traceExporter.routingKey = svcRouting
77+
case attrRoutingStr:
78+
traceExporter.routingKey = attrRouting
79+
traceExporter.routingAttrs = cfg.(*Config).RoutingAttributes
7080
case traceIDRoutingStr, "":
7181
default:
7282
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
@@ -95,7 +105,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
95105
exporterSegregatedTraces := make(exporterTraces)
96106
endpoints := make(map[*wrappedExporter]string)
97107
for _, batch := range batches {
98-
routingID, err := routingIdentifiersFromTraces(batch, e.routingKey)
108+
routingID, err := routingIdentifiersFromTraces(batch, e.routingKey, e.routingAttrs)
99109
if err != nil {
100110
return err
101111
}
@@ -137,7 +147,15 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
137147
return errs
138148
}
139149

140-
func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) {
150+
// routingIdentifiersFromTraces reads the traces and determines an identifier that can be used to define a position on the
151+
// ring hash. It takes the routingKey, defining what type of routing should be used, and a series of attributes
152+
// (optionally) used if the routingKey is attrRouting.
153+
//
154+
// only svcRouting and attrRouting are supported. For attrRouting, any attribute, as well the "pseudo" attributes span.name
155+
// and span.kind are supported.
156+
//
157+
// In practice, makes the assumption that ptrace.Traces includes only one trace of each kind, in the "trace tree".
158+
func routingIdentifiersFromTraces(td ptrace.Traces, rType routingKey, attrs []string) (map[string]bool, error) {
141159
ids := make(map[string]bool)
142160
rs := td.ResourceSpans()
143161
if rs.Len() == 0 {
@@ -153,18 +171,72 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]
153171
if spans.Len() == 0 {
154172
return nil, errors.New("empty spans")
155173
}
174+
// Determine how the key should be populated.
175+
switch rType {
176+
case traceIDRouting:
177+
// The simple case is the TraceID routing. In this case, we just use the string representation of the Trace ID.
178+
tid := spans.At(0).TraceID()
179+
ids[string(tid[:])] = true
156180

157-
if key == svcRouting {
158-
for i := 0; i < rs.Len(); i++ {
159-
svc, ok := rs.At(i).Resource().Attributes().Get("service.name")
160-
if !ok {
161-
return nil, errors.New("unable to get service name")
181+
return ids, nil
182+
case svcRouting:
183+
// Service Name is still handled as an "attribute router", it's just expressed as a "pseudo attribute"
184+
attrs = []string{"service.name"}
185+
case attrRouting:
186+
// By default, we'll just use the input provided.
187+
break
188+
default:
189+
return nil, fmt.Errorf("unsupported routing_key: %d", rType)
190+
}
191+
192+
// Composite the attributes together as a key.
193+
for i := 0; i < rs.Len(); i++ {
194+
// rKey will never return an error. See
195+
// 1. https://pkg.go.dev/bytes#Buffer.Write
196+
// 2. https://stackoverflow.com/a/70388629
197+
var rKey strings.Builder
198+
199+
for _, a := range attrs {
200+
// resource spans
201+
rAttr, ok := rs.At(i).Resource().Attributes().Get(a)
202+
if ok {
203+
rKey.WriteString(rAttr.Str())
204+
continue
205+
}
206+
207+
// ils or "instrumentation library spans"
208+
ils := rs.At(0).ScopeSpans()
209+
iAttr, ok := ils.At(0).Scope().Attributes().Get(a)
210+
if ok {
211+
rKey.WriteString(iAttr.Str())
212+
continue
213+
}
214+
215+
// the lowest level span (or what engineers regularly interact with)
216+
spans := ils.At(0).Spans()
217+
218+
if a == pseudoAttrSpanKind {
219+
rKey.WriteString(spans.At(0).Kind().String())
220+
221+
continue
222+
}
223+
224+
if a == pseudoAttrSpanName {
225+
rKey.WriteString(spans.At(0).Name())
226+
227+
continue
228+
}
229+
230+
sAttr, ok := spans.At(0).Attributes().Get(a)
231+
if ok {
232+
rKey.WriteString(sAttr.Str())
233+
continue
162234
}
163-
ids[svc.Str()] = true
164235
}
165-
return ids, nil
236+
237+
// No matter what, there will be a key here (even if that key is "").
238+
ids[rKey.String()] = true
166239
}
167-
tid := spans.At(0).TraceID()
168-
ids[string(tid[:])] = true
240+
169241
return ids, nil
170242
}

0 commit comments

Comments
 (0)