diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 874e6b5604d4d..5113e38536bc1 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -1,9 +1,7 @@ use super::fanout::{self, Fanout}; use crate::buffers; -use futures::prelude::*; -use futures::{sync::mpsc, Future}; -use std::collections::HashMap; -use std::time::Duration; +use futures::{sync::mpsc, Future, Stream}; +use std::{collections::HashMap, time::Duration}; use stream_cancel::{Trigger, Tripwire}; use tokio::util::FutureExt; use tokio_trace_futures::Instrument; @@ -74,7 +72,12 @@ pub fn build_pieces(config: &super::Config) -> Result<(Pieces, Vec), Vec let (output, control) = Fanout::new(); let task = input_rx - .filter_map(move |event| transform.transform(event)) + .map(move |event| { + let mut output = Vec::with_capacity(1); + transform.transform_into(&mut output, event); + futures::stream::iter_ok(output.into_iter()) + }) + .flatten() .forward(output) .map(|_| ()); let task: Task = Box::new(task); diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index e4a2cbc0bf087..4eabd2d8f291b 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -90,7 +90,14 @@ impl LogToMetric { } impl Transform for LogToMetric { + // Only used in tests fn transform(&mut self, event: Event) -> Option { + let mut output = Vec::new(); + self.transform_into(&mut output, event); + output.pop() + } + + fn transform_into(&mut self, output: &mut Vec, event: Event) { let event = event.into_log(); for metric in self.config.metrics.iter() { @@ -99,17 +106,16 @@ impl Transform for LogToMetric { if let Some(val) = event.get(&counter.field) { if counter.increment_by_value { if let Ok(val) = val.to_string_lossy().parse::() { - return Some(Event::Metric(Metric::Counter { + output.push(Event::Metric(Metric::Counter { name: counter.sanitized_name.to_string(), val: val as u32, sampling: None, })); } else { trace!("failed to parse counter value"); - return None; } } else { - return Some(Event::Metric(Metric::Counter { + output.push(Event::Metric(Metric::Counter { name: counter.sanitized_name.to_string(), val: 1, sampling: None, @@ -120,21 +126,18 @@ impl Transform for LogToMetric { MetricConfig::Gauge(gauge) => { if let Some(val) = event.get(&gauge.field) { if let Ok(val) = val.to_string_lossy().parse() { - return Some(Event::Metric(Metric::Gauge { + output.push(Event::Metric(Metric::Gauge { name: gauge.sanitized_name.to_string(), val, direction: None, })); } else { trace!("failed to parse gauge value"); - return None; } } } } } - - None } } @@ -328,4 +331,51 @@ mod tests { let mut transform = LogToMetric::new(&config); assert!(transform.transform(log).is_none()); } + + #[test] + fn multiple_metrics() { + let config: LogToMetricConfig = toml::from_str( + r##" + [[metrics]] + type = "counter" + field = "status" + labels = {status = "#{event.status}", host = "#{event.host}"} + + [[metrics]] + type = "counter" + field = "backtrace" + name = "exception_total" + labels = {host = "#{event.host}"} + "##, + ) + .unwrap(); + + let mut log = Event::from("i am a log"); + log.as_mut_log() + .insert_explicit("status".into(), "42".into()); + log.as_mut_log() + .insert_explicit("backtrace".into(), "message".into()); + + let mut transform = LogToMetric::new(&config); + + let mut output = Vec::new(); + transform.transform_into(&mut output, log); + assert_eq!(2, output.len()); + assert_eq!( + output.pop().unwrap().into_metric(), + Metric::Counter { + name: "exception_total".into(), + val: 1, + sampling: None + } + ); + assert_eq!( + output.pop().unwrap().into_metric(), + Metric::Counter { + name: "status_total".into(), + val: 1, + sampling: None + } + ); + } } diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index 09b8d16fa68fa..19754c782a5a7 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -13,4 +13,10 @@ pub mod tokenizer; pub trait Transform: Send { fn transform(&mut self, event: Event) -> Option; + + fn transform_into(&mut self, output: &mut Vec, event: Event) { + if let Some(transformed) = self.transform(event) { + output.push(transformed); + } + } }