Skip to content

[receiver/datadog] Address semconv noncompliance #39678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 14, 2025
Merged
32 changes: 32 additions & 0 deletions .chloggen/datadog_receiver_address_semconv_noncompliance.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Address semantic conventions noncompliance and add support for http/db

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36924]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
* Bump semantic conventions to v1.27.0
* Add support for http and db attributes
* Use datadog's base service as service.name when available
* Set `server.address` on client/producer/consumer spans
* Properly name postgresql/redis/servlet/spring spans

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 4 additions & 3 deletions receiver/datadogreceiver/internal/translator/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
)

func strPtr(s string) *string { return &s }
Expand Down Expand Up @@ -400,13 +401,13 @@ func TestTranslateSeriesV2(t *testing.T) {
requireMetricAndDataPointCounts(t, result, 1, 0)

require.Equal(t, 1, result.ResourceMetrics().Len())
v, exists := result.ResourceMetrics().At(0).Resource().Attributes().Get("host.name")
v, exists := result.ResourceMetrics().At(0).Resource().Attributes().Get(string(semconv.HostNameKey))
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
v, exists = result.ResourceMetrics().At(0).Resource().Attributes().Get("deployment.environment")
v, exists = result.ResourceMetrics().At(0).Resource().Attributes().Get(string(semconv.DeploymentEnvironmentNameKey))
require.True(t, exists)
require.Equal(t, "tag1", v.AsString())
v, exists = result.ResourceMetrics().At(0).Resource().Attributes().Get("service.version")
v, exists = result.ResourceMetrics().At(0).Resource().Attributes().Get(string(semconv.ServiceVersionKey))
require.True(t, exists)
require.Equal(t, "tag2", v.AsString())

Expand Down
49 changes: 43 additions & 6 deletions receiver/datadogreceiver/internal/translator/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ import (
"sync"

"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/otel/semconv/v1.16.0"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
)

// See:
// https://docs.datadoghq.com/opentelemetry/schema_semantics/semantic_mapping/
// https://github.com/DataDog/opentelemetry-mapping-go/blob/main/pkg/otlp/attributes/attributes.go
var datadogKnownResourceAttributes = map[string]string{
"env": string(semconv.DeploymentEnvironmentKey),
"env": string(semconv.DeploymentEnvironmentNameKey),
"service": string(semconv.ServiceNameKey),
"version": string(semconv.ServiceVersionKey),

// Container-related attributes
"container_id": string(semconv.ContainerIDKey),
"container_name": string(semconv.ContainerNameKey),
"image_name": string(semconv.ContainerImageNameKey),
"image_tag": string(semconv.ContainerImageTagKey),
"image_tag": string(semconv.ContainerImageTagsKey),
"runtime": string(semconv.ContainerRuntimeKey),

// Cloud-related attributes
Expand All @@ -50,6 +50,25 @@ var datadogKnownResourceAttributes = map[string]string{
"kube_namespace": string(semconv.K8SNamespaceNameKey),
"pod_name": string(semconv.K8SPodNameKey),

// HTTP
"http.client_ip": string(semconv.ClientAddressKey),
"http.response.content_length": string(semconv.HTTPResponseBodySizeKey),
"http.status_code": string(semconv.HTTPResponseStatusCodeKey),
"http.request.content_length": string(semconv.HTTPRequestBodySizeKey),
"http.referer": "http.request.header.referer",
"http.method": string(semconv.HTTPRequestMethodKey),
"http.route": string(semconv.HTTPRouteKey),
"http.version": string(semconv.NetworkProtocolVersionKey),
"http.server_name": string(semconv.ServerAddressKey),
"http.url": string(semconv.URLFullKey),
"http.useragent": string(semconv.UserAgentOriginalKey),

// DB
"db.type": string(semconv.DBSystemNameKey),
"db.operation": string(semconv.DBOperationNameKey),
"db.instance": string(semconv.DBCollectionNameKey),
"db.pool.name": string(semconv.DBClientConnectionPoolNameKey),

// Other
"process_id": string(semconv.ProcessPIDKey),
"error.stacktrace": string(semconv.ExceptionStacktraceKey),
Expand Down Expand Up @@ -80,6 +99,15 @@ func translateDatadogKeyToOTel(k string) string {
if otelKey, ok := datadogKnownResourceAttributes[strings.ToLower(k)]; ok {
return otelKey
}

// HTTP dynamic attributes
if strings.HasPrefix(k, "http.response.headers.") { // type: string[]
header := strings.TrimPrefix(k, "http.response.headers.")
return "http.response.header." + header
} else if strings.HasPrefix(k, "http.request.headers.") { // type: string[]
header := strings.TrimPrefix(k, "http.request.headers.")
return "http.request.header." + header
}
return k
}

Expand Down Expand Up @@ -136,12 +164,21 @@ func tagsToAttributes(tags []string, host string, stringPool *StringPool) attrib
for _, tag := range tags {
key, val = translateDatadogTagToKeyValuePair(tag)
if attr, ok := datadogKnownResourceAttributes[key]; ok {
val = stringPool.Intern(val) // No need to intern the key if we already have it
attrs.resource.PutStr(attr, val)
val = stringPool.Intern(val) // No need to intern the key if we already have it
if attr == string(semconv.ContainerImageTagsKey) { // type: string[]
attrs.resource.PutEmptySlice(attr).AppendEmpty().SetStr(val)
} else {
attrs.resource.PutStr(attr, val)
}
} else {
key = stringPool.Intern(translateDatadogKeyToOTel(key))
val = stringPool.Intern(val)
attrs.dp.PutStr(key, val)
if strings.HasPrefix(key, "http.request.header.") || strings.HasPrefix(key, "http.response.header.") {
// type string[]
attrs.resource.PutEmptySlice(key).AppendEmpty().SetStr(val)
} else {
attrs.dp.PutStr(key, val)
}
}
}

Expand Down
49 changes: 42 additions & 7 deletions receiver/datadogreceiver/internal/translator/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
)

func TestGetMetricAttributes(t *testing.T) {
Expand All @@ -32,7 +33,7 @@ func TestGetMetricAttributes(t *testing.T) {
tags: []string{},
host: "host",
expectedResourceAttrs: newMapFromKV(t, map[string]any{
"host.name": "host",
string(semconv.HostNameKey): "host",
}),
expectedScopeAttrs: pcommon.NewMap(),
expectedDpAttrs: pcommon.NewMap(),
Expand All @@ -42,10 +43,10 @@ func TestGetMetricAttributes(t *testing.T) {
tags: []string{"env:prod", "service:my-service", "version:1.0"},
host: "host",
expectedResourceAttrs: newMapFromKV(t, map[string]any{
"host.name": "host",
"deployment.environment": "prod",
"service.name": "my-service",
"service.version": "1.0",
string(semconv.HostNameKey): "host",
string(semconv.DeploymentEnvironmentNameKey): "prod",
string(semconv.ServiceNameKey): "my-service",
string(semconv.ServiceVersionKey): "1.0",
}),
expectedScopeAttrs: pcommon.NewMap(),
expectedDpAttrs: pcommon.NewMap(),
Expand All @@ -55,8 +56,8 @@ func TestGetMetricAttributes(t *testing.T) {
tags: []string{"env:prod", "foo"},
host: "host",
expectedResourceAttrs: newMapFromKV(t, map[string]any{
"host.name": "host",
"deployment.environment": "prod",
string(semconv.HostNameKey): "host",
string(semconv.DeploymentEnvironmentNameKey): "prod",
}),
expectedScopeAttrs: pcommon.NewMap(),
expectedDpAttrs: newMapFromKV(t, map[string]any{
Expand Down Expand Up @@ -147,4 +148,38 @@ func TestTranslateDataDogKeyToOtel(t *testing.T) {
assert.Equal(t, v, translateDatadogKeyToOTel(k))
})
}

// test dynamic attributes:
// * http.request.header.<header_name>
// * http.response.header.<header_name>
assert.Equal(t, "http.request.header.referer", translateDatadogKeyToOTel("http.request.headers.referer"))
assert.Equal(t, "http.response.header.content-type", translateDatadogKeyToOTel("http.response.headers.content-type"))
}

func TestImageTags(t *testing.T) {
// make sure container.image.tags is a string[]
expected := "[\"tag1\"]"
tags := []string{"env:prod", "foo", "image_tag:tag1"}
host := "host"
pool := newStringPool()

attrs := tagsToAttributes(tags, host, pool)
imageTags, _ := attrs.resource.Get(string(semconv.ContainerImageTagsKey))
assert.Equal(t, expected, imageTags.AsString())
}

func TestHTTPHeaders(t *testing.T) {
// make sure container.image.tags is a string[]
expected := "[\"value\"]"
tags := []string{"env:prod", "foo", "http.request.headers.header:value", "http.response.headers.header:value"}
host := "host"
pool := newStringPool()

attrs := tagsToAttributes(tags, host, pool)
header, found := attrs.resource.Get("http.request.header.header")
assert.True(t, found)
assert.Equal(t, expected, header.AsString())
header, found = attrs.resource.Get("http.response.header.header")
assert.True(t, found)
assert.Equal(t, expected, header.AsString())
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package translator // import "github.com/open-telemetry/opentelemetry-collector-

import (
"bytes"
"cmp"
"encoding/binary"
"encoding/json"
"errors"
Expand All @@ -20,7 +21,7 @@ import (
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/otel/semconv/v1.16.0"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
oteltrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand All @@ -44,6 +45,18 @@ const (
attributeDatadogSpanID = "datadog.span.id"
)

var spanProcessor = map[string]func(*pb.Span, *ptrace.Span){
// HTTP
"servlet.request": processHTTPSpan,

// Internal
"spring.handler": processInternalSpan,

// Database
"postgresql.query": processDBSpan,
"redis.query": processDBSpan,
}

func upsertHeadersAttributes(req *http.Request, attrs pcommon.Map) {
if ddTracerVersion := req.Header.Get(header.TracerVersion); ddTracerVersion != "" {
attrs.PutStr(string(semconv.TelemetrySDKVersionKey), "Datadog-"+ddTracerVersion)
Expand Down Expand Up @@ -88,21 +101,61 @@ func traceID64to128(span *pb.Span, traceIDCache *simplelru.LRU[uint64, pcommon.T
return pcommon.TraceID{}, nil
}

func processInternalSpan(span *pb.Span, newSpan *ptrace.Span) {
newSpan.SetName(span.Resource)
newSpan.SetKind(ptrace.SpanKindInternal)
}

func processHTTPSpan(span *pb.Span, newSpan *ptrace.Span) {
// https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name
// We assume that http.route coming from datadog is low cardinality
if val, ok := span.Meta["http.method"]; ok {
if suffix, ok := span.Meta["http.route"]; ok {
newSpan.SetName(val + " " + suffix)
} else {
newSpan.SetName(val)
}
}
}

func processDBSpan(span *pb.Span, newSpan *ptrace.Span) {
// https://opentelemetry.io/docs/specs/semconv/database/database-spans/#name
if val, ok := span.Meta["db.query.summary"]; ok {
newSpan.SetName(val)
} else {
if val, ok = span.Meta["db.operation"]; ok {
newSpan.SetName(val)
suffix := cmp.Or(span.Meta["db.instance"], span.Meta["db.namespace"], span.Meta["peer.hostname"])
if suffix != "" {
newSpan.SetName(val + " " + suffix)
}
} else if val, ok = span.Meta["db.type"]; ok {
newSpan.SetName(val)
}
}
}

func processSpanByName(span *pb.Span, newSpan *ptrace.Span) {
if processor, ok := spanProcessor[span.Name]; ok {
processor(span, newSpan)
}
}

func ToTraces(logger *zap.Logger, payload *pb.TracerPayload, req *http.Request, traceIDCache *simplelru.LRU[uint64, pcommon.TraceID]) (ptrace.Traces, error) {
var traces pb.Traces
for _, p := range payload.GetChunks() {
traces = append(traces, p.GetSpans())
}
sharedAttributes := pcommon.NewMap()
for k, v := range map[string]string{
string(semconv.ContainerIDKey): payload.ContainerID,
string(semconv.TelemetrySDKLanguageKey): payload.LanguageName,
string(semconv.ProcessRuntimeVersionKey): payload.LanguageVersion,
string(semconv.DeploymentEnvironmentKey): payload.Env,
string(semconv.HostNameKey): payload.Hostname,
string(semconv.ServiceVersionKey): payload.AppVersion,
string(semconv.TelemetrySDKNameKey): "Datadog",
string(semconv.TelemetrySDKVersionKey): payload.TracerVersion,
string(semconv.ContainerIDKey): payload.ContainerID,
string(semconv.TelemetrySDKLanguageKey): payload.LanguageName,
string(semconv.ProcessRuntimeVersionKey): payload.LanguageVersion,
string(semconv.DeploymentEnvironmentNameKey): payload.Env,
string(semconv.HostNameKey): payload.Hostname,
string(semconv.ServiceVersionKey): payload.AppVersion,
string(semconv.TelemetrySDKNameKey): "Datadog",
string(semconv.TelemetrySDKVersionKey): payload.TracerVersion,
} {
if v != "" {
sharedAttributes.PutStr(k, v)
Expand All @@ -125,6 +178,11 @@ func ToTraces(logger *zap.Logger, payload *pb.TracerPayload, req *http.Request,

for _, trace := range traces {
for _, span := range trace {
// Restore base service name as the service name.
// Without this, internal spans such as postgresql queries have a service.name set to postgresql
if val, ok := span.Meta["_dd.base_service"]; ok {
span.Service = val
}
slice, exist := groupByService[span.Service]
if !exist {
slice = ptrace.NewSpanSlice()
Expand Down Expand Up @@ -193,6 +251,21 @@ func ToTraces(logger *zap.Logger, payload *pb.TracerPayload, req *http.Request,
newSpan.SetKind(ptrace.SpanKindUnspecified)
}
}

// For client/producer/consumer spans, if we have `peer.hostname`, and `server.address` is unset, set
// `server.address` to `peer.hostname`.
if newSpan.Kind() == ptrace.SpanKindClient ||
newSpan.Kind() == ptrace.SpanKindProducer ||
newSpan.Kind() == ptrace.SpanKindConsumer {
if _, ok := newSpan.Attributes().Get("server.address"); !ok {
if val, ok := span.Meta["peer.hostname"]; ok {
newSpan.Attributes().PutStr("server.address", val)
}
}
}

// Some spans need specific processing (http, db, ...)
processSpanByName(span, &newSpan)
}
}

Expand Down
Loading
Loading