Skip to content

Commit cc94ea5

Browse files
committed
feat(redis sink): support input based on encoding type
1 parent 96bc594 commit cc94ea5

File tree

4 files changed

+131
-5
lines changed

4 files changed

+131
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The redis sink now supports any input event type that the configured encoding supports. It previously only supported log events.
2+
3+
authors: ynachi

src/sinks/redis/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl SinkConfig for RedisSinkConfig {
146146
}
147147

148148
fn input(&self) -> Input {
149-
Input::new(self.encoding.config().input_type() & DataType::Log)
149+
Input::new(self.encoding.config().input_type())
150150
}
151151

152152
fn acknowledgements(&self) -> &AcknowledgementsConfig {

src/sinks/redis/integration_tests.rs

+97
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use vector_lib::{
77
event::LogEvent,
88
};
99

10+
use crate::event::{Event, Metric, MetricKind, MetricValue};
11+
1012
use super::config::{DataTypeConfig, ListOption, Method, RedisSinkConfig};
1113
use crate::{
1214
sinks::prelude::*,
@@ -290,3 +292,98 @@ async fn redis_sink_channel_data_volume_tags() {
290292
}
291293
}
292294
}
295+
296+
#[tokio::test]
297+
async fn redis_sink_metrics() {
298+
trace_init();
299+
300+
let key = Template::try_from(format!("test-metrics-{}", random_string(10)))
301+
.expect("should not fail to create key template");
302+
debug!("Test key name: {}.", key);
303+
let num_events = 1000;
304+
debug!("Test events num: {}.", num_events);
305+
306+
let cnf = RedisSinkConfig {
307+
endpoint: redis_server(),
308+
key: key.clone(),
309+
encoding: JsonSerializerConfig::default().into(),
310+
data_type: DataTypeConfig::List,
311+
list_option: Some(ListOption {
312+
method: Method::RPush,
313+
}),
314+
batch: BatchConfig::default(),
315+
request: TowerRequestConfig {
316+
rate_limit_num: u64::MAX,
317+
..Default::default()
318+
},
319+
acknowledgements: Default::default(),
320+
};
321+
322+
// Create a mix of counter and gauge metrics
323+
let mut events: Vec<Event> = Vec::new();
324+
for i in 0..num_events {
325+
let metric = if i % 2 == 0 {
326+
// Counter metrics
327+
Metric::new(
328+
format!("counter_{}", i),
329+
MetricKind::Absolute,
330+
MetricValue::Counter { value: i as f64 },
331+
)
332+
} else {
333+
// Gauge metrics
334+
Metric::new(
335+
format!("gauge_{}", i),
336+
MetricKind::Absolute,
337+
MetricValue::Gauge { value: i as f64 },
338+
)
339+
};
340+
events.push(metric.into());
341+
}
342+
let input = stream::iter(events.clone().into_iter().map(Into::into));
343+
344+
// Publish events
345+
let cnf2 = cnf.clone();
346+
assert_sink_compliance(&SINK_TAGS, async move {
347+
let cx = SinkContext::default();
348+
let (sink, _healthcheck) = cnf2.build(cx).await.unwrap();
349+
sink.run(input).await
350+
})
351+
.await
352+
.expect("Running sink failed");
353+
354+
// Verify metrics were stored correctly
355+
let mut conn = cnf.build_client().await.unwrap();
356+
357+
let key_exists: bool = conn.exists(key.to_string()).await.unwrap();
358+
debug!("Test key: {} exists: {}.", key, key_exists);
359+
assert!(key_exists);
360+
361+
let llen: usize = conn.llen(key.clone().to_string()).await.unwrap();
362+
debug!("Test key: {} len: {}.", key, llen);
363+
assert_eq!(llen, num_events);
364+
365+
// Verify the content of each metric
366+
for i in 0..num_events {
367+
let original_event = events.get(i).unwrap().as_metric();
368+
let payload: (String, String) = conn.blpop(key.clone().to_string(), 2000.0).await.unwrap();
369+
let val = payload.1;
370+
371+
// Parse the JSON and verify key metric properties
372+
let json: serde_json::Value = serde_json::from_str(&val).unwrap();
373+
374+
if i % 2 == 0 {
375+
// Counter metrics
376+
assert_eq!(json["name"], format!("counter_{}", i));
377+
assert_eq!(json["kind"], "absolute");
378+
assert_eq!(json["counter"]["value"], i as f64);
379+
} else {
380+
// Gauge metrics
381+
assert_eq!(json["name"], format!("gauge_{}", i));
382+
assert_eq!(json["kind"], "absolute");
383+
assert_eq!(json["gauge"]["value"], i as f64);
384+
}
385+
386+
// Verify that the name matches what we expect
387+
assert_eq!(json["name"].as_str().unwrap(), original_event.name());
388+
}
389+
}

src/sinks/redis/tests.rs

+30-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::HashMap;
22

33
use vector_lib::codecs::{JsonSerializerConfig, TextSerializerConfig};
4-
use vector_lib::event::LogEvent;
4+
use vector_lib::event::{LogEvent, Metric, MetricKind, MetricValue};
55
use vector_lib::request_metadata::GroupedCountByteSize;
66

77
use super::{config::RedisSinkConfig, request_builder::encode_event};
@@ -16,7 +16,7 @@ fn generate_config() {
1616
}
1717

1818
#[test]
19-
fn redis_event_json() {
19+
fn redis_log_event_json() {
2020
let msg = "hello_world".to_owned();
2121
let mut byte_size = GroupedCountByteSize::new_untagged();
2222
let mut evt = LogEvent::from(msg.clone());
@@ -35,7 +35,7 @@ fn redis_event_json() {
3535
}
3636

3737
#[test]
38-
fn redis_event_text() {
38+
fn redis_log_event_text() {
3939
let msg = "hello_world".to_owned();
4040
let evt = LogEvent::from(msg.clone());
4141
let mut byte_size = GroupedCountByteSize::new_untagged();
@@ -52,7 +52,7 @@ fn redis_event_text() {
5252
}
5353

5454
#[test]
55-
fn redis_encode_event() {
55+
fn redis_log_encode_event() {
5656
let msg = "hello_world";
5757
let mut evt = LogEvent::from(msg);
5858
let mut byte_size = GroupedCountByteSize::new_untagged();
@@ -71,3 +71,29 @@ fn redis_encode_event() {
7171
let map: HashMap<String, String> = serde_json::from_slice(&result[..]).unwrap();
7272
assert!(!map.contains_key("key"));
7373
}
74+
75+
#[test]
76+
fn redis_metric_encode_event() {
77+
let mut byte_size = GroupedCountByteSize::new_untagged();
78+
let metric = Metric::new(
79+
"test_counter",
80+
MetricKind::Absolute,
81+
MetricValue::Counter { value: 42.0 },
82+
);
83+
84+
let result = encode_event(
85+
metric.into(),
86+
"metrics.counter".to_string(),
87+
&Default::default(),
88+
&mut Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
89+
&mut byte_size,
90+
)
91+
.unwrap()
92+
.value;
93+
94+
let json: serde_json::Value = serde_json::from_slice(&result).unwrap();
95+
96+
assert_eq!(json["name"], "test_counter");
97+
assert_eq!(json["kind"], "absolute");
98+
assert_eq!(json["counter"]["value"], 42.0);
99+
}

0 commit comments

Comments
 (0)