From 1ec1112e98bde8994f9f700749397a128c99e83a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 6 Dec 2022 12:31:45 -0800 Subject: [PATCH 01/14] Update Meter RegisterCallback method Return a Registration from the method that can be used by the caller to unregister their callback. Update documentation of the method to better explain expectations of use and implementation. --- metric/meter.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/metric/meter.go b/metric/meter.go index 23e6853afbb..50a11adccaf 100644 --- a/metric/meter.go +++ b/metric/meter.go @@ -51,14 +51,30 @@ type Meter interface { // To Observe data with instruments it must be registered in a callback. AsyncFloat64() asyncfloat64.InstrumentProvider - // RegisterCallback captures the function that will be called during Collect. - // - // It is only valid to call Observe within the scope of the passed function, - // and only on the instruments that were registered with this call. - RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error - // SyncInt64 is the namespace for the Synchronous Integer instruments SyncInt64() syncint64.InstrumentProvider // SyncFloat64 is the namespace for the Synchronous Float instruments SyncFloat64() syncfloat64.InstrumentProvider + + // RegisterCallback registeres f to be called during the collection of a + // measurement cycle. + // + // If Unregister of the returned Registration is called, f needs to be + // unregistered and not called during collection. + // + // The instruments f is registered with are the only instruments that f may + // observe values for. + // + // If no instruments are passed, f should not be registered nor called + // during collection. + RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error) +} + +// Registration is an token representing the unique registration of a callback +// for a set of instruments with a Meter. +type Registration interface { + // Unregister removes the callback registration from a Meter. + // + // This method needs to be idempotent and concurrent safe. + Unregister() error } From a17f123b2d9641891e6b3e527fd75226ff67230c Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 6 Dec 2022 12:34:50 -0800 Subject: [PATCH 02/14] Update noop impl --- metric/noop.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/metric/noop.go b/metric/noop.go index e8b9a9a1458..7454a790337 100644 --- a/metric/noop.go +++ b/metric/noop.go @@ -64,10 +64,14 @@ func (noopMeter) SyncFloat64() syncfloat64.InstrumentProvider { } // RegisterCallback creates a register callback that does not record any metrics. -func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) error { - return nil +func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) { + return noopReg{}, nil } +type noopReg struct{} + +func (noopReg) Unregister() error { return nil } + type nonrecordingAsyncFloat64Instrument struct { instrument.Asynchronous } From 41a433e3825f5a125c8999282896921663796357 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 6 Dec 2022 14:24:04 -0800 Subject: [PATCH 03/14] Update global impl --- metric/internal/global/meter.go | 63 ++++++++++++++++++---- metric/internal/global/meter_test.go | 55 +++++++++++++++++-- metric/internal/global/meter_types_test.go | 21 +++++++- 3 files changed, 123 insertions(+), 16 deletions(-) diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 0fa924f397c..b6b8aaae628 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -109,7 +109,7 @@ type meter struct { mtx sync.Mutex instruments []delegatedInstrument - callbacks []delegatedCallback + register []*registration delegate atomic.Value // metric.Meter } @@ -135,12 +135,16 @@ func (m *meter) setDelegate(provider metric.MeterProvider) { inst.setDelegate(meter) } - for _, callback := range m.callbacks { - callback.setDelegate(meter) + for _, r := range m.register { + if r == nil { + // Already unregistered. + continue + } + r.setDelegate(meter) } m.instruments = nil - m.callbacks = nil + m.register = nil } // AsyncInt64 is the namespace for the Asynchronous Integer instruments. @@ -167,7 +171,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) (metric.Registration, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { insts = unwrapInstruments(insts) return del.RegisterCallback(insts, function) @@ -175,12 +179,22 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func( m.mtx.Lock() defer m.mtx.Unlock() - m.callbacks = append(m.callbacks, delegatedCallback{ + reg := ®istration{ instruments: insts, function: function, - }) + unreg: m.unregister(len(m.register)), + } + m.register = append(m.register, reg) + return reg, nil +} - return nil +func (m *meter) unregister(idx int) func() error { + return func() error { + m.mtx.Lock() + defer m.mtx.Unlock() + m.register[idx] = nil + return nil + } } type wrapped interface { @@ -217,17 +231,44 @@ func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { return (*sfInstProvider)(m) } -type delegatedCallback struct { +type registration struct { instruments []instrument.Asynchronous function func(context.Context) + + unreg func() error + unregMu sync.Mutex } -func (c *delegatedCallback) setDelegate(m metric.Meter) { +func (c *registration) setDelegate(m metric.Meter) { insts := unwrapInstruments(c.instruments) - err := m.RegisterCallback(insts, c.function) + + c.unregMu.Lock() + defer c.unregMu.Unlock() + + if c.unreg == nil { + // Unregister already called. + return + } + + reg, err := m.RegisterCallback(insts, c.function) if err != nil { otel.Handle(err) } + + c.unreg = reg.Unregister +} + +func (c *registration) Unregister() error { + c.unregMu.Lock() + defer c.unregMu.Unlock() + if c.unreg == nil { + // Unregister already called. + return nil + } + + var err error + err, c.unreg = c.unreg(), nil + return err } type afInstProvider meter diff --git a/metric/internal/global/meter_test.go b/metric/internal/global/meter_test.go index 8865f06d57b..60e12e97b73 100644 --- a/metric/internal/global/meter_test.go +++ b/metric/internal/global/meter_test.go @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) { _, _ = mtr.SyncInt64().Counter(name) _, _ = mtr.SyncInt64().UpDownCounter(name) _, _ = mtr.SyncInt64().Histogram(name) - _ = mtr.RegisterCallback(nil, func(ctx context.Context) {}) + _, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {}) if !once { wg.Done() once = true @@ -101,9 +101,10 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun _, err = m.AsyncInt64().Gauge("test_Async_Gauge") assert.NoError(t, err) - require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) { afcounter.Observe(ctx, 3) - })) + }) + require.NoError(t, err) sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter") require.NoError(t, err) @@ -257,3 +258,51 @@ func TestMeterDefersDelegations(t *testing.T) { assert.IsType(t, &afCounter{}, actr) assert.Equal(t, 1, mp.count) } + +func TestRegistrationDelegation(t *testing.T) { + // globalMeterProvider := otel.GetMeterProvider + globalMeterProvider := &meterProvider{} + + m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test") + require.IsType(t, &meter{}, m) + mImpl := m.(*meter) + + actr, err := m.AsyncFloat64().Counter("test_Async_Counter") + require.NoError(t, err) + + var called0 bool + reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + called0 = true + }) + require.NoError(t, err) + require.Len(t, mImpl.register, 1, "callback not registered") + // This means reg0 should not be delegated. + assert.NoError(t, reg0.Unregister()) + assert.Nil(t, mImpl.register[0], "callback not unregister") + + var called1 bool + reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + called1 = true + }) + require.NoError(t, err) + require.Len(t, mImpl.register, 2, "second callback not registered") + + mp := &testMeterProvider{} + + // otel.SetMeterProvider(mp) + globalMeterProvider.setDelegate(mp) + + testCollect(t, m) // This is a hacky way to emulate a read from an exporter + require.False(t, called0, "pre-delegation unregistered callback called") + require.True(t, called1, "callback not called") + + called1 = false + assert.NoError(t, reg1.Unregister(), "unregister second callback") + + testCollect(t, m) // This is a hacky way to emulate a read from an exporter + assert.False(t, called1, "unregistered callback called") + + assert.NotPanics(t, func() { + assert.NoError(t, reg1.Unregister(), "duplicate unregister calls") + }) +} diff --git a/metric/internal/global/meter_types_test.go b/metric/internal/global/meter_types_test.go index ac6e93ebe38..53dfbc7528d 100644 --- a/metric/internal/global/meter_types_test.go +++ b/metric/internal/global/meter_types_test.go @@ -64,8 +64,21 @@ func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider { // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error { - m.callbacks = append(m.callbacks, function) +func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { + m.callbacks = append(m.callbacks, f) + return testReg{ + f: func(idx int) func() { + return func() { m.callbacks[idx] = nil } + }(len(m.callbacks) - 1), + }, nil +} + +type testReg struct { + f func() +} + +func (r testReg) Unregister() error { + r.f() return nil } @@ -85,6 +98,10 @@ func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider { func (m *testMeter) collect() { ctx := context.Background() for _, f := range m.callbacks { + if f == nil { + // Unregister. + continue + } f(ctx) } } From 6e47a27f30ad434eee1c17a369090b01b8f047f4 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 6 Dec 2022 14:28:15 -0800 Subject: [PATCH 04/14] Test global Unregister concurrent safe --- metric/internal/global/meter_test.go | 29 ++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/metric/internal/global/meter_test.go b/metric/internal/global/meter_test.go index 60e12e97b73..39d63992f05 100644 --- a/metric/internal/global/meter_test.go +++ b/metric/internal/global/meter_test.go @@ -86,6 +86,35 @@ func TestMeterRace(t *testing.T) { close(finish) } +func TestUnregisterRace(t *testing.T) { + mtr := &meter{} + reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + finish := make(chan struct{}) + go func() { + for i, once := 0, false; ; i++ { + _ = reg.Unregister() + if !once { + wg.Done() + once = true + } + select { + case <-finish: + return + default: + } + } + }() + _ = reg.Unregister() + + wg.Wait() + mtr.setDelegate(metric.NewNoopMeterProvider()) + close(finish) +} + func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) { afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter") require.NoError(t, err) From 1b74247ec8eba1f14cc42bcf8ae66a4c7ade45d6 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 08:39:12 -0800 Subject: [PATCH 05/14] Use a map to track reg in global impl --- metric/internal/global/meter.go | 42 ++++++++++++++++++---------- metric/internal/global/meter_test.go | 6 ++-- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index b6b8aaae628..5a26597fd45 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -109,7 +109,9 @@ type meter struct { mtx sync.Mutex instruments []delegatedInstrument - register []*registration + + regID uint + reg map[uint]*registration delegate atomic.Value // metric.Meter } @@ -135,16 +137,12 @@ func (m *meter) setDelegate(provider metric.MeterProvider) { inst.setDelegate(meter) } - for _, r := range m.register { - if r == nil { - // Already unregistered. - continue - } + for _, r := range m.reg { r.setDelegate(meter) } m.instruments = nil - m.register = nil + m.reg = nil } // AsyncInt64 is the namespace for the Asynchronous Integer instruments. @@ -171,28 +169,42 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) (metric.Registration, error) { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { insts = unwrapInstruments(insts) - return del.RegisterCallback(insts, function) + return del.RegisterCallback(insts, f) } + return m.register(insts, f) +} + +func (m *meter) registrationID() uint { + defer func() { m.regID++ }() + return m.regID +} +func (m *meter) register(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { m.mtx.Lock() defer m.mtx.Unlock() + + if m.reg == nil { + m.reg = make(map[uint]*registration) + } + + id := m.registrationID() reg := ®istration{ instruments: insts, - function: function, - unreg: m.unregister(len(m.register)), + function: f, + unreg: m.unregister(id), } - m.register = append(m.register, reg) + m.reg[id] = reg return reg, nil } -func (m *meter) unregister(idx int) func() error { +func (m *meter) unregister(id uint) func() error { return func() error { m.mtx.Lock() - defer m.mtx.Unlock() - m.register[idx] = nil + delete(m.reg, id) + m.mtx.Unlock() return nil } } diff --git a/metric/internal/global/meter_test.go b/metric/internal/global/meter_test.go index 39d63992f05..77924c25aac 100644 --- a/metric/internal/global/meter_test.go +++ b/metric/internal/global/meter_test.go @@ -304,17 +304,17 @@ func TestRegistrationDelegation(t *testing.T) { called0 = true }) require.NoError(t, err) - require.Len(t, mImpl.register, 1, "callback not registered") + require.Len(t, mImpl.reg, 1, "callback not registered") // This means reg0 should not be delegated. assert.NoError(t, reg0.Unregister()) - assert.Nil(t, mImpl.register[0], "callback not unregister") + assert.Nil(t, mImpl.reg[0], "callback not unregister") var called1 bool reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { called1 = true }) require.NoError(t, err) - require.Len(t, mImpl.register, 2, "second callback not registered") + require.Len(t, mImpl.reg, 1, "second callback not registered") mp := &testMeterProvider{} From ad30134a2889db01b33186434eec000c6405bcbb Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 09:57:39 -0800 Subject: [PATCH 06/14] Update sdk impl --- sdk/metric/meter.go | 15 +++++-- sdk/metric/meter_test.go | 95 ++++++++++++++++++++++++++++++++-------- sdk/metric/pipeline.go | 39 ++++++++++++----- 3 files changed, 117 insertions(+), 32 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 418827e9672..c2c515af35c 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -69,7 +69,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { for _, inst := range insts { // Only register if at least one instrument has a non-drop aggregation. // Otherwise, calling f during collection will be wasted computation. @@ -91,14 +91,21 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context } } // All insts use drop aggregation. - return nil + return noopRegister{}, nil } -func (m *meter) registerCallback(f func(context.Context)) error { - m.pipes.registerCallback(f) +type noopRegister struct{} + +func (noopRegister) Unregister() error { return nil } +type callback func(context.Context) + +func (m *meter) registerCallback(c callback) (metric.Registration, error) { + return m.pipes.registerCallback(c), nil +} + // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { return syncInt64Provider{m.instProviderInt64} diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 013a52e5924..226d31b82e5 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -103,11 +103,63 @@ func TestMeterCallbackCreationConcurrency(t *testing.T) { m := NewMeterProvider().Meter("callback-concurrency") go func() { - _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) wg.Done() }() go func() { - _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + wg.Done() + }() + wg.Wait() +} + +func TestNoopCallbackUnregisterConcurrency(t *testing.T) { + m := NewMeterProvider().Meter("noop-unregister-concurrency") + reg, err := m.RegisterCallback(nil, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + _ = reg.Unregister() + wg.Done() + }() + go func() { + _ = reg.Unregister() + wg.Done() + }() + wg.Wait() +} + +func TestCallbackUnregisterConcurrency(t *testing.T) { + reader := NewManualReader() + provider := NewMeterProvider(WithReader(reader)) + meter := provider.Meter("unregister-concurrency") + + actr, err := meter.AsyncFloat64().Counter("counter") + require.NoError(t, err) + + ag, err := meter.AsyncInt64().Gauge("gauge") + require.NoError(t, err) + + i := []instrument.Asynchronous{actr} + regCtr, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + require.NoError(t, err) + + i = []instrument.Asynchronous{ag} + regG, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + _ = regCtr.Unregister() + _ = regG.Unregister() + wg.Done() + }() + go func() { + _ = regCtr.Unregister() + _ = regG.Unregister() wg.Done() }() wg.Wait() @@ -126,7 +178,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncInt64().Counter("aint") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) }) assert.NoError(t, err) @@ -150,7 +202,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncInt64().UpDownCounter("aint") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) }) assert.NoError(t, err) @@ -174,7 +226,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { gauge, err := m.AsyncInt64().Gauge("agauge") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) }) assert.NoError(t, err) @@ -196,7 +248,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncFloat64().Counter("afloat") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) }) assert.NoError(t, err) @@ -220,7 +272,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncFloat64().UpDownCounter("afloat") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) }) assert.NoError(t, err) @@ -244,7 +296,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { gauge, err := m.AsyncFloat64().Gauge("agauge") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) }) assert.NoError(t, err) @@ -418,7 +470,7 @@ func TestMetersProvideScope(t *testing.T) { m1 := mp.Meter("scope1") ctr1, err := m1.AsyncFloat64().Counter("ctr1") assert.NoError(t, err) - err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { ctr1.Observe(ctx, 5) }) assert.NoError(t, err) @@ -426,7 +478,7 @@ func TestMetersProvideScope(t *testing.T) { m2 := mp.Meter("scope2") ctr2, err := m2.AsyncInt64().Counter("ctr2") assert.NoError(t, err) - err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { ctr2.Observe(ctx, 7) }) assert.NoError(t, err) @@ -507,14 +559,15 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { require.NoError(t, err) var called bool - require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{ + _, err = m.RegisterCallback([]instrument.Asynchronous{ int64Counter, int64UpDownCounter, int64Gauge, floag64Counter, floag64UpDownCounter, floag64Gauge, - }, func(context.Context) { called = true })) + }, func(context.Context) { called = true }) + require.NoError(t, err) data, err := r.Collect(context.Background()) require.NoError(t, err) @@ -538,10 +591,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afcounter", @@ -564,10 +618,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afupdowncounter", @@ -590,10 +645,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afgauge", @@ -614,10 +670,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aicounter", @@ -640,10 +697,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aiupdowncounter", @@ -666,10 +724,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aigauge", diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index bc6901e5775..00dea212f1a 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -15,6 +15,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( + "container/list" "context" "errors" "fmt" @@ -22,6 +23,7 @@ import ( "sync" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -75,7 +77,7 @@ type pipeline struct { sync.Mutex aggregations map[instrumentation.Scope][]instrumentSync - callbacks []func(context.Context) + callbacks list.List } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -94,10 +96,15 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { } // addCallback registers a callback to be run when `produce()` is called. -func (p *pipeline) addCallback(callback func(context.Context)) { +func (p *pipeline) addCallback(c callback) func() { p.Lock() defer p.Unlock() - p.callbacks = append(p.callbacks, callback) + e := p.callbacks.PushBack(c) + return func() { + p.Lock() + p.callbacks.Remove(e) + p.Unlock() + } } // callbackKey is a context key type used to identify context that came from the SDK. @@ -112,14 +119,15 @@ const produceKey callbackKey = 0 // // This method is safe to call concurrently. func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { + ctx = context.WithValue(ctx, produceKey, struct{}{}) + p.Lock() defer p.Unlock() - ctx = context.WithValue(ctx, produceKey, struct{}{}) - - for _, callback := range p.callbacks { + for e := p.callbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) - callback(ctx) + f := e.Value.(callback) + f(ctx) if err := ctx.Err(); err != nil { // This means the context expired before we finished running callbacks. return metricdata.ResourceMetrics{}, err @@ -439,10 +447,21 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli return pipes } -func (p pipelines) registerCallback(fn func(context.Context)) { - for _, pipe := range p { - pipe.addCallback(fn) +func (p pipelines) registerCallback(c callback) metric.Registration { + unregs := make([]func(), len(p)) + for i, pipe := range p { + unregs[i] = pipe.addCallback(c) + } + return unregisterFuncs(unregs) +} + +type unregisterFuncs []func() + +func (u unregisterFuncs) Unregister() error { + for _, f := range u { + f() } + return nil } // resolver facilitates resolving Aggregators an instrument needs to aggregate From cfa2348b9fb8adac43648f5c8b850300b1bed67a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 10:10:31 -0800 Subject: [PATCH 07/14] Use a list for global impl --- metric/internal/global/meter.go | 40 ++++++++-------------------- metric/internal/global/meter_test.go | 6 ++--- 2 files changed, 14 insertions(+), 32 deletions(-) diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 5a26597fd45..e8c83578459 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -15,6 +15,7 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global" import ( + "container/list" "context" "sync" "sync/atomic" @@ -110,8 +111,7 @@ type meter struct { mtx sync.Mutex instruments []delegatedInstrument - regID uint - reg map[uint]*registration + registry list.List delegate atomic.Value // metric.Meter } @@ -137,12 +137,14 @@ func (m *meter) setDelegate(provider metric.MeterProvider) { inst.setDelegate(meter) } - for _, r := range m.reg { + for e := m.registry.Front(); e != nil; e = e.Next() { + r := e.Value.(*registration) r.setDelegate(meter) + m.registry.Remove(e) } m.instruments = nil - m.reg = nil + m.registry.Init() } // AsyncInt64 is the namespace for the Asynchronous Integer instruments. @@ -174,39 +176,19 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context insts = unwrapInstruments(insts) return del.RegisterCallback(insts, f) } - return m.register(insts, f) -} - -func (m *meter) registrationID() uint { - defer func() { m.regID++ }() - return m.regID -} -func (m *meter) register(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { m.mtx.Lock() defer m.mtx.Unlock() - if m.reg == nil { - m.reg = make(map[uint]*registration) - } - - id := m.registrationID() - reg := ®istration{ - instruments: insts, - function: f, - unreg: m.unregister(id), - } - m.reg[id] = reg - return reg, nil -} - -func (m *meter) unregister(id uint) func() error { - return func() error { + reg := ®istration{instruments: insts, function: f} + e := m.registry.PushBack(reg) + reg.unreg = func() error { m.mtx.Lock() - delete(m.reg, id) + _ = m.registry.Remove(e) m.mtx.Unlock() return nil } + return reg, nil } type wrapped interface { diff --git a/metric/internal/global/meter_test.go b/metric/internal/global/meter_test.go index 77924c25aac..15a0bf877af 100644 --- a/metric/internal/global/meter_test.go +++ b/metric/internal/global/meter_test.go @@ -304,17 +304,17 @@ func TestRegistrationDelegation(t *testing.T) { called0 = true }) require.NoError(t, err) - require.Len(t, mImpl.reg, 1, "callback not registered") + require.Equal(t, 1, mImpl.registry.Len(), "callback not registered") // This means reg0 should not be delegated. assert.NoError(t, reg0.Unregister()) - assert.Nil(t, mImpl.reg[0], "callback not unregister") + assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered") var called1 bool reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { called1 = true }) require.NoError(t, err) - require.Len(t, mImpl.reg, 1, "second callback not registered") + require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered") mp := &testMeterProvider{} From be40648bbd09df82446b3db7be4b5681b613d0f5 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 10:14:02 -0800 Subject: [PATCH 08/14] Fix prom example --- example/prometheus/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/prometheus/main.go b/example/prometheus/main.go index bc15f041486..39015994517 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -68,7 +68,7 @@ func main() { if err != nil { log.Fatal(err) } - err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { n := -10. + rand.Float64()*(90.) // [-10, 100) gauge.Observe(ctx, n, attrs...) }) From 35ba6ad8dfdb207d5ad0f21a3ff6eed44aa3a7e1 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 10:14:22 -0800 Subject: [PATCH 09/14] Lint metric/meter.go --- metric/meter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metric/meter.go b/metric/meter.go index 50a11adccaf..3a505264ca0 100644 --- a/metric/meter.go +++ b/metric/meter.go @@ -56,7 +56,7 @@ type Meter interface { // SyncFloat64 is the namespace for the Synchronous Float instruments SyncFloat64() syncfloat64.InstrumentProvider - // RegisterCallback registeres f to be called during the collection of a + // RegisterCallback registers f to be called during the collection of a // measurement cycle. // // If Unregister of the returned Registration is called, f needs to be From 958b5a2a2d1ad80423684f413034d1dd22feacc4 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 10:15:31 -0800 Subject: [PATCH 10/14] Fix metric example --- metric/example_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metric/example_test.go b/metric/example_test.go index bc94d7572ef..dc22989f627 100644 --- a/metric/example_test.go +++ b/metric/example_test.go @@ -61,7 +61,7 @@ func ExampleMeter_asynchronous_single() { panic(err) } - err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage}, + _, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage}, func(ctx context.Context) { // instrument.WithCallbackFunc(func(ctx context.Context) { //Do Work to get the real memoryUsage @@ -86,7 +86,7 @@ func ExampleMeter_asynchronous_multiple() { gcCount, _ := meter.AsyncInt64().Counter("gcCount") gcPause, _ := meter.SyncFloat64().Histogram("gcPause") - err := meter.RegisterCallback([]instrument.Asynchronous{ + _, err := meter.RegisterCallback([]instrument.Asynchronous{ heapAlloc, gcCount, }, From 84266b2796c9230f268a01e9db1bf1fe2b296d73 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 10:18:39 -0800 Subject: [PATCH 11/14] Placeholder for changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f130b8be10..b6860782855 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package. + This `Registration` can be used to unregister callbacks. (TBD) + ## [1.11.2/0.34.0] 2022-12-05 ### Added From ea064bbcac2ce6dea374cc7fa2a883c52c33356b Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 7 Dec 2022 10:19:51 -0800 Subject: [PATCH 12/14] Update PR number in changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b6860782855..f38252e6e54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package. - This `Registration` can be used to unregister callbacks. (TBD) + This `Registration` can be used to unregister callbacks. (#3522) ## [1.11.2/0.34.0] 2022-12-05 From db62d9e35512b4b592c5e1553511a910dd8ef030 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 8 Dec 2022 15:07:29 -0800 Subject: [PATCH 13/14] Update sdk/metric/pipeline.go Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> --- sdk/metric/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 00dea212f1a..f9938bf617f 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -96,7 +96,7 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { } // addCallback registers a callback to be run when `produce()` is called. -func (p *pipeline) addCallback(c callback) func() { +func (p *pipeline) addCallback(c callback) (unregister func()) { p.Lock() defer p.Unlock() e := p.callbacks.PushBack(c) From c56c1409cf4b968a2933c4d6a9f50a2bef9096cc Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 8 Dec 2022 15:20:18 -0800 Subject: [PATCH 14/14] Add test unregistered callback is not called --- sdk/metric/meter_test.go | 47 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 226d31b82e5..d904b118ad4 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -532,6 +532,53 @@ func TestMetersProvideScope(t *testing.T) { metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } +func TestUnregisterUnregisters(t *testing.T) { + r := NewManualReader() + mp := NewMeterProvider(WithReader(r)) + m := mp.Meter("TestUnregisterUnregisters") + + int64Counter, err := m.AsyncInt64().Counter("int64.counter") + require.NoError(t, err) + + int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter") + require.NoError(t, err) + + int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge") + require.NoError(t, err) + + floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter") + require.NoError(t, err) + + floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter") + require.NoError(t, err) + + floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge") + require.NoError(t, err) + + var called bool + reg, err := m.RegisterCallback([]instrument.Asynchronous{ + int64Counter, + int64UpDownCounter, + int64Gauge, + floag64Counter, + floag64UpDownCounter, + floag64Gauge, + }, func(context.Context) { called = true }) + require.NoError(t, err) + + ctx := context.Background() + _, err = r.Collect(ctx) + require.NoError(t, err) + assert.True(t, called, "callback not called for registered callback") + + called = false + require.NoError(t, reg.Unregister(), "unregister") + + _, err = r.Collect(ctx) + require.NoError(t, err) + assert.False(t, called, "callback called for unregistered callback") +} + func TestRegisterCallbackDropAggregations(t *testing.T) { aggFn := func(InstrumentKind) aggregation.Aggregation { return aggregation.Drop{}