Skip to content

Commit 9cd18ee

Browse files
committed
Use metric timestamp
1 parent fd1a25b commit 9cd18ee

File tree

6 files changed

+117
-18
lines changed

6 files changed

+117
-18
lines changed

exporter/awsemfexporter/datapoint.go

+30-11
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ var currentState = mapwithexpiry.NewMapWithExpiry(CleanInterval)
3232

3333
// DataPoint represents a processed metric data point
3434
type DataPoint struct {
35-
Value interface{}
36-
Labels map[string]string
35+
Value interface{}
36+
Labels map[string]string
37+
Timestamp int64
3738
}
3839

3940
// DataPoints is a wrapper interface for:
@@ -90,42 +91,57 @@ type DoubleSummaryDataPointSlice struct {
9091
func (dps IntDataPointSlice) At(i int) DataPoint {
9192
metric := dps.IntDataPointSlice.At(i)
9293
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
94+
timestamp := unixNanoToMilliseconds(metric.Timestamp())
9395

9496
var metricVal interface{}
9597
metricVal = metric.Value()
9698
if dps.needsCalculateRate {
9799
rateKey := createMetricKey(labels, dps.rateKeyParams)
98-
metricVal = calculateRate(rateKey, metricVal, dps.timestamp)
100+
rateTS := dps.timestamp
101+
if timestamp > 0 {
102+
// Use metric timestamp if available
103+
rateTS = timestamp
104+
}
105+
metricVal = calculateRate(rateKey, metricVal, rateTS)
99106
}
100107

101108
return DataPoint{
102-
Value: metricVal,
103-
Labels: labels,
109+
Value: metricVal,
110+
Labels: labels,
111+
Timestamp: timestamp,
104112
}
105113
}
106114

107115
// At retrieves the DoubleDataPoint at the given index and performs rate calculation if necessary.
108116
func (dps DoubleDataPointSlice) At(i int) DataPoint {
109117
metric := dps.DoubleDataPointSlice.At(i)
110118
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
119+
timestamp := unixNanoToMilliseconds(metric.Timestamp())
111120

112121
var metricVal interface{}
113122
metricVal = metric.Value()
114123
if dps.needsCalculateRate {
115124
rateKey := createMetricKey(labels, dps.rateKeyParams)
116-
metricVal = calculateRate(rateKey, metricVal, dps.timestamp)
125+
rateTS := dps.timestamp
126+
if timestamp > 0 {
127+
// Use metric timestamp if available
128+
rateTS = timestamp
129+
}
130+
metricVal = calculateRate(rateKey, metricVal, rateTS)
117131
}
118132

119133
return DataPoint{
120-
Value: metricVal,
121-
Labels: labels,
134+
Value: metricVal,
135+
Labels: labels,
136+
Timestamp: timestamp,
122137
}
123138
}
124139

125140
// At retrieves the DoubleHistogramDataPoint at the given index.
126141
func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
127142
metric := dps.DoubleHistogramDataPointSlice.At(i)
128143
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
144+
timestamp := unixNanoToMilliseconds(metric.Timestamp())
129145

130146
var minBound, maxBound float64
131147
bucketBounds := metric.ExplicitBounds()
@@ -141,14 +157,16 @@ func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
141157
Count: metric.Count(),
142158
Sum: metric.Sum(),
143159
},
144-
Labels: labels,
160+
Labels: labels,
161+
Timestamp: timestamp,
145162
}
146163
}
147164

