Skip to content

Commit 013ca8e

Browse files
committed
adjust transform trait for multiple output events
1 parent a703de8 commit 013ca8e

File tree

2 files changed

+14
-5
lines changed

2 files changed

+14
-5
lines changed

src/topology/builder.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use super::fanout::{self, Fanout};
22
use crate::buffers;
3-
use futures::prelude::*;
4-
use futures::{sync::mpsc, Future};
5-
use std::collections::HashMap;
6-
use std::time::Duration;
3+
use futures::{sync::mpsc, Future, Stream};
4+
use std::{collections::HashMap, time::Duration};
75
use stream_cancel::{Trigger, Tripwire};
86
use tokio::util::FutureExt;
97
use tokio_trace_futures::Instrument;
@@ -74,7 +72,12 @@ pub fn build_pieces(config: &super::Config) -> Result<(Pieces, Vec<String>), Vec
7472
let (output, control) = Fanout::new();
7573

7674
let task = input_rx
77-
.filter_map(move |event| transform.transform(event))
75+
.map(move |event| {
76+
let mut output = Vec::with_capacity(1);
77+
transform.transform_into(&mut output, event);
78+
futures::stream::iter_ok(output.into_iter())
79+
})
80+
.flatten()
7881
.forward(output)
7982
.map(|_| ());
8083
let task: Task = Box::new(task);

src/transforms/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,10 @@ pub mod tokenizer;
1313

1414
pub trait Transform: Send {
1515
fn transform(&mut self, event: Event) -> Option<Event>;
16+
17+
fn transform_into(&mut self, output: &mut Vec<Event>, event: Event) {
18+
if let Some(transformed) = self.transform(event) {
19+
output.push(transformed);
20+
}
21+
}
1622
}

0 commit comments

Comments
 (0)