Skip to content

Commit df87df7

Browse files
authored
Merge pull request vectordotdev#374 from answerbook/dominic/LOG-18535-temp-revert
Revert "Merge pull request vectordotdev#370 from answerbook/dominic/LOG-18535"
2 parents 61e3f80 + 73a8b28 commit df87df7

10 files changed

+8
-1742
lines changed

Cargo.lock

-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

-3
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,6 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix"
335335
# make sure to update the external docs when the Lua version changes
336336
mlua = { version = "0.8.9", default-features = false, features = ["lua54", "send", "vendored"], optional = true }
337337

338-
# MEZMO: added dependency for s3-sink file consolidation
339-
gethostname = "0.4.3"
340-
341338
[target.'cfg(windows)'.dependencies]
342339
windows-service = "0.6.0"
343340

src/sinks/aws_s3/config.rs

+1-60
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,6 @@ use crate::{
3333
tls::TlsConfig,
3434
};
3535

36-
// MEZMO: added dependencies for s3-sink file consolidation
37-
use crate::sinks::aws_s3::file_consolidator_async::{
38-
FileConsolidationConfig, FileConsolidatorAsync,
39-
};
40-
use gethostname::gethostname;
41-
4236
/// Configuration for the `aws_s3` sink.
4337
#[configurable_component(sink("aws_s3"))]
4438
#[derive(Clone, Debug)]
@@ -139,11 +133,6 @@ pub struct S3SinkConfig {
139133
skip_serializing_if = "crate::serde::skip_serializing_if_default"
140134
)]
141135
pub acknowledgements: AcknowledgementsConfig,
142-
143-
// MEZMO: added configuration for s3-sink file consolidation
144-
#[configurable(derived)]
145-
#[serde(default)]
146-
pub file_consolidation_config: FileConsolidationConfig,
147136
}
148137

149138
pub(super) fn default_key_prefix() -> String {
@@ -171,7 +160,6 @@ impl GenerateConfig for S3SinkConfig {
171160
tls: Some(TlsConfig::default()),
172161
auth: AwsAuthentication::default(),
173162
acknowledgements: Default::default(),
174-
file_consolidation_config: Default::default(),
175163
})
176164
.unwrap()
177165
}
@@ -240,15 +228,7 @@ impl S3SinkConfig {
240228
compression: self.compression,
241229
};
242230

243-
// MEZMO: added new file consolidation process for S3 sinks
244-
let consolidation_process = self.build_consolidation_process(cx.proxy);
245-
let sink = S3Sink::new(
246-
service,
247-
request_options,
248-
partitioner,
249-
batch_settings,
250-
consolidation_process,
251-
);
231+
let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
252232

253233
Ok(VectorSink::from_event_streamsink(sink))
254234
}
@@ -264,45 +244,6 @@ impl S3SinkConfig {
264244
pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
265245
s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await
266246
}
267-
268-
// MEZMO: added process to define setup for s3-sink file consolidation
269-
fn build_consolidation_process(&self, proxy: ProxyConfig) -> Option<FileConsolidatorAsync> {
270-
// we can perform consolidation assuming that the process itself is requested via the configuration
271-
// we only want to handle this process on the primary instance of the statefulset
272-
// so we don't have to worry about contention between instances of sinks
273-
let host_name = gethostname().into_string().unwrap();
274-
if !host_name.ends_with("-0") || !self.file_consolidation_config.enabled {
275-
info!(
276-
message = "S3 sink file consolidation process disabled",
277-
host_name,
278-
config.enabled = self.file_consolidation_config.enabled,
279-
);
280-
return None;
281-
} else {
282-
info!(
283-
message = "S3 sink file consolidation enabled",
284-
host_name,
285-
config.enabled = self.file_consolidation_config.enabled,
286-
);
287-
}
288-
289-
// build the S3 client and config so we can return a new FileConsolidator
290-
let region_or_endpoint = &self.region;
291-
let endpoint = region_or_endpoint.endpoint().unwrap_or_default();
292-
let region = region_or_endpoint.region();
293-
294-
let consolidator = FileConsolidatorAsync::new(
295-
self.auth.clone(),
296-
region.clone(),
297-
endpoint.clone(),
298-
proxy.clone(),
299-
self.tls.clone(),
300-
self.file_consolidation_config,
301-
self.bucket.clone(),
302-
self.key_prefix.clone(),
303-
);
304-
Some(consolidator)
305-
}
306247
}
307248

308249
#[cfg(test)]

0 commit comments

Comments
 (0)