@@ -231,7 +231,7 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *ins
231
231
//
232
232
// If an instrument is determined to use a Drop aggregation, that instrument is
233
233
// not inserted nor returned.
234
- func (i * inserter [N ]) Instrument (inst Instrument ) ([]aggregate.Measure [N ], error ) {
234
+ func (i * inserter [N ]) Instrument (inst Instrument , readerAggregation Aggregation ) ([]aggregate.Measure [N ], error ) {
235
235
var (
236
236
matched bool
237
237
measures []aggregate.Measure [N ]
@@ -245,8 +245,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
245
245
continue
246
246
}
247
247
matched = true
248
-
249
- in , id , err := i .cachedAggregator (inst .Scope , inst .Kind , stream )
248
+ in , id , err := i .cachedAggregator (inst .Scope , inst .Kind , stream , readerAggregation )
250
249
if err != nil {
251
250
errs .append (err )
252
251
}
@@ -271,7 +270,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
271
270
Description : inst .Description ,
272
271
Unit : inst .Unit ,
273
272
}
274
- in , _ , err := i .cachedAggregator (inst .Scope , inst .Kind , stream )
273
+ in , _ , err := i .cachedAggregator (inst .Scope , inst .Kind , stream , readerAggregation )
275
274
if err != nil {
276
275
errs .append (err )
277
276
}
@@ -291,6 +290,31 @@ type aggVal[N int64 | float64] struct {
291
290
Err error
292
291
}
293
292
293
+ // readerDefaultAggregation returns the default aggregation for the instrument
294
+ // kind based on the reader's aggregation preferences. This is used unless the
295
+ // aggregation is overridden with a view.
296
+ func (i * inserter [N ]) readerDefaultAggregation (kind InstrumentKind ) Aggregation {
297
+ aggregation := i .pipeline .reader .aggregation (kind )
298
+ switch aggregation .(type ) {
299
+ case nil , AggregationDefault :
300
+ // If the reader returns default or nil use the default selector.
301
+ aggregation = DefaultAggregationSelector (kind )
302
+ default :
303
+ // Deep copy and validate before using.
304
+ aggregation = aggregation .copy ()
305
+ if err := aggregation .err (); err != nil {
306
+ orig := aggregation
307
+ aggregation = DefaultAggregationSelector (kind )
308
+ global .Error (
309
+ err , "using default aggregation instead" ,
310
+ "aggregation" , orig ,
311
+ "replacement" , aggregation ,
312
+ )
313
+ }
314
+ }
315
+ return aggregation
316
+ }
317
+
294
318
// cachedAggregator returns the appropriate aggregate input and output
295
319
// functions for an instrument configuration. If the exact instrument has been
296
320
// created within the inst.Scope, those aggregate function instances will be
@@ -305,29 +329,14 @@ type aggVal[N int64 | float64] struct {
305
329
//
306
330
// If the instrument defines an unknown or incompatible aggregation, an error
307
331
// is returned.
308
- func (i * inserter [N ]) cachedAggregator (scope instrumentation.Scope , kind InstrumentKind , stream Stream ) (meas aggregate.Measure [N ], aggID uint64 , err error ) {
332
+ func (i * inserter [N ]) cachedAggregator (scope instrumentation.Scope , kind InstrumentKind , stream Stream , readerAggregation Aggregation ) (meas aggregate.Measure [N ], aggID uint64 , err error ) {
309
333
switch stream .Aggregation .(type ) {
310
334
case nil :
311
- // Undefined, nil, means to use the default from the reader.
312
- stream .Aggregation = i .pipeline .reader .aggregation (kind )
313
- switch stream .Aggregation .(type ) {
314
- case nil , AggregationDefault :
315
- // If the reader returns default or nil use the default selector.
316
- stream .Aggregation = DefaultAggregationSelector (kind )
317
- default :
318
- // Deep copy and validate before using.
319
- stream .Aggregation = stream .Aggregation .copy ()
320
- if err := stream .Aggregation .err (); err != nil {
321
- orig := stream .Aggregation
322
- stream .Aggregation = DefaultAggregationSelector (kind )
323
- global .Error (
324
- err , "using default aggregation instead" ,
325
- "aggregation" , orig ,
326
- "replacement" , stream .Aggregation ,
327
- )
328
- }
329
- }
335
+ // The aggregation was not overridden with a view. Use the aggregation
336
+ // provided by the reader.
337
+ stream .Aggregation = readerAggregation
330
338
case AggregationDefault :
339
+ // The view explicitly requested the default aggregation.
331
340
stream .Aggregation = DefaultAggregationSelector (kind )
332
341
}
333
342
@@ -596,7 +605,29 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
596
605
597
606
errs := & multierror {}
598
607
for _ , i := range r .inserters {
599
- in , err := i .Instrument (id )
608
+ in , err := i .Instrument (id , i .readerDefaultAggregation (id .Kind ))
609
+ if err != nil {
610
+ errs .append (err )
611
+ }
612
+ measures = append (measures , in ... )
613
+ }
614
+ return measures , errs .errorOrNil ()
615
+ }
616
+
617
+ // HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
618
+ // defined by key. If boundaries were provided on instrument instantiation, those take precedence
619
+ // over boundaries provided by the reader.
620
+ func (r resolver [N ]) HistogramAggregators (id Instrument , boundaries []float64 ) ([]aggregate.Measure [N ], error ) {
621
+ var measures []aggregate.Measure [N ]
622
+
623
+ errs := & multierror {}
624
+ for _ , i := range r .inserters {
625
+ agg := i .readerDefaultAggregation (id .Kind )
626
+ if histAgg , ok := agg .(AggregationExplicitBucketHistogram ); ok && len (boundaries ) > 0 {
627
+ histAgg .Boundaries = boundaries
628
+ agg = histAgg
629
+ }
630
+ in , err := i .Instrument (id , agg )
600
631
if err != nil {
601
632
errs .append (err )
602
633
}
0 commit comments