148165
// At retrieves the DoubleSummaryDataPoint at the given index.
149166
func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint {
150167
metric := dps.DoubleSummaryDataPointSlice.At(i)
151168
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
169+
timestamp := unixNanoToMilliseconds(metric.Timestamp())
152170

153171
metricVal := &CWMetricStats{
154172
Count: metric.Count(),
@@ -160,8 +178,9 @@ func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint {
160178
}
161179

162180
return DataPoint{
163-
Value: metricVal,
164-
Labels: labels,
181+
Value: metricVal,
182+
Labels: labels,
183+
Timestamp: timestamp,
165184
}
166185
}
167186

exporter/awsemfexporter/emf_exporter_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestConsumeMetrics(t *testing.T) {
105105
Points: []*metricspb.Point{
106106
{
107107
Timestamp: &timestamp.Timestamp{
108-
Seconds: 100,
108+
Seconds: int64(1608068109),
109109
},
110110
Value: &metricspb.Point_Int64Value{
111111
Int64Value: 1,
@@ -166,7 +166,7 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
166166
Points: []*metricspb.Point{
167167
{
168168
Timestamp: &timestamp.Timestamp{
169-
Seconds: 100,
169+
Seconds: int64(1608068109),
170170
},
171171
Value: &metricspb.Point_Int64Value{
172172
Int64Value: 1,
@@ -228,7 +228,7 @@ func TestConsumeMetricsWithoutLogGroupStreamConfig(t *testing.T) {
228228
Points: []*metricspb.Point{
229229
{
230230
Timestamp: &timestamp.Timestamp{
231-
Seconds: 100,
231+
Seconds: int64(1608068109),
232232
},
233233
Value: &metricspb.Point_Int64Value{
234234
Int64Value: 1,
@@ -297,7 +297,7 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
297297
Points: []*metricspb.Point{
298298
{
299299
Timestamp: &timestamp.Timestamp{
300-
Seconds: 100,
300+
Seconds: int64(1608068109),
301301
},
302302
Value: &metricspb.Point_Int64Value{
303303
Int64Value: 1,
@@ -366,7 +366,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
366366
Points: []*metricspb.Point{
367367
{
368368
Timestamp: &timestamp.Timestamp{
369-
Seconds: 100,
369+
Seconds: int64(1608068109),
370370
},
371371
Value: &metricspb.Point_Int64Value{
372372
Int64Value: 1,
@@ -435,7 +435,7 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
435435
Points: []*metricspb.Point{
436436
{
437437
Timestamp: &timestamp.Timestamp{
438-
Seconds: 100,
438+
Seconds: int64(1608068109),
439439
},
440440
Value: &metricspb.Point_Int64Value{
441441
Int64Value: 1,
@@ -512,7 +512,7 @@ func TestPushMetricsDataWithErr(t *testing.T) {
512512
Points: []*metricspb.Point{
513513
{
514514
Timestamp: &timestamp.Timestamp{
515-
Seconds: 100,
515+
Seconds: int64(1608068109),
516516
},
517517
Value: &metricspb.Point_Int64Value{
518518
Int64Value: 1,

exporter/awsemfexporter/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ require (
1010
github.com/stretchr/testify v1.6.1
1111
go.opentelemetry.io/collector v0.16.1-0.20201207152538-326931de8c32
1212
go.uber.org/zap v1.16.0
13+
google.golang.org/protobuf v1.25.0
1314
)

exporter/awsemfexporter/groupedmetric.go

+4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func addToGroupedMetric(pmd *pdata.Metric, groupedMetrics map[string]*GroupedMet
6262
Unit: pmd.Unit(),
6363
}
6464

65+
if dp.Timestamp > 0 {
66+
metadata.Timestamp = dp.Timestamp
67+
}
68+
6569
// Extra params to use when grouping metrics
6670
groupKeyParams := map[string]string{
6771
(namespaceKey): metadata.Namespace,

exporter/awsemfexporter/groupedmetric_test.go

+69
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"go.uber.org/zap"
3030
"go.uber.org/zap/zapcore"
3131
"go.uber.org/zap/zaptest/observer"
32+
"google.golang.org/protobuf/types/known/timestamppb"
3233
)
3334

3435
func TestAddToGroupedMetric(t *testing.T) {
@@ -211,6 +212,74 @@ func TestAddToGroupedMetric(t *testing.T) {
211212
}
212213
})
213214

215+
t.Run("Add multiple metrics w/ different timestamps", func(t *testing.T) {
216+
groupedMetrics := make(map[string]*GroupedMetric)
217+
oc := consumerdata.MetricsData{
218+
Node: &commonpb.Node{},
219+
Resource: &resourcepb.Resource{
220+
Labels: map[string]string{
221+
conventions.AttributeServiceName: "myServiceName",
222+
conventions.AttributeServiceNamespace: "myServiceNS",
223+
},
224+
},
225+
Metrics: []*metricspb.Metric{
226+
generateTestIntGauge("int-gauge"),
227+
generateTestDoubleGauge("double-gauge"),
228+
generateTestIntSum("int-sum"),
229+
generateTestSummary("summary"),
230+
},
231+
}
232+
233+
timestamp1 := &timestamppb.Timestamp{
234+
Seconds: int64(1608068109),
235+
Nanos: 347942000,
236+
}
237+
timestamp2 := &timestamppb.Timestamp{
238+
Seconds: int64(1608068110),
239+
Nanos: 347942000,
240+
}
241+
242+
// Give int gauge and int-sum the same timestamp
243+
oc.Metrics[0].Timeseries[0].Points[0].Timestamp = timestamp1
244+
oc.Metrics[2].Timeseries[0].Points[0].Timestamp = timestamp1
245+
// Give summary a different timestamp
246+
oc.Metrics[3].Timeseries[0].Points[0].Timestamp = timestamp2
247+
248+
rm := internaldata.OCToMetrics(oc)
249+
rms := rm.ResourceMetrics()
250+
ilms := rms.At(0).InstrumentationLibraryMetrics()
251+
metrics := ilms.At(0).Metrics()
252+
assert.Equal(t, 4, metrics.Len())
253+
254+
for i := 0; i < metrics.Len(); i++ {
255+
metric := metrics.At(i)
256+
addToGroupedMetric(&metric, groupedMetrics, metadata, logger)
257+
}
258+
259+
assert.Equal(t, 3, len(groupedMetrics))
260+
for _, group := range groupedMetrics {
261+
for metricName := range group.Metrics {
262+
if metricName == "int-gauge" || metricName == "int-sum" {
263+
assert.Equal(t, 2, len(group.Metrics))
264+
assert.Equal(t, int64(1608068109347), group.Metadata.Timestamp)
265+
} else if metricName == "summary" {
266+
assert.Equal(t, 1, len(group.Metrics))
267+
assert.Equal(t, int64(1608068110347), group.Metadata.Timestamp)
268+
} else {
269+
// double-gauge should use the default timestamp
270+
assert.Equal(t, 1, len(group.Metrics))
271+
assert.Equal(t, "double-gauge", metricName)
272+
assert.Equal(t, timestamp, group.Metadata.Timestamp)
273+
}
274+
}
275+
expectedLabels := map[string]string{
276+
(OTellibDimensionKey): "cloudwatch-otel",
277+
"label1": "value1",
278+
}
279+
assert.Equal(t, expectedLabels, group.Labels)
280+
}
281+
})
282+
214283
t.Run("Add same metric but different log group", func(t *testing.T) {
215284
groupedMetrics := make(map[string]*GroupedMetric)
216285
oc := consumerdata.MetricsData{

exporter/awsemfexporter/util.go

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919
"sort"
2020
"strings"
21+
"time"
2122

2223
"go.opentelemetry.io/collector/consumer/pdata"
2324
"go.opentelemetry.io/collector/translator/conventions"
@@ -174,3 +175,8 @@ func dimensionRollup(dimensionRollupOption string, labels map[string]string) [][
174175

175176
return rollupDimensionArray
176177
}
178+
179+
// unixNanoToMilliseconds converts a timestamp in nanoseconds to milliseconds.
180+
func unixNanoToMilliseconds(timestamp pdata.TimestampUnixNano) int64 {
181+
return int64(uint64(timestamp) / uint64(time.Millisecond))
182+
}

0 commit comments

Comments
 (0)