Skip to content

Commit a39ba04

Browse files
feat(redis source): Use OptionalValuePath for redis_key (#15615)
1 parent 67f8d35 commit a39ba04

File tree

1 file changed

+19
-18
lines changed

1 file changed

+19
-18
lines changed

src/sources/redis/mod.rs

+19-18
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,18 @@ use codecs::{
55
StreamDecodingError,
66
};
77
use futures::StreamExt;
8-
use lookup::{lookup_v2::BorrowedSegment, owned_value_path, path};
8+
use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
99
use snafu::{ResultExt, Snafu};
1010
use tokio_util::codec::FramedRead;
1111
use value::Kind;
1212
use vector_common::internal_event::{
1313
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
1414
};
1515
use vector_config::{configurable_component, NamedComponent};
16-
use vector_core::config::{LegacyKey, LogNamespace};
17-
use vector_core::EstimatedJsonEncodedSizeOf;
16+
use vector_core::{
17+
config::{LegacyKey, LogNamespace},
18+
EstimatedJsonEncodedSizeOf,
19+
};
1820

1921
use crate::{
2022
codecs::{Decoder, DecodingConfig},
@@ -116,7 +118,7 @@ pub struct RedisSourceConfig {
116118
/// The value will be the Redis key that the event was read from.
117119
///
118120
/// By default, this is not set and the field will not be automatically added.
119-
redis_key: Option<String>,
121+
redis_key: Option<OptionalValuePath>,
120122

121123
#[configurable(derived)]
122124
#[serde(default = "default_framing_message_based")]
@@ -158,6 +160,7 @@ impl SourceConfig for RedisSourceConfig {
158160
if self.key.is_empty() {
159161
return Err("`key` cannot be empty.".into());
160162
}
163+
let redis_key = self.redis_key.clone().and_then(|k| k.path);
161164

162165
let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
163166
let connection_info = ConnectionInfo::from(client.get_connection_info());
@@ -173,7 +176,7 @@ impl SourceConfig for RedisSourceConfig {
173176
bytes_received: bytes_received.clone(),
174177
events_received: events_received.clone(),
175178
key: self.key.clone(),
176-
redis_key: self.redis_key.clone(),
179+
redis_key,
177180
decoder,
178181
cx,
179182
log_namespace,
@@ -193,8 +196,8 @@ impl SourceConfig for RedisSourceConfig {
193196

194197
let redis_key_path = self
195198
.redis_key
196-
.as_ref()
197-
.map(|x| owned_value_path!(x))
199+
.clone()
200+
.and_then(|k| k.path)
198201
.map(LegacyKey::InsertIfEmpty);
199202

200203
let schema_definition = self
@@ -222,7 +225,7 @@ pub(self) struct InputHandler {
222225
pub bytes_received: Registered<BytesReceived>,
223226
pub events_received: Registered<EventsReceived>,
224227
pub key: String,
225-
pub redis_key: Option<String>,
228+
pub redis_key: Option<OwnedValuePath>,
226229
pub decoder: Decoder,
227230
pub log_namespace: LogNamespace,
228231
pub cx: SourceContext,
@@ -257,15 +260,10 @@ impl InputHandler {
257260
now,
258261
);
259262

260-
let redis_key_path = self
261-
.redis_key
262-
.as_deref()
263-
.map(|x| [BorrowedSegment::from(x)]);
264-
265263
self.log_namespace.insert_source_metadata(
266264
RedisSourceConfig::NAME,
267265
log,
268-
redis_key_path.as_ref().map(LegacyKey::InsertIfEmpty),
266+
self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
269267
path!("key"),
270268
self.key.as_str(),
271269
);
@@ -307,10 +305,13 @@ mod integration_test {
307305
use redis::AsyncCommands;
308306

309307
use super::*;
310-
use crate::config::log_schema;
311-
use crate::test_util::components::{run_and_assert_source_compliance_n, SOURCE_TAGS};
312308
use crate::{
313-
test_util::{collect_n, random_string},
309+
config::log_schema,
310+
test_util::{
311+
collect_n,
312+
components::{run_and_assert_source_compliance_n, SOURCE_TAGS},
313+
random_string,
314+
},
314315
SourceSender,
315316
};
316317

@@ -369,7 +370,7 @@ mod integration_test {
369370
}),
370371
url: REDIS_SERVER.to_owned(),
371372
key: key.clone(),
372-
redis_key: Some("remapped_key".into()),
373+
redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
373374
framing: default_framing_message_based(),
374375
decoding: default_decoding(),
375376
log_namespace: Some(true),

0 commit comments

Comments
 (0)