Skip to content

Commit 83dd84e

Browse files
committed
Add support for V1 series endpoint
Co-authored by: Jesus Vazquez <[email protected]>:
1 parent 4010573 commit 83dd84e

File tree

2 files changed

+127
-6
lines changed

2 files changed

+127
-6
lines changed

receiver/datadogreceiver/metrics_translator.go

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
55
import (
6+
datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
7+
"go.opentelemetry.io/collector/pdata/pmetric"
68
"sync"
79

810
"go.opentelemetry.io/collector/component"
@@ -13,12 +15,104 @@ import (
1315

1416
type MetricsTranslator struct {
1517
sync.RWMutex
16-
buildInfo component.BuildInfo
17-
lastTs map[identity.Stream]pcommon.Timestamp
18+
buildInfo component.BuildInfo
19+
lastTs map[identity.Stream]pcommon.Timestamp
20+
stringPool *StringPool
1821
}
1922

2023
func newMetricsTranslator() *MetricsTranslator {
2124
return &MetricsTranslator{
22-
lastTs: make(map[identity.Stream]pcommon.Timestamp),
25+
lastTs: make(map[identity.Stream]pcommon.Timestamp),
26+
stringPool: newStringPool(),
2327
}
2428
}
29+
30+
func (mt *MetricsTranslator) streamHasTimestamp(stream identity.Stream) (pcommon.Timestamp, bool) {
31+
mt.RLock()
32+
defer mt.RUnlock()
33+
ts, ok := mt.lastTs[stream]
34+
return ts, ok
35+
}
36+
37+
func (mt *MetricsTranslator) updateLastTsForStream(stream identity.Stream, ts pcommon.Timestamp) {
38+
mt.Lock()
39+
defer mt.Unlock()
40+
mt.lastTs[stream] = ts
41+
}
42+
43+
const (
44+
TypeGauge string = "gauge"
45+
TypeRate string = "rate"
46+
TypeCount string = "count"
47+
)
48+
49+
type SeriesList struct {
50+
Series []datadogV1.Series `json:"series"`
51+
}
52+
53+
func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metrics {
54+
bt := newBatcher()
55+
bt.Metrics = pmetric.NewMetrics()
56+
57+
for _, serie := range series.Series {
58+
var dps pmetric.NumberDataPointSlice
59+
60+
dimensions := parseSeriesProperties(serie.Metric, serie.GetType(), serie.GetTags(), serie.GetHost(), mt.buildInfo.Version, mt.stringPool)
61+
metric, metricID := bt.Lookup(dimensions)
62+
63+
switch serie.GetType() {
64+
case TypeCount:
65+
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
66+
metric.Sum().SetIsMonotonic(false) // See https://docs.datadoghq.com/metrics/types/?tab=count#definition
67+
dps = metric.Sum().DataPoints()
68+
case TypeGauge:
69+
dps = metric.Gauge().DataPoints()
70+
case TypeRate:
71+
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
72+
dps = metric.Sum().DataPoints()
73+
default:
74+
// Type is unset/unspecified
75+
continue
76+
}
77+
78+
dps.EnsureCapacity(len(serie.Points))
79+
80+
var dp pmetric.NumberDataPoint
81+
var ts uint64
82+
var value float64
83+
// The Datadog API returns a slice of slices of points [][]*float64 which is a bit awkward to work with.
84+
// It looks like this:
85+
// points := [][]*float64{
86+
// {&timestamp1, &value1},
87+
// {&timestamp2, &value2},
88+
// }
89+
// We need to flatten this to a slice of *float64 to work with it. And we know that in that slice, the first
90+
// element is the timestamp and the second is the value.
91+
for _, points := range serie.Points {
92+
if len(points) != 2 {
93+
continue // The datapoint is missing a timestamp and/or value, so this point should be skipped
94+
}
95+
ts = uint64(*points[0])
96+
value = *points[1]
97+
98+
dp = dps.AppendEmpty()
99+
dp.SetTimestamp(pcommon.Timestamp(ts * 1_000_000_000)) // OTel uses nanoseconds, while Datadog uses seconds
100+
101+
if *serie.Type == TypeRate {
102+
if serie.Interval.IsSet() {
103+
value *= float64(serie.GetInterval())
104+
}
105+
}
106+
dp.SetDoubleValue(value)
107+
dimensions.dpAttrs.CopyTo(dp.Attributes())
108+
109+
stream := identity.OfStream(metricID, dp)
110+
ts, ok := mt.streamHasTimestamp(stream)
111+
if ok {
112+
dp.SetStartTimestamp(ts)
113+
}
114+
mt.updateLastTsForStream(stream, dp.Timestamp())
115+
}
116+
}
117+
return bt.Metrics
118+
}

receiver/datadogreceiver/receiver.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ package datadogreceiver // import "github.com/open-telemetry/opentelemetry-colle
55

66
import (
77
"context"
8+
"encoding/json"
89
"errors"
910
"fmt"
11+
"io"
1012
"net/http"
1113

1214
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
@@ -143,9 +145,34 @@ func (ddr *datadogReceiver) handleV1Series(w http.ResponseWriter, req *http.Requ
143145
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
144146
}(&metricsCount)
145147

146-
err = fmt.Errorf("series v1 endpoint not implemented")
147-
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
148-
ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err))
148+
buf := getBuffer()
149+
defer putBuffer(buf)
150+
if _, err = io.Copy(buf, req.Body); err != nil {
151+
http.Error(w, err.Error(), http.StatusInternalServerError)
152+
ddr.params.Logger.Error(err.Error())
153+
return
154+
}
155+
156+
seriesList := SeriesList{}
157+
err = json.Unmarshal(buf.Bytes(), &seriesList)
158+
if err != nil {
159+
http.Error(w, err.Error(), http.StatusBadRequest)
160+
ddr.params.Logger.Error(err.Error())
161+
return
162+
}
163+
164+
metrics := translateMetricsV1(seriesList, ddr.metricsTranslator)
165+
metricsCount = metrics.DataPointCount()
166+
167+
err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
168+
if err != nil {
169+
http.Error(w, err.Error(), http.StatusInternalServerError)
170+
ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err))
171+
return
172+
}
173+
174+
w.WriteHeader(http.StatusAccepted)
175+
_, err = w.Write([]byte("OK"))
149176
}
150177

151178
// handleV2Series handles the v2 series endpoint https://docs.datadoghq.com/api/latest/metrics/#submit-metrics

0 commit comments

Comments
 (0)