diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cc4cfe0817..2252e56d8c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added +- The `WithInt64Callback` option is added to `go.opentelemetry.io/otel/metric/instrument` to configure int64 Observer callbacks during their creation. (#3507) +- The `WithFloat64Callback` option is added to `go.opentelemetry.io/otel/metric/instrument` to configure float64 Observer callbacks during their creation. (#3507) - Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package. This `Registration` can be used to unregister callbacks. (#3522) - Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524) @@ -30,6 +32,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed +- Instrument configuration in `go.opentelemetry.io/otel/metric/instrument` is split into specific options and confguration based on the instrument type. (#3507) + - Use the added `Int64Option` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/syncint64`. + - Use the added `Float64Option` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/syncfloat64`. + - Use the added `Int64ObserverOption` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/asyncint64`. + - Use the added `Float64ObserverOption` type to configure instruments from `go.opentelemetry.io/otel/metric/instrument/asyncfloat64`. - The `InstrumentProvider` from `go.opentelemetry.io/otel/sdk/metric/asyncint64` is removed. Use the new creation methods of the `Meter` in `go.opentelemetry.io/otel/sdk/metric` instead. (#3530) - The `Counter` method is replaced by `Meter.Int64ObservableCounter` diff --git a/metric/instrument/asyncfloat64.go b/metric/instrument/asyncfloat64.go new file mode 100644 index 00000000000..9f77afb7d38 --- /dev/null +++ b/metric/instrument/asyncfloat64.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/unit" +) + +// Float64Observer is a recorder of float64 measurement values. +// Warning: methods may be added to this interface in minor releases. +type Float64Observer interface { + Asynchronous + + // Observe records the measurement value for a set of attributes. + // + // It is only valid to call this within a callback. If called outside of + // the registered callback it should have no effect on the instrument, and + // an error will be reported via the error handler. + Observe(ctx context.Context, value float64, attributes ...attribute.KeyValue) +} + +// Float64Callback is a function registered with a Meter that makes +// observations for a Float64Observer it is registered with. +// +// The function needs to complete in a finite amount of time and the deadline +// of the passed context is expected to be honored. +// +// The function needs to make unique observations across all registered +// Float64Callbacks. Meaning, it should not report measurements with the same +// attributes as another Float64Callbacks also registered for the same +// instrument. +// +// The function needs to be concurrent safe. +type Float64Callback func(context.Context, Float64Observer) error + +// Float64ObserverConfig contains options for Asynchronous instruments that +// observe float64 values. +type Float64ObserverConfig struct { + description string + unit unit.Unit + callbacks []Float64Callback +} + +// NewFloat64ObserverConfig returns a new Float64ObserverConfig with all opts +// applied. +func NewFloat64ObserverConfig(opts ...Float64ObserverOption) Float64ObserverConfig { + var config Float64ObserverConfig + for _, o := range opts { + config = o.applyFloat64Observer(config) + } + return config +} + +// Description returns the Config description. +func (c Float64ObserverConfig) Description() string { + return c.description +} + +// Unit returns the Config unit. +func (c Float64ObserverConfig) Unit() unit.Unit { + return c.unit +} + +// Callbacks returns the Config callbacks. +func (c Float64ObserverConfig) Callbacks() []Float64Callback { + return c.callbacks +} + +// Float64ObserverOption applies options to float64 Observer instruments. +type Float64ObserverOption interface { + applyFloat64Observer(Float64ObserverConfig) Float64ObserverConfig +} + +type float64ObserverOptionFunc func(Float64ObserverConfig) Float64ObserverConfig + +func (fn float64ObserverOptionFunc) applyFloat64Observer(cfg Float64ObserverConfig) Float64ObserverConfig { + return fn(cfg) +} + +// WithFloat64Callback adds callback to be called for an instrument. +func WithFloat64Callback(callback Float64Callback) Float64ObserverOption { + return float64ObserverOptionFunc(func(cfg Float64ObserverConfig) Float64ObserverConfig { + cfg.callbacks = append(cfg.callbacks, callback) + return cfg + }) +} diff --git a/metric/instrument/asyncfloat64/asyncfloat64.go b/metric/instrument/asyncfloat64/asyncfloat64.go index 038e55db0e5..1932d55d225 100644 --- a/metric/instrument/asyncfloat64/asyncfloat64.go +++ b/metric/instrument/asyncfloat64/asyncfloat64.go @@ -14,53 +14,28 @@ package asyncfloat64 // import "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" -import ( - "context" +import "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument" -) - -// Counter is an instrument that records increasing values. +// Counter is an instrument used to asynchronously record increasing float64 +// measurements once per a measurement collection cycle. The Observe method is +// used to record the measured state of the instrument when it is called. +// Implementations will assume the observed value to be the cumulative sum of +// the count. // // Warning: methods may be added to this interface in minor releases. -type Counter interface { - // Observe records the state of the instrument to be x. Implementations - // will assume x to be the cumulative sum of the count. - // - // It is only valid to call this within a callback. If called outside of the - // registered callback it should have no effect on the instrument, and an - // error will be reported via the error handler. - Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) - - instrument.Asynchronous -} +type Counter interface{ instrument.Float64Observer } -// UpDownCounter is an instrument that records increasing or decreasing values. +// UpDownCounter is an instrument used to asynchronously record float64 +// measurements once per a measurement collection cycle. The Observe method is +// used to record the measured state of the instrument when it is called. +// Implementations will assume the observed value to be the cumulative sum of +// the count. // // Warning: methods may be added to this interface in minor releases. -type UpDownCounter interface { - // Observe records the state of the instrument to be x. Implementations - // will assume x to be the cumulative sum of the count. - // - // It is only valid to call this within a callback. If called outside of the - // registered callback it should have no effect on the instrument, and an - // error will be reported via the error handler. - Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) +type UpDownCounter interface{ instrument.Float64Observer } - instrument.Asynchronous -} - -// Gauge is an instrument that records independent readings. +// Gauge is an instrument used to asynchronously record instantaneous float64 +// measurements once per a measurement collection cycle. // // Warning: methods may be added to this interface in minor releases. -type Gauge interface { - // Observe records the state of the instrument to be x. - // - // It is only valid to call this within a callback. If called outside of the - // registered callback it should have no effect on the instrument, and an - // error will be reported via the error handler. - Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) - - instrument.Asynchronous -} +type Gauge interface{ instrument.Float64Observer } diff --git a/metric/instrument/asyncfloat64_test.go b/metric/instrument/asyncfloat64_test.go new file mode 100644 index 00000000000..ab7d0fe5e13 --- /dev/null +++ b/metric/instrument/asyncfloat64_test.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/unit" +) + +func TestFloat64ObserverOptions(t *testing.T) { + const ( + token float64 = 43 + desc = "Instrument description." + uBytes = unit.Bytes + ) + + got := NewFloat64ObserverConfig( + WithDescription(desc), + WithUnit(uBytes), + WithFloat64Callback(func(ctx context.Context, o Float64Observer) error { + o.Observe(ctx, token) + return nil + }), + ) + assert.Equal(t, desc, got.Description(), "description") + assert.Equal(t, uBytes, got.Unit(), "unit") + + // Functions are not comparable. + cBacks := got.Callbacks() + require.Len(t, cBacks, 1, "callbacks") + o := &float64Observer{} + err := cBacks[0](context.Background(), o) + require.NoError(t, err) + assert.Equal(t, token, o.got, "callback not set") +} + +type float64Observer struct { + Asynchronous + got float64 +} + +func (o *float64Observer) Observe(_ context.Context, v float64, _ ...attribute.KeyValue) { + o.got = v +} diff --git a/metric/instrument/asyncint64.go b/metric/instrument/asyncint64.go new file mode 100644 index 00000000000..296da3e6f94 --- /dev/null +++ b/metric/instrument/asyncint64.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/unit" +) + +// Int64Observer is a recorder of int64 measurement values. +// +// Warning: methods may be added to this interface in minor releases. +type Int64Observer interface { + Asynchronous + + // Observe records the measurement value for a set of attributes. + // + // It is only valid to call this within a callback. If called outside of + // the registered callback it should have no effect on the instrument, and + // an error will be reported via the error handler. + Observe(ctx context.Context, value int64, attributes ...attribute.KeyValue) +} + +// Int64Callback is a function registered with a Meter that makes +// observations for an Int64Observer it is registered with. +// +// The function needs to complete in a finite amount of time and the deadline +// of the passed context is expected to be honored. +// +// The function needs to make unique observations across all registered +// Int64Callback. Meaning, it should not report measurements with the same +// attributes as another Int64Callbacks also registered for the same +// instrument. +// +// The function needs to be concurrent safe. +type Int64Callback func(context.Context, Int64Observer) error + +// Int64ObserverConfig contains options for Asynchronous instruments that +// observe int64 values. +type Int64ObserverConfig struct { + description string + unit unit.Unit + callbacks []Int64Callback +} + +// NewInt64ObserverConfig returns a new Int64ObserverConfig with all opts +// applied. +func NewInt64ObserverConfig(opts ...Int64ObserverOption) Int64ObserverConfig { + var config Int64ObserverConfig + for _, o := range opts { + config = o.applyInt64Observer(config) + } + return config +} + +// Description returns the Config description. +func (c Int64ObserverConfig) Description() string { + return c.description +} + +// Unit returns the Config unit. +func (c Int64ObserverConfig) Unit() unit.Unit { + return c.unit +} + +// Callbacks returns the Config callbacks. +func (c Int64ObserverConfig) Callbacks() []Int64Callback { + return c.callbacks +} + +// Int64ObserverOption applies options to int64 Observer instruments. +type Int64ObserverOption interface { + applyInt64Observer(Int64ObserverConfig) Int64ObserverConfig +} + +type int64ObserverOptionFunc func(Int64ObserverConfig) Int64ObserverConfig + +func (fn int64ObserverOptionFunc) applyInt64Observer(cfg Int64ObserverConfig) Int64ObserverConfig { + return fn(cfg) +} + +// WithInt64Callback adds callback to be called for an instrument. +func WithInt64Callback(callback Int64Callback) Int64ObserverOption { + return int64ObserverOptionFunc(func(cfg Int64ObserverConfig) Int64ObserverConfig { + cfg.callbacks = append(cfg.callbacks, callback) + return cfg + }) +} diff --git a/metric/instrument/asyncint64/asyncint64.go b/metric/instrument/asyncint64/asyncint64.go index 0d727e08f86..8cf970855c8 100644 --- a/metric/instrument/asyncint64/asyncint64.go +++ b/metric/instrument/asyncint64/asyncint64.go @@ -14,53 +14,28 @@ package asyncint64 // import "go.opentelemetry.io/otel/metric/instrument/asyncint64" -import ( - "context" +import "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument" -) - -// Counter is an instrument that records increasing values. +// Counter is an instrument used to asynchronously record increasing int64 +// measurements once per a measurement collection cycle. The Observe method is +// used to record the measured state of the instrument when it is called. +// Implementations will assume the observed value to be the cumulative sum of +// the count. // // Warning: methods may be added to this interface in minor releases. -type Counter interface { - // Observe records the state of the instrument to be x. Implementations - // will assume x to be the cumulative sum of the count. - // - // It is only valid to call this within a callback. If called outside of the - // registered callback it should have no effect on the instrument, and an - // error will be reported via the error handler. - Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) - - instrument.Asynchronous -} +type Counter interface{ instrument.Int64Observer } -// UpDownCounter is an instrument that records increasing or decreasing values. +// UpDownCounter is an instrument used to asynchronously record int64 +// measurements once per a measurement collection cycle. The Observe method is +// used to record the measured state of the instrument when it is called. +// Implementations will assume the observed value to be the cumulative sum of +// the count. // // Warning: methods may be added to this interface in minor releases. -type UpDownCounter interface { - // Observe records the state of the instrument to be x. Implementations - // will assume x to be the cumulative sum of the count. - // - // It is only valid to call this within a callback. If called outside of the - // registered callback it should have no effect on the instrument, and an - // error will be reported via the error handler. - Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) +type UpDownCounter interface{ instrument.Int64Observer } - instrument.Asynchronous -} - -// Gauge is an instrument that records independent readings. +// Gauge is an instrument used to asynchronously record instantaneous int64 +// measurements once per a measurement collection cycle. // // Warning: methods may be added to this interface in minor releases. -type Gauge interface { - // Observe records the state of the instrument to be x. - // - // It is only valid to call this within a callback. If called outside of the - // registered callback it should have no effect on the instrument, and an - // error will be reported via the error handler. - Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) - - instrument.Asynchronous -} +type Gauge interface{ instrument.Int64Observer } diff --git a/metric/instrument/asyncint64_test.go b/metric/instrument/asyncint64_test.go new file mode 100644 index 00000000000..a9ad527f1ea --- /dev/null +++ b/metric/instrument/asyncint64_test.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/unit" +) + +func TestInt64ObserverOptions(t *testing.T) { + const ( + token int64 = 43 + desc = "Instrument description." + uBytes = unit.Bytes + ) + + got := NewInt64ObserverConfig( + WithDescription(desc), + WithUnit(uBytes), + WithInt64Callback(func(ctx context.Context, o Int64Observer) error { + o.Observe(ctx, token) + return nil + }), + ) + assert.Equal(t, desc, got.Description(), "description") + assert.Equal(t, uBytes, got.Unit(), "unit") + + // Functions are not comparable. + cBacks := got.Callbacks() + require.Len(t, cBacks, 1, "callbacks") + o := &int64Observer{} + err := cBacks[0](context.Background(), o) + require.NoError(t, err) + assert.Equal(t, token, o.got, "callback not set") +} + +type int64Observer struct { + Asynchronous + got int64 +} + +func (o *int64Observer) Observe(_ context.Context, v int64, _ ...attribute.KeyValue) { + o.got = v +} diff --git a/metric/instrument/config.go b/metric/instrument/config.go deleted file mode 100644 index 8778bce1619..00000000000 --- a/metric/instrument/config.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package instrument // import "go.opentelemetry.io/otel/metric/instrument" - -import "go.opentelemetry.io/otel/metric/unit" - -// Config contains options for metric instrument descriptors. -type Config struct { - description string - unit unit.Unit -} - -// Description describes the instrument in human-readable terms. -func (cfg Config) Description() string { - return cfg.description -} - -// Unit describes the measurement unit for an instrument. -func (cfg Config) Unit() unit.Unit { - return cfg.unit -} - -// Option is an interface for applying metric instrument options. -type Option interface { - applyInstrument(Config) Config -} - -// NewConfig creates a new Config and applies all the given options. -func NewConfig(opts ...Option) Config { - var config Config - for _, o := range opts { - config = o.applyInstrument(config) - } - return config -} - -type optionFunc func(Config) Config - -func (fn optionFunc) applyInstrument(cfg Config) Config { - return fn(cfg) -} - -// WithDescription applies provided description. -func WithDescription(desc string) Option { - return optionFunc(func(cfg Config) Config { - cfg.description = desc - return cfg - }) -} - -// WithUnit applies provided unit. -func WithUnit(u unit.Unit) Option { - return optionFunc(func(cfg Config) Config { - cfg.unit = u - return cfg - }) -} diff --git a/metric/instrument/instrument.go b/metric/instrument/instrument.go index e1bbb850d76..c583be6fbf1 100644 --- a/metric/instrument/instrument.go +++ b/metric/instrument/instrument.go @@ -14,6 +14,8 @@ package instrument // import "go.opentelemetry.io/otel/metric/instrument" +import "go.opentelemetry.io/otel/metric/unit" + // Asynchronous instruments are instruments that are updated within a Callback. // If an instrument is observed outside of it's callback it should be an error. // @@ -28,3 +30,61 @@ type Asynchronous interface { type Synchronous interface { synchronous() } + +// Option applies options to all instruments. +type Option interface { + Float64ObserverOption + Int64ObserverOption + Float64Option + Int64Option +} + +type descOpt string + +func (o descOpt) applyFloat64(c Float64Config) Float64Config { + c.description = string(o) + return c +} + +func (o descOpt) applyInt64(c Int64Config) Int64Config { + c.description = string(o) + return c +} + +func (o descOpt) applyFloat64Observer(c Float64ObserverConfig) Float64ObserverConfig { + c.description = string(o) + return c +} + +func (o descOpt) applyInt64Observer(c Int64ObserverConfig) Int64ObserverConfig { + c.description = string(o) + return c +} + +// WithDescription sets the instrument description. +func WithDescription(desc string) Option { return descOpt(desc) } + +type unitOpt unit.Unit + +func (o unitOpt) applyFloat64(c Float64Config) Float64Config { + c.unit = unit.Unit(o) + return c +} + +func (o unitOpt) applyInt64(c Int64Config) Int64Config { + c.unit = unit.Unit(o) + return c +} + +func (o unitOpt) applyFloat64Observer(c Float64ObserverConfig) Float64ObserverConfig { + c.unit = unit.Unit(o) + return c +} + +func (o unitOpt) applyInt64Observer(c Int64ObserverConfig) Int64ObserverConfig { + c.unit = unit.Unit(o) + return c +} + +// WithUnit sets the instrument unit. +func WithUnit(u unit.Unit) Option { return unitOpt(u) } diff --git a/metric/instrument/syncfloat64.go b/metric/instrument/syncfloat64.go new file mode 100644 index 00000000000..b3df2b02f20 --- /dev/null +++ b/metric/instrument/syncfloat64.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "go.opentelemetry.io/otel/metric/unit" +) + +// Float64Config contains options for Asynchronous instruments that +// observe float64 values. +type Float64Config struct { + description string + unit unit.Unit +} + +// Float64Config contains options for Synchronous instruments that record +// float64 values. +func NewFloat64Config(opts ...Float64Option) Float64Config { + var config Float64Config + for _, o := range opts { + config = o.applyFloat64(config) + } + return config +} + +// Description returns the Config description. +func (c Float64Config) Description() string { + return c.description +} + +// Unit returns the Config unit. +func (c Float64Config) Unit() unit.Unit { + return c.unit +} + +// Float64Option applies options to synchronous float64 instruments. +type Float64Option interface { + applyFloat64(Float64Config) Float64Config +} diff --git a/metric/instrument/syncfloat64_test.go b/metric/instrument/syncfloat64_test.go new file mode 100644 index 00000000000..8e90e15ddde --- /dev/null +++ b/metric/instrument/syncfloat64_test.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/metric/unit" +) + +func TestFloat64Options(t *testing.T) { + const ( + token float64 = 43 + desc = "Instrument description." + uBytes = unit.Bytes + ) + + got := NewFloat64Config(WithDescription(desc), WithUnit(uBytes)) + assert.Equal(t, desc, got.Description(), "description") + assert.Equal(t, uBytes, got.Unit(), "unit") +} diff --git a/metric/instrument/syncint64.go b/metric/instrument/syncint64.go new file mode 100644 index 00000000000..d49aed6c85d --- /dev/null +++ b/metric/instrument/syncint64.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "go.opentelemetry.io/otel/metric/unit" +) + +// Int64Config contains options for Synchronous instruments that record int64 +// values. +type Int64Config struct { + description string + unit unit.Unit +} + +// NewInt64Config returns a new Int64Config with all opts +// applied. +func NewInt64Config(opts ...Int64Option) Int64Config { + var config Int64Config + for _, o := range opts { + config = o.applyInt64(config) + } + return config +} + +// Description returns the Config description. +func (c Int64Config) Description() string { + return c.description +} + +// Unit returns the Config unit. +func (c Int64Config) Unit() unit.Unit { + return c.unit +} + +// Int64Option applies options to synchronous int64 instruments. +type Int64Option interface { + applyInt64(Int64Config) Int64Config +} diff --git a/metric/instrument/syncint64_test.go b/metric/instrument/syncint64_test.go new file mode 100644 index 00000000000..3eb39915499 --- /dev/null +++ b/metric/instrument/syncint64_test.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package instrument // import "go.opentelemetry.io/otel/metric/instrument" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/metric/unit" +) + +func TestInt64Options(t *testing.T) { + const ( + token int64 = 43 + desc = "Instrument description." + uBytes = unit.Bytes + ) + + got := NewInt64Config(WithDescription(desc), WithUnit(uBytes)) + assert.Equal(t, desc, got.Description(), "description") + assert.Equal(t, uBytes, got.Unit(), "unit") +} diff --git a/metric/internal/global/instruments.go b/metric/internal/global/instruments.go index 9d28d5884d7..4b4e77ff988 100644 --- a/metric/internal/global/instruments.go +++ b/metric/internal/global/instruments.go @@ -30,7 +30,7 @@ import ( type afCounter struct { name string - opts []instrument.Option + opts []instrument.Float64ObserverOption delegate atomic.Value //asyncfloat64.Counter @@ -61,7 +61,7 @@ func (i *afCounter) unwrap() instrument.Asynchronous { type afUpDownCounter struct { name string - opts []instrument.Option + opts []instrument.Float64ObserverOption delegate atomic.Value //asyncfloat64.UpDownCounter @@ -92,7 +92,7 @@ func (i *afUpDownCounter) unwrap() instrument.Asynchronous { type afGauge struct { name string - opts []instrument.Option + opts []instrument.Float64ObserverOption delegate atomic.Value //asyncfloat64.Gauge @@ -123,7 +123,7 @@ func (i *afGauge) unwrap() instrument.Asynchronous { type aiCounter struct { name string - opts []instrument.Option + opts []instrument.Int64ObserverOption delegate atomic.Value //asyncint64.Counter @@ -154,7 +154,7 @@ func (i *aiCounter) unwrap() instrument.Asynchronous { type aiUpDownCounter struct { name string - opts []instrument.Option + opts []instrument.Int64ObserverOption delegate atomic.Value //asyncint64.UpDownCounter @@ -185,7 +185,7 @@ func (i *aiUpDownCounter) unwrap() instrument.Asynchronous { type aiGauge struct { name string - opts []instrument.Option + opts []instrument.Int64ObserverOption delegate atomic.Value //asyncint64.Gauge @@ -217,7 +217,7 @@ func (i *aiGauge) unwrap() instrument.Asynchronous { // Sync Instruments. type sfCounter struct { name string - opts []instrument.Option + opts []instrument.Float64Option delegate atomic.Value //syncfloat64.Counter @@ -241,7 +241,7 @@ func (i *sfCounter) Add(ctx context.Context, incr float64, attrs ...attribute.Ke type sfUpDownCounter struct { name string - opts []instrument.Option + opts []instrument.Float64Option delegate atomic.Value //syncfloat64.UpDownCounter @@ -265,7 +265,7 @@ func (i *sfUpDownCounter) Add(ctx context.Context, incr float64, attrs ...attrib type sfHistogram struct { name string - opts []instrument.Option + opts []instrument.Float64Option delegate atomic.Value //syncfloat64.Histogram @@ -289,7 +289,7 @@ func (i *sfHistogram) Record(ctx context.Context, x float64, attrs ...attribute. type siCounter struct { name string - opts []instrument.Option + opts []instrument.Int64Option delegate atomic.Value //syncint64.Counter @@ -313,7 +313,7 @@ func (i *siCounter) Add(ctx context.Context, x int64, attrs ...attribute.KeyValu type siUpDownCounter struct { name string - opts []instrument.Option + opts []instrument.Int64Option delegate atomic.Value //syncint64.UpDownCounter @@ -337,7 +337,7 @@ func (i *siUpDownCounter) Add(ctx context.Context, x int64, attrs ...attribute.K type siHistogram struct { name string - opts []instrument.Option + opts []instrument.Int64Option delegate atomic.Value //syncint64.Histogram diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 06de9176983..8d71aa050cf 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -147,7 +147,7 @@ func (m *meter) setDelegate(provider metric.MeterProvider) { m.registry.Init() } -func (m *meter) Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error) { +func (m *meter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Int64Counter(name, options...) } @@ -158,7 +158,7 @@ func (m *meter) Int64Counter(name string, options ...instrument.Option) (syncint return i, nil } -func (m *meter) Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error) { +func (m *meter) Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Int64UpDownCounter(name, options...) } @@ -169,7 +169,7 @@ func (m *meter) Int64UpDownCounter(name string, options ...instrument.Option) (s return i, nil } -func (m *meter) Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error) { +func (m *meter) Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Int64Histogram(name, options...) } @@ -180,7 +180,7 @@ func (m *meter) Int64Histogram(name string, options ...instrument.Option) (synci return i, nil } -func (m *meter) Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error) { +func (m *meter) Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Int64ObservableCounter(name, options...) } @@ -191,7 +191,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...instrument.Option return i, nil } -func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error) { +func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Int64ObservableUpDownCounter(name, options...) } @@ -202,7 +202,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument. return i, nil } -func (m *meter) Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error) { +func (m *meter) Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Int64ObservableGauge(name, options...) } @@ -213,7 +213,7 @@ func (m *meter) Int64ObservableGauge(name string, options ...instrument.Option) return i, nil } -func (m *meter) Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error) { +func (m *meter) Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Float64Counter(name, options...) } @@ -224,7 +224,7 @@ func (m *meter) Float64Counter(name string, options ...instrument.Option) (syncf return i, nil } -func (m *meter) Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error) { +func (m *meter) Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Float64UpDownCounter(name, options...) } @@ -235,7 +235,7 @@ func (m *meter) Float64UpDownCounter(name string, options ...instrument.Option) return i, nil } -func (m *meter) Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error) { +func (m *meter) Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Float64Histogram(name, options...) } @@ -246,7 +246,7 @@ func (m *meter) Float64Histogram(name string, options ...instrument.Option) (syn return i, nil } -func (m *meter) Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error) { +func (m *meter) Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Float64ObservableCounter(name, options...) } @@ -257,7 +257,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...instrument.Opti return i, nil } -func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error) { +func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Float64ObservableUpDownCounter(name, options...) } @@ -268,7 +268,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrumen return i, nil } -func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error) { +func (m *meter) Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { return del.Float64ObservableGauge(name, options...) } diff --git a/metric/internal/global/meter_types_test.go b/metric/internal/global/meter_types_test.go index 6e49ce84f51..7a2680ee45a 100644 --- a/metric/internal/global/meter_types_test.go +++ b/metric/internal/global/meter_types_test.go @@ -55,62 +55,62 @@ type testMeter struct { callbacks []func(context.Context) } -func (m *testMeter) Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error) { +func (m *testMeter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) { m.siCount++ return &testCountingIntInstrument{}, nil } -func (m *testMeter) Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error) { +func (m *testMeter) Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error) { m.siUDCount++ return &testCountingIntInstrument{}, nil } -func (m *testMeter) Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error) { +func (m *testMeter) Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error) { m.siHist++ return &testCountingIntInstrument{}, nil } -func (m *testMeter) Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error) { +func (m *testMeter) Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error) { m.aiCount++ return &testCountingIntInstrument{}, nil } -func (m *testMeter) Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error) { +func (m *testMeter) Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) { m.aiUDCount++ return &testCountingIntInstrument{}, nil } -func (m *testMeter) Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error) { +func (m *testMeter) Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) { m.aiGauge++ return &testCountingIntInstrument{}, nil } -func (m *testMeter) Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error) { +func (m *testMeter) Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error) { m.sfCount++ return &testCountingFloatInstrument{}, nil } -func (m *testMeter) Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error) { +func (m *testMeter) Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) { m.sfUDCount++ return &testCountingFloatInstrument{}, nil } -func (m *testMeter) Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error) { +func (m *testMeter) Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error) { m.sfHist++ return &testCountingFloatInstrument{}, nil } -func (m *testMeter) Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error) { +func (m *testMeter) Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) { m.afCount++ return &testCountingFloatInstrument{}, nil } -func (m *testMeter) Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error) { +func (m *testMeter) Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) { m.afUDCount++ return &testCountingFloatInstrument{}, nil } -func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error) { +func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) { m.afGauge++ return &testCountingFloatInstrument{}, nil } diff --git a/metric/meter.go b/metric/meter.go index 604254c10ab..7a042113835 100644 --- a/metric/meter.go +++ b/metric/meter.go @@ -44,56 +44,56 @@ type Meter interface { // Int64Counter returns a new instrument identified by name and configured // with options. The instrument is used to synchronously record increasing // int64 measurements during a computational operation. - Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error) + Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) // Int64UpDownCounter returns a new instrument identified by name and // configured with options. The instrument is used to synchronously record // int64 measurements during a computational operation. - Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error) + Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error) // Int64Histogram returns a new instrument identified by name and // configured with options. The instrument is used to synchronously record // the distribution of int64 measurements during a computational operation. - Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error) + Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error) // Int64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing int64 measurements once per a measurement collection cycle. - Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error) + Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error) // Int64ObservableUpDownCounter returns a new instrument identified by name // and configured with options. The instrument is used to asynchronously // record int64 measurements once per a measurement collection cycle. - Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error) + Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) // Int64ObservableGauge returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // instantaneous int64 measurements once per a measurement collection // cycle. - Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error) + Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) // Float64Counter returns a new instrument identified by name and // configured with options. The instrument is used to synchronously record // increasing float64 measurements during a computational operation. - Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error) + Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error) // Float64UpDownCounter returns a new instrument identified by name and // configured with options. The instrument is used to synchronously record // float64 measurements during a computational operation. - Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error) + Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) // Float64Histogram returns a new instrument identified by name and // configured with options. The instrument is used to synchronously record // the distribution of float64 measurements during a computational // operation. - Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error) + Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error) // Float64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing float64 measurements once per a measurement collection cycle. - Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error) + Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) // Float64ObservableUpDownCounter returns a new instrument identified by // name and configured with options. The instrument is used to // asynchronously record float64 measurements once per a measurement // collection cycle. - Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error) + Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) // Float64ObservableGauge returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // instantaneous float64 measurements once per a measurement collection // cycle. - Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error) + Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) // RegisterCallback registers f to be called during the collection of a // measurement cycle. diff --git a/metric/noop.go b/metric/noop.go index 0b0a9707d14..8c717f2a92a 100644 --- a/metric/noop.go +++ b/metric/noop.go @@ -43,51 +43,51 @@ func NewNoopMeter() Meter { type noopMeter struct{} -func (noopMeter) Int64Counter(string, ...instrument.Option) (syncint64.Counter, error) { +func (noopMeter) Int64Counter(string, ...instrument.Int64Option) (syncint64.Counter, error) { return nonrecordingSyncInt64Instrument{}, nil } -func (noopMeter) Int64UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) { +func (noopMeter) Int64UpDownCounter(string, ...instrument.Int64Option) (syncint64.UpDownCounter, error) { return nonrecordingSyncInt64Instrument{}, nil } -func (noopMeter) Int64Histogram(string, ...instrument.Option) (syncint64.Histogram, error) { +func (noopMeter) Int64Histogram(string, ...instrument.Int64Option) (syncint64.Histogram, error) { return nonrecordingSyncInt64Instrument{}, nil } -func (noopMeter) Int64ObservableCounter(string, ...instrument.Option) (asyncint64.Counter, error) { +func (noopMeter) Int64ObservableCounter(string, ...instrument.Int64ObserverOption) (asyncint64.Counter, error) { return nonrecordingAsyncInt64Instrument{}, nil } -func (noopMeter) Int64ObservableUpDownCounter(string, ...instrument.Option) (asyncint64.UpDownCounter, error) { +func (noopMeter) Int64ObservableUpDownCounter(string, ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) { return nonrecordingAsyncInt64Instrument{}, nil } -func (noopMeter) Int64ObservableGauge(string, ...instrument.Option) (asyncint64.Gauge, error) { +func (noopMeter) Int64ObservableGauge(string, ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) { return nonrecordingAsyncInt64Instrument{}, nil } -func (noopMeter) Float64Counter(string, ...instrument.Option) (syncfloat64.Counter, error) { +func (noopMeter) Float64Counter(string, ...instrument.Float64Option) (syncfloat64.Counter, error) { return nonrecordingSyncFloat64Instrument{}, nil } -func (noopMeter) Float64UpDownCounter(string, ...instrument.Option) (syncfloat64.UpDownCounter, error) { +func (noopMeter) Float64UpDownCounter(string, ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) { return nonrecordingSyncFloat64Instrument{}, nil } -func (noopMeter) Float64Histogram(string, ...instrument.Option) (syncfloat64.Histogram, error) { +func (noopMeter) Float64Histogram(string, ...instrument.Float64Option) (syncfloat64.Histogram, error) { return nonrecordingSyncFloat64Instrument{}, nil } -func (noopMeter) Float64ObservableCounter(string, ...instrument.Option) (asyncfloat64.Counter, error) { +func (noopMeter) Float64ObservableCounter(string, ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) { return nonrecordingAsyncFloat64Instrument{}, nil } -func (noopMeter) Float64ObservableUpDownCounter(string, ...instrument.Option) (asyncfloat64.UpDownCounter, error) { +func (noopMeter) Float64ObservableUpDownCounter(string, ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) { return nonrecordingAsyncFloat64Instrument{}, nil } -func (noopMeter) Float64ObservableGauge(string, ...instrument.Option) (asyncfloat64.Gauge, error) { +func (noopMeter) Float64ObservableGauge(string, ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) { return nonrecordingAsyncFloat64Instrument{}, nil } @@ -110,15 +110,15 @@ var ( _ asyncfloat64.Gauge = nonrecordingAsyncFloat64Instrument{} ) -func (n nonrecordingAsyncFloat64Instrument) Counter(string, ...instrument.Option) (asyncfloat64.Counter, error) { +func (n nonrecordingAsyncFloat64Instrument) Counter(string, ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) { return n, nil } -func (n nonrecordingAsyncFloat64Instrument) UpDownCounter(string, ...instrument.Option) (asyncfloat64.UpDownCounter, error) { +func (n nonrecordingAsyncFloat64Instrument) UpDownCounter(string, ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) { return n, nil } -func (n nonrecordingAsyncFloat64Instrument) Gauge(string, ...instrument.Option) (asyncfloat64.Gauge, error) { +func (n nonrecordingAsyncFloat64Instrument) Gauge(string, ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) { return n, nil } @@ -136,15 +136,15 @@ var ( _ asyncint64.Gauge = nonrecordingAsyncInt64Instrument{} ) -func (n nonrecordingAsyncInt64Instrument) Counter(string, ...instrument.Option) (asyncint64.Counter, error) { +func (n nonrecordingAsyncInt64Instrument) Counter(string, ...instrument.Int64ObserverOption) (asyncint64.Counter, error) { return n, nil } -func (n nonrecordingAsyncInt64Instrument) UpDownCounter(string, ...instrument.Option) (asyncint64.UpDownCounter, error) { +func (n nonrecordingAsyncInt64Instrument) UpDownCounter(string, ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) { return n, nil } -func (n nonrecordingAsyncInt64Instrument) Gauge(string, ...instrument.Option) (asyncint64.Gauge, error) { +func (n nonrecordingAsyncInt64Instrument) Gauge(string, ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) { return n, nil } @@ -161,15 +161,15 @@ var ( _ syncfloat64.Histogram = nonrecordingSyncFloat64Instrument{} ) -func (n nonrecordingSyncFloat64Instrument) Counter(string, ...instrument.Option) (syncfloat64.Counter, error) { +func (n nonrecordingSyncFloat64Instrument) Counter(string, ...instrument.Float64Option) (syncfloat64.Counter, error) { return n, nil } -func (n nonrecordingSyncFloat64Instrument) UpDownCounter(string, ...instrument.Option) (syncfloat64.UpDownCounter, error) { +func (n nonrecordingSyncFloat64Instrument) UpDownCounter(string, ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) { return n, nil } -func (n nonrecordingSyncFloat64Instrument) Histogram(string, ...instrument.Option) (syncfloat64.Histogram, error) { +func (n nonrecordingSyncFloat64Instrument) Histogram(string, ...instrument.Float64Option) (syncfloat64.Histogram, error) { return n, nil } @@ -191,15 +191,15 @@ var ( _ syncint64.Histogram = nonrecordingSyncInt64Instrument{} ) -func (n nonrecordingSyncInt64Instrument) Counter(string, ...instrument.Option) (syncint64.Counter, error) { +func (n nonrecordingSyncInt64Instrument) Counter(string, ...instrument.Int64Option) (syncint64.Counter, error) { return n, nil } -func (n nonrecordingSyncInt64Instrument) UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) { +func (n nonrecordingSyncInt64Instrument) UpDownCounter(string, ...instrument.Int64Option) (syncint64.UpDownCounter, error) { return n, nil } -func (n nonrecordingSyncInt64Instrument) Histogram(string, ...instrument.Option) (syncint64.Histogram, error) { +func (n nonrecordingSyncInt64Instrument) Histogram(string, ...instrument.Int64Option) (syncint64.Histogram, error) { return n, nil } diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go index 0f9850cbc84..be5d760f317 100644 --- a/sdk/metric/internal/aggregator_example_test.go +++ b/sdk/metric/internal/aggregator_example_test.go @@ -31,7 +31,7 @@ type meter struct { aggregations []metricdata.Aggregation } -func (p *meter) Int64Counter(string, ...instrument.Option) (syncint64.Counter, error) { +func (p *meter) Int64Counter(string, ...instrument.Int64Option) (syncint64.Counter, error) { // This is an example of how a meter would create an aggregator for a new // counter. At this point the provider would determine the aggregation and // temporality to used based on the Reader and View configuration. Assume @@ -47,7 +47,7 @@ func (p *meter) Int64Counter(string, ...instrument.Option) (syncint64.Counter, e return count, nil } -func (p *meter) Int64UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) { +func (p *meter) Int64UpDownCounter(string, ...instrument.Int64Option) (syncint64.UpDownCounter, error) { // This is an example of how a meter would create an aggregator for a new // up-down counter. At this point the provider would determine the // aggregation and temporality to used based on the Reader and View @@ -64,7 +64,7 @@ func (p *meter) Int64UpDownCounter(string, ...instrument.Option) (syncint64.UpDo return upDownCount, nil } -func (p *meter) Int64Histogram(string, ...instrument.Option) (syncint64.Histogram, error) { +func (p *meter) Int64Histogram(string, ...instrument.Int64Option) (syncint64.Histogram, error) { // This is an example of how a meter would create an aggregator for a new // histogram. At this point the provider would determine the aggregation // and temporality to used based on the Reader and View configuration. diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index d28f53f0fdb..1ebd0cb8018 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -15,12 +15,15 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( + "context" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" "go.opentelemetry.io/otel/metric/instrument/asyncint64" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" ) @@ -31,8 +34,8 @@ import ( type meter struct { pipes pipelines - instProviderInt64 *instProvider[int64] - instProviderFloat64 *instProvider[float64] + int64IP *instProvider[int64] + float64IP *instProvider[float64] } func newMeter(s instrumentation.Scope, p pipelines) *meter { @@ -46,9 +49,9 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { fc := newInstrumentCache[float64](nil, &viewCache) return &meter{ - pipes: p, - instProviderInt64: newInstProvider(s, p, ic), - instProviderFloat64: newInstProvider(s, p, fc), + pipes: p, + int64IP: newInstProvider(s, p, ic), + float64IP: newInstProvider(s, p, fc), } } @@ -58,85 +61,145 @@ var _ metric.Meter = (*meter)(nil) // Int64Counter returns a new instrument identified by name and configured with // options. The instrument is used to synchronously record increasing int64 // measurements during a computational operation. -func (m *meter) Int64Counter(name string, options ...instrument.Option) (syncint64.Counter, error) { - return m.instProviderInt64.lookup(InstrumentKindCounter, name, options) +func (m *meter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) { + cfg := instrument.NewInt64Config(options...) + const kind = InstrumentKindCounter + return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) } // Int64UpDownCounter returns a new instrument identified by name and // configured with options. The instrument is used to synchronously record // int64 measurements during a computational operation. -func (m *meter) Int64UpDownCounter(name string, options ...instrument.Option) (syncint64.UpDownCounter, error) { - return m.instProviderInt64.lookup(InstrumentKindUpDownCounter, name, options) +func (m *meter) Int64UpDownCounter(name string, options ...instrument.Int64Option) (syncint64.UpDownCounter, error) { + cfg := instrument.NewInt64Config(options...) + const kind = InstrumentKindUpDownCounter + return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) } // Int64Histogram returns a new instrument identified by name and configured // with options. The instrument is used to synchronously record the // distribution of int64 measurements during a computational operation. -func (m *meter) Int64Histogram(name string, options ...instrument.Option) (syncint64.Histogram, error) { - return m.instProviderInt64.lookup(InstrumentKindHistogram, name, options) +func (m *meter) Int64Histogram(name string, options ...instrument.Int64Option) (syncint64.Histogram, error) { + cfg := instrument.NewInt64Config(options...) + const kind = InstrumentKindHistogram + return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) } // Int64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing int64 measurements once per a measurement collection cycle. -func (m *meter) Int64ObservableCounter(name string, options ...instrument.Option) (asyncint64.Counter, error) { - return m.instProviderInt64.lookup(InstrumentKindObservableCounter, name, options) +func (m *meter) Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.Counter, error) { + cfg := instrument.NewInt64ObserverConfig(options...) + const kind = InstrumentKindObservableCounter + p := int64ObservProvider{m.int64IP} + inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + if err != nil { + return nil, err + } + p.registerCallbacks(inst, cfg.Callbacks()) + return inst, nil } // Int64ObservableUpDownCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // int64 measurements once per a measurement collection cycle. -func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncint64.UpDownCounter, error) { - return m.instProviderInt64.lookup(InstrumentKindObservableUpDownCounter, name, options) +func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (asyncint64.UpDownCounter, error) { + cfg := instrument.NewInt64ObserverConfig(options...) + const kind = InstrumentKindObservableUpDownCounter + p := int64ObservProvider{m.int64IP} + inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + if err != nil { + return nil, err + } + p.registerCallbacks(inst, cfg.Callbacks()) + return inst, nil } // Int64ObservableGauge returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // instantaneous int64 measurements once per a measurement collection cycle. -func (m *meter) Int64ObservableGauge(name string, options ...instrument.Option) (asyncint64.Gauge, error) { - return m.instProviderInt64.lookup(InstrumentKindObservableGauge, name, options) +func (m *meter) Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (asyncint64.Gauge, error) { + cfg := instrument.NewInt64ObserverConfig(options...) + const kind = InstrumentKindObservableGauge + p := int64ObservProvider{m.int64IP} + inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + if err != nil { + return nil, err + } + p.registerCallbacks(inst, cfg.Callbacks()) + return inst, nil } // Float64Counter returns a new instrument identified by name and configured // with options. The instrument is used to synchronously record increasing // float64 measurements during a computational operation. -func (m *meter) Float64Counter(name string, options ...instrument.Option) (syncfloat64.Counter, error) { - return m.instProviderFloat64.lookup(InstrumentKindCounter, name, options) +func (m *meter) Float64Counter(name string, options ...instrument.Float64Option) (syncfloat64.Counter, error) { + cfg := instrument.NewFloat64Config(options...) + const kind = InstrumentKindCounter + return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) } // Float64UpDownCounter returns a new instrument identified by name and // configured with options. The instrument is used to synchronously record // float64 measurements during a computational operation. -func (m *meter) Float64UpDownCounter(name string, options ...instrument.Option) (syncfloat64.UpDownCounter, error) { - return m.instProviderFloat64.lookup(InstrumentKindUpDownCounter, name, options) +func (m *meter) Float64UpDownCounter(name string, options ...instrument.Float64Option) (syncfloat64.UpDownCounter, error) { + cfg := instrument.NewFloat64Config(options...) + const kind = InstrumentKindUpDownCounter + return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) } // Float64Histogram returns a new instrument identified by name and configured // with options. The instrument is used to synchronously record the // distribution of float64 measurements during a computational operation. -func (m *meter) Float64Histogram(name string, options ...instrument.Option) (syncfloat64.Histogram, error) { - return m.instProviderFloat64.lookup(InstrumentKindHistogram, name, options) +func (m *meter) Float64Histogram(name string, options ...instrument.Float64Option) (syncfloat64.Histogram, error) { + cfg := instrument.NewFloat64Config(options...) + const kind = InstrumentKindHistogram + return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) } // Float64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing float64 measurements once per a measurement collection cycle. -func (m *meter) Float64ObservableCounter(name string, options ...instrument.Option) (asyncfloat64.Counter, error) { - return m.instProviderFloat64.lookup(InstrumentKindObservableCounter, name, options) +func (m *meter) Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Counter, error) { + cfg := instrument.NewFloat64ObserverConfig(options...) + const kind = InstrumentKindObservableCounter + p := float64ObservProvider{m.float64IP} + inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + if err != nil { + return nil, err + } + p.registerCallbacks(inst, cfg.Callbacks()) + return inst, nil } // Float64ObservableUpDownCounter returns a new instrument identified by name // and configured with options. The instrument is used to asynchronously record // float64 measurements once per a measurement collection cycle. -func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Option) (asyncfloat64.UpDownCounter, error) { - return m.instProviderFloat64.lookup(InstrumentKindObservableUpDownCounter, name, options) +func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.UpDownCounter, error) { + cfg := instrument.NewFloat64ObserverConfig(options...) + const kind = InstrumentKindObservableUpDownCounter + p := float64ObservProvider{m.float64IP} + inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + if err != nil { + return nil, err + } + p.registerCallbacks(inst, cfg.Callbacks()) + return inst, nil } // Float64ObservableGauge returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // instantaneous float64 measurements once per a measurement collection cycle. -func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option) (asyncfloat64.Gauge, error) { - return m.instProviderFloat64.lookup(InstrumentKindObservableGauge, name, options) +func (m *meter) Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (asyncfloat64.Gauge, error) { + cfg := instrument.NewFloat64ObserverConfig(options...) + const kind = InstrumentKindObservableGauge + p := float64ObservProvider{m.float64IP} + inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + if err != nil { + return nil, err + } + p.registerCallbacks(inst, cfg.Callbacks()) + return inst, nil } // RegisterCallback registers the function f to be called when any of the @@ -148,18 +211,18 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callb switch t := inst.(type) { case *instrumentImpl[int64]: if len(t.aggregators) > 0 { - return m.registerCallback(f) + return m.registerMultiCallback(f) } case *instrumentImpl[float64]: if len(t.aggregators) > 0 { - return m.registerCallback(f) + return m.registerMultiCallback(f) } default: // Instrument external to the SDK. For example, an instrument from // the "go.opentelemetry.io/otel/metric/internal/global" package. // // Fail gracefully here, assume a valid instrument. - return m.registerCallback(f) + return m.registerMultiCallback(f) } } // All insts use drop aggregation. @@ -172,30 +235,64 @@ func (noopRegister) Unregister() error { return nil } -func (m *meter) registerCallback(c metric.Callback) (metric.Registration, error) { - return m.pipes.registerCallback(c), nil +func (m *meter) registerMultiCallback(c metric.Callback) (metric.Registration, error) { + return m.pipes.registerMultiCallback(c), nil } // instProvider provides all OpenTelemetry instruments. type instProvider[N int64 | float64] struct { scope instrumentation.Scope + pipes pipelines resolve resolver[N] } func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] { - return &instProvider[N]{scope: s, resolve: newResolver(p, c)} + return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)} } // lookup returns the resolved instrumentImpl. -func (p *instProvider[N]) lookup(kind InstrumentKind, name string, opts []instrument.Option) (*instrumentImpl[N], error) { - cfg := instrument.NewConfig(opts...) - i := Instrument{ +func (p *instProvider[N]) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (*instrumentImpl[N], error) { + inst := Instrument{ Name: name, - Description: cfg.Description(), - Unit: cfg.Unit(), + Description: desc, + Unit: u, Kind: kind, Scope: p.scope, } - aggs, err := p.resolve.Aggregators(i) + aggs, err := p.resolve.Aggregators(inst) return &instrumentImpl[N]{aggregators: aggs}, err } + +type int64ObservProvider struct{ *instProvider[int64] } + +func (p int64ObservProvider) registerCallbacks(inst *instrumentImpl[int64], cBacks []instrument.Int64Callback) { + if inst == nil { + // Drop aggregator. + return + } + + for _, cBack := range cBacks { + p.pipes.registerCallback(p.callback(inst, cBack)) + } +} + +func (p int64ObservProvider) callback(i *instrumentImpl[int64], f instrument.Int64Callback) func(context.Context) error { + return func(ctx context.Context) error { return f(ctx, i) } +} + +type float64ObservProvider struct{ *instProvider[float64] } + +func (p float64ObservProvider) registerCallbacks(inst *instrumentImpl[float64], cBacks []instrument.Float64Callback) { + if inst == nil { + // Drop aggregator. + return + } + + for _, cBack := range cBacks { + p.pipes.registerCallback(p.callback(inst, cBack)) + } +} + +func (p float64ObservProvider) callback(i *instrumentImpl[float64], f instrument.Float64Callback) func(context.Context) error { + return func(ctx context.Context) error { return f(ctx, i) } +} diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index fee46e93dac..9568ee3311a 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -167,6 +167,7 @@ func TestCallbackUnregisterConcurrency(t *testing.T) { // Instruments should produce correct ResourceMetrics. func TestMeterCreatesInstruments(t *testing.T) { + attrs := []attribute.KeyValue{attribute.String("name", "alice")} seven := 7.0 testCases := []struct { name string @@ -176,7 +177,11 @@ func TestMeterCreatesInstruments(t *testing.T) { { name: "ObservableInt64Count", fn: func(t *testing.T, m metric.Meter) { - ctr, err := m.Int64ObservableCounter("aint") + cback := func(ctx context.Context, o instrument.Int64Observer) error { + o.Observe(ctx, 4, attrs...) + return nil + } + ctr, err := m.Int64ObservableCounter("aint", instrument.WithInt64Callback(cback)) assert.NoError(t, err) _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) @@ -192,6 +197,7 @@ func TestMeterCreatesInstruments(t *testing.T) { Temporality: metricdata.CumulativeTemporality, IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attrs...), Value: 4}, {Value: 3}, }, }, @@ -200,7 +206,11 @@ func TestMeterCreatesInstruments(t *testing.T) { { name: "ObservableInt64UpDownCount", fn: func(t *testing.T, m metric.Meter) { - ctr, err := m.Int64ObservableUpDownCounter("aint") + cback := func(ctx context.Context, o instrument.Int64Observer) error { + o.Observe(ctx, 4, attrs...) + return nil + } + ctr, err := m.Int64ObservableUpDownCounter("aint", instrument.WithInt64Callback(cback)) assert.NoError(t, err) _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) @@ -216,6 +226,7 @@ func TestMeterCreatesInstruments(t *testing.T) { Temporality: metricdata.CumulativeTemporality, IsMonotonic: false, DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attrs...), Value: 4}, {Value: 11}, }, }, @@ -224,7 +235,11 @@ func TestMeterCreatesInstruments(t *testing.T) { { name: "ObservableInt64Gauge", fn: func(t *testing.T, m metric.Meter) { - gauge, err := m.Int64ObservableGauge("agauge") + cback := func(ctx context.Context, o instrument.Int64Observer) error { + o.Observe(ctx, 4, attrs...) + return nil + } + gauge, err := m.Int64ObservableGauge("agauge", instrument.WithInt64Callback(cback)) assert.NoError(t, err) _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) @@ -238,6 +253,7 @@ func TestMeterCreatesInstruments(t *testing.T) { Name: "agauge", Data: metricdata.Gauge[int64]{ DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attrs...), Value: 4}, {Value: 11}, }, }, @@ -246,7 +262,11 @@ func TestMeterCreatesInstruments(t *testing.T) { { name: "ObservableFloat64Count", fn: func(t *testing.T, m metric.Meter) { - ctr, err := m.Float64ObservableCounter("afloat") + cback := func(ctx context.Context, o instrument.Float64Observer) error { + o.Observe(ctx, 4, attrs...) + return nil + } + ctr, err := m.Float64ObservableCounter("afloat", instrument.WithFloat64Callback(cback)) assert.NoError(t, err) _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) @@ -262,6 +282,7 @@ func TestMeterCreatesInstruments(t *testing.T) { Temporality: metricdata.CumulativeTemporality, IsMonotonic: true, DataPoints: []metricdata.DataPoint[float64]{ + {Attributes: attribute.NewSet(attrs...), Value: 4}, {Value: 3}, }, }, @@ -270,7 +291,11 @@ func TestMeterCreatesInstruments(t *testing.T) { { name: "ObservableFloat64UpDownCount", fn: func(t *testing.T, m metric.Meter) { - ctr, err := m.Float64ObservableUpDownCounter("afloat") + cback := func(ctx context.Context, o instrument.Float64Observer) error { + o.Observe(ctx, 4, attrs...) + return nil + } + ctr, err := m.Float64ObservableUpDownCounter("afloat", instrument.WithFloat64Callback(cback)) assert.NoError(t, err) _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) @@ -286,6 +311,7 @@ func TestMeterCreatesInstruments(t *testing.T) { Temporality: metricdata.CumulativeTemporality, IsMonotonic: false, DataPoints: []metricdata.DataPoint[float64]{ + {Attributes: attribute.NewSet(attrs...), Value: 4}, {Value: 11}, }, }, @@ -294,7 +320,11 @@ func TestMeterCreatesInstruments(t *testing.T) { { name: "ObservableFloat64Gauge", fn: func(t *testing.T, m metric.Meter) { - gauge, err := m.Float64ObservableGauge("agauge") + cback := func(ctx context.Context, o instrument.Float64Observer) error { + o.Observe(ctx, 4, attrs...) + return nil + } + gauge, err := m.Float64ObservableGauge("agauge", instrument.WithFloat64Callback(cback)) assert.NoError(t, err) _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) @@ -308,6 +338,7 @@ func TestMeterCreatesInstruments(t *testing.T) { Name: "agauge", Data: metricdata.Gauge[float64]{ DataPoints: []metricdata.DataPoint[float64]{ + {Attributes: attribute.NewSet(attrs...), Value: 4}, {Value: 11}, }, }, diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 4ebabc0c1d2..e305eb8706c 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -76,8 +76,9 @@ type pipeline struct { views []View sync.Mutex - aggregations map[instrumentation.Scope][]instrumentSync - callbacks list.List + aggregations map[instrumentation.Scope][]instrumentSync + callbacks []func(context.Context) error + multiCallbacks list.List } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -95,14 +96,23 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { p.aggregations[scope] = append(p.aggregations[scope], iSync) } -// addCallback registers a callback to be run when `produce()` is called. -func (p *pipeline) addCallback(c metric.Callback) (unregister func()) { +// addCallback registers a single instrument callback to be run when +// `produce()` is called. +func (p *pipeline) addCallback(cback func(context.Context) error) { p.Lock() defer p.Unlock() - e := p.callbacks.PushBack(c) + p.callbacks = append(p.callbacks, cback) +} + +// addMultiCallback registers a multi-instrument callback to be run when +// `produce()` is called. +func (p *pipeline) addMultiCallback(c metric.Callback) (unregister func()) { + p.Lock() + defer p.Unlock() + e := p.multiCallbacks.PushBack(c) return func() { p.Lock() - p.callbacks.Remove(e) + p.multiCallbacks.Remove(e) p.Unlock() } } @@ -124,7 +134,17 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err p.Lock() defer p.Unlock() - for e := p.callbacks.Front(); e != nil; e = e.Next() { + var errs multierror + for _, c := range p.callbacks { + // TODO make the callbacks parallel. ( #3034 ) + if err := c(ctx); err != nil { + errs.append(err) + } + if err := ctx.Err(); err != nil { + return metricdata.ResourceMetrics{}, err + } + } + for e := p.multiCallbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) f := e.Value.(metric.Callback) f(ctx) @@ -159,7 +179,7 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err return metricdata.ResourceMetrics{ Resource: p.resource, ScopeMetrics: sm, - }, nil + }, errs.errorOrNil() } // inserter facilitates inserting of new instruments from a single scope into a @@ -447,10 +467,16 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli return pipes } -func (p pipelines) registerCallback(c metric.Callback) metric.Registration { +func (p pipelines) registerCallback(cback func(context.Context) error) { + for _, pipe := range p { + pipe.addCallback(cback) + } +} + +func (p pipelines) registerMultiCallback(c metric.Callback) metric.Registration { unregs := make([]func(), len(p)) for i, pipe := range p { - unregs[i] = pipe.addCallback(c) + unregs[i] = pipe.addMultiCallback(c) } return unregisterFuncs(unregs) } diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 2badfe1865e..7fea4c8aba2 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -54,7 +54,7 @@ func TestEmptyPipeline(t *testing.T) { }) require.NotPanics(t, func() { - pipe.addCallback(func(ctx context.Context) {}) + pipe.addMultiCallback(func(ctx context.Context) {}) }) output, err = pipe.produce(context.Background()) @@ -78,7 +78,7 @@ func TestNewPipeline(t *testing.T) { }) require.NotPanics(t, func() { - pipe.addCallback(func(ctx context.Context) {}) + pipe.addMultiCallback(func(ctx context.Context) {}) }) output, err = pipe.produce(context.Background()) @@ -121,7 +121,7 @@ func TestPipelineConcurrency(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - pipe.addCallback(func(ctx context.Context) {}) + pipe.addMultiCallback(func(ctx context.Context) {}) }() } wg.Wait()