Skip to content

Commit e29038a

Browse files
committed
anotha one
Signed-off-by: Toby Lawrence <[email protected]>
1 parent 38be785 commit e29038a

File tree

2 files changed

+11
-16
lines changed

2 files changed

+11
-16
lines changed

src/sinks/datadog/metrics/sink.rs

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ where
6161
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
6262
let sink = input
6363
.filter_map(|event| ready(event.try_into_metric()))
64+
.normalize(DatadogMetricNormalizer)
6465
.batched(DatadogMetricsTypePartitioner, self.batch_settings);
6566
//.into_driver(self.service, self.acker);
6667

src/sinks/util/normalizer.rs

+10-16
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,22 @@
1-
use std::{pin::Pin, task::{Context, Poll, ready}};
1+
use std::{pin::Pin, task::{Context, Poll}};
22

3-
use futures_util::Stream;
3+
use futures_util::{Stream, ready};
44
use vector_core::event::Metric;
55

66
use super::buffer::metrics::{MetricNormalize, MetricNormalizer};
77

8-
pub struct Normalizer<St, N>
9-
where
10-
N: MetricNormalize,
11-
{
8+
pub struct Normalizer<St, N> {
129
stream: St,
13-
normalizer: MetricNormalizer<N>,
10+
normalizer: N,
11+
metric_set: MetricSet,
1412
}
1513

16-
impl<St, N> Normalizer<St, N>
17-
where
18-
N: MetricNormalize,
19-
{
20-
pub fn new(stream: St) -> Normalizer<St, N>
21-
where
22-
N: MetricNormalize,
23-
{
14+
impl<St, N> Normalizer<St, N> {
15+
pub fn new(stream: St, normalizer: N) -> Self {
2416
Self {
2517
stream,
26-
normalizer: MetricNormalizer::default(),
18+
normalizer,
19+
metric_set: MetricSet::default(),
2720
}
2821
}
2922
}
@@ -37,5 +30,6 @@ where
3730

3831
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3932
let metric = ready!(self.stream.poll_next(cx));
33+
let result = N::apply_state(&mut self.metric_set, metric);
4034
}
4135
}

0 commit comments

Comments
 (0)