Skip to content

Commit 55fdd84

Browse files
[chore] [receiver/datadog] Add support for v2 series (#34180)
**Description:** This PR adds support for Datadog V2 series. Follow up of #33631 and #33957. The full version of the code can be found in the `cedwards/datadog-metrics-receiver-full` branch, or in Grafana Alloy: https://github.com/grafana/alloy/tree/main/internal/etc/datadogreceiver **Link to tracking Issue:** #18278 **Testing:** Unit tests, as well as an end-to-end test, have been added.
1 parent c0ffc7b commit 55fdd84

File tree

13 files changed

+565
-89
lines changed

13 files changed

+565
-89
lines changed

receiver/datadogreceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datado
33
go 1.21.0
44

55
require (
6+
github.com/DataDog/agent-payload/v5 v5.0.124
67
github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9
78
github.com/DataDog/datadog-api-client-go/v2 v2.28.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.107.0

receiver/datadogreceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/datadogreceiver/internal/translator/batcher.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ var metricTypeMap = map[string]pmetric.MetricType{
4848
}
4949

5050
func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) dimensions {
51-
resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(tags, host, stringPool)
51+
attrs := tagsToAttributes(tags, host, stringPool)
5252
return dimensions{
5353
name: name,
5454
metricType: metricTypeMap[metricType],
5555
buildInfo: version,
56-
resourceAttrs: resourceAttrs,
57-
scopeAttrs: scopeAttrs,
58-
dpAttrs: dpAttrs,
56+
resourceAttrs: attrs.resource,
57+
scopeAttrs: attrs.scope,
58+
dpAttrs: attrs.dp,
5959
}
6060
}
6161

receiver/datadogreceiver/internal/translator/batcher_test.go

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,25 @@ func TestMetricBatcher(t *testing.T) {
4747
},
4848
expect: func(t *testing.T, result pmetric.Metrics) {
4949
// Different hosts should result in different ResourceMetrics
50+
requireMetricAndDataPointCounts(t, result, 2, 2)
5051
require.Equal(t, 2, result.ResourceMetrics().Len())
5152
resource1 := result.ResourceMetrics().At(0)
5253
resource2 := result.ResourceMetrics().At(1)
53-
v, exists := resource1.Resource().Attributes().Get("host.name")
54-
require.True(t, exists)
55-
require.Equal(t, "Host1", v.AsString())
56-
v, exists = resource2.Resource().Attributes().Get("host.name")
57-
require.True(t, exists)
58-
require.Equal(t, "Host2", v.AsString())
54+
55+
res1ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool())
56+
requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource)
57+
58+
res2ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host2", newStringPool())
59+
requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource)
5960

6061
require.Equal(t, 1, resource1.ScopeMetrics().Len())
6162
require.Equal(t, 1, resource2.ScopeMetrics().Len())
6263

6364
require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len())
6465
require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len())
6566

