Skip to content

Commit acc70ac

Browse files
committed
Merge branch 'main' into fix-3439
2 parents af45187 + b1a8002 commit acc70ac

File tree

13 files changed

+634
-124
lines changed

13 files changed

+634
-124
lines changed

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
106106
Instead it uses the `net.sock.peer` attributes. (#3581)
107107
- The parameters for the `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/metric` are changed.
108108
The slice of `instrument.Asynchronous` parameter is now passed as a variadic argument. (#3587)
109+
- The `Callback` in `go.opentelemetry.io/otel/metric` has the added `Observer` parameter added.
110+
This new parameter is used by `Callback` implementations to observe values for asynchronous instruments instead of calling the `Observe` method of the instrument directly. (#3584)
111+
112+
### Fixed
113+
114+
- The `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/sdk/metric` only registers a callback for instruments created by that meter.
115+
Trying to register a callback with instruments from a different meter will result in an error being returned. (#3584)
109116

110117
### Deprecated
111118

example/prometheus/main.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"go.opentelemetry.io/otel/attribute"
3030
"go.opentelemetry.io/otel/exporters/prometheus"
31+
api "go.opentelemetry.io/otel/metric"
3132
"go.opentelemetry.io/otel/metric/instrument"
3233
"go.opentelemetry.io/otel/sdk/metric"
3334
)
@@ -68,9 +69,9 @@ func main() {
6869
if err != nil {
6970
log.Fatal(err)
7071
}
71-
_, err = meter.RegisterCallback(func(ctx context.Context) error {
72+
_, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error {
7273
n := -10. + rand.Float64()*(90.) // [-10, 100)
73-
gauge.Observe(ctx, n, attrs...)
74+
o.ObserveFloat64(gauge, n, attrs...)
7475
return nil
7576
}, gauge)
7677
if err != nil {

metric/example_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,13 @@ func ExampleMeter_asynchronous_multiple() {
9090
gcPause, _ := meter.Float64Histogram("gcPause")
9191

9292
_, err := meter.RegisterCallback(
93-
func(ctx context.Context) error {
93+
func(ctx context.Context, o metric.Observer) error {
9494
memStats := &runtime.MemStats{}
9595
// This call does work
9696
runtime.ReadMemStats(memStats)
9797

98-
heapAlloc.Observe(ctx, int64(memStats.HeapAlloc))
99-
gcCount.Observe(ctx, int64(memStats.NumGC))
98+
o.ObserveInt64(heapAlloc, int64(memStats.HeapAlloc))
99+
o.ObserveInt64(gcCount, int64(memStats.NumGC))
100100

101101
// This function synchronously records the pauses
102102
computeGCPauses(ctx, gcPause, memStats.PauseNs[:])

metric/internal/global/instruments.go

+41-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ import (
2424
"go.opentelemetry.io/otel/metric/instrument"
2525
)
2626

27+
// unwrapper unwraps to return the underlying instrument implementation.
28+
type unwrapper interface {
29+
Unwrap() instrument.Asynchronous
30+
}
31+
2732
type afCounter struct {
2833
name string
2934
opts []instrument.Float64ObserverOption
@@ -33,6 +38,9 @@ type afCounter struct {
3338
instrument.Asynchronous
3439
}
3540

41+
var _ unwrapper = (*afCounter)(nil)
42+
var _ instrument.Float64ObservableCounter = (*afCounter)(nil)
43+
3644
func (i *afCounter) setDelegate(m metric.Meter) {
3745
ctr, err := m.Float64ObservableCounter(i.name, i.opts...)
3846
if err != nil {
@@ -48,7 +56,7 @@ func (i *afCounter) Observe(ctx context.Context, x float64, attrs ...attribute.K
4856
}
4957
}
5058

51-
func (i *afCounter) unwrap() instrument.Asynchronous {
59+
func (i *afCounter) Unwrap() instrument.Asynchronous {
5260
if ctr := i.delegate.Load(); ctr != nil {
5361
return ctr.(instrument.Float64ObservableCounter)
5462
}
@@ -64,6 +72,9 @@ type afUpDownCounter struct {
6472
instrument.Asynchronous
6573
}
6674

75+
var _ unwrapper = (*afUpDownCounter)(nil)
76+
var _ instrument.Float64ObservableUpDownCounter = (*afUpDownCounter)(nil)
77+
6778
func (i *afUpDownCounter) setDelegate(m metric.Meter) {
6879
ctr, err := m.Float64ObservableUpDownCounter(i.name, i.opts...)
6980
if err != nil {
@@ -79,7 +90,7 @@ func (i *afUpDownCounter) Observe(ctx context.Context, x float64, attrs ...attri
7990
}
8091
}
8192

82-
func (i *afUpDownCounter) unwrap() instrument.Asynchronous {
93+
func (i *afUpDownCounter) Unwrap() instrument.Asynchronous {
8394
if ctr := i.delegate.Load(); ctr != nil {
8495
return ctr.(instrument.Float64ObservableUpDownCounter)
8596
}
@@ -104,13 +115,16 @@ func (i *afGauge) setDelegate(m metric.Meter) {
104115
i.delegate.Store(ctr)
105116
}
106117

118+
var _ unwrapper = (*afGauge)(nil)
119+
var _ instrument.Float64ObservableGauge = (*afGauge)(nil)
120+
107121
func (i *afGauge) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
108122
if ctr := i.delegate.Load(); ctr != nil {
109123
ctr.(instrument.Float64ObservableGauge).Observe(ctx, x, attrs...)
110124
}
111125
}
112126

113-
func (i *afGauge) unwrap() instrument.Asynchronous {
127+
func (i *afGauge) Unwrap() instrument.Asynchronous {
114128
if ctr := i.delegate.Load(); ctr != nil {
115129
return ctr.(instrument.Float64ObservableGauge)
116130
}
@@ -126,6 +140,9 @@ type aiCounter struct {
126140
instrument.Asynchronous
127141
}
128142

143+
var _ unwrapper = (*aiCounter)(nil)
144+
var _ instrument.Int64ObservableCounter = (*aiCounter)(nil)
145+
129146
func (i *aiCounter) setDelegate(m metric.Meter) {
130147
ctr, err := m.Int64ObservableCounter(i.name, i.opts...)
131148
if err != nil {
@@ -141,7 +158,7 @@ func (i *aiCounter) Observe(ctx context.Context, x int64, attrs ...attribute.Key
141158
}
142159
}
143160

144-
func (i *aiCounter) unwrap() instrument.Asynchronous {
161+
func (i *aiCounter) Unwrap() instrument.Asynchronous {
145162
if ctr := i.delegate.Load(); ctr != nil {
146163
return ctr.(instrument.Int64ObservableCounter)
147164
}
@@ -157,6 +174,9 @@ type aiUpDownCounter struct {
157174
instrument.Asynchronous
158175
}
159176

177+
var _ unwrapper = (*aiUpDownCounter)(nil)
178+
var _ instrument.Int64ObservableUpDownCounter = (*aiUpDownCounter)(nil)
179+
160180
func (i *aiUpDownCounter) setDelegate(m metric.Meter) {
161181
ctr, err := m.Int64ObservableUpDownCounter(i.name, i.opts...)
162182
if err != nil {
@@ -172,7 +192,7 @@ func (i *aiUpDownCounter) Observe(ctx context.Context, x int64, attrs ...attribu
172192
}
173193
}
174194

175-
func (i *aiUpDownCounter) unwrap() instrument.Asynchronous {
195+
func (i *aiUpDownCounter) Unwrap() instrument.Asynchronous {
176196
if ctr := i.delegate.Load(); ctr != nil {
177197
return ctr.(instrument.Int64ObservableUpDownCounter)
178198
}
@@ -188,6 +208,9 @@ type aiGauge struct {
188208
instrument.Asynchronous
189209
}
190210

211+
var _ unwrapper = (*aiGauge)(nil)
212+
var _ instrument.Int64ObservableGauge = (*aiGauge)(nil)
213+
191214
func (i *aiGauge) setDelegate(m metric.Meter) {
192215
ctr, err := m.Int64ObservableGauge(i.name, i.opts...)
193216
if err != nil {
@@ -203,7 +226,7 @@ func (i *aiGauge) Observe(ctx context.Context, x int64, attrs ...attribute.KeyVa
203226
}
204227
}
205228

206-
func (i *aiGauge) unwrap() instrument.Asynchronous {
229+
func (i *aiGauge) Unwrap() instrument.Asynchronous {
207230
if ctr := i.delegate.Load(); ctr != nil {
208231
return ctr.(instrument.Int64ObservableGauge)
209232
}
@@ -220,6 +243,8 @@ type sfCounter struct {
220243
instrument.Synchronous
221244
}
222245

246+
var _ instrument.Float64Counter = (*sfCounter)(nil)
247+
223248
func (i *sfCounter) setDelegate(m metric.Meter) {
224249
ctr, err := m.Float64Counter(i.name, i.opts...)
225250
if err != nil {
@@ -244,6 +269,8 @@ type sfUpDownCounter struct {
244269
instrument.Synchronous
245270
}
246271

272+
var _ instrument.Float64UpDownCounter = (*sfUpDownCounter)(nil)
273+
247274
func (i *sfUpDownCounter) setDelegate(m metric.Meter) {
248275
ctr, err := m.Float64UpDownCounter(i.name, i.opts...)
249276
if err != nil {
@@ -268,6 +295,8 @@ type sfHistogram struct {
268295
instrument.Synchronous
269296
}
270297

298+
var _ instrument.Float64Histogram = (*sfHistogram)(nil)
299+
271300
func (i *sfHistogram) setDelegate(m metric.Meter) {
272301
ctr, err := m.Float64Histogram(i.name, i.opts...)
273302
if err != nil {
@@ -292,6 +321,8 @@ type siCounter struct {
292321
instrument.Synchronous
293322
}
294323

324+
var _ instrument.Int64Counter = (*siCounter)(nil)
325+
295326
func (i *siCounter) setDelegate(m metric.Meter) {
296327
ctr, err := m.Int64Counter(i.name, i.opts...)
297328
if err != nil {
@@ -316,6 +347,8 @@ type siUpDownCounter struct {
316347
instrument.Synchronous
317348
}
318349

350+
var _ instrument.Int64UpDownCounter = (*siUpDownCounter)(nil)
351+
319352
func (i *siUpDownCounter) setDelegate(m metric.Meter) {
320353
ctr, err := m.Int64UpDownCounter(i.name, i.opts...)
321354
if err != nil {
@@ -340,6 +373,8 @@ type siHistogram struct {
340373
instrument.Synchronous
341374
}
342375

376+
var _ instrument.Int64Histogram = (*siHistogram)(nil)
377+
343378
func (i *siHistogram) setDelegate(m metric.Meter) {
344379
ctr, err := m.Int64Histogram(i.name, i.opts...)
345380
if err != nil {

metric/internal/global/meter.go

-3
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,6 @@ func (m *meter) Float64ObservableGauge(name string, options ...instrument.Float6
275275
}
276276

277277
// RegisterCallback captures the function that will be called during Collect.
278-
//
279-
// It is only valid to call Observe within the scope of the passed function,
280-
// and only on the instruments that were registered with this call.
281278
func (m *meter) RegisterCallback(f metric.Callback, insts ...instrument.Asynchronous) (metric.Registration, error) {
282279
if del, ok := m.delegate.Load().(metric.Meter); ok {
283280
insts = unwrapInstruments(insts)

metric/internal/global/meter_test.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ func TestMeterProviderRace(t *testing.T) {
4545
close(finish)
4646
}
4747

48+
var zeroCallback metric.Callback = func(ctx context.Context, or metric.Observer) error {
49+
return nil
50+
}
51+
4852
func TestMeterRace(t *testing.T) {
4953
mtr := &meter{}
5054

@@ -66,7 +70,7 @@ func TestMeterRace(t *testing.T) {
6670
_, _ = mtr.Int64Counter(name)
6771
_, _ = mtr.Int64UpDownCounter(name)
6872
_, _ = mtr.Int64Histogram(name)
69-
_, _ = mtr.RegisterCallback(func(ctx context.Context) error { return nil })
73+
_, _ = mtr.RegisterCallback(zeroCallback)
7074
if !once {
7175
wg.Done()
7276
once = true
@@ -86,7 +90,7 @@ func TestMeterRace(t *testing.T) {
8690

8791
func TestUnregisterRace(t *testing.T) {
8892
mtr := &meter{}
89-
reg, err := mtr.RegisterCallback(func(ctx context.Context) error { return nil })
93+
reg, err := mtr.RegisterCallback(zeroCallback)
9094
require.NoError(t, err)
9195

9296
wg := &sync.WaitGroup{}
@@ -128,8 +132,8 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (instrument.Float
128132
_, err = m.Int64ObservableGauge("test_Async_Gauge")
129133
assert.NoError(t, err)
130134

131-
_, err = m.RegisterCallback(func(ctx context.Context) error {
132-
afcounter.Observe(ctx, 3)
135+
_, err = m.RegisterCallback(func(ctx context.Context, obs metric.Observer) error {
136+
obs.ObserveFloat64(afcounter, 3)
133137
return nil
134138
}, afcounter)
135139
require.NoError(t, err)
@@ -323,7 +327,7 @@ func TestRegistrationDelegation(t *testing.T) {
323327
require.NoError(t, err)
324328

325329
var called0 bool
326-
reg0, err := m.RegisterCallback(func(context.Context) error {
330+
reg0, err := m.RegisterCallback(func(context.Context, metric.Observer) error {
327331
called0 = true
328332
return nil
329333
}, actr)
@@ -334,7 +338,7 @@ func TestRegistrationDelegation(t *testing.T) {
334338
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")
335339

336340
var called1 bool
337-
reg1, err := m.RegisterCallback(func(context.Context) error {
341+
reg1, err := m.RegisterCallback(func(context.Context, metric.Observer) error {
338342
called1 = true
339343
return nil
340344
}, actr)

metric/internal/global/meter_types_test.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global"
1717
import (
1818
"context"
1919

20+
"go.opentelemetry.io/otel/attribute"
2021
"go.opentelemetry.io/otel/metric"
2122
"go.opentelemetry.io/otel/metric/instrument"
2223
)
@@ -112,9 +113,6 @@ func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Fl
112113
}
113114

114115
// RegisterCallback captures the function that will be called during Collect.
115-
//
116-
// It is only valid to call Observe within the scope of the passed function,
117-
// and only on the instruments that were registered with this call.
118116
func (m *testMeter) RegisterCallback(f metric.Callback, i ...instrument.Asynchronous) (metric.Registration, error) {
119117
m.callbacks = append(m.callbacks, f)
120118
return testReg{
@@ -136,11 +134,24 @@ func (r testReg) Unregister() error {
136134
// This enables async collection.
137135
func (m *testMeter) collect() {
138136
ctx := context.Background()
137+
o := observationRecorder{ctx}
139138
for _, f := range m.callbacks {
140139
if f == nil {
141140
// Unregister.
142141
continue
143142
}
144-
_ = f(ctx)
143+
_ = f(ctx, o)
145144
}
146145
}
146+
147+
type observationRecorder struct {
148+
ctx context.Context
149+
}
150+
151+
func (o observationRecorder) ObserveFloat64(i instrument.Float64Observer, value float64, attr ...attribute.KeyValue) {
152+
i.Observe(o.ctx, value, attr...)
153+
}
154+
155+
func (o observationRecorder) ObserveInt64(i instrument.Int64Observer, value int64, attr ...attribute.KeyValue) {
156+
i.Observe(o.ctx, value, attr...)
157+
}

metric/meter.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package metric // import "go.opentelemetry.io/otel/metric"
1717
import (
1818
"context"
1919

20+
"go.opentelemetry.io/otel/attribute"
2021
"go.opentelemetry.io/otel/metric/instrument"
2122
)
2223

@@ -106,7 +107,8 @@ type Meter interface {
106107
}
107108

108109
// Callback is a function registered with a Meter that makes observations for
109-
// the set of instruments it is registered with.
110+
// the set of instruments it is registered with. The Observer parameter is used
111+
// to record measurment observations for these instruments.
110112
//
111113
// The function needs to complete in a finite amount of time and the deadline
112114
// of the passed context is expected to be honored.
@@ -116,7 +118,15 @@ type Meter interface {
116118
// the same attributes as another Callback will report.
117119
//
118120
// The function needs to be concurrent safe.
119-
type Callback func(context.Context) error
121+
type Callback func(context.Context, Observer) error
122+
123+
// Observer records measurements for multiple instruments in a Callback.
124+
type Observer interface {
125+
// ObserveFloat64 records the float64 value with attributes for obsrv.
126+
ObserveFloat64(obsrv instrument.Float64Observer, value float64, attributes ...attribute.KeyValue)
127+
// ObserveInt64 records the int64 value with attributes for obsrv.
128+
ObserveInt64(obsrv instrument.Int64Observer, value int64, attributes ...attribute.KeyValue)
129+
}
120130

121131
// Registration is an token representing the unique registration of a callback
122132
// for a set of instruments with a Meter.

0 commit comments

Comments
 (0)