Skip to content

Add export timestamps; distinguish Accumulation vs. Record #835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,10 @@ func newFixture(b *testing.B) *benchFixture {
return bf
}

func (*benchFixture) Process(export.Record) error {
func (*benchFixture) Process(export.Accumulation) error {
return nil
}

func (*benchFixture) CheckpointSet() export.CheckpointSet {
return nil
}

func (*benchFixture) FinishedCollection() {
}

func (fix *benchFixture) Meter(_ string, _ ...metric.MeterOption) metric.Meter {
return fix.meter
}
Expand Down
6 changes: 4 additions & 2 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,12 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
defer c.exp.lock.RUnlock()

ctrl := c.exp.Controller()
ctrl.Collect(context.Background())
if err := ctrl.Collect(context.Background()); err != nil {
global.Handle(err)
}

err := ctrl.ForEach(func(record export.Record) error {
agg := record.Aggregator()
agg := record.Aggregation()
numberKind := record.Descriptor().NumberKind()

var labelKeys, labels []string
Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
}
aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor()
agg := record.Aggregator()
agg := record.Aggregation()
kind := desc.NumberKind()
encodedResource := record.Resource().Encoded(e.config.LabelEncoder)

Expand Down
21 changes: 16 additions & 5 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"reflect"
"sync"
"time"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
Expand Down Expand Up @@ -47,20 +48,30 @@ type NoopAggregator struct{}
var _ export.Aggregator = (*NoopAggregator)(nil)

// Update implements export.Aggregator.
func (*NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor) error {
func (NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor) error {
return nil
}

// SynchronizedCopy implements export.Aggregator.
func (*NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
func (NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
return nil
}

// Merge implements export.Aggregator.
func (*NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error {
func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error {
return nil
}

// Aggregation returns an interface for reading the state of this aggregator.
func (NoopAggregator) Aggregation() aggregation.Aggregation {
return NoopAggregator{}
}

// Kind implements aggregation.Aggregation.
func (NoopAggregator) Kind() aggregation.Kind {
return aggregation.Kind("Noop")
}

// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels.
func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
Expand Down Expand Up @@ -88,10 +99,10 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
distinct: elabels.Equivalent(),
}
if record, ok := p.records[key]; ok {
return record.Aggregator(), false
return record.Aggregation().(export.Aggregator), false
}

rec := export.NewRecord(desc, &elabels, p.resource, newAgg)
rec := export.NewRecord(desc, &elabels, p.resource, newAgg.Aggregation(), time.Time{}, time.Time{})
p.updates = append(p.updates, rec)
p.records[key] = rec
return newAgg, true
Expand Down
30 changes: 21 additions & 9 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,20 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e
// Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg
// error is returned if the Record Aggregator is not supported.
func Record(r export.Record) (*metricpb.Metric, error) {
d := r.Descriptor()
l := r.Labels()
switch a := r.Aggregator().(type) {
switch a := r.Aggregation().(type) {
case aggregation.MinMaxSumCount:
return minMaxSumCount(d, l, a)
return minMaxSumCount(r, a)
case aggregation.Sum:
return sum(d, l, a)
return sum(r, a)
default:
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
}
}

// sum transforms a Sum Aggregator into an OTLP Metric.
func sum(desc *metric.Descriptor, labels *label.Set, a aggregation.Sum) (*metricpb.Metric, error) {
func sum(record export.Record, a aggregation.Sum) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
sum, err := a.Sum()
if err != nil {
return nil, err
Expand All @@ -266,12 +266,20 @@ func sum(desc *metric.Descriptor, labels *label.Set, a aggregation.Sum) (*metric
case metric.Int64NumberKind, metric.Uint64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_INT64
m.Int64DataPoints = []*metricpb.Int64DataPoint{
{Value: sum.CoerceToInt64(n)},
{
Value: sum.CoerceToInt64(n),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
},
}
case metric.Float64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_DOUBLE
m.DoubleDataPoints = []*metricpb.DoubleDataPoint{
{Value: sum.CoerceToFloat64(n)},
{
Value: sum.CoerceToFloat64(n),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
},
}
default:
return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n)
Expand Down Expand Up @@ -299,7 +307,9 @@ func minMaxSumCountValues(a aggregation.MinMaxSumCount) (min, max, sum metric.Nu
}

// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregation.MinMaxSumCount) (*metricpb.Metric, error) {
func minMaxSumCount(record export.Record, a aggregation.MinMaxSumCount) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
min, max, sum, count, err := minMaxSumCountValues(a)
if err != nil {
return nil, err
Expand Down Expand Up @@ -328,6 +338,8 @@ func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregation.Mi
Value: max.CoerceToFloat64(numKind),
},
},
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
},
},
}, nil
Expand Down
42 changes: 34 additions & 8 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"testing"
"time"

commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
Expand All @@ -29,11 +30,19 @@ import (
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/unit"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)

var (
// Timestamps used in this test

intervalStart = time.Now()
intervalEnd = intervalStart.Add(-time.Hour)
)

func TestStringKeyValues(t *testing.T) {
tests := []struct {
kvs []kv.KeyValue
Expand Down Expand Up @@ -155,7 +164,8 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
metric.WithDescription(test.description),
metric.WithUnit(test.unit))
labels := label.NewSet(test.labels...)
got, err := minMaxSumCount(&desc, &labels, ckpt.(aggregation.MinMaxSumCount))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
got, err := minMaxSumCount(record, ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
Expand Down Expand Up @@ -184,9 +194,12 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
Value: 10,
},
},
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
},
}
m, err := minMaxSumCount(&desc, &labels, ckpt.(aggregation.MinMaxSumCount))
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
m, err := minMaxSumCount(record, ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
Expand Down Expand Up @@ -253,7 +266,9 @@ func TestSumMetricDescriptor(t *testing.T) {
metric.WithUnit(test.unit),
)
labels := label.NewSet(test.labels...)
got, err := sum(&desc, &labels, &sumAgg.New(1)[0])
emptyAgg := &sumAgg.New(1)[0]
record := export.NewRecord(&desc, &labels, nil, emptyAgg, intervalStart, intervalEnd)
got, err := sum(record, emptyAgg)
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
Expand All @@ -266,8 +281,13 @@ func TestSumInt64DataPoints(t *testing.T) {
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
if m, err := sum(&desc, &labels, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints)
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints)
Expand All @@ -280,9 +300,14 @@ func TestSumFloat64DataPoints(t *testing.T) {
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
if m, err := sum(&desc, &labels, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints)
}
Expand All @@ -292,7 +317,8 @@ func TestSumErrUnknownValueType(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.NumberKind(-1))
labels := label.NewSet()
s := &sumAgg.New(1)[0]
_, err := sum(&desc, &labels, s)
record := export.NewRecord(&desc, &labels, nil, s, intervalStart, intervalEnd)
_, err := sum(record, s)
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err)
Expand Down
Loading