Skip to content

Commit c21f892

Browse files
chore(flush on shutdown): validate s3 sink flushes (vectordotdev#17667)
<!-- **Your PR title must conform to the conventional commit spec!** <type>(<scope>)!: <description> * `type` = chore, enhancement, feat, fix, docs * `!` = OPTIONAL: signals a breaking change * `scope` = Optional when `type` is "chore" or "docs", available scopes https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20 * `description` = short description of the change Examples: * enhancement(file source): Add `sort` option to sort discovered files * feat(new source): Initial `statsd` source * fix(file source): Fix a bug discovering new files * chore(external docs): Clarify `batch_size` option --> Adding regression test for related issue: vectordotdev#11405
1 parent bebac21 commit c21f892

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

src/sinks/aws_s3/integration_tests.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,80 @@ async fn s3_healthchecks_invalid_bucket() {
406406
.is_err());
407407
}
408408

409+
#[tokio::test]
410+
async fn s3_flush_on_exhaustion() {
411+
let cx = SinkContext::new_test();
412+
413+
let bucket = uuid::Uuid::new_v4().to_string();
414+
create_bucket(&bucket, false).await;
415+
416+
// batch size of ten events, timeout of ten seconds
417+
let config = {
418+
let mut batch = BatchConfig::default();
419+
batch.max_events = Some(10);
420+
batch.timeout_secs = Some(10.0);
421+
422+
S3SinkConfig {
423+
bucket: bucket.to_string(),
424+
key_prefix: random_string(10) + "/date=%F",
425+
filename_time_format: default_filename_time_format(),
426+
filename_append_uuid: true,
427+
filename_extension: None,
428+
options: S3Options::default(),
429+
region: RegionOrEndpoint::with_both("minio", s3_address()),
430+
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
431+
compression: Compression::None,
432+
batch,
433+
request: TowerRequestConfig::default(),
434+
tls: Default::default(),
435+
auth: Default::default(),
436+
acknowledgements: Default::default(),
437+
}
438+
};
439+
let prefix = config.key_prefix.clone();
440+
let service = config.create_service(&cx.globals.proxy).await.unwrap();
441+
let sink = config.build_processor(service).unwrap();
442+
443+
let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size)
444+
445+
let events = lines.clone().into_iter().enumerate().map(|(i, line)| {
446+
let mut e = LogEvent::from(line);
447+
let i = if i < 10 {
448+
1
449+
} else if i < 20 {
450+
2
451+
} else {
452+
3
453+
};
454+
e.insert("i", i.to_string());
455+
Event::from(e)
456+
});
457+
458+
// Here, we validate that the s3 sink flushes when its source stream is exhausted
459+
// by giving it a number of inputs less than the batch size, verifying that the
460+
// outputs for the in-flight batch are flushed. By timing out in 3 seconds with a
461+
// flush period of ten seconds, we verify that the flush is triggered *at stream
462+
// completion* and not because of periodic flushing.
463+
assert!(tokio::time::timeout(
464+
Duration::from_secs(3),
465+
run_and_assert_sink_compliance(sink, stream::iter(events), &AWS_SINK_TAGS)
466+
)
467+
.await
468+
.is_ok());
469+
470+
let keys = get_keys(&bucket, prefix).await;
471+
assert_eq!(keys.len(), 1);
472+
473+
let mut response_lines: Vec<String> = Vec::new();
474+
let mut key_stream = stream::iter(keys);
475+
while let Some(key) = key_stream.next().await {
476+
let obj = get_object(&bucket, key).await;
477+
response_lines.append(&mut get_lines(obj).await);
478+
}
479+
480+
assert_eq!(lines, response_lines); // if all events are received, and lines.len() < batch size, then a flush was performed.
481+
}
482+
409483
async fn client() -> S3Client {
410484
let auth = AwsAuthentication::test_auth();
411485
let region = RegionOrEndpoint::with_both("minio", s3_address());

0 commit comments

Comments
 (0)