Skip to content

Commit 4014204

Browse files
Allow multi-instrument callbacks to be unregistered (#3522)
* Update Meter RegisterCallback method Return a Registration from the method that can be used by the caller to unregister their callback. Update documentation of the method to better explain expectations of use and implementation. * Update noop impl * Update global impl * Test global Unregister concurrent safe * Use a map to track reg in global impl * Update sdk impl * Use a list for global impl * Fix prom example * Lint metric/meter.go * Fix metric example * Placeholder for changelog * Update PR number in changelog * Update sdk/metric/pipeline.go Co-authored-by: Aaron Clawson <[email protected]> * Add test unregistered callback is not called Co-authored-by: Aaron Clawson <[email protected]>
1 parent ca4cdfe commit 4014204

File tree

11 files changed

+350
-66
lines changed

11 files changed

+350
-66
lines changed

CHANGELOG.md

+6-4
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
88

99
## [Unreleased]
1010

11-
### Removed
12-
13-
- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)
14-
1511
### Added
1612

13+
- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
14+
This `Registration` can be used to unregister callbacks. (#3522)
1715
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
1816

17+
### Removed
18+
19+
- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)
20+
1921
### Changed
2022

2123
- Global error handler uses an atomic value instead of a mutex. (#3543)

example/prometheus/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func main() {
6868
if err != nil {
6969
log.Fatal(err)
7070
}
71-
err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
71+
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
7272
n := -10. + rand.Float64()*(90.) // [-10, 100)
7373
gauge.Observe(ctx, n, attrs...)
7474
})

metric/example_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func ExampleMeter_asynchronous_single() {
6161
panic(err)
6262
}
6363