66-
require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name())
67-
require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name())
67+
requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
68+
requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
6869
},
6970
},
7071
{
@@ -98,18 +99,19 @@ func TestMetricBatcher(t *testing.T) {
9899
expect: func(t *testing.T, result pmetric.Metrics) {
99100
// The different metrics will fall under the same ResourceMetric and ScopeMetric
100101
// and there will be separate metrics under the ScopeMetric.Metrics()
102+
requireMetricAndDataPointCounts(t, result, 2, 2)
101103
require.Equal(t, 1, result.ResourceMetrics().Len())
102104
resource := result.ResourceMetrics().At(0)
103105

104-
v, exists := resource.Resource().Attributes().Get("host.name")
105-
require.True(t, exists)
106-
require.Equal(t, "Host1", v.AsString())
106+
expectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool())
107+
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)
107108

108109
require.Equal(t, 1, resource.ScopeMetrics().Len())
109110

110-
require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len())
111111
require.Equal(t, "TestCount1", resource.ScopeMetrics().At(0).Metrics().At(0).Name())
112112
require.Equal(t, "TestCount2", resource.ScopeMetrics().At(0).Metrics().At(1).Name())
113+
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
114+
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestCount2", 1)
113115
},
114116
},
115117
{
@@ -142,21 +144,16 @@ func TestMetricBatcher(t *testing.T) {
142144
},
143145
expect: func(t *testing.T, result pmetric.Metrics) {
144146
// Differences in attribute values should result in different resourceMetrics
147+
requireMetricAndDataPointCounts(t, result, 2, 2)
145148
require.Equal(t, 2, result.ResourceMetrics().Len())
146149
resource1 := result.ResourceMetrics().At(0)
147150
resource2 := result.ResourceMetrics().At(1)
148-
v, exists := resource1.Resource().Attributes().Get("host.name")
149-
require.True(t, exists)
150-
require.Equal(t, "Host1", v.AsString())
151-
v, exists = resource2.Resource().Attributes().Get("host.name")
152-
require.True(t, exists)
153-
require.Equal(t, "Host1", v.AsString())
154-
v, exists = resource1.Resource().Attributes().Get("deployment.environment")
155-
require.True(t, exists)
156-
require.Equal(t, "dev", v.AsString())
157-
v, exists = resource2.Resource().Attributes().Get("deployment.environment")
158-
require.True(t, exists)
159-
require.Equal(t, "prod", v.AsString())
151+
152+
res1ExpectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
153+
requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource)
154+
155+
res2ExpectedAttrs := tagsToAttributes([]string{"env:prod", "version:tag1"}, "Host1", newStringPool())
156+
requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource)
160157

161158
require.Equal(t, 1, resource1.ScopeMetrics().Len())
162159
require.Equal(t, 1, resource1.ScopeMetrics().Len())
@@ -167,8 +164,8 @@ func TestMetricBatcher(t *testing.T) {
167164
require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len())
168165
require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len())
169166

170-
require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name())
171-
require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name())
167+
requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
168+
requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
172169
},
173170
},
174171
{
@@ -203,22 +200,20 @@ func TestMetricBatcher(t *testing.T) {
203200
// The different metrics will fall under the same ResourceMetric and ScopeMetric
204201
// and there will be separate metrics under the ScopeMetric.Metrics() due to the different
205202
// data types
203+
requireMetricAndDataPointCounts(t, result, 2, 2)
206204
require.Equal(t, 1, result.ResourceMetrics().Len())
207205
resource := result.ResourceMetrics().At(0)
208206

209-
v, exists := resource.Resource().Attributes().Get("host.name")
210-
require.True(t, exists)
211-
require.Equal(t, "Host1", v.AsString())
207+
expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
208+
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)
212209

213210
require.Equal(t, 1, resource.ScopeMetrics().Len())
214211

215-
require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len())
216-
217212
require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name())
218213
require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(1).Name())
219214

220-
require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type())
221-
require.Equal(t, pmetric.MetricTypeGauge, resource.ScopeMetrics().At(0).Metrics().At(1).Type())
215+
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 1)
216+
requireGauge(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestMetric", 1)
222217
},
223218
},
224219
{
@@ -253,21 +248,16 @@ func TestMetricBatcher(t *testing.T) {
253248
// Same host, tags, and metric name but two different datapoints
254249
// should result in a single resourceMetric, scopeMetric, and metric
255250
// but two different datapoints under that metric
251+
requireMetricAndDataPointCounts(t, result, 1, 2)
256252
require.Equal(t, 1, result.ResourceMetrics().Len())
257253
resource := result.ResourceMetrics().At(0)
258254

259-
v, exists := resource.Resource().Attributes().Get("host.name")
260-
require.True(t, exists)
261-
require.Equal(t, "Host1", v.AsString())
255+
expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
256+
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)
262257

263258
require.Equal(t, 1, resource.ScopeMetrics().Len())
264259

265-
require.Equal(t, 1, resource.ScopeMetrics().At(0).Metrics().Len())
266-
267-
require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name())
268-
269-
require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type())
270-
require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len())
260+
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 2)
271261
},
272262
},
273263
}

receiver/datadogreceiver/internal/translator/series.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"
55

