diff --git a/.chloggen/elasticsearchexporter_dynamic-routing-default.yaml b/.chloggen/elasticsearchexporter_dynamic-routing-default.yaml new file mode 100644 index 0000000000000..16125459ecdff --- /dev/null +++ b/.chloggen/elasticsearchexporter_dynamic-routing-default.yaml @@ -0,0 +1,32 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Dynamically route documents by default unless `{logs,metrics,traces}_index` is non-empty + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38361] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Overhaul in document routing. + Deprecate and make `{logs,metrics,traces}_dynamic_index` config no-op. + Config validation error on `{logs,metrics,traces}_dynamic_index::enabled` and `{logs,metrics,traces}_index` set at the same time, as users who rely on dynamic index should not set `{logs,metrics,traces}_index`. + Remove `elasticsearch.index.{prefix,suffix}` handling. Replace it with `elasticsearch.index` handling that uses attribute value as index directly. Users rely on the previously supported `elasticsearch.index.prefix` and `elasticsearch.index.suffix` should migrate to a transform processor that sets `elasticsearch.index`. + Fix a bug where receiver-based routing overwrites data_stream.dataset. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5e39a08ffea60..0793e8a9af3ce 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -118,33 +118,39 @@ Using the common `batcher` functionality provides several benefits over the defa ### Elasticsearch document routing -Telemetry data will be written to signal specific data streams by default: -logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`. +Documents are statically or dynamically routed to the target index / data stream in the following order. The first routing mode that applies will be used. +1. "Static mode": Route to `logs_index` for log records, `metrics_index` for data points and `traces_index` for spans, if these configs are not empty respectively. [^3] +2. "Dynamic - Index attribute mode": Route to index name specified in `elasticsearch.index` attribute (precedence: log record / data point / span attribute > scope attribute > resource attribute) if the attribute exists. [^3] +3. "Dynamic - Data stream routing mode": Route to data stream constructed from `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`, +where `data_stream.type` is `logs` for log records, `metrics` for data points, and `traces` for spans, and is static. [^3] +In a special case with `mapping::mode: bodymap`, `data_stream.type` field (valid values: `logs`, `metrics`) can be dynamically set from attributes. +The resulting documents will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html). + 1. `data_stream.dataset` or `data_stream.namespace` in attributes (precedence: log record / data point / span attribute > scope attribute > resource attribute) + 2. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1 + 3. Otherwise, `data_stream.dataset` falls back to `generic` and `data_stream.namespace` falls back to `default`. + +[^3]: See additional handling in [Document routing exceptions for OTel data mode](#document-routing-exceptions-for-otel-data-mode) + This can be customised through the following settings: -- `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default` +- `logs_index` (optional): The [index] or [data stream] name to publish logs (and span events in OTel mapping mode) to. `logs_index` should be empty unless all logs should be sent to the same index. - `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name. - - `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. In a special case with `mapping::mode: bodymap`, `data_stream.type` field (valid values: `logs`, `metrics`) is also supported to dynamically construct index in the form `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html). + - `enabled`(DEPRECATED): No-op. Documents are now always routed dynamically unless `logs_index` is not empty. Will be removed in a future version. -- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. - ⚠️ Note that metrics support is currently in development. +- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. `metrics_index` should be empty unless all metrics should be sent to the same index. Note that metrics support is currently in development. - `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. - ⚠️ Note that metrics support is currently in development. - - `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html). + - `enabled`(DEPRECATED): No-op. Documents are now always routed dynamically unless `metrics_index` is not empty. Will be removed in a future version. -- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. +- `traces_index` (optional): The [index] or [data stream] name to publish traces to. `traces_index` should be empty unless all traces should be sent to the same index. - `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. - - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html). There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`. + - `enabled`(DEPRECATED): No-op. Documents are now always routed dynamically unless `traces_index` is not empty. Will be removed in a future version. - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, - e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`. + - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format::enabled` is `true`, the index name is composed using the above dynamic routing rules as prefix and the date as suffix, + e.g: If the computed index name is `logs-generic-default`, the resulting index will be `logs-generic-default-YYYY.MM.DD`. The last string appended belongs to the date when the data is being generated. - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. @@ -154,6 +160,19 @@ This can be customised through the following settings: +#### Document routing exceptions for OTel data mode + +In OTel mapping mode (`mapping::mode: otel`), there is special handling in addition to the above document routing rules in [Elasticsearch document routing](#elasticsearch-document-routing). +The order to determine the routing mode is the same as [Elasticsearch document routing](#elasticsearch-document-routing). + +1. "Static mode": Span events are separate documents routed to `logs_index` if non-empty. +2. "Dynamic - Index attribute mode": Span events are separate documents routed using attribute `elasticsearch.index` (precedence: span event attribute > scope attribute > resource attribute) if the attribute exists. +3. "Dynamic - Data stream routing mode": + - For all documents, `data_stream.dataset` will always be appended with `.otel`. + - A special case to (3)(1) in [Elasticsearch document routing](#elasticsearch-document-routing), span events are separate documents that have `data_stream.type: logs` and are routed using data stream attributes (precedence: span event attribute > scope attribute > resource attribute) + + + ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing @@ -198,7 +217,9 @@ scheme that maps these as `constant_keyword` fields. `data_stream.dataset` will always be appended with `.otel` if [dynamic data stream routing mode](#elasticsearch-document-routing) is active. Span events are stored in separate documents. They will be routed with `data_stream.type` set to -`logs` if `traces_dynamic_index::enabled` is `true`. +`logs` if [dynamic data stream routing mode](#elasticsearch-document-routing) is active. + +Attribute `elasticsearch.index` will be removed from the final document if exists. | Signal | Supported | | --------- | ------------------ | diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index c83b21eb8de27..87f72c45b7030 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -7,8 +7,6 @@ import "go.opentelemetry.io/collector/pdata/pcommon" // dynamic index attribute key constants const ( - indexPrefix = "elasticsearch.index.prefix" - indexSuffix = "elasticsearch.index.suffix" defaultDataStreamDataset = "generic" defaultDataStreamNamespace = "default" defaultDataStreamTypeLogs = "logs" diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 05e76df9f833f..2d9f84c4d0f2a 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -40,19 +40,19 @@ type Config struct { // NumWorkers configures the number of workers publishing bulk requests. NumWorkers int `mapstructure:"num_workers"` - // This setting is required when logging pipelines used. - LogsIndex string `mapstructure:"logs_index"` - // fall back to pure LogsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) + // LogsIndex configures the static index used for document routing for logs. + // It should be empty if dynamic document routing is preferred. + LogsIndex string `mapstructure:"logs_index"` LogsDynamicIndex DynamicIndexSetting `mapstructure:"logs_dynamic_index"` - // This setting is required when the exporter is used in a metrics pipeline. - MetricsIndex string `mapstructure:"metrics_index"` - // fall back to pure MetricsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource attributes + // MetricsIndex configures the static index used for document routing for metrics. + // It should be empty if dynamic document routing is preferred. + MetricsIndex string `mapstructure:"metrics_index"` MetricsDynamicIndex DynamicIndexSetting `mapstructure:"metrics_dynamic_index"` - // This setting is required when traces pipelines used. - TracesIndex string `mapstructure:"traces_index"` - // fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) + // TracesIndex configures the static index used for document routing for metrics. + // It should be empty if dynamic document routing is preferred. + TracesIndex string `mapstructure:"traces_index"` TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"` // LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES. @@ -121,6 +121,9 @@ type LogstashFormatSettings struct { } type DynamicIndexSetting struct { + // Enabled enables dynamic index routing. + // + // Deprecated: [v0.122.0] This config is now ignored. Dynamic index routing is always done by default. Enabled bool `mapstructure:"enabled"` } @@ -288,6 +291,16 @@ func (cfg *Config) Validate() error { return errors.New("retry::max_retries should be non-negative") } + if cfg.LogsIndex != "" && cfg.LogsDynamicIndex.Enabled { + return errors.New("must not specify both logs_index and logs_dynamic_index; logs_index should be empty unless all documents should be sent to the same index") + } + if cfg.MetricsIndex != "" && cfg.MetricsDynamicIndex.Enabled { + return errors.New("must not specify both metrics_index and metrics_dynamic_index; metrics_index should be empty unless all documents should be sent to the same index") + } + if cfg.TracesIndex != "" && cfg.TracesDynamicIndex.Enabled { + return errors.New("must not specify both traces_index and traces_dynamic_index; traces_index should be empty unless all documents should be sent to the same index") + } + return nil } @@ -397,4 +410,13 @@ func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { // Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.") } + if cfg.LogsDynamicIndex.Enabled { + logger.Warn("logs_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") + } + if cfg.MetricsDynamicIndex.Enabled { + logger.Warn("metrics_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") + } + if cfg.TracesDynamicIndex.Enabled { + logger.Warn("traces_dynamic_index::enabled has been deprecated, and will be removed in a future version. It is now a no-op. Dynamic document routing is now the default. See Elasticsearch Exporter README.") + } } diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 468ff4a928bd9..dfba0b34d7406 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -63,13 +63,11 @@ func TestConfig(t *testing.T) { QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, }, Endpoints: []string{"https://elastic.example.com:9200"}, - LogsIndex: "logs-generic-default", LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, }, - MetricsIndex: "metrics-generic-default", MetricsDynamicIndex: DynamicIndexSetting{ - Enabled: true, + Enabled: false, }, TracesIndex: "trace_index", TracesDynamicIndex: DynamicIndexSetting{ @@ -145,11 +143,9 @@ func TestConfig(t *testing.T) { LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, }, - MetricsIndex: "metrics-generic-default", MetricsDynamicIndex: DynamicIndexSetting{ - Enabled: true, + Enabled: false, }, - TracesIndex: "traces-generic-default", TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, @@ -219,15 +215,13 @@ func TestConfig(t *testing.T) { QueueSize: exporterhelper.NewDefaultQueueConfig().QueueSize, }, Endpoints: []string{"http://localhost:9200"}, - LogsIndex: "logs-generic-default", LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, }, MetricsIndex: "my_metric_index", MetricsDynamicIndex: DynamicIndexSetting{ - Enabled: true, + Enabled: false, }, - TracesIndex: "traces-generic-default", TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index 999c9fdfdd0cf..5a6f19a808805 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -51,16 +51,16 @@ type documentRouter interface { routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) } -func newDocumentRouter(mode MappingMode, dynamicIndex bool, defaultIndex string, cfg *Config) documentRouter { +// newDocumentRouter returns a router that routes document based on configured mode, static index config, and config. +func newDocumentRouter(mode MappingMode, staticIndex string, cfg *Config) documentRouter { var router documentRouter - if dynamicIndex { + if staticIndex == "" { router = dynamicDocumentRouter{ - index: elasticsearch.Index{Index: defaultIndex}, - mode: mode, + mode: mode, } } else { router = staticDocumentRouter{ - index: elasticsearch.Index{Index: defaultIndex}, + index: elasticsearch.Index{Index: staticIndex}, } } if cfg.LogstashFormat.Enabled { @@ -94,24 +94,23 @@ func (r staticDocumentRouter) route(_ pcommon.Resource, _ pcommon.Instrumentatio } type dynamicDocumentRouter struct { - index elasticsearch.Index - mode MappingMode + mode MappingMode } func (r dynamicDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { - return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeLogs) + return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeLogs) } func (r dynamicDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { - return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeMetrics) + return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeMetrics) } func (r dynamicDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { - return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeTraces) + return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeTraces) } func (r dynamicDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { - return routeRecord(resource, scope, recordAttrs, r.index.Index, r.mode, defaultDataStreamTypeLogs) + return routeRecord(resource, scope, recordAttrs, r.mode, defaultDataStreamTypeLogs) } type logstashDocumentRouter struct { @@ -150,7 +149,6 @@ func routeRecord( resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttr pcommon.Map, - index string, mode MappingMode, defaultDSType string, ) (elasticsearch.Index, error) { @@ -158,12 +156,18 @@ func routeRecord( scopeAttr := scope.Attributes() // Order: - // 1. read data_stream.* from attributes - // 2. read elasticsearch.index.* from attributes + // 1. elasticsearch.index from attributes + // 2. read data_stream.* from attributes // 3. receiver-based routing // 4. use default hardcoded data_stream.* + if esIndex, esIndexExists := getFromAttributes(elasticsearch.IndexAttributeName, "", recordAttr, scopeAttr, resourceAttr); esIndexExists { + // Advanced users can route documents by setting IndexAttributeName in a processor earlier in the pipeline. + // If `data_stream.*` needs to be set in the document, users should use `data_stream.*` attributes. + return elasticsearch.Index{Index: esIndex}, nil + } + dataset, datasetExists := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr) - namespace, namespaceExists := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) + namespace, _ := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) dsType := defaultDSType // if mapping mode is bodymap, allow overriding data_stream.type @@ -174,24 +178,18 @@ func routeRecord( } } - dataStreamMode := datasetExists || namespaceExists - if !dataStreamMode { - prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) - suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) - if prefixExists || suffixExists { - return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, index, suffix)}, nil + // Only use receiver-based routing if dataset is not specified. + if !datasetExists { + // Receiver-based routing + // For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode) + // for the scope name + // github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper + if submatch := receiverRegex.FindStringSubmatch(scope.Name()); len(submatch) > 0 { + receiverName := submatch[1] + dataset = receiverName } } - // Receiver-based routing - // For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode) - // for the scope name - // github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper - if submatch := receiverRegex.FindStringSubmatch(scope.Name()); len(submatch) > 0 { - receiverName := submatch[1] - dataset = receiverName - } - // For dataset, the naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]". // This is in order to match the built-in logs-*.otel-* index template. var datasetSuffix string diff --git a/exporter/elasticsearchexporter/data_stream_router_test.go b/exporter/elasticsearchexporter/data_stream_router_test.go index 3bf74abb25b34..90e2786fdcb9e 100644 --- a/exporter/elasticsearchexporter/data_stream_router_test.go +++ b/exporter/elasticsearchexporter/data_stream_router_test.go @@ -14,54 +14,76 @@ import ( ) type routeTestCase struct { - name string - mode MappingMode - scopeName string - want elasticsearch.Index + name string + mode MappingMode + scopeName string + recordAttrs map[string]any + want elasticsearch.Index } func createRouteTests(dsType string) []routeTestCase { - renderWantRoute := func(dsType, dsDataset string, mode MappingMode) elasticsearch.Index { + renderWantRoute := func(dsType, dsDataset, dsNamespace string, mode MappingMode) elasticsearch.Index { if mode == MappingOTel { dsDataset += ".otel" } - return elasticsearch.NewDataStreamIndex(dsType, dsDataset, defaultDataStreamNamespace) + return elasticsearch.NewDataStreamIndex(dsType, dsDataset, dsNamespace) } return []routeTestCase{ { name: "default", mode: MappingNone, - want: renderWantRoute(dsType, defaultDataStreamDataset, MappingNone), + want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingNone), }, { name: "otel", mode: MappingOTel, - want: renderWantRoute(dsType, defaultDataStreamDataset, MappingOTel), + want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingOTel), }, { name: "default with receiver scope name", mode: MappingNone, scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper", - want: renderWantRoute(dsType, "hostmetricsreceiver", MappingNone), + want: renderWantRoute(dsType, "hostmetricsreceiver", defaultDataStreamNamespace, MappingNone), }, { name: "otel with receiver scope name", mode: MappingOTel, scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper", - want: renderWantRoute(dsType, "hostmetricsreceiver", MappingOTel), + want: renderWantRoute(dsType, "hostmetricsreceiver", defaultDataStreamNamespace, MappingOTel), }, { name: "default with non-receiver scope name", mode: MappingNone, scopeName: "some_other_scope_name", - want: renderWantRoute(dsType, defaultDataStreamDataset, MappingNone), + want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingNone), }, { name: "otel with non-receiver scope name", mode: MappingOTel, scopeName: "some_other_scope_name", - want: renderWantRoute(dsType, defaultDataStreamDataset, MappingOTel), + want: renderWantRoute(dsType, defaultDataStreamDataset, defaultDataStreamNamespace, MappingOTel), + }, + { + name: "otel with elasticsearch.index", + mode: MappingOTel, + scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/should/be/ignored", + recordAttrs: map[string]any{ + "elasticsearch.index": "my-index", + }, + want: elasticsearch.Index{ + Index: "my-index", + }, + }, + { + name: "otel with data_stream attrs", + mode: MappingOTel, + scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/should/be/ignored", + recordAttrs: map[string]any{ + "data_stream.dataset": "foo", + "data_stream.namespace": "bar", + }, + want: renderWantRoute(dsType, "foo", "bar", MappingOTel), }, } } @@ -75,7 +97,10 @@ func TestRouteLogRecord(t *testing.T) { scope := pcommon.NewInstrumentationScope() scope.SetName(tc.scopeName) - ds, err := router.routeLogRecord(pcommon.NewResource(), scope, pcommon.NewMap()) + recordAttrMap := pcommon.NewMap() + fillAttributeMap(recordAttrMap, tc.recordAttrs) + + ds, err := router.routeLogRecord(pcommon.NewResource(), scope, recordAttrMap) require.NoError(t, err) assert.Equal(t, tc.want, ds) }) @@ -119,7 +144,10 @@ func TestRouteDataPoint(t *testing.T) { scope := pcommon.NewInstrumentationScope() scope.SetName(tc.scopeName) - ds, err := router.routeDataPoint(pcommon.NewResource(), scope, pcommon.NewMap()) + recordAttrMap := pcommon.NewMap() + fillAttributeMap(recordAttrMap, tc.recordAttrs) + + ds, err := router.routeDataPoint(pcommon.NewResource(), scope, recordAttrMap) require.NoError(t, err) assert.Equal(t, tc.want, ds) }) @@ -135,7 +163,10 @@ func TestRouteSpan(t *testing.T) { scope := pcommon.NewInstrumentationScope() scope.SetName(tc.scopeName) - ds, err := router.routeSpan(pcommon.NewResource(), scope, pcommon.NewMap()) + recordAttrMap := pcommon.NewMap() + fillAttributeMap(recordAttrMap, tc.recordAttrs) + + ds, err := router.routeSpan(pcommon.NewResource(), scope, recordAttrMap) require.NoError(t, err) assert.Equal(t, tc.want, ds) }) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 3fb6dd849685b..a80eb0dd79862 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -30,7 +30,6 @@ type elasticsearchExporter struct { set exporter.Settings config *Config index string - dynamicIndex bool logstashFormat LogstashFormatSettings defaultMappingMode MappingMode allowedMappingModes map[string]MappingMode @@ -38,19 +37,13 @@ type elasticsearchExporter struct { bufferPool *pool.BufferPool } -func newExporter( - cfg *Config, - set exporter.Settings, - index string, - dynamicIndex bool, -) *elasticsearchExporter { +func newExporter(cfg *Config, set exporter.Settings, index string) *elasticsearchExporter { allowedMappingModes := cfg.allowedMappingModes() defaultMappingMode := allowedMappingModes[canonicalMappingModeName(cfg.Mapping.Mode)] return &elasticsearchExporter{ set: set, config: cfg, index: index, - dynamicIndex: dynamicIndex, logstashFormat: cfg.LogstashFormat, allowedMappingModes: allowedMappingModes, defaultMappingMode: defaultMappingMode, @@ -77,7 +70,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) if err != nil { return err } - router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config) + router := newDocumentRouter(mappingMode, e.index, e.config) encoder, err := newEncoder(mappingMode) if err != nil { return err @@ -177,7 +170,7 @@ func (e *elasticsearchExporter) pushMetricsData( if err != nil { return err } - router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config) + router := newDocumentRouter(mappingMode, e.index, e.config) hasher := newDataPointHasher(mappingMode) encoder, err := newEncoder(mappingMode) if err != nil { @@ -340,7 +333,8 @@ func (e *elasticsearchExporter) pushTraceData( if err != nil { return err } - router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config) + router := newDocumentRouter(mappingMode, e.index, e.config) + spanEventRouter := newDocumentRouter(mappingMode, e.config.LogsIndex, e.config) encoder, err := newEncoder(mappingMode) if err != nil { return err @@ -379,7 +373,7 @@ func (e *elasticsearchExporter) pushTraceData( } for ii := 0; ii < span.Events().Len(); ii++ { spanEvent := span.Events().At(ii) - if err := e.pushSpanEvent(ctx, router, encoder, ec, span, spanEvent, session); err != nil { + if err := e.pushSpanEvent(ctx, spanEventRouter, encoder, ec, span, spanEvent, session); err != nil { errs = append(errs, err) } } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index c09fd54dabf26..7a11c179d1af0 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -62,9 +62,9 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - expected := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"application":"myapp","attrKey1":"abc","attrKey2":"def","error":{"stacktrace":"no no no no"},"message":"hello world","service":{"name":"myservice"}}` + expected := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"application":"myapp","attrKey1":"abc","attrKey2":"def","data_stream":{"dataset":"generic","namespace":"default","type":"logs"},"error":{"stacktrace":"no no no no"},"message":"hello world","service":{"name":"myservice"}}` actual := string(docs[0].Document) - assert.Equal(t, expected, actual) + assert.JSONEq(t, expected, actual) return itemsAllOK(docs) }) @@ -229,6 +229,7 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "ecs" + cfg.LogsIndex = "index" // deduplication is always performed except in otel mapping mode - // there is no other configuration that controls it }) @@ -251,6 +252,7 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "raw" + cfg.LogsIndex = "index" // deduplication is always performed - there is no configuration that controls it }) logs := newLogsWithAttributes( @@ -308,41 +310,36 @@ func TestExporterLogs(t *testing.T) { <-done }) - t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { + t.Run("publish with elasticsearch.index", func(t *testing.T) { rec := newBulkRecorder() - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) + index := "someindex" server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) - return itemsAllOK(docs) }) exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsIndex = index - cfg.LogsDynamicIndex.Enabled = true + cfg.Mapping.Mode = "otel" }) logs := newLogsWithAttributes( map[string]any{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, + "elasticsearch.index": index, }, - nil, map[string]any{ - indexPrefix: prefix, + "elasticsearch.index": "ignored", + }, + map[string]any{ + "elasticsearch.index": "ignored", }, ) logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") mustSendLogs(t, exporter, logs) - rec.WaitItems(1) + docs := rec.WaitItems(1) + doc := docs[0] + assert.Equal(t, index, actionJSONToIndex(t, doc.Action)) + assert.JSONEq(t, `{}`, gjson.GetBytes(doc.Document, `attributes`).Raw) }) t.Run("publish with dynamic index, data_stream", func(t *testing.T) { @@ -358,7 +355,6 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "none" - cfg.LogsDynamicIndex.Enabled = true }) logs := newLogsWithAttributes( map[string]any{ @@ -376,55 +372,26 @@ func TestExporterLogs(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - assert.Contains(t, actionJSONToIndex(t, docs[0].Action), "not-used-index") - - return itemsAllOK(docs) - }) - - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogstashFormat.Enabled = true - cfg.LogsIndex = "not-used-index" - }) - mustSendLogs(t, exporter, newLogsWithAttributes(nil, nil, nil)) - - rec.WaitItems(1) - }) - - t.Run("publish with logstash index format enabled and dynamic index enabled", func(t *testing.T) { - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) + t.Run("publish with logstash index format enabled", func(t *testing.T) { + index := "someindex" rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Contains(t, actionJSONToIndex(t, docs[0].Action), expected) + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), index) return itemsAllOK(docs) }) exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsIndex = index - cfg.LogsDynamicIndex.Enabled = true cfg.LogstashFormat.Enabled = true }) mustSendLogs(t, exporter, newLogsWithAttributes( map[string]any{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, + "elasticsearch.index": index, }, nil, - map[string]any{ - indexPrefix: prefix, - }, + nil, )) rec.WaitItems(1) }) @@ -495,7 +462,6 @@ func TestExporterLogs(t *testing.T) { }) exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsDynamicIndex.Enabled = true cfg.Mapping.Mode = "otel" }) recordAttrs := map[string]any{ @@ -951,36 +917,34 @@ func TestExporterMetrics(t *testing.T) { rec.WaitItems(2) }) - t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { + t.Run("publish with elasticsearch.index", func(t *testing.T) { + index := "someindex" rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - - expected := "resource.prefix-metrics.index-resource.suffix" - assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) - return itemsAllOK(docs) }) exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { - cfg.MetricsIndex = "metrics.index" - cfg.Mapping.Mode = "ecs" + cfg.Mapping.Mode = "otel" }) metrics := newMetricsWithAttributes( map[string]any{ - indexSuffix: "-data.point.suffix", + "elasticsearch.index": index, }, - nil, map[string]any{ - indexPrefix: "resource.prefix-", - indexSuffix: "-resource.suffix", + "elasticsearch.index": "ignored", + }, + map[string]any{ + "elasticsearch.index": "ignored", }, ) - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) mustSendMetrics(t, exporter, metrics) - rec.WaitItems(1) + docs := rec.WaitItems(1) + doc := docs[0] + assert.Equal(t, index, actionJSONToIndex(t, doc.Action)) + assert.JSONEq(t, `{}`, gjson.GetBytes(doc.Document, `attributes`).Raw) }) t.Run("publish with dynamic index, data_stream", func(t *testing.T) { @@ -995,7 +959,6 @@ func TestExporterMetrics(t *testing.T) { }) exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { - cfg.MetricsIndex = "metrics.index" cfg.Mapping.Mode = "ecs" }) metrics := newMetricsWithAttributes( @@ -1614,7 +1577,6 @@ func TestExporterMetrics_Grouping(t *testing.T) { }) exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { - cfg.MetricsIndex = "metrics.index" cfg.Mapping.Mode = "ecs" }) @@ -1747,49 +1709,42 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(2) }) - t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { + t.Run("publish with elasticsearch.index", func(t *testing.T) { rec := newBulkRecorder() - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) + index := "someindex" + eventIndex := "some-event-index" server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Equal(t, expected, create["_index"].(string)) - return itemsAllOK(docs) }) exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { - cfg.TracesIndex = index - cfg.TracesDynamicIndex.Enabled = true + cfg.Mapping.Mode = "otel" }) - mustSendTraces(t, exporter, newTracesWithAttributes( + traces := newTracesWithAttributes( map[string]any{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, + "elasticsearch.index": index, }, - nil, map[string]any{ - indexPrefix: prefix, + "elasticsearch.index": "ignored", }, - )) + map[string]any{ + "elasticsearch.index": "ignored", + }, + ) + event := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().AppendEmpty() + event.Attributes().PutStr("elasticsearch.index", eventIndex) + mustSendTraces(t, exporter, traces) - rec.WaitItems(1) + docs := rec.WaitItems(2) + doc := docs[0] + assert.Equal(t, index, actionJSONToIndex(t, doc.Action)) + assert.JSONEq(t, `{}`, gjson.GetBytes(doc.Document, `attributes`).Raw) + eventDoc := docs[1] + assert.Equal(t, eventIndex, actionJSONToIndex(t, eventDoc.Action)) + assert.JSONEq(t, `{}`, gjson.GetBytes(eventDoc.Document, `attributes`).Raw) }) t.Run("publish with dynamic index, data_stream", func(t *testing.T) { @@ -1806,7 +1761,6 @@ func TestExporterTraces(t *testing.T) { exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { cfg.Mapping.Mode = "none" - cfg.TracesDynamicIndex.Enabled = true }) mustSendTraces(t, exporter, newTracesWithAttributes( @@ -1822,7 +1776,7 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with logstash format index", func(t *testing.T) { + t.Run("publish with logstash format index, default traces index", func(t *testing.T) { var defaultCfg Config rec := newBulkRecorder() @@ -1836,7 +1790,6 @@ func TestExporterTraces(t *testing.T) { exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { cfg.LogstashFormat.Enabled = true - cfg.TracesIndex = "not-used-index" defaultCfg = *cfg }) @@ -1845,38 +1798,28 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with logstash format index and dynamic index enabled", func(t *testing.T) { - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) + t.Run("publish with logstash format index", func(t *testing.T) { + index := "someindex" rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Contains(t, actionJSONToIndex(t, docs[0].Action), expected) + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), index) return itemsAllOK(docs) }) exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { - cfg.TracesIndex = index - cfg.TracesDynamicIndex.Enabled = true cfg.LogstashFormat.Enabled = true }) mustSendTraces(t, exporter, newTracesWithAttributes( map[string]any{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, + "elasticsearch.index": index, }, nil, - map[string]any{ - indexPrefix: prefix, - }, + nil, )) rec.WaitItems(1) }) @@ -1889,7 +1832,6 @@ func TestExporterTraces(t *testing.T) { }) exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { - cfg.TracesDynamicIndex.Enabled = true cfg.Mapping.Mode = "otel" }) @@ -1950,6 +1892,79 @@ func TestExporterTraces(t *testing.T) { assertRecordedItems(t, expected, rec, false) }) + t.Run("otel mode span event routing", func(t *testing.T) { + for _, tc := range []struct { + name string + config func(cfg *Config) + spanEventAttrs map[string]any + wantIndex string + }{ + { + name: "default", + wantIndex: "logs-generic.otel-default", + }, + { + name: "static index config", + config: func(cfg *Config) { + cfg.LogsIndex = "someindex" + cfg.MetricsIndex = "ignored" + cfg.TracesIndex = "ignored" + }, + wantIndex: "someindex", + }, + { + name: "dynamic elasticsearch.index", + spanEventAttrs: map[string]any{ + "elasticsearch.index": "someindex", + }, + wantIndex: "someindex", + }, + { + name: "dynamic data_stream.*", + spanEventAttrs: map[string]any{ + "data_stream.dataset": "foo", + "data_stream.namespace": "bar", + }, + wantIndex: "logs-foo.otel-bar", + }, + } { + t.Run(tc.name, func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + configs := []func(cfg *Config){ + func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }, + } + if tc.config != nil { + configs = append(configs, tc.config) + } + + exporter := newTestTracesExporter(t, server.URL, configs...) + + traces := newTracesWithAttributes(nil, nil, nil) + spanEvent := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().AppendEmpty() + spanEvent.SetName("some_event_name") + fillAttributeMap(spanEvent.Attributes(), tc.spanEventAttrs) + mustSendTraces(t, exporter, traces) + + rec.WaitItems(2) + var spanEventDocs []itemRequest + for _, doc := range rec.Items() { + if result := gjson.GetBytes(doc.Document, "event_name"); result.Raw != "" { + spanEventDocs = append(spanEventDocs, doc) + } + } + require.Len(t, spanEventDocs, 1) + assert.Equal(t, tc.wantIndex, gjson.GetBytes(spanEventDocs[0].Action, "create._index").Str) + }) + } + }) + t.Run("otel mode attribute array value", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 487cd410b3c80..f36a49f9ee1cc 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -26,13 +26,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) -const ( - // The value of "type" key in configuration. - defaultLogsIndex = "logs-generic-default" - defaultMetricsIndex = "metrics-generic-default" - defaultTracesIndex = "traces-generic-default" -) - var defaultBatcherMinSizeItems = 5000 // NewFactory creates a factory for Elastic exporter. @@ -59,18 +52,6 @@ func createDefaultConfig() component.Config { return &Config{ QueueSettings: qs, ClientConfig: httpClientConfig, - LogsIndex: defaultLogsIndex, - LogsDynamicIndex: DynamicIndexSetting{ - Enabled: false, - }, - MetricsIndex: defaultMetricsIndex, - MetricsDynamicIndex: DynamicIndexSetting{ - Enabled: true, - }, - TracesIndex: defaultTracesIndex, - TracesDynamicIndex: DynamicIndexSetting{ - Enabled: false, - }, LogsDynamicID: DynamicIDSettings{ Enabled: false, }, @@ -127,7 +108,7 @@ func createLogsExporter( handleDeprecatedConfig(cf, set.Logger) - exporter := newExporter(cf, set, cf.LogsIndex, cf.LogsDynamicIndex.Enabled) + exporter := newExporter(cf, set, cf.LogsIndex) return exporterhelper.NewLogs( ctx, @@ -146,7 +127,7 @@ func createMetricsExporter( cf := cfg.(*Config) handleDeprecatedConfig(cf, set.Logger) - exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) + exporter := newExporter(cf, set, cf.MetricsIndex) return exporterhelper.NewMetrics( ctx, @@ -164,7 +145,7 @@ func createTracesExporter(ctx context.Context, cf := cfg.(*Config) handleDeprecatedConfig(cf, set.Logger) - exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) + exporter := newExporter(cf, set, cf.TracesIndex) return exporterhelper.NewTraces( ctx, @@ -187,7 +168,7 @@ func createProfilesExporter( handleDeprecatedConfig(cf, set.Logger) - exporter := newExporter(cf, set, "", false) + exporter := newExporter(cf, set, "") return xexporterhelper.NewProfilesExporter( ctx, diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 3afeb272af3a2..ae65e4fbe2a94 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -169,11 +169,8 @@ func prepareBenchmark( cfg.esCfg.Mapping.Mode = mappingMode cfg.esCfg.Endpoints = []string{receiver.endpoint} cfg.esCfg.LogsIndex = TestLogsIndex - cfg.esCfg.LogsDynamicIndex.Enabled = false cfg.esCfg.MetricsIndex = TestMetricsIndex - cfg.esCfg.MetricsDynamicIndex.Enabled = false cfg.esCfg.TracesIndex = TestTracesIndex - cfg.esCfg.TracesDynamicIndex.Enabled = false cfg.esCfg.Flush.Interval = 10 * time.Millisecond cfg.esCfg.NumWorkers = 1 diff --git a/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go b/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go index 531b5876f8ee1..5c70fdb834ef2 100644 --- a/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go +++ b/exporter/elasticsearchexporter/internal/elasticsearch/attribute.go @@ -14,4 +14,7 @@ const ( // DocumentPipelineAttributeName is the attribute name used to specify the document ingest pipeline. DocumentPipelineAttributeName = "elasticsearch.ingest_pipeline" + + // IndexAttributeName is the attribute name used to specify the index to which the document should be routed. + IndexAttributeName = "elasticsearch.index" ) diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go index ae7a3c0f9e106..09da7a62c4644 100644 --- a/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/common.go @@ -59,7 +59,7 @@ func writeAttributes(v *json.Visitor, attributes pcommon.Map, stringifyMapValues _ = v.OnObjectStart(-1, structform.AnyType) attributes.Range(func(k string, val pcommon.Value) bool { switch k { - case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace, elasticsearch.MappingHintsAttrKey, elasticsearch.DocumentIDAttributeName, elasticsearch.DocumentPipelineAttributeName: + case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace, elasticsearch.MappingHintsAttrKey, elasticsearch.DocumentIDAttributeName, elasticsearch.DocumentPipelineAttributeName, elasticsearch.IndexAttributeName: return true } if isGeoAttribute(k, val) { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 455e408cfb591..d1c2c5637231d 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -1255,7 +1255,7 @@ func TestEncodeLogOtelMode(t *testing.T) { for _, tc := range tests { record, scope, resource := createTestOTelLogRecord(t, tc.rec) - router := newDocumentRouter(MappingOTel, true, "", &Config{}) + router := newDocumentRouter(MappingOTel, "", &Config{}) idx, err := router.routeLogRecord(resource, scope, record.Attributes()) require.NoError(t, err) diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index ec262e4fb329e..2ddd8f8b6c847 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -8,6 +8,12 @@ elasticsearch/trace: headers: myheader: test traces_index: trace_index + traces_dynamic_index: + enabled: false + logs_dynamic_index: + enabled: false + metrics_dynamic_index: + enabled: false pipeline: mypipeline user: elastic password: search @@ -26,6 +32,12 @@ elasticsearch/metric: insecure: false endpoints: [http://localhost:9200] metrics_index: my_metric_index + traces_dynamic_index: + enabled: false + logs_dynamic_index: + enabled: false + metrics_dynamic_index: + enabled: false timeout: 2m headers: myheader: test @@ -49,6 +61,12 @@ elasticsearch/log: insecure: false endpoints: [http://localhost:9200] logs_index: my_log_index + traces_dynamic_index: + enabled: false + logs_dynamic_index: + enabled: false + metrics_dynamic_index: + enabled: false timeout: 2m headers: myheader: test diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index 36785b7b329f5..a40deb7c63a09 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -329,6 +329,7 @@ func fillAttributeMap(attrs pcommon.Map, m map[string]any) { func TestGetSuffixTime(t *testing.T) { defaultCfg := createDefaultConfig().(*Config) defaultCfg.LogstashFormat.Enabled = true + defaultCfg.LogsIndex = "logs-generic-default" testTime := time.Date(2023, 12, 2, 10, 10, 10, 1, time.UTC) index, err := generateIndexWithLogstashFormat(defaultCfg.LogsIndex, &defaultCfg.LogstashFormat, testTime) assert.NoError(t, err)