64-
err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
64+
_, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
6565
func(ctx context.Context) {
6666
// instrument.WithCallbackFunc(func(ctx context.Context) {
6767
//Do Work to get the real memoryUsage
@@ -86,7 +86,7 @@ func ExampleMeter_asynchronous_multiple() {
8686
gcCount, _ := meter.AsyncInt64().Counter("gcCount")
8787
gcPause, _ := meter.SyncFloat64().Histogram("gcPause")
8888

89-
err := meter.RegisterCallback([]instrument.Asynchronous{
89+
_, err := meter.RegisterCallback([]instrument.Asynchronous{
9090
heapAlloc,
9191
gcCount,
9292
},

metric/internal/global/meter.go

+49-14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package global // import "go.opentelemetry.io/otel/metric/internal/global"
1616

1717
import (
18+
"container/list"
1819
"context"
1920
"sync"
2021
"sync/atomic"
@@ -109,7 +110,8 @@ type meter struct {
109110

110111
mtx sync.Mutex
111112
instruments []delegatedInstrument
112-
callbacks []delegatedCallback
113+
114+
registry list.List
113115

114116
delegate atomic.Value // metric.Meter
115117
}
@@ -135,12 +137,14 @@ func (m *meter) setDelegate(provider metric.MeterProvider) {
135137
inst.setDelegate(meter)
136138
}
137139

138-
for _, callback := range m.callbacks {
139-
callback.setDelegate(meter)
140+
for e := m.registry.Front(); e != nil; e = e.Next() {
141+
r := e.Value.(*registration)
142+
r.setDelegate(meter)
143+
m.registry.Remove(e)
140144
}
141145

142146
m.instruments = nil
143-
m.callbacks = nil
147+
m.registry.Init()
144148
}
145149

146150
// AsyncInt64 is the namespace for the Asynchronous Integer instruments.
@@ -167,20 +171,24 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
167171
//
168172
// It is only valid to call Observe within the scope of the passed function,
169173
// and only on the instruments that were registered with this call.
170-
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
174+
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
171175
if del, ok := m.delegate.Load().(metric.Meter); ok {
172176
insts = unwrapInstruments(insts)
173-
return del.RegisterCallback(insts, function)
177+
return del.RegisterCallback(insts, f)
174178
}
175179

176180
m.mtx.Lock()
177181
defer m.mtx.Unlock()
178-
m.callbacks = append(m.callbacks, delegatedCallback{
179-
instruments: insts,
180-
function: function,
181-
})
182182

183-
return nil
183+
reg := &registration{instruments: insts, function: f}
184+
e := m.registry.PushBack(reg)
185+
reg.unreg = func() error {
186+
m.mtx.Lock()
187+
_ = m.registry.Remove(e)
188+
m.mtx.Unlock()
189+
return nil
190+
}
191+
return reg, nil
184192
}
185193

186194
type wrapped interface {
@@ -217,17 +225,44 @@ func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
217225
return (*sfInstProvider)(m)
218226
}
219227

220-
type delegatedCallback struct {
228+
type registration struct {
221229
instruments []instrument.Asynchronous
222230
function func(context.Context)
231+
232+
unreg func() error
233+
unregMu sync.Mutex
223234
}
224235

225-
func (c *delegatedCallback) setDelegate(m metric.Meter) {
236+
func (c *registration) setDelegate(m metric.Meter) {
226237
insts := unwrapInstruments(c.instruments)
227-
err := m.RegisterCallback(insts, c.function)
238+
239+
c.unregMu.Lock()
240+
defer c.unregMu.Unlock()
241+
242+
if c.unreg == nil {
243+
// Unregister already called.
244+
return
245+
}
246+
247+
reg, err := m.RegisterCallback(insts, c.function)
228248
if err != nil {
229249
otel.Handle(err)
230250
}
251+
252+
c.unreg = reg.Unregister
253+
}
254+
255+
func (c *registration) Unregister() error {
256+
c.unregMu.Lock()
257+
defer c.unregMu.Unlock()
258+
if c.unreg == nil {
259+
// Unregister already called.
260+
return nil
261+
}
262+
263+
var err error
264+
err, c.unreg = c.unreg(), nil
265+
return err
231266
}
232267

233268
type afInstProvider meter

metric/internal/global/meter_test.go

+81-3
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) {
6868
_, _ = mtr.SyncInt64().Counter(name)
6969
_, _ = mtr.SyncInt64().UpDownCounter(name)
7070
_, _ = mtr.SyncInt64().Histogram(name)
71-
_ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
71+
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
7272
if !once {
7373
wg.Done()
7474
once = true
@@ -86,6 +86,35 @@ func TestMeterRace(t *testing.T) {
8686
close(finish)
8787
}
8888

89+
func TestUnregisterRace(t *testing.T) {
90+
mtr := &meter{}
91+
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {})
92+
require.NoError(t, err)
93+
94+
wg := &sync.WaitGroup{}
95+
wg.Add(1)
96+
finish := make(chan struct{})
97+
go func() {
98+
for i, once := 0, false; ; i++ {
99+
_ = reg.Unregister()
100+
if !once {
101+
wg.Done()
102+
once = true
103+
}
104+
select {
105+
case <-finish:
106+
return
107+
default:
108+
}
109+
}
110+
}()
111+
_ = reg.Unregister()
112+
113+
wg.Wait()
114+
mtr.setDelegate(metric.NewNoopMeterProvider())
115+
close(finish)
116+
}
117+
89118
func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) {
90119
afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter")
91120
require.NoError(t, err)
@@ -101,9 +130,10 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun
101130
_, err = m.AsyncInt64().Gauge("test_Async_Gauge")
102131
assert.NoError(t, err)
103132

104-
require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
133+
_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
105134
afcounter.Observe(ctx, 3)
106-
}))
135+
})
136+
require.NoError(t, err)
107137

108138
sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter")
109139
require.NoError(t, err)
@@ -257,3 +287,51 @@ func TestMeterDefersDelegations(t *testing.T) {
257287
assert.IsType(t, &afCounter{}, actr)
258288
assert.Equal(t, 1, mp.count)
259289
}
290+
291+
func TestRegistrationDelegation(t *testing.T) {
292+
// globalMeterProvider := otel.GetMeterProvider
293+
globalMeterProvider := &meterProvider{}
294+
295+
m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
296+
require.IsType(t, &meter{}, m)
297+
mImpl := m.(*meter)
298+
299+
actr, err := m.AsyncFloat64().Counter("test_Async_Counter")
300+
require.NoError(t, err)
301+
302+
var called0 bool
303+
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
304+
called0 = true
305+
})
306+
require.NoError(t, err)
307+
require.Equal(t, 1, mImpl.registry.Len(), "callback not registered")
308+
// This means reg0 should not be delegated.
309+
assert.NoError(t, reg0.Unregister())
310+
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")
311+
312+
var called1 bool
313+
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
314+
called1 = true
315+
})
316+
require.NoError(t, err)
317+
require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered")
318+
319+
mp := &testMeterProvider{}
320+
321+
// otel.SetMeterProvider(mp)
322+
globalMeterProvider.setDelegate(mp)
323+
324+
testCollect(t, m) // This is a hacky way to emulate a read from an exporter
325+
require.False(t, called0, "pre-delegation unregistered callback called")
326+
require.True(t, called1, "callback not called")
327+
328+
called1 = false
329+
assert.NoError(t, reg1.Unregister(), "unregister second callback")
330+
331+
testCollect(t, m) // This is a hacky way to emulate a read from an exporter
332+
assert.False(t, called1, "unregistered callback called")
333+
334+
assert.NotPanics(t, func() {
335+
assert.NoError(t, reg1.Unregister(), "duplicate unregister calls")
336+
})
337+
}

