Skip to content

Commit 61c0ae8

Browse files
authored
feat(appsignal sink): Normalize metrics (vectordotdev#18217)
* feat(appsignal sink): Normalize metrics Implement a normaliser for the AppSignal sink to convert absolute counter metrics to incremental counters, and incremental gauges to absolute gauges. The AppSignal API ignores absolute counters and incremental gauges, so this change adds support for absolute counters and incremental gauges. This normaliser is inspired by the DataDog normaliser. * Refactor metric normalizer tests Move the methods that generate test metrics and the code that compares metric inputs and normalized outputs to the `test_util` metrics module. Previously these test helpers were duplicated across DataDog, StatsD and AppSignal's metric sinks. Rename the `run_comparisons` method to `assert_normalize`, as it asserts the results of running a normalizer's `.normalize` method. Move the duplicated test implementations to the `test_util::metrics` module, in a separate tests sub-module, and make them generic over the normalizer. Use these test definitions in the DataDog, StatsD and AppSignal's metric sink tests. * Fix integration tests Since the AppSignal sink now normalises counters from absolute to incremental, absolute counters that are only emitted once do not result in an outgoing HTTP request being emitted by the sink. Address this by emitting the absolute counters in the tests at least twice. This also implicitly tests the metrics' normalisation.
1 parent 71343bd commit 61c0ae8

File tree

7 files changed

+476
-552
lines changed

7 files changed

+476
-552
lines changed

src/sinks/appsignal/integration_tests.rs

+34-18
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,23 @@ async fn metrics_real_endpoint() {
9898
#[tokio::test]
9999
async fn metrics_shape() {
100100
let events: Vec<_> = (0..5)
101-
.map(|index| {
102-
Event::Metric(Metric::new(
103-
format!("counter_{}", index),
104-
MetricKind::Absolute,
105-
MetricValue::Counter {
106-
value: index as f64,
107-
},
108-
))
101+
.flat_map(|index| {
102+
vec![
103+
Event::Metric(Metric::new(
104+
format!("counter_{}", index),
105+
MetricKind::Absolute,
106+
MetricValue::Counter {
107+
value: index as f64,
108+
},
109+
)),
110+
Event::Metric(Metric::new(
111+
format!("counter_{}", index),
112+
MetricKind::Absolute,
113+
MetricValue::Counter {
114+
value: (index + index) as f64,
115+
},
116+
)),
117+
]
109118
})
110119
.collect();
111120
let api_key = push_api_key();
@@ -146,11 +155,11 @@ async fn metrics_shape() {
146155
.collect();
147156
assert_eq!(
148157
vec![
149-
("counter_0", "absolute", 0.0),
150-
("counter_1", "absolute", 1.0),
151-
("counter_2", "absolute", 2.0),
152-
("counter_3", "absolute", 3.0),
153-
("counter_4", "absolute", 4.0),
158+
("counter_0", "incremental", 0.0),
159+
("counter_1", "incremental", 1.0),
160+
("counter_2", "incremental", 2.0),
161+
("counter_3", "incremental", 3.0),
162+
("counter_4", "incremental", 4.0),
154163
],
155164
metrics
156165
);
@@ -231,11 +240,18 @@ async fn error_scenario_real_endpoint() {
231240

232241
let (sink, _) = config.build(cx).await.unwrap();
233242
let (batch, receiver) = BatchNotifier::new_with_receiver();
234-
let events = vec![Event::Metric(Metric::new(
235-
"counter",
236-
MetricKind::Absolute,
237-
MetricValue::Counter { value: 1.0 },
238-
))];
243+
let events = vec![
244+
Event::Metric(Metric::new(
245+
"counter",
246+
MetricKind::Absolute,
247+
MetricValue::Counter { value: 1.0 },
248+
)),
249+
Event::Metric(Metric::new(
250+
"counter",
251+
MetricKind::Absolute,
252+
MetricValue::Counter { value: 2.0 },
253+
)),
254+
];
239255
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));
240256

241257
sink.run(stream).await.unwrap();

src/sinks/appsignal/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
99
mod config;
1010
mod encoder;
11+
mod normalizer;
1112
mod request_builder;
1213
mod service;
1314
mod sink;

src/sinks/appsignal/normalizer.rs

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use vector_core::event::{Metric, MetricValue};
2+
3+
use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};
4+
5+
#[derive(Default)]
6+
pub(crate) struct AppsignalMetricsNormalizer;
7+
8+
impl MetricNormalize for AppsignalMetricsNormalizer {
9+
fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
10+
// We only care about making sure that counters are incremental, and that gauges are
11+
// always absolute. Other metric types are currently unsupported.
12+
match &metric.value() {
13+
// We always send counters as incremental and gauges as absolute. Realistically, any
14+
// system sending an incremental gauge update is kind of doing it wrong, but alas.
15+
MetricValue::Counter { .. } => state.make_incremental(metric),
16+
MetricValue::Gauge { .. } => state.make_absolute(metric),
17+
// Otherwise, send it through as-is.
18+
_ => Some(metric),
19+
}
20+
}
21+
}
22+
23+
#[cfg(test)]
24+
mod tests {
25+
use std::collections::BTreeSet;
26+
27+
use crate::event::{Metric, MetricKind, MetricValue};
28+
29+
use super::AppsignalMetricsNormalizer;
30+
use crate::test_util::metrics::{assert_normalize, tests};
31+
32+
#[test]
33+
fn absolute_counter() {
34+
tests::absolute_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
35+
}
36+
37+
#[test]
38+
fn incremental_counter() {
39+
tests::incremental_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
40+
}
41+
42+
#[test]
43+
fn mixed_counter() {
44+
tests::mixed_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
45+
}
46+
47+
#[test]
48+
fn absolute_gauge() {
49+
tests::absolute_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
50+
}
51+
52+
#[test]
53+
fn incremental_gauge() {
54+
tests::incremental_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
55+
}
56+
57+
#[test]
58+
fn mixed_gauge() {
59+
tests::mixed_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
60+
}
61+
62+
#[test]
63+
fn other_metrics() {
64+
let metric = Metric::new(
65+
"set",
66+
MetricKind::Incremental,
67+
MetricValue::Set {
68+
values: BTreeSet::new(),
69+
},
70+
);
71+
72+
assert_normalize(
73+
AppsignalMetricsNormalizer,
74+
vec![metric.clone()],
75+
vec![Some(metric)],
76+
);
77+
}
78+
}

src/sinks/appsignal/sink.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use futures::{stream::BoxStream, StreamExt};
2+
use futures_util::future::ready;
23
use tower::{Service, ServiceBuilder};
34
use vector_core::{
45
event::Event,
@@ -7,12 +8,14 @@ use vector_core::{
78
};
89

910
use crate::{
10-
codecs::Transformer, internal_events::SinkRequestBuildError,
11-
sinks::util::builder::SinkBuilderExt, sinks::util::Compression,
11+
codecs::Transformer,
12+
internal_events::SinkRequestBuildError,
13+
sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression},
1214
};
1315

1416
use super::{
1517
encoder::AppsignalEncoder,
18+
normalizer::AppsignalMetricsNormalizer,
1619
request_builder::{AppsignalRequest, AppsignalRequestBuilder},
1720
};
1821

@@ -32,8 +35,16 @@ where
3235
{
3336
pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
3437
let service = ServiceBuilder::new().service(self.service);
38+
let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();
3539

3640
input
41+
.filter_map(move |event| {
42+
ready(if let Event::Metric(metric) = event {
43+
normalizer.normalize(metric).map(Event::Metric)
44+
} else {
45+
Some(event)
46+
})
47+
})
3748
.batched(self.batch_settings.into_byte_size_config())
3849
.request_builder(
3950
None,

0 commit comments

Comments
 (0)