Skip to content

Commit 1dd7bb1

Browse files
authored
enhancement(elasticsearch sink): Allow empty data_stream fields (#18193)
enhancement(elasticsearch sink): Allow empty data_stream fileds Add support for ES datastream with empty namespace or dataset. closes: #17883 Signed-off-by: Artur Malchanau <[email protected]>
1 parent 91e48f6 commit 1dd7bb1

File tree

2 files changed

+169
-7
lines changed

2 files changed

+169
-7
lines changed

src/sinks/elasticsearch/config.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,14 @@ impl DataStreamConfig {
460460
.or_else(|| self.namespace(log))?;
461461
(dtype, dataset, namespace)
462462
};
463-
Some(format!("{}-{}-{}", dtype, dataset, namespace))
463+
464+
let name = [dtype, dataset, namespace]
465+
.into_iter()
466+
.filter(|s| !s.is_empty())
467+
.collect::<Vec<_>>()
468+
.join("-");
469+
470+
Some(name)
464471
}
465472
}
466473

src/sinks/elasticsearch/tests.rs

+161-6
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,25 @@ async fn sets_create_action_when_configured() {
6565
assert_eq!(encoded.len(), encoded_size);
6666
}
6767

68-
fn data_stream_body() -> BTreeMap<String, Value> {
68+
fn data_stream_body(
69+
dtype: Option<String>,
70+
dataset: Option<String>,
71+
namespace: Option<String>,
72+
) -> BTreeMap<String, Value> {
6973
let mut ds = BTreeMap::<String, Value>::new();
70-
ds.insert("type".into(), Value::from("synthetics"));
71-
ds.insert("dataset".into(), Value::from("testing"));
74+
75+
if let Some(dtype) = dtype {
76+
ds.insert("type".into(), Value::from(dtype));
77+
}
78+
79+
if let Some(dataset) = dataset {
80+
ds.insert("dataset".into(), Value::from(dataset));
81+
}
82+
83+
if let Some(namespace) = namespace {
84+
ds.insert("namespace".into(), Value::from(namespace));
85+
}
86+
7287
ds
7388
}
7489

@@ -100,7 +115,14 @@ async fn encode_datastream_mode() {
100115
.single()
101116
.expect("invalid timestamp"),
102117
);
103-
log.insert("data_stream", data_stream_body());
118+
log.insert(
119+
"data_stream",
120+
data_stream_body(
121+
Some("synthetics".to_string()),
122+
Some("testing".to_string()),
123+
None,
124+
),
125+
);
104126

105127
let mut encoded = vec![];
106128
let (encoded_size, _json_size) = es
@@ -143,7 +165,14 @@ async fn encode_datastream_mode_no_routing() {
143165
let es = ElasticsearchCommon::parse_single(&config).await.unwrap();
144166

145167
let mut log = LogEvent::from("hello there");
146-
log.insert("data_stream", data_stream_body());
168+
log.insert(
169+
"data_stream",
170+
data_stream_body(
171+
Some("synthetics".to_string()),
172+
Some("testing".to_string()),
173+
None,
174+
),
175+
);
147176
log.insert(
148177
(
149178
lookup::PathPrefix::Event,
@@ -287,7 +316,14 @@ async fn encode_datastream_mode_no_sync() {
287316
let es = ElasticsearchCommon::parse_single(&config).await.unwrap();
288317

289318
let mut log = LogEvent::from("hello there");
290-
log.insert("data_stream", data_stream_body());
319+
log.insert(
320+
"data_stream",
321+
data_stream_body(
322+
Some("synthetics".to_string()),
323+
Some("testing".to_string()),
324+
None,
325+
),
326+
);
291327
log.insert(
292328
(
293329
lookup::PathPrefix::Event,
@@ -389,3 +425,122 @@ async fn allows_using_only_fields() {
389425
assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected);
390426
assert_eq!(encoded.len(), encoded_size);
391427
}
428+
429+
#[tokio::test]
430+
async fn datastream_index_name() {
431+
#[derive(Clone, Debug)]
432+
struct TestCase {
433+
dtype: Option<String>,
434+
namespace: Option<String>,
435+
dataset: Option<String>,
436+
want: String,
437+
}
438+
439+
let config = ElasticsearchConfig {
440+
bulk: BulkConfig {
441+
index: parse_template("vector"),
442+
..Default::default()
443+
},
444+
endpoints: vec![String::from("https://example.com")],
445+
mode: ElasticsearchMode::DataStream,
446+
api_version: ElasticsearchApiVersion::V6,
447+
..Default::default()
448+
};
449+
let es = ElasticsearchCommon::parse_single(&config).await.unwrap();
450+
451+
let test_cases = [
452+
TestCase {
453+
dtype: Some("type".to_string()),
454+
dataset: Some("dataset".to_string()),
455+
namespace: Some("namespace".to_string()),
456+
want: "type-dataset-namespace".to_string(),
457+
},
458+
TestCase {
459+
dtype: Some("type".to_string()),
460+
dataset: Some("".to_string()),
461+
namespace: Some("namespace".to_string()),
462+
want: "type-namespace".to_string(),
463+
},
464+
TestCase {
465+
dtype: Some("type".to_string()),
466+
dataset: None,
467+
namespace: Some("namespace".to_string()),
468+
want: "type-generic-namespace".to_string(),
469+
},
470+
TestCase {
471+
dtype: Some("type".to_string()),
472+
dataset: Some("".to_string()),
473+
namespace: Some("".to_string()),
474+
want: "type".to_string(),
475+
},
476+
TestCase {
477+
dtype: Some("type".to_string()),
478+
dataset: None,
479+
namespace: None,
480+
want: "type-generic-default".to_string(),
481+
},
482+
TestCase {
483+
dtype: Some("".to_string()),
484+
dataset: Some("".to_string()),
485+
namespace: Some("".to_string()),
486+
want: "".to_string(),
487+
},
488+
TestCase {
489+
dtype: None,
490+
dataset: None,
491+
namespace: None,
492+
want: "logs-generic-default".to_string(),
493+
},
494+
TestCase {
495+
dtype: Some("".to_string()),
496+
dataset: Some("dataset".to_string()),
497+
namespace: Some("namespace".to_string()),
498+
want: "dataset-namespace".to_string(),
499+
},
500+
TestCase {
501+
dtype: None,
502+
dataset: Some("dataset".to_string()),
503+
namespace: Some("namespace".to_string()),
504+
want: "logs-dataset-namespace".to_string(),
505+
},
506+
TestCase {
507+
dtype: Some("".to_string()),
508+
dataset: Some("".to_string()),
509+
namespace: Some("namespace".to_string()),
510+
want: "namespace".to_string(),
511+
},
512+
TestCase {
513+
dtype: None,
514+
dataset: None,
515+
namespace: Some("namespace".to_string()),
516+
want: "logs-generic-namespace".to_string(),
517+
},
518+
TestCase {
519+
dtype: Some("".to_string()),
520+
dataset: Some("dataset".to_string()),
521+
namespace: Some("".to_string()),
522+
want: "dataset".to_string(),
523+
},
524+
TestCase {
525+
dtype: None,
526+
dataset: Some("dataset".to_string()),
527+
namespace: None,
528+
want: "logs-dataset-default".to_string(),
529+
},
530+
];
531+
532+
for test_case in test_cases {
533+
let mut log = LogEvent::from("hello there");
534+
log.insert(
535+
"data_stream",
536+
data_stream_body(
537+
test_case.dtype.clone(),
538+
test_case.dataset.clone(),
539+
test_case.namespace.clone(),
540+
),
541+
);
542+
543+
let processed_event = process_log(log, &es.mode, &None, &config.encoding).unwrap();
544+
assert_eq!(processed_event.index, test_case.want, "{test_case:?}");
545+
}
546+
}

0 commit comments

Comments
 (0)