metric/internal/global/meter_types_test.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,21 @@ func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider {
6464
//
6565
// It is only valid to call Observe within the scope of the passed function,
6666
// and only on the instruments that were registered with this call.
67-
func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
68-
m.callbacks = append(m.callbacks, function)
67+
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
68+
m.callbacks = append(m.callbacks, f)
69+
return testReg{
70+
f: func(idx int) func() {
71+
return func() { m.callbacks[idx] = nil }
72+
}(len(m.callbacks) - 1),
73+
}, nil
74+
}
75+
76+
type testReg struct {
77+
f func()
78+
}
79+
80+
func (r testReg) Unregister() error {
81+
r.f()
6982
return nil
7083
}
7184

@@ -85,6 +98,10 @@ func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider {
8598
func (m *testMeter) collect() {
8699
ctx := context.Background()
87100
for _, f := range m.callbacks {
101+
if f == nil {
102+
// Unregister.
103+
continue
104+
}
88105
f(ctx)
89106
}
90107
}

metric/meter.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,30 @@ type Meter interface {
5151
// To Observe data with instruments it must be registered in a callback.
5252
AsyncFloat64() asyncfloat64.InstrumentProvider
5353

54-
// RegisterCallback captures the function that will be called during Collect.
55-
//
56-
// It is only valid to call Observe within the scope of the passed function,
57-
// and only on the instruments that were registered with this call.
58-
RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error
59-
6054
// SyncInt64 is the namespace for the Synchronous Integer instruments
6155
SyncInt64() syncint64.InstrumentProvider
6256
// SyncFloat64 is the namespace for the Synchronous Float instruments
6357
SyncFloat64() syncfloat64.InstrumentProvider
58+
59+
// RegisterCallback registers f to be called during the collection of a
60+
// measurement cycle.
61+
//
62+
// If Unregister of the returned Registration is called, f needs to be
63+
// unregistered and not called during collection.
64+
//
65+
// The instruments f is registered with are the only instruments that f may
66+
// observe values for.
67+
//
68+
// If no instruments are passed, f should not be registered nor called
69+
// during collection.
70+
RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error)
71+
}
72+
73+
// Registration is an token representing the unique registration of a callback
74+
// for a set of instruments with a Meter.
75+
type Registration interface {
76+
// Unregister removes the callback registration from a Meter.
77+
//
78+
// This method needs to be idempotent and concurrent safe.
79+
Unregister() error
6480
}

metric/noop.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,14 @@ func (noopMeter) SyncFloat64() syncfloat64.InstrumentProvider {
6464
}
6565

6666
// RegisterCallback creates a register callback that does not record any metrics.
67-
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) error {
68-
return nil
67+
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) {
68+
return noopReg{}, nil
6969
}
7070

71+
type noopReg struct{}
72+
73+
func (noopReg) Unregister() error { return nil }
74+
7175
type nonrecordingAsyncFloat64Instrument struct {
7276
instrument.Asynchronous
7377
}

sdk/metric/meter.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
6969

7070
// RegisterCallback registers the function f to be called when any of the
7171
// insts Collect method is called.
72-
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
72+
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
7373
for _, inst := range insts {
7474
// Only register if at least one instrument has a non-drop aggregation.
7575
// Otherwise, calling f during collection will be wasted computation.
@@ -91,14 +91,21 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context
9191
}
9292
}
9393
// All insts use drop aggregation.
94-
return nil
94+
return noopRegister{}, nil
9595
}
9696

97-
func (m *meter) registerCallback(f func(context.Context)) error {
98-
m.pipes.registerCallback(f)
97+
type noopRegister struct{}
98+
99+
func (noopRegister) Unregister() error {
99100
return nil
100101
}
101102

103+
type callback func(context.Context)
104+
105+
func (m *meter) registerCallback(c callback) (metric.Registration, error) {
106+
return m.pipes.registerCallback(c), nil
107+
}
108+
102109
// SyncInt64 returns the synchronous integer instrument provider.
103110
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
104111
return syncInt64Provider{m.instProviderInt64}

0 commit comments

Comments
 (0)