-
Notifications
You must be signed in to change notification settings - Fork 2.9k
[exporter/elasticsearch] Dynamically route documents by default #38500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
3f35e80
365d2d6
9287519
73f16b3
0e27f5d
c820266
956f573
f1c544e
3378e02
4265639
fd4b63a
963fb57
22b7952
bdb75d9
f5d1455
6870aee
68ac209
bbc0b4d
3f6d99e
99db8e6
61a3bfa
b9a944e
0187945
232a98a
6aaa5d0
c079201
8338d13
d78af32
5c9c85a
769edb2
52b89fa
bb44b7d
4837cd3
0d86022
fdc0087
ed8d857
49139fc
378f81f
b07eb95
91112ab
7f34bdd
87c9593
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,8 +55,7 @@ func newDocumentRouter(mode MappingMode, dynamicIndex bool, defaultIndex string, | |
var router documentRouter | ||
if dynamicIndex { | ||
router = dynamicDocumentRouter{ | ||
index: elasticsearch.Index{Index: defaultIndex}, | ||
mode: mode, | ||
mode: mode, | ||
} | ||
} else { | ||
router = staticDocumentRouter{ | ||
|
@@ -94,24 +93,23 @@ func (r staticDocumentRouter) route(_ pcommon.Resource, _ pcommon.Instrumentatio | |
} | ||
|
||
type dynamicDocumentRouter struct { | ||
index elasticsearch.Index | ||
mode MappingMode | ||
mode MappingMode | ||
} | ||
|
||
func (r dynamicDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { | ||
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeLogs) | ||
return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeLogs) | ||
} | ||
|
||
func (r dynamicDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { | ||
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeMetrics) | ||
return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeMetrics) | ||
} | ||
|
||
func (r dynamicDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { | ||
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeTraces) | ||
return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeTraces) | ||
} | ||
|
||
func (r dynamicDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { | ||
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeLogs) | ||
return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeLogs) | ||
} | ||
|
||
type logstashDocumentRouter struct { | ||
|
@@ -150,20 +148,25 @@ func routeRecord( | |
resource pcommon.Resource, | ||
scope pcommon.InstrumentationScope, | ||
recordAttr pcommon.Map, | ||
index string, | ||
mode MappingMode, | ||
defaultDSType string, | ||
) (elasticsearch.Index, error) { | ||
resourceAttr := resource.Attributes() | ||
scopeAttr := scope.Attributes() | ||
|
||
// Order: | ||
// 1. read data_stream.* from attributes | ||
// 2. read elasticsearch.index.* from attributes | ||
// 1. elasticsearch.index from attributes | ||
// 2. read data_stream.* from attributes | ||
// 3. receiver-based routing | ||
// 4. use default hardcoded data_stream.* | ||
dataset, datasetExists := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr) | ||
namespace, namespaceExists := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) | ||
if esIndex, esIndexExists := getFromAttributes(elasticsearch.IndexAttributeName, "", recordAttr, scopeAttr, resourceAttr); esIndexExists { | ||
// Advanced users can route documents by setting IndexAttributeName in a processor earlier in the pipeline. | ||
// If `data_stream.*` needs to be set in the document, users should use `data_stream.*` attributes. | ||
return elasticsearch.Index{Index: esIndex}, nil | ||
} | ||
|
||
dataset, _ := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr) | ||
namespace, _ := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) | ||
|
||
dsType := defaultDSType | ||
// if mapping mode is bodymap, allow overriding data_stream.type | ||
|
@@ -174,15 +177,6 @@ func routeRecord( | |
} | ||
} | ||
|
||
dataStreamMode := datasetExists || namespaceExists | ||
if !dataStreamMode { | ||
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) | ||
suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) | ||
if prefixExists || suffixExists { | ||
return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, index, suffix)}, nil | ||
} | ||
} | ||
|
||
// Receiver-based routing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that receiver-based routing has a higher precedence compared to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch. Fixed the bug and added a test: 5c9c85a |
||
// For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode) | ||
// for the scope name | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use
logs_index
for span events? Or maybe add another option for span events?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't intuitive to have logs_index as the configuration of destination of span events. What about a new config
traces_span_events_index
(that is only applicable to otel mapping mode as it sends span events as separate documents)? It should default to empty, such that dynamic routing kicks in.I have also considered
logs_index
for span events. See reasoning below.traces_span_events_index
aslogs_index
, but it seems to a big trap. Let's imagine if a user wants logs to be statically routed to anlogs_index
and traces dynamically routed. If the user sets logs_index, span events suddenly are not dynamically routed. This is not great.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we model span events as logs in OTel mode, I think it's only logical to use
logs_index
. Ack that this may not be intuitive but documenting this seems like the solution. I think we should start simple by just usinglogs_index
and only adding something liketraces_span_events_index
if necessary.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm. It is explicitly stated that it is not recommended to set
logs_index
,metrics_index
andtraces_index
anyway.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 61a3bfa