66
import (
7+
"io"
8+
"net/http"
9+
"strings"
710
"time"
811

12+
"github.com/DataDog/agent-payload/v5/gogen"
913
datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
1014
"go.opentelemetry.io/collector/pdata/pcommon"
1115
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -23,6 +27,22 @@ type SeriesList struct {
2327
Series []datadogV1.Series `json:"series"`
2428
}
2529

30+
// TODO: add handling for JSON format in additional to protobuf?
31+
func (mt *MetricsTranslator) HandleSeriesV2Payload(req *http.Request) (mp []*gogen.MetricPayload_MetricSeries, err error) {
32+
buf := GetBuffer()
33+
defer PutBuffer(buf)
34+
if _, err := io.Copy(buf, req.Body); err != nil {
35+
return mp, err
36+
}
37+
38+
pl := new(gogen.MetricPayload)
39+
if err := pl.Unmarshal(buf.Bytes()); err != nil {
40+
return mp, err
41+
}
42+
43+
return pl.GetSeries(), nil
44+
}
45+
2646
func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metrics {
2747
bt := newBatcher()
2848

@@ -87,3 +107,68 @@ func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metric
87107
}
88108
return bt.Metrics
89109
}
110+
111+
func (mt *MetricsTranslator) TranslateSeriesV2(series []*gogen.MetricPayload_MetricSeries) pmetric.Metrics {
112+
bt := newBatcher()
113+
114+
for _, serie := range series {
115+
var dps pmetric.NumberDataPointSlice
116+
117+
// The V2 payload stores the host name under in the Resources field
118+
resourceMap := getV2Resources(serie.Resources)
119+
// TODO(jesus.vazquez) (Do this with string interning)
120+
dimensions := parseSeriesProperties(serie.Metric, strings.ToLower(serie.Type.String()), serie.Tags, resourceMap["host"], mt.buildInfo.Version, mt.stringPool)
121+
for k, v := range resourceMap {
122+
if k == "host" {
123+
continue // Host has already been added as a resource attribute in parseSeriesProperties(), so avoid duplicating that attribute
124+
}
125+
dimensions.resourceAttrs.PutStr(k, v)
126+
}
127+
dimensions.resourceAttrs.PutStr("source", serie.SourceTypeName) //TODO: check if this is correct handling of SourceTypeName field
128+
metric, metricID := bt.Lookup(dimensions)
129+
130+
switch serie.Type {
131+
case gogen.MetricPayload_COUNT:
132+
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
133+
metric.Sum().SetIsMonotonic(false) // See https://docs.datadoghq.com/metrics/types/?tab=count#definition
134+
dps = metric.Sum().DataPoints()
135+
case gogen.MetricPayload_GAUGE:
136+
dps = metric.Gauge().DataPoints()
137+
case gogen.MetricPayload_RATE:
138+
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) //TODO: verify that this is always the case
139+
dps = metric.Sum().DataPoints()
140+
case gogen.MetricPayload_UNSPECIFIED:
141+
// Type is unset/unspecified
142+
continue
143+
}
144+
145+
dps.EnsureCapacity(len(serie.Points))
146+
147+
for _, point := range serie.Points {
148+
dp := dps.AppendEmpty()
149+
dp.SetTimestamp(pcommon.Timestamp(point.Timestamp * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds
150+
dimensions.dpAttrs.CopyTo(dp.Attributes()) // TODO(jesus.vazquez) Review this copy
151+
val := point.Value
152+
if serie.Type == gogen.MetricPayload_RATE && serie.Interval != 0 {
153+
val *= float64(serie.Interval)
154+
}
155+
dp.SetDoubleValue(val)
156+
157+
stream := identity.OfStream(metricID, dp)
158+
ts, ok := mt.streamHasTimestamp(stream)
159+
if ok {
160+
dp.SetStartTimestamp(ts)
161+
}
162+
mt.updateLastTsForStream(stream, dp.Timestamp())
163+
}
164+
}
165+
return bt.Metrics
166+
}
167+
168+
func getV2Resources(resources []*gogen.MetricPayload_Resource) map[string]string {
169+
resourceMap := make(map[string]string)
170+
for i := range resources {
171+
resourceMap[resources[i].Type] = resources[i].Name
172+
}
173+
return resourceMap
174+
}

0 commit comments

Comments
 (0)