Skip to content

Commit bfdf241

Browse files
carrieedwardsfedetorres93krajorama
authored andcommitted
[receiver/datadog] Add support for sketches (open-telemetry#34662)
**Description:** This PR adds support for translating Datadog sketches into Exponential Histograms. Follow up of open-telemetry#33631, open-telemetry#33957 and open-telemetry#34180. The full version of the code can be found in the `cedwards/datadog-metrics-receiver-full` branch, or in Grafana Alloy: https://github.com/grafana/alloy/tree/main/internal/etc/datadogreceiver **Link to tracking Issue:** open-telemetry#18278 **Testing:** Unit tests, as well as an end-to-end test, have been added. --------- Signed-off-by: Federico Torres <[email protected]> Signed-off-by: György Krajcsovits <[email protected]> Co-authored-by: Federico Torres <[email protected]> Co-authored-by: György Krajcsovits <[email protected]>
1 parent cc99a4c commit bfdf241

File tree

6 files changed

+1163
-3
lines changed

6 files changed

+1163
-3
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: datadogreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add support for sketch metrics in Datadog receiver
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [18278]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"
5+
6+
import (
7+
"fmt"
8+
"io"
9+
"math"
10+
"net/http"
11+
"sort"
12+
"time"
13+
14+
"github.com/DataDog/agent-payload/v5/gogen"
15+
"go.opentelemetry.io/collector/pdata/pcommon"
16+
"go.opentelemetry.io/collector/pdata/pmetric"
17+
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
19+
)
20+
21+
const (
22+
// The relativeAccuracy (also called epsilon or eps) comes from DDSketch's logarithmic mapping, which is used for sketches
23+
// in the Datadog agent. The Datadog agent uses the default value from opentelemetry-go-mapping configuration
24+
// See:
25+
// https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L15
26+
relativeAccuracy = 1.0 / 128
27+
28+
// The gamma value comes from the default values of the epsilon/relative accuracy from opentelemetry-go-mapping. This value is used for
29+
// finding the lower boundary of the bucket at a specific index
30+
// See:
31+
// https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L138
32+
gamma = 1 + 2*relativeAccuracy
33+
34+
// Since the default bucket factor for Sketches (gamma value) is 1.015625, this corresponds to a scale between 5 (2^2^-5=1.0219)
35+
// and 6 (2^2^-6=1.01088928605). However, the lower resolution of 5 will produce larger buckets which allows for easier mapping
36+
scale = 5
37+
38+
// The agentSketchOffset value comes from the following calculation:
39+
// min = 1e-9
40+
// emin = math.Floor((math.Log(min)/math.Log1p(2*relativeAccuracy))
41+
// offset = -emin + 1
42+
// The resulting value is 1338.
43+
// See: https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L154
44+
// (Note: in Datadog's code, it is referred to as 'bias')
45+
agentSketchOffset int32 = 1338
46+
47+
// The max limit for the index of a sketch bucket
48+
// See https://github.com/DataDog/opentelemetry-mapping-go/blob/00c3f838161a00de395d7d0ed44d967ac71e43b9/pkg/quantile/ddsketch.go#L21
49+
// and https://github.com/DataDog/opentelemetry-mapping-go/blob/00c3f838161a00de395d7d0ed44d967ac71e43b9/pkg/quantile/ddsketch.go#L127
50+
maxIndex = math.MaxInt16
51+
)
52+
53+
// Unmarshal the sketch payload, which contains the underlying Dogsketch structure used for the translation
54+
func (mt *MetricsTranslator) HandleSketchesPayload(req *http.Request) (sp []gogen.SketchPayload_Sketch, err error) {
55+
buf := GetBuffer()
56+
defer PutBuffer(buf)
57+
if _, err := io.Copy(buf, req.Body); err != nil {
58+
return sp, err
59+
}
60+
61+
pl := new(gogen.SketchPayload)
62+
if err := pl.Unmarshal(buf.Bytes()); err != nil {
63+
return sp, err
64+
}
65+
66+
return pl.GetSketches(), nil
67+
}
68+
69+
func (mt *MetricsTranslator) TranslateSketches(sketches []gogen.SketchPayload_Sketch) pmetric.Metrics {
70+
bt := newBatcher()
71+
bt.Metrics = pmetric.NewMetrics()
72+
73+
for _, sketch := range sketches {
74+
dimensions := parseSeriesProperties(sketch.Metric, "sketch", sketch.Tags, sketch.Host, mt.buildInfo.Version, mt.stringPool)
75+
metric, metricID := bt.Lookup(dimensions)
76+
metric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
77+
dps := metric.ExponentialHistogram().DataPoints()
78+
79+
dps.EnsureCapacity(len(sketch.Dogsketches))
80+
81+
// The dogsketches field of the payload contains the sketch data
82+
for i := range sketch.Dogsketches {
83+
dp := dps.AppendEmpty()
84+
85+
err := sketchToDatapoint(sketch.Dogsketches[i], dp, dimensions.dpAttrs)
86+
if err != nil {
87+
// If a sketch is invalid, remove this datapoint
88+
metric.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
89+
if dp.Positive().BucketCounts().Len() == 0 && dp.Negative().BucketCounts().Len() == 0 {
90+
return true
91+
}
92+
return false
93+
})
94+
continue
95+
}
96+
stream := identity.OfStream(metricID, dp)
97+
if ts, ok := mt.streamHasTimestamp(stream); ok {
98+
dp.SetStartTimestamp(ts)
99+
}
100+
mt.updateLastTsForStream(stream, dp.Timestamp())
101+
}
102+
}
103+
104+
return bt.Metrics
105+
}
106+
107+
func sketchToDatapoint(sketch gogen.SketchPayload_Sketch_Dogsketch, dp pmetric.ExponentialHistogramDataPoint, attributes pcommon.Map) error {
108+
dp.SetTimestamp(pcommon.Timestamp(sketch.Ts * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds
109+
110+
dp.SetCount(uint64(sketch.Cnt))
111+
dp.SetSum(sketch.Sum)
112+
dp.SetMin(sketch.Min)
113+
dp.SetMax(sketch.Max)
114+
dp.SetScale(scale)
115+
dp.SetZeroThreshold(math.Exp(float64(1-agentSketchOffset) / (1 / math.Log(gamma)))) // See https://github.com/DataDog/sketches-go/blob/7546f8f95179bb41d334d35faa281bfe97812a86/ddsketch/mapping/logarithmic_mapping.go#L48
116+
117+
attributes.CopyTo(dp.Attributes())
118+
119+
negativeBuckets, positiveBuckets, zeroCount, err := mapSketchBucketsToHistogramBuckets(sketch.K, sketch.N)
120+
if err != nil {
121+
return err
122+
}
123+
124+
dp.SetZeroCount(zeroCount)
125+
126+
convertBucketLayout(positiveBuckets, dp.Positive())
127+
convertBucketLayout(negativeBuckets, dp.Negative())
128+
129+
return nil
130+
}
131+
132+
// mapSketchBucketsToHistogramBuckets attempts to map the counts in each Sketch bucket to the closest equivalent Exponential Histogram
133+
// bucket(s). It works by first calculating an Exponential Histogram key that corresponds most closely with the Sketch key (using the lower
134+
// bound of the sketch bucket the key corresponds to), calculates differences in the range of the Sketch bucket and exponential histogram bucket,
135+
// and distributes the count to the corresponding bucket, and the bucket(s) after it, based on the proportion of overlap between the
136+
// exponential histogram buckets and the Sketch bucket. Note that the Sketch buckets are not separated into positive and negative buckets, but exponential
137+
// histograms store positive and negative buckets separately. Negative buckets in exponential histograms are mapped in the same way as positive buckets.
138+
// Note that negative indices in exponential histograms do not necessarily correspond to negative values; they correspond with values between 0 and 1,
139+
// on either the negative or positive side
140+
func mapSketchBucketsToHistogramBuckets(sketchKeys []int32, sketchCounts []uint32) (map[int]uint64, map[int]uint64, uint64, error) {
141+
var zeroCount uint64
142+
143+
var positiveBuckets = make(map[int]uint64)
144+
var negativeBuckets = make(map[int]uint64)
145+
146+
// The data format for the sketch received from the sketch payload does not have separate positive and negative buckets,
147+
// and instead just uses a single list of sketch keys that are in order by increasing bucket index, starting with negative indices,
148+
// which correspond to negative buckets
149+
for i := range sketchKeys {
150+
if sketchKeys[i] == 0 { // A sketch key of 0 corresponds to the zero bucket
151+
zeroCount += uint64(sketchCounts[i])
152+
continue
153+
}
154+
if sketchKeys[i] >= maxIndex {
155+
// This should not happen, as sketches that contain bucket(s) with an index higher than the max
156+
// limit should have already been discarded. However, if there happens to be an index > maxIndex,
157+
// it can cause an infinite loop within the below inner for loop on some operating systems. Therefore,
158+
// throw an error for sketches that have an index above the max limit
159+
return nil, nil, 0, fmt.Errorf("Sketch contains bucket index %d which exceeds maximum supported index value %d", sketchKeys[i], maxIndex)
160+
}
161+
162+
// The approach here is to use the Datadog sketch index's lower bucket boundary to find the
163+
// OTel exponential histogram bucket that with the closest range to the sketch bucket. Then,
164+
// the buckets before and after that bucket are also checked for overlap with the sketch bucket.
165+
// A count proportional to the intersection of the sketch bucket with the OTel bucket(s) is then
166+
// added to the OTel bucket(s). After looping through all possible buckets that are within the Sketch
167+
// bucket range, the bucket with the highest proportion of overlap is given the remaining count
168+
sketchLowerBound, sketchUpperBound := getSketchBounds(sketchKeys[i])
169+
sketchBucketSize := sketchUpperBound - sketchLowerBound
170+
histogramKey := sketchLowerBoundToHistogramIndex(sketchLowerBound)
171+
highestCountProportion := 0.0
172+
highestCountIdx := 0
173+
targetBucketCount := uint64(sketchCounts[i])
174+
var currentAssignedCount uint64
175+
176+
//TODO: look into better algorithms for applying fractional counts
177+
for outIndex := histogramKey; histogramLowerBound(outIndex) < sketchUpperBound; outIndex++ {
178+
histogramLowerBound, histogramUpperBound := getHistogramBounds(outIndex)
179+
lowerIntersection := math.Max(histogramLowerBound, sketchLowerBound)
180+
higherIntersection := math.Min(histogramUpperBound, sketchUpperBound)
181+
182+
intersectionSize := higherIntersection - lowerIntersection
183+
proportion := intersectionSize / sketchBucketSize
184+
if proportion <= 0 {
185+
continue // In this case, the bucket does not overlap with the sketch bucket, so continue to the next bucket
186+
}
187+
if proportion > highestCountProportion {
188+
highestCountProportion = proportion
189+
highestCountIdx = outIndex
190+
}
191+
// OTel exponential histograms only support integer bucket counts, so rounding needs to be done here
192+
roundedCount := uint64(proportion * float64(sketchCounts[i]))
193+
if sketchKeys[i] < 0 {
194+
negativeBuckets[outIndex] += roundedCount
195+
} else {
196+
positiveBuckets[outIndex] += roundedCount
197+
}
198+
currentAssignedCount += roundedCount
199+
}
200+
// Add the difference between the original sketch bucket's count and the total count that has been
201+
// added to the matching OTel bucket(s) thus far to the bucket that had the highest proportion of
202+
// overlap between the original sketch bucket and the corresponding exponential histogram buckets
203+
if highestCountProportion > 0 {
204+
additionalCount := targetBucketCount - currentAssignedCount
205+
if sketchKeys[i] < 0 {
206+
negativeBuckets[highestCountIdx] += additionalCount
207+
} else {
208+
positiveBuckets[highestCountIdx] += additionalCount
209+
}
210+
}
211+
212+
}
213+
214+
return negativeBuckets, positiveBuckets, zeroCount, nil
215+
}
216+
217+
// convertBucketLayout populates the count for positive or negative buckets in the resulting OTel
218+
// exponential histogram structure. The bucket layout is dense and consists of an offset, which is the
219+
// index of the first populated bucket, and a list of counts, which correspond to the counts at the offset
220+
// bucket's index, and the counts of each bucket after. Unpopulated/empty buckets must be represented with
221+
// a count of 0. After assigning bucket counts, it sets the offset for the bucket layout
222+
func convertBucketLayout(inputBuckets map[int]uint64, outputBuckets pmetric.ExponentialHistogramDataPointBuckets) {
223+
if len(inputBuckets) == 0 {
224+
return
225+
}
226+
bucketIdxs := make([]int, 0, len(inputBuckets))
227+
for k := range inputBuckets {
228+
bucketIdxs = append(bucketIdxs, k)
229+
}
230+
sort.Ints(bucketIdxs)
231+
232+
bucketsSize := bucketIdxs[len(bucketIdxs)-1] - bucketIdxs[0] + 1 // find total number of buckets needed
233+
outputBuckets.BucketCounts().EnsureCapacity(bucketsSize)
234+
outputBuckets.BucketCounts().Append(make([]uint64, bucketsSize)...)
235+
236+
offset := bucketIdxs[0]
237+
outputBuckets.SetOffset(int32(offset))
238+
239+
for _, idx := range bucketIdxs {
240+
delta := idx - offset
241+
outputBuckets.BucketCounts().SetAt(delta, inputBuckets[idx])
242+
}
243+
}
244+
245+
// getSketchBounds calculates the lower and upper bounds of a sketch bucket based on the index of the bucket.
246+
// This is based on sketch buckets placing values in bucket so that γ^k <= v < γ^(k+1)
247+
// See https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/quantile/config.go#L83
248+
// and https://github.com/DataDog/sketches-go/blob/8a1961cf57f80fbbe26e7283464fcc01ebf17d5c/ddsketch/ddsketch.go#L468
249+
func getSketchBounds(index int32) (float64, float64) {
250+
if index < 0 {
251+
index = -index
252+
}
253+
return sketchLowerBound(index), sketchLowerBound(index + 1)
254+
}
255+
256+
// sketchLowerBound calculates the lower bound of a sketch bucket based on the index of the bucket.
257+
// It uses the index offset and multiplier (represented by (1 / math.Log(gamma))). The logic behind this
258+
// is based on the DD agent using logarithmic mapping for definition DD agent sketches
259+
// See:
260+
// https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L54
261+
// https://github.com/DataDog/sketches-go/blob/8a1961cf57f80fbbe26e7283464fcc01ebf17d5c/ddsketch/mapping/logarithmic_mapping.go#L39
262+
func sketchLowerBound(index int32) float64 {
263+
if index < 0 {
264+
index = -index
265+
}
266+
return math.Exp((float64(index-agentSketchOffset) / (1 / math.Log(gamma))))
267+
}
268+
269+
// getHistogramBounds returns the lower and upper boundaries of the histogram bucket that
270+
// corresponds to the specified bucket index
271+
func getHistogramBounds(histIndex int) (float64, float64) {
272+
return histogramLowerBound(histIndex), histogramLowerBound(histIndex + 1)
273+
}
274+
275+
// This equation for finding the lower bound of the exponential histogram bucket
276+
// Based on: https://github.com/open-telemetry/opentelemetry-go/blob/3a72c5ea94bf843beeaa044b0dda2ce4d627bb7b/sdk/metric/internal/aggregate/exponential_histogram.go#L122
277+
// See also: https://github.com/open-telemetry/opentelemetry-go/blob/3a72c5ea94bf843beeaa044b0dda2ce4d627bb7b/sdk/metric/internal/aggregate/exponential_histogram.go#L139
278+
func histogramLowerBound(histIndex int) float64 {
279+
inverseFactor := math.Ldexp(math.Ln2, -scale)
280+
return math.Exp(float64(histIndex) * inverseFactor)
281+
}
282+
283+
// sketchLowerBoundToHistogramIndex takes the lower boundary of a sketch bucket and computes the
284+
// closest equivalent exponential histogram index that corresponds to an exponential histogram
285+
// bucket that has a range covering that lower bound
286+
// See: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function
287+
func sketchLowerBoundToHistogramIndex(value float64) int {
288+
if frac, exp := math.Frexp(value); frac == 0.5 {
289+
return ((exp - 1) << scale) - 1
290+
}
291+
scaleFactor := math.Ldexp(math.Log2E, scale)
292+
293+
return int(math.Floor(math.Log(value) * scaleFactor))
294+
}

0 commit comments

Comments
 (0)