Skip to content

Commit 9ee3c6c

Browse files
committed
checkpoint
Signed-off-by: Jean Mertz <[email protected]>
1 parent f384eb1 commit 9ee3c6c

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

src/sinks/console/config.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ impl GenerateConfig for ConsoleSinkConfig {
6060

6161
#[async_trait::async_trait]
6262
impl SinkConfig for ConsoleSinkConfig {
63-
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
63+
async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
64+
let sources_details = ctx.sources_details.clone();
6465
let transformer = self.encoding.transformer();
6566
let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?;
6667
let encoder = Encoder::<Framer>::new(framer, serializer);
@@ -70,11 +71,13 @@ impl SinkConfig for ConsoleSinkConfig {
7071
output: io::stdout(),
7172
transformer,
7273
encoder,
74+
sources_details,
7375
}),
7476
Target::Stderr => VectorSink::from_event_streamsink(WriterSink {
7577
output: io::stderr(),
7678
transformer,
7779
encoder,
80+
sources_details,
7881
}),
7982
};
8083

src/sinks/console/sink.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use codecs::encoding::Framer;
44
use futures::{stream::BoxStream, StreamExt};
55
use tokio::{io, io::AsyncWriteExt};
66
use tokio_util::codec::Encoder as _;
7+
use vector_common::config::SourceDetails;
78
use vector_core::{
89
internal_event::{ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol},
910
EstimatedJsonEncodedSizeOf,
@@ -19,6 +20,7 @@ pub struct WriterSink<T> {
1920
pub output: T,
2021
pub transformer: Transformer,
2122
pub encoder: Encoder<Framer>,
23+
pub sources_details: Vec<SourceDetails>,
2224
}
2325

2426
#[async_trait]
@@ -30,6 +32,11 @@ where
3032
let bytes_sent = register!(BytesSent::from(Protocol("console".into(),)));
3133
while let Some(mut event) = input.next().await {
3234
let event_byte_size = event.estimated_json_encoded_size_of();
35+
let source = event
36+
.metadata()
37+
.source_id()
38+
.and_then(|id| self.sources_details.get(id).map(|details| details.key.id()));
39+
3340
self.transformer.transform(&mut event);
3441

3542
let finalizers = event.take_finalizers();
@@ -54,7 +61,7 @@ where
5461
byte_size: event_byte_size,
5562
count: 1,
5663
output: None,
57-
source: None,
64+
source,
5865
});
5966
bytes_sent.emit(ByteSize(bytes.len()));
6067
}

0 commit comments

Comments
 (0)