Skip to content

Commit f04e9ec

Browse files
authored
feat: Use builder pattern for constructing Metric Streams (#2984)
1 parent 7cfe8cd commit f04e9ec

File tree

7 files changed

+329
-98
lines changed

7 files changed

+329
-98
lines changed

examples/metrics-advanced/src/main.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
99
let my_view_rename_and_unit = |i: &Instrument| {
1010
if i.name == "my_histogram" {
1111
Some(
12-
Stream::new()
13-
.name("my_histogram_renamed")
14-
.unit("milliseconds"),
12+
Stream::builder()
13+
.with_name("my_histogram_renamed")
14+
.with_unit("milliseconds")
15+
.build()
16+
.unwrap(),
1517
)
1618
} else {
1719
None
@@ -21,7 +23,11 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
2123
// for example 2
2224
let my_view_change_cardinality = |i: &Instrument| {
2325
if i.name == "my_second_histogram" {
24-
Some(Stream::new().cardinality_limit(2))
26+
// Note: If Stream is invalid, build() will return an error. By
27+
// calling `.ok()`, any such error is ignored and treated as if the
28+
// view does not match the instrument. If this is not the desired
29+
// behavior, consider handling the error explicitly.
30+
Stream::builder().with_cardinality_limit(2).build().ok()
2531
} else {
2632
None
2733
}

opentelemetry-sdk/CHANGELOG.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,17 @@ also modified to suppress telemetry before invoking exporters.
4444
behind feature flag "experimental_metrics_custom_reader".
4545
[#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928)
4646

47-
- TODO: Placeholder for View related changelog. Polish this after all
48-
- The `Stream` struct now has its public fields hidden.
47+
- TODO: Placeholder for View related changelog. Polish this after all changs done
4948
- Core view functionality is now available by default—users can change the
5049
name, unit, description, and cardinality limit of a metric via views without
5150
enabling the `spec_unstable_metrics_views` feature flag. Advanced view
5251
features, such as custom aggregation or attribute filtering, still require
5352
the `spec_unstable_metrics_views` feature.
54-
- TODO: Add Stream::builder() pattern change, validation when done.
53+
- Introduced a builder pattern for `Stream` creation to use with "Views".
54+
- Added `StreamBuilder` struct with methods to configure stream properties
55+
- Added `Stream::builder()` method that returns a new `StreamBuilder`
56+
- `StreamBuilder::build()` returns `Result<Stream, Box<dyn Error>>` enabling
57+
proper validation
5558

5659
- *Breaking* `Aggregation` enum moved behind feature flag
5760
"spec_unstable_metrics_views". This was only required when using Views.

opentelemetry-sdk/benches/metric.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,10 @@ fn counters(c: &mut Criterion) {
223223
Some(
224224
new_view(
225225
Instrument::new().name("*"),
226-
Stream::new().allowed_attribute_keys([Key::new("K")]),
226+
Stream::builder()
227+
.with_allowed_attribute_keys([Key::new("K")])
228+
.build()
229+
.unwrap(),
227230
)
228231
.unwrap(),
229232
),
@@ -273,10 +276,13 @@ fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram<u64>) {
273276
let view = Some(
274277
new_view(
275278
Instrument::new().name("histogram_*"),
276-
Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
277-
boundaries: bounds.iter().map(|&x| x as f64).collect(),
278-
record_min_max: true,
279-
}),
279+
Stream::builder()
280+
.with_aggregation(Aggregation::ExplicitBucketHistogram {
281+
boundaries: bounds.iter().map(|&x| x as f64).collect(),
282+
record_min_max: true,
283+
})
284+
.build()
285+
.unwrap(),
280286
)
281287
.unwrap(),
282288
);

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 152 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{borrow::Cow, collections::HashSet, sync::Arc};
1+
use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};
22

33
use opentelemetry::{
44
metrics::{AsyncInstrument, SyncInstrument},
@@ -73,10 +73,13 @@ impl InstrumentKind {
7373
/// Instruments can be used as criteria for views.
7474
///
7575
/// ```
76-
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
76+
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream, StreamBuilder};
7777
///
7878
/// let criteria = Instrument::new().name("counter_*");
79-
/// let mask = Stream::new().aggregation(Aggregation::Sum);
79+
/// let mask = Stream::builder()
80+
/// .with_aggregation(Aggregation::Sum)
81+
/// .build()
82+
/// .unwrap();
8083
///
8184
/// let view = new_view(criteria, mask);
8285
/// # drop(view);
@@ -169,71 +172,60 @@ impl Instrument {
169172
}
170173
}
171174

172-
/// Describes the stream of data an instrument produces.
175+
/// A builder for creating Stream objects.
173176
///
174177
/// # Example
175178
///
176-
/// Streams can be used as masks in views.
177-
///
178179
/// ```
179-
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
180-
///
181-
/// let criteria = Instrument::new().name("counter_*");
182-
/// let mask = Stream::new().aggregation(Aggregation::Sum);
180+
/// use opentelemetry_sdk::metrics::{Aggregation, Stream};
181+
/// use opentelemetry::Key;
183182
///
184-
/// let view = new_view(criteria, mask);
185-
/// # drop(view);
183+
/// let stream = Stream::builder()
184+
/// .with_name("my_stream")
185+
/// .with_aggregation(Aggregation::Sum)
186+
/// .with_cardinality_limit(100)
187+
/// .build()
188+
/// .unwrap();
186189
/// ```
187190
#[derive(Default, Debug)]
188191
#[non_exhaustive]
189-
#[allow(unreachable_pub)]
190-
pub struct Stream {
191-
/// The human-readable identifier of the stream.
192-
pub(crate) name: Option<Cow<'static, str>>,
193-
/// Describes the purpose of the data.
194-
pub(crate) description: Option<Cow<'static, str>>,
195-
/// the unit of measurement recorded.
196-
pub(crate) unit: Option<Cow<'static, str>>,
197-
/// Aggregation the stream uses for an instrument.
198-
pub(crate) aggregation: Option<Aggregation>,
199-
/// An allow-list of attribute keys that will be preserved for the stream.
200-
///
201-
/// Any attribute recorded for the stream with a key not in this set will be
202-
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
203-
/// attributes will be kept.
204-
pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
205-
206-
/// Cardinality limit for the stream.
207-
pub(crate) cardinality_limit: Option<usize>,
192+
pub struct StreamBuilder {
193+
name: Option<Cow<'static, str>>,
194+
description: Option<Cow<'static, str>>,
195+
unit: Option<Cow<'static, str>>,
196+
aggregation: Option<Aggregation>,
197+
allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
198+
cardinality_limit: Option<usize>,
208199
}
209200

210-
impl Stream {
211-
/// Create a new stream with empty values.
212-
pub fn new() -> Self {
213-
Stream::default()
201+
impl StreamBuilder {
202+
/// Create a new stream builder with default values.
203+
pub(crate) fn new() -> Self {
204+
StreamBuilder::default()
214205
}
215206

216-
/// Set the stream name.
217-
pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
207+
/// Set the stream name. If this is not set, name provide while creating the instrument will be used.
208+
pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
218209
self.name = Some(name.into());
219210
self
220211
}
221212

222-
/// Set the stream description.
223-
pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
213+
/// Set the stream description. If this is not set, description provided while creating the instrument will be used.
214+
pub fn with_description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
224215
self.description = Some(description.into());
225216
self
226217
}
227218

228-
/// Set the stream unit.
229-
pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
219+
/// Set the stream unit. If this is not set, unit provided while creating the instrument will be used.
220+
pub fn with_unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
230221
self.unit = Some(unit.into());
231222
self
232223
}
233224

234225
#[cfg(feature = "spec_unstable_metrics_views")]
235-
/// Set the stream aggregation.
236-
pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
226+
/// Set the stream aggregation. This is used to customize the aggregation.
227+
/// If not set, the default aggregation based on the instrument kind will be used.
228+
pub fn with_aggregation(mut self, aggregation: Aggregation) -> Self {
237229
self.aggregation = Some(aggregation);
238230
self
239231
}
@@ -242,18 +234,130 @@ impl Stream {
242234
/// Set the stream allowed attribute keys.
243235
///
244236
/// Any attribute recorded for the stream with a key not in this set will be
245-
/// dropped. If this set is empty all attributes will be dropped.
246-
pub fn allowed_attribute_keys(mut self, attribute_keys: impl IntoIterator<Item = Key>) -> Self {
237+
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
238+
/// attributes will be kept.
239+
pub fn with_allowed_attribute_keys(
240+
mut self,
241+
attribute_keys: impl IntoIterator<Item = Key>,
242+
) -> Self {
247243
self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
248-
249244
self
250245
}
251246

252-
/// Set the stream cardinality limit.
253-
pub fn cardinality_limit(mut self, limit: usize) -> Self {
247+
/// Set the stream cardinality limit. If this is not set, the default limit of 2000 will be used.
248+
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
254249
self.cardinality_limit = Some(limit);
255250
self
256251
}
252+
253+
/// Build a new Stream instance using the configuration in this builder.
254+
///
255+
/// # Returns
256+
///
257+
/// A Result containing the new Stream instance or an error if the build failed.
258+
pub fn build(self) -> Result<Stream, Box<dyn Error>> {
259+
// TODO: Add same validation as already done while
260+
// creating instruments. It is better to move validation logic
261+
// to a common helper and call it from both places.
262+
// The current implementations does a basic validation
263+
// only to close the overall API design.
264+
265+
// if name is provided, it must not be empty
266+
if let Some(name) = &self.name {
267+
if name.is_empty() {
268+
return Err("Stream name must not be empty".into());
269+
}
270+
}
271+
272+
// if cardinality limit is provided, it must be greater than 0
273+
if let Some(limit) = self.cardinality_limit {
274+
if limit == 0 {
275+
return Err("Cardinality limit must be greater than 0".into());
276+
}
277+
}
278+
279+
// If the aggregation is set to ExplicitBucketHistogram, validate the bucket boundaries.
280+
if let Some(Aggregation::ExplicitBucketHistogram { boundaries, .. }) = &self.aggregation {
281+
validate_bucket_boundaries(boundaries)?;
282+
}
283+
284+
Ok(Stream {
285+
name: self.name,
286+
description: self.description,
287+
unit: self.unit,
288+
aggregation: self.aggregation,
289+
allowed_attribute_keys: self.allowed_attribute_keys,
290+
cardinality_limit: self.cardinality_limit,
291+
})
292+
}
293+
}
294+
295+
fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
296+
// Validate boundaries do not contain f64::NAN, f64::INFINITY, or f64::NEG_INFINITY
297+
for boundary in boundaries {
298+
if boundary.is_nan() || boundary.is_infinite() {
299+
return Err(
300+
"Bucket boundaries must not contain NaN, Infinity, or -Infinity".to_string(),
301+
);
302+
}
303+
}
304+
305+
// validate that buckets are sorted and non-duplicate
306+
for i in 1..boundaries.len() {
307+
if boundaries[i] <= boundaries[i - 1] {
308+
return Err("Bucket boundaries must be sorted and non-duplicate".to_string());
309+
}
310+
}
311+
312+
Ok(())
313+
}
314+
315+
/// Describes the stream of data an instrument produces.
316+
///
317+
/// # Example
318+
///
319+
/// Streams can be used as masks in views.
320+
///
321+
/// ```
322+
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
323+
///
324+
/// let criteria = Instrument::new().name("counter_*");
325+
/// let mask = Stream::builder()
326+
/// .with_aggregation(Aggregation::Sum)
327+
/// .build()
328+
/// .unwrap();
329+
///
330+
/// let view = new_view(criteria, mask);
331+
/// # drop(view);
332+
/// ```
333+
#[derive(Default, Debug)]
334+
#[non_exhaustive]
335+
#[allow(unreachable_pub)]
336+
pub struct Stream {
337+
/// The human-readable identifier of the stream.
338+
pub(crate) name: Option<Cow<'static, str>>,
339+
/// Describes the purpose of the data.
340+
pub(crate) description: Option<Cow<'static, str>>,
341+
/// the unit of measurement recorded.
342+
pub(crate) unit: Option<Cow<'static, str>>,
343+
/// Aggregation the stream uses for an instrument.
344+
pub(crate) aggregation: Option<Aggregation>,
345+
/// An allow-list of attribute keys that will be preserved for the stream.
346+
///
347+
/// Any attribute recorded for the stream with a key not in this set will be
348+
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
349+
/// attributes will be kept.
350+
pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
351+
352+
/// Cardinality limit for the stream.
353+
pub(crate) cardinality_limit: Option<usize>,
354+
}
355+
356+
impl Stream {
357+
/// Create a new stream builder with default values.
358+
pub fn builder() -> StreamBuilder {
359+
StreamBuilder::new()
360+
}
257361
}
258362

259363
/// The identifying properties of an instrument.

0 commit comments

Comments
 (0)