@@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
114
114
cancel : cancel ,
115
115
done : make (chan struct {}),
116
116
}
117
+ r .externalProducers .Store ([]Producer {})
117
118
118
119
go func () {
119
120
defer func () { close (r .done ) }()
@@ -126,7 +127,11 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
126
127
// periodicReader is a Reader that continuously collects and exports metric
127
128
// data at a set interval.
128
129
type periodicReader struct {
129
- producer atomic.Value
130
+ sdkProducer atomic.Value
131
+
132
+ mu sync.Mutex
133
+ isShutdown bool
134
+ externalProducers atomic.Value
130
135
131
136
timeout time.Duration
132
137
exporter Exporter
@@ -166,14 +171,28 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
166
171
}
167
172
168
173
// register registers p as the producer of this reader.
169
- func (r * periodicReader ) register (p producer ) {
174
+ func (r * periodicReader ) register (p sdkProducer ) {
170
175
// Only register once. If producer is already set, do nothing.
171
- if ! r .producer .CompareAndSwap (nil , produceHolder {produce : p .produce }) {
176
+ if ! r .sdkProducer .CompareAndSwap (nil , produceHolder {produce : p .produce }) {
172
177
msg := "did not register periodic reader"
173
178
global .Error (errDuplicateRegister , msg )
174
179
}
175
180
}
176
181
182
+ // RegisterProducer registers p as an external Producer of this reader.
183
+ func (r * periodicReader ) RegisterProducer (p Producer ) {
184
+ r .mu .Lock ()
185
+ defer r .mu .Unlock ()
186
+ if r .isShutdown {
187
+ return
188
+ }
189
+ currentProducers := r .externalProducers .Load ().([]Producer )
190
+ newProducers := []Producer {}
191
+ newProducers = append (newProducers , currentProducers ... )
192
+ newProducers = append (newProducers , p )
193
+ r .externalProducers .Store (newProducers )
194
+ }
195
+
177
196
// temporality reports the Temporality for the instrument kind provided.
178
197
func (r * periodicReader ) temporality (kind InstrumentKind ) metricdata.Temporality {
179
198
return r .exporter .Temporality (kind )
@@ -195,12 +214,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
195
214
}
196
215
197
216
// Collect gathers and returns all metric data related to the Reader from
198
- // the SDK. The returned metric data is not exported to the configured
199
- // exporter, it is left to the caller to handle that if desired.
217
+ // the SDK and other Producers. The returned metric data is not exported
218
+ // to the configured exporter, it is left to the caller to handle that if
219
+ // desired.
200
220
//
201
221
// An error is returned if this is called after Shutdown.
202
222
func (r * periodicReader ) Collect (ctx context.Context ) (metricdata.ResourceMetrics , error ) {
203
- return r .collect (ctx , r .producer .Load ())
223
+ return r .collect (ctx , r .sdkProducer .Load ())
204
224
}
205
225
206
226
// collect unwraps p as a produceHolder and returns its produce results.
@@ -218,7 +238,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
218
238
err := fmt .Errorf ("periodic reader: invalid producer: %T" , p )
219
239
return metricdata.ResourceMetrics {}, err
220
240
}
221
- return ph .produce (ctx )
241
+
242
+ rm , err := ph .produce (ctx )
243
+ if err != nil {
244
+ return metricdata.ResourceMetrics {}, err
245
+ }
246
+ var errs []error
247
+ for _ , producer := range r .externalProducers .Load ().([]Producer ) {
248
+ externalMetrics , err := producer .Produce (ctx )
249
+ if err != nil {
250
+ errs = append (errs , err )
251
+ }
252
+ rm .ScopeMetrics = append (rm .ScopeMetrics , externalMetrics ... )
253
+ }
254
+ return rm , unifyErrors (errs )
222
255
}
223
256
224
257
// export exports metric data m using r's exporter.
@@ -259,7 +292,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
259
292
<- r .done
260
293
261
294
// Any future call to Collect will now return ErrReaderShutdown.
262
- ph := r .producer .Swap (produceHolder {
295
+ ph := r .sdkProducer .Swap (produceHolder {
263
296
produce : shutdownProducer {}.produce ,
264
297
})
265
298
@@ -276,6 +309,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
276
309
if err == nil || err == ErrReaderShutdown {
277
310
err = sErr
278
311
}
312
+
313
+ r .mu .Lock ()
314
+ defer r .mu .Unlock ()
315
+ r .isShutdown = true
316
+ // release references to Producer(s)
317
+ r .externalProducers .Store ([]Producer {})
279
318
})
280
319
return err
281
320
}
0 commit comments