@@ -34,8 +34,8 @@ import (
34
34
"go.opentelemetry.io/otel/sdk/export/metric"
35
35
export "go.opentelemetry.io/otel/sdk/export/metric"
36
36
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
37
- "go.opentelemetry.io/otel/sdk/metric/controller/push "
38
- "go.opentelemetry.io/otel/sdk/metric/processor/basic"
37
+ controller "go.opentelemetry.io/otel/sdk/metric/controller/basic "
38
+ processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
39
39
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
40
40
)
41
41
@@ -88,26 +88,25 @@ func NewRawExporter(config Config) (*Exporter, error) {
88
88
89
89
// NewExportPipeline sets up a complete export pipeline with a push Controller and
90
90
// Exporter.
91
- func NewExportPipeline (config Config , options ... push .Option ) (* push .Controller , error ) {
91
+ func NewExportPipeline (config Config , options ... controller .Option ) (* controller .Controller , error ) {
92
92
exporter , err := NewRawExporter (config )
93
93
if err != nil {
94
94
return nil , err
95
95
}
96
96
97
- pusher := push .New (
98
- basic .New (
97
+ pusher := controller .New (
98
+ processor .New (
99
99
simple .NewWithHistogramDistribution (config .HistogramBoundaries ),
100
100
exporter ,
101
101
),
102
- exporter ,
103
- options ... ,
102
+ append (options , controller .WithPusher (exporter ))... ,
104
103
)
105
- pusher . Start ()
106
- return pusher , nil
104
+
105
+ return pusher , pusher . Start ( context . TODO ())
107
106
}
108
107
109
108
// InstallNewPipeline registers a push Controller's MeterProvider globally.
110
- func InstallNewPipeline (config Config , options ... push .Option ) (* push .Controller , error ) {
109
+ func InstallNewPipeline (config Config , options ... controller .Option ) (* controller .Controller , error ) {
111
110
pusher , err := NewExportPipeline (config , options ... )
112
111
if err != nil {
113
112
return nil , err
@@ -130,9 +129,7 @@ func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*p
130
129
131
130
// The following section uses loose type checking to determine how to
132
131
// convert aggregations to timeseries. More "expensive" timeseries are
133
- // checked first. For example, because a Distribution has a Sum value,
134
- // we must check for Distribution first or else only the Sum would be
135
- // converted and the other values like Quantiles would not be.
132
+ // checked first.
136
133
//
137
134
// See the Aggregator Kind for more information
138
135
// https://github.com/open-telemetry/opentelemetry-go/blob/master/sdk/export/metric/aggregation/aggregation.go#L123-L138
@@ -142,12 +139,6 @@ func (e *Exporter) ConvertToTimeSeries(checkpointSet export.CheckpointSet) ([]*p
142
139
return err
143
140
}
144
141
timeSeries = append (timeSeries , tSeries ... )
145
- } else if distribution , ok := agg .(aggregation.Distribution ); ok && len (e .config .Quantiles ) != 0 {
146
- tSeries , err := convertFromDistribution (record , distribution , e .config .Quantiles )
147
- if err != nil {
148
- return err
149
- }
150
- timeSeries = append (timeSeries , tSeries ... )
151
142
} else if sum , ok := agg .(aggregation.Sum ); ok {
152
143
tSeries , err := convertFromSum (record , sum )
153
144
if err != nil {
@@ -257,7 +248,7 @@ func convertFromMinMaxSumCount(record metric.Record, minMaxSumCount aggregation.
257
248
return nil , err
258
249
}
259
250
name = sanitize (record .Descriptor ().Name () + "_count" )
260
- countTimeSeries := createTimeSeries (record , number .NewInt64Number (count ), number .Int64Kind , label .String ("__name__" , name ))
251
+ countTimeSeries := createTimeSeries (record , number .NewInt64Number (int64 ( count ) ), number .Int64Kind , label .String ("__name__" , name ))
261
252
262
253
// Return all timeSeries
263
254
tSeries := []* prompb.TimeSeries {
@@ -267,66 +258,6 @@ func convertFromMinMaxSumCount(record metric.Record, minMaxSumCount aggregation.
267
258
return tSeries , nil
268
259
}
269
260
270
- // convertFromDistribution returns len(quantiles) number of TimeSeries in a distribution.
271
- func convertFromDistribution (record metric.Record , distribution aggregation.Distribution , quantiles []float64 ) ([]* prompb.TimeSeries , error ) {
272
- var timeSeries []* prompb.TimeSeries
273
- metricName := sanitize (record .Descriptor ().Name ())
274
- numberKind := record .Descriptor ().NumberKind ()
275
-
276
- // Convert Min
277
- min , err := distribution .Min ()
278
- if err != nil {
279
- return nil , err
280
- }
281
- name := sanitize (metricName + "_min" )
282
- minTimeSeries := createTimeSeries (record , min , numberKind , label .String ("__name__" , name ))
283
- timeSeries = append (timeSeries , minTimeSeries )
284
-
285
- // Convert Max
286
- max , err := distribution .Max ()
287
- if err != nil {
288
- return nil , err
289
- }
290
- name = sanitize (metricName + "_max" )
291
- maxTimeSeries := createTimeSeries (record , max , numberKind , label .String ("__name__" , name ))
292
- timeSeries = append (timeSeries , maxTimeSeries )
293
-
294
- // Convert Sum
295
- sum , err := distribution .Sum ()
296
- if err != nil {
297
- return nil , err
298
- }
299
- name = sanitize (metricName + "_sum" )
300
- sumTimeSeries := createTimeSeries (record , sum , numberKind , label .String ("__name__" , name ))
301
- timeSeries = append (timeSeries , sumTimeSeries )
302
-
303
- // Convert Count
304
- count , err := distribution .Count ()
305
- if err != nil {
306
- return nil , err
307
- }
308
- name = sanitize (record .Descriptor ().Name () + "_count" )
309
- countTimeSeries := createTimeSeries (record , number .NewInt64Number (count ), number .Int64Kind , label .String ("__name__" , name ))
310
- timeSeries = append (timeSeries , countTimeSeries )
311
-
312
- // For each configured quantile, get the value and create a timeseries
313
- for _ , q := range quantiles {
314
- value , err := distribution .Quantile (q )
315
- if err != nil {
316
- return nil , err
317
- }
318
-
319
- // Add quantile as a label. e.g. {quantile="0.5"}
320
- quantileStr := strconv .FormatFloat (q , 'f' , - 1 , 64 )
321
-
322
- // Create TimeSeries
323
- tSeries := createTimeSeries (record , value , numberKind , label .String ("__name__" , metricName ), label .String ("quantile" , quantileStr ))
324
- timeSeries = append (timeSeries , tSeries )
325
- }
326
-
327
- return timeSeries , nil
328
- }
329
-
330
261
// convertFromHistogram returns len(histogram.Buckets) timeseries for a histogram aggregation
331
262
func convertFromHistogram (record metric.Record , histogram aggregation.Histogram ) ([]* prompb.TimeSeries , error ) {
332
263
var timeSeries []* prompb.TimeSeries
@@ -353,7 +284,7 @@ func convertFromHistogram(record metric.Record, histogram aggregation.Histogram)
353
284
counts := make (map [float64 ]float64 , len (buckets .Boundaries ))
354
285
for i , boundary := range buckets .Boundaries {
355
286
// Add bucket count to totalCount and record in map
356
- totalCount += buckets .Counts [i ]
287
+ totalCount += float64 ( buckets .Counts [i ])
357
288
counts [boundary ] = totalCount
358
289
359
290
// Add upper boundary as a label. e.g. {le="5"}
@@ -365,7 +296,7 @@ func convertFromHistogram(record metric.Record, histogram aggregation.Histogram)
365
296
}
366
297
367
298
// Include the +inf boundary in the total count
368
- totalCount += buckets .Counts [len (buckets .Counts )- 1 ]
299
+ totalCount += float64 ( buckets .Counts [len (buckets .Counts )- 1 ])
369
300
370
301
// Create a timeSeries for the +inf bucket and total count
371
302
// These are the same and are both required by Prometheus-based backends
0 commit comments