Skip to content

Commit 27dbdd3

Browse files
authored
Enhance EMFExporter for Metrics Batching in AWS EMF Logs (#2271)
We sent a large PR #1891 to support batching the metrics on the same dimensions for AWS EMF Log request to save the customers' billing cost and request throughput. At the same time, there was a fairly large code refactor on EMFExporter. For better code review purpose, I plan to split #1891 to 2 PRs. (This is PR#1) In this PR, We refactored EMFExporter without introducing any new feature. For each OTel metrics data point, we defined `DataPoint` file, it wraps `pdata.DataPointSlice` to the custom structures for each type of metric data point. we also moved the metric data handling functions - data conversion and rate calculation to `datapoint`. It also fixed the metric `timestamp` bug.
1 parent d430156 commit 27dbdd3

File tree

9 files changed

+1499
-391
lines changed

9 files changed

+1499
-391
lines changed

exporter/awsemfexporter/datapoint.go

+315
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
// Copyright 2020, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package awsemfexporter
16+
17+
import (
18+
"time"
19+
20+
"go.opentelemetry.io/collector/consumer/pdata"
21+
"go.opentelemetry.io/otel/label"
22+
"go.uber.org/zap"
23+
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/mapwithexpiry"
25+
)
26+
27+
const (
28+
cleanInterval = 5 * time.Minute
29+
minTimeDiff = 50 * time.Millisecond // We assume 50 milli-seconds is the minimal gap between two collected data sample to be valid to calculate delta
30+
)
31+
32+
var currentState = mapwithexpiry.NewMapWithExpiry(cleanInterval)
33+
34+
// DataPoint represents a processed metric data point
35+
type DataPoint struct {
36+
Value interface{}
37+
Labels map[string]string
38+
TimestampMs int64
39+
}
40+
41+
// DataPoints is a wrapper interface for:
42+
// - pdata.IntDataPointSlice
43+
// - pdata.DoubleDataPointSlice
44+
// - pdata.IntHistogramDataPointSlice
45+
// - pdata.DoubleHistogramDataPointSlice
46+
// - pdata.DoubleSummaryDataPointSlice
47+
type DataPoints interface {
48+
Len() int
49+
// NOTE: At() is an expensive call as it calculates the metric's value
50+
At(i int) DataPoint
51+
}
52+
53+
// rateCalculationMetadata contains the metadata required to perform rate calculation
54+
type rateCalculationMetadata struct {
55+
needsCalculateRate bool
56+
rateKeyParams rateKeyParams
57+
timestampMs int64
58+
}
59+
60+
type rateKeyParams struct {
61+
namespaceKey string
62+
metricNameKey string
63+
logGroupKey string
64+
logStreamKey string
65+
labels label.Distinct
66+
}
67+
68+
// rateState stores a metric's value
69+
type rateState struct {
70+
value float64
71+
timestampMs int64
72+
}
73+
74+
// IntDataPointSlice is a wrapper for pdata.IntDataPointSlice
75+
type IntDataPointSlice struct {
76+
instrumentationLibraryName string
77+
rateCalculationMetadata
78+
pdata.IntDataPointSlice
79+
}
80+
81+
// DoubleDataPointSlice is a wrapper for pdata.DoubleDataPointSlice
82+
type DoubleDataPointSlice struct {
83+
instrumentationLibraryName string
84+
rateCalculationMetadata
85+
pdata.DoubleDataPointSlice
86+
}
87+
88+
// DoubleHistogramDataPointSlice is a wrapper for pdata.DoubleHistogramDataPointSlice
89+
type DoubleHistogramDataPointSlice struct {
90+
instrumentationLibraryName string
91+
pdata.DoubleHistogramDataPointSlice
92+
}
93+
94+
// DoubleSummaryDataPointSlice is a wrapper for pdata.DoubleSummaryDataPointSlice
95+
type DoubleSummaryDataPointSlice struct {
96+
instrumentationLibraryName string
97+
pdata.DoubleSummaryDataPointSlice
98+
}
99+
100+
// At retrieves the IntDataPoint at the given index and performs rate calculation if necessary.
101+
func (dps IntDataPointSlice) At(i int) DataPoint {
102+
metric := dps.IntDataPointSlice.At(i)
103+
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
104+
labels := createLabels(metric.LabelsMap())
105+
106+
var metricVal float64
107+
metricVal = float64(metric.Value())
108+
if dps.needsCalculateRate {
109+
sortedLabels := getSortedLabels(metric.LabelsMap())
110+
dps.rateKeyParams.labels = sortedLabels
111+
rateKey := dps.rateKeyParams
112+
rateTS := dps.timestampMs
113+
if timestampMs > 0 {
114+
// Use metric timestamp if available
115+
rateTS = timestampMs
116+
}
117+
metricVal = calculateRate(rateKey, metricVal, rateTS)
118+
}
119+
120+
return DataPoint{
121+
Value: metricVal,
122+
Labels: labels,
123+
TimestampMs: timestampMs,
124+
}
125+
}
126+
127+
// At retrieves the DoubleDataPoint at the given index and performs rate calculation if necessary.
128+
func (dps DoubleDataPointSlice) At(i int) DataPoint {
129+
metric := dps.DoubleDataPointSlice.At(i)
130+
labels := createLabels(metric.LabelsMap())
131+
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
132+
133+
var metricVal float64
134+
metricVal = metric.Value()
135+
if dps.needsCalculateRate {
136+
sortedLabels := getSortedLabels(metric.LabelsMap())
137+
dps.rateKeyParams.labels = sortedLabels
138+
rateKey := dps.rateKeyParams
139+
rateTS := dps.timestampMs
140+
if timestampMs > 0 {
141+
// Use metric timestamp if available
142+
rateTS = timestampMs
143+
}
144+
metricVal = calculateRate(rateKey, metricVal, rateTS)
145+
}
146+
147+
return DataPoint{
148+
Value: metricVal,
149+
Labels: labels,
150+
TimestampMs: timestampMs,
151+
}
152+
}
153+
154+
// At retrieves the DoubleHistogramDataPoint at the given index.
155+
func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
156+
metric := dps.DoubleHistogramDataPointSlice.At(i)
157+
labels := createLabels(metric.LabelsMap())
158+
timestamp := unixNanoToMilliseconds(metric.Timestamp())
159+
160+
return DataPoint{
161+
Value: &CWMetricStats{
162+
Count: metric.Count(),
163+
Sum: metric.Sum(),
164+
},
165+
Labels: labels,
166+
TimestampMs: timestamp,
167+
}
168+
}
169+
170+
// At retrieves the DoubleSummaryDataPoint at the given index.
171+
func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint {
172+
metric := dps.DoubleSummaryDataPointSlice.At(i)
173+
labels := createLabels(metric.LabelsMap())
174+
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
175+
176+
metricVal := &CWMetricStats{
177+
Count: metric.Count(),
178+
Sum: metric.Sum(),
179+
}
180+
if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 {
181+
metricVal.Min = quantileValues.At(0).Value()
182+
metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value()
183+
}
184+
185+
return DataPoint{
186+
Value: metricVal,
187+
Labels: labels,
188+
TimestampMs: timestampMs,
189+
}
190+
}
191+
192+
// createLabels converts OTel StringMap labels to a map
193+
func createLabels(labelsMap pdata.StringMap) map[string]string {
194+
labels := make(map[string]string, labelsMap.Len()+1)
195+
labelsMap.ForEach(func(k, v string) {
196+
labels[k] = v
197+
})
198+
199+
return labels
200+
}
201+
202+
// getSortedLabels converts OTel StringMap labels to sorted labels as label.Distinct
203+
func getSortedLabels(labelsMap pdata.StringMap) label.Distinct {
204+
var kvs []label.KeyValue
205+
var sortable label.Sortable
206+
labelsMap.ForEach(func(k, v string) {
207+
kvs = append(kvs, label.String(k, v))
208+
})
209+
set := label.NewSetWithSortable(kvs, &sortable)
210+
211+
return set.Equivalent()
212+
}
213+
214+
// calculateRate calculates the metric value's rate of change using valDelta / timeDelta.
215+
func calculateRate(metricKey interface{}, val float64, timestampMs int64) float64 {
216+
var metricRate float64
217+
// get previous Metric content from map. Need to lock the map until set the new state
218+
currentState.Lock()
219+
if state, ok := currentState.Get(metricKey); ok {
220+
prevStats := state.(*rateState)
221+
deltaTime := timestampMs - prevStats.timestampMs
222+
223+
deltaVal := val - prevStats.value
224+
if deltaTime > minTimeDiff.Milliseconds() && deltaVal >= 0 {
225+
metricRate = deltaVal * 1e3 / float64(deltaTime)
226+
}
227+
}
228+
content := &rateState{
229+
value: val,
230+
timestampMs: timestampMs,
231+
}
232+
currentState.Set(metricKey, content)
233+
currentState.Unlock()
234+
return metricRate
235+
}
236+
237+
// getDataPoints retrieves data points from OT Metric.
238+
func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Logger) (dps DataPoints) {
239+
if pmd == nil {
240+
return
241+
}
242+
243+
rateKeys := rateKeyParams{
244+
namespaceKey: metadata.Namespace,
245+
metricNameKey: pmd.Name(),
246+
logGroupKey: metadata.LogGroup,
247+
logStreamKey: metadata.LogStream,
248+
}
249+
250+
switch pmd.DataType() {
251+
case pdata.MetricDataTypeIntGauge:
252+
metric := pmd.IntGauge()
253+
dps = IntDataPointSlice{
254+
metadata.InstrumentationLibraryName,
255+
rateCalculationMetadata{
256+
false,
257+
rateKeys,
258+
metadata.TimestampMs,
259+
},
260+
metric.DataPoints(),
261+
}
262+
case pdata.MetricDataTypeDoubleGauge:
263+
metric := pmd.DoubleGauge()
264+
dps = DoubleDataPointSlice{
265+
metadata.InstrumentationLibraryName,
266+
rateCalculationMetadata{
267+
false,
268+
rateKeys,
269+
metadata.TimestampMs,
270+
},
271+
metric.DataPoints(),
272+
}
273+
case pdata.MetricDataTypeIntSum:
274+
metric := pmd.IntSum()
275+
dps = IntDataPointSlice{
276+
metadata.InstrumentationLibraryName,
277+
rateCalculationMetadata{
278+
metric.AggregationTemporality() == pdata.AggregationTemporalityCumulative,
279+
rateKeys,
280+
metadata.TimestampMs,
281+
},
282+
metric.DataPoints(),
283+
}
284+
case pdata.MetricDataTypeDoubleSum:
285+
metric := pmd.DoubleSum()
286+
dps = DoubleDataPointSlice{
287+
metadata.InstrumentationLibraryName,
288+
rateCalculationMetadata{
289+
metric.AggregationTemporality() == pdata.AggregationTemporalityCumulative,
290+
rateKeys,
291+
metadata.TimestampMs,
292+
},
293+
metric.DataPoints(),
294+
}
295+
case pdata.MetricDataTypeDoubleHistogram:
296+
metric := pmd.DoubleHistogram()
297+
dps = DoubleHistogramDataPointSlice{
298+
metadata.InstrumentationLibraryName,
299+
metric.DataPoints(),
300+
}
301+
case pdata.MetricDataTypeDoubleSummary:
302+
metric := pmd.DoubleSummary()
303+
dps = DoubleSummaryDataPointSlice{
304+
metadata.InstrumentationLibraryName,
305+
metric.DataPoints(),
306+
}
307+
default:
308+
logger.Warn("Unhandled metric data type.",
309+
zap.String("DataType", pmd.DataType().String()),
310+
zap.String("Name", pmd.Name()),
311+
zap.String("Unit", pmd.Unit()),
312+
)
313+
}
314+
return
315+
}

0 commit comments

Comments
 (0)