File tree 5 files changed +52
-5
lines changed
lib/vector-core/src/transform
5 files changed +52
-5
lines changed Original file line number Diff line number Diff line change
1
+ ## [ 1.32.1] ( https://github.com/answerbook/vector/compare/v1.32.0...v1.32.1 ) (2023-12-22)
2
+
3
+
4
+ ### Bug Fixes
5
+
6
+ * Use buffer ref to account for event size in transforms [ 9842893] ( https://github.com/answerbook/vector/commit/9842893dd0db567c2604a64c181dee3833fce55b ) - Jorge Bay [ LOG-18897] ( https://logdna.atlassian.net/browse/LOG-18897 )
7
+
8
+ # [ 1.32.0] ( https://github.com/answerbook/vector/compare/v1.31.0...v1.32.0 ) (2023-12-21)
9
+
10
+
11
+ ### Features
12
+
13
+ * ** s3-sink** : file consolidation off default [ fb46e73] ( https://github.com/answerbook/vector/commit/fb46e7359b442466069e1fc89d24254821b2a869 ) - dominic-mcallister-logdna [ LOG-18535] ( https://logdna.atlassian.net/browse/LOG-18535 )
14
+
15
+
16
+ ### Miscellaneous
17
+
18
+ * Merge pull request #378 from answerbook/dominic/LOG-18535-defaultoff [ af67c9e] ( https://github.com/answerbook/vector/commit/af67c9e1af1bc4bb6d5add3ea88888709f500f38 ) - GitHub [ LOG-18535] ( https://logdna.atlassian.net/browse/LOG-18535 )
19
+
1
20
# [ 1.31.0] ( https://github.com/answerbook/vector/compare/v1.30.0...v1.31.0 ) (2023-12-20)
2
21
3
22
Original file line number Diff line number Diff line change @@ -272,14 +272,16 @@ impl TransformOutputs {
272
272
usage_tracker : & dyn OutputUsageTracker ,
273
273
) -> Result < ( ) , Box < dyn error:: Error + Send + Sync > > {
274
274
if let Some ( primary) = self . primary_output . as_mut ( ) {
275
- let send_buf = buf. primary_buffer . as_mut ( ) . expect ( "mismatched outputs" ) ;
276
- Self :: send_single_buffer ( send_buf, primary) . await ?;
277
-
275
+ // Use the reference of the buffer FIRST to get the original value
276
+ // to calculate counts/sizes
278
277
let usage_profile = buf. primary_buffer . as_ref ( ) . map_or ( Default :: default ( ) , |o| {
279
278
o. 0 . iter ( )
280
279
. map ( |a| usage_tracker. get_size_and_profile ( a) )
281
280
. sum ( )
282
281
} ) ;
282
+
283
+ let send_buf = buf. primary_buffer . as_mut ( ) . expect ( "mismatched outputs" ) ;
284
+ Self :: send_single_buffer ( send_buf, primary) . await ?;
283
285
// We only want to track the primary transform output.
284
286
// Named outputs are for stuff like route/swimlanes that we don't want to track atm.
285
287
// We only want to capture the traffic of the remap transform after the node representing
Original file line number Diff line number Diff line change 1
1
{
2
2
"name" : " vector" ,
3
- "version" : " 1.31.0 " ,
3
+ "version" : " 1.32.1 " ,
4
4
"description" : " Vector is a high-performance, end-to-end (agent & aggregator) observability data pipeline" ,
5
5
"repository" : {
6
6
"type" : " git" ,
Original file line number Diff line number Diff line change @@ -88,6 +88,11 @@ impl FileConsolidatorAsync {
88
88
}
89
89
90
90
pub fn start ( & mut self ) -> bool {
91
+ // default situation so the config isn't enabled
92
+ if !self . file_consolidation_config . enabled {
93
+ return false ;
94
+ }
95
+
91
96
if self . join_handle . is_some ( ) {
92
97
info ! (
93
98
message =
@@ -192,6 +197,11 @@ impl FileConsolidatorAsync {
192
197
}
193
198
194
199
pub fn stop ( & mut self ) -> bool {
200
+ // default situation so the config isn't enabled
201
+ if !self . file_consolidation_config . enabled {
202
+ return false ;
203
+ }
204
+
195
205
info ! (
196
206
message = "Triggering shutdown for S3 file consolidation" ,
197
207
bucket = self . bucket,
Original file line number Diff line number Diff line change @@ -144,7 +144,7 @@ async fn s3_message_objects_not_reshaped_because_of_env() {
144
144
}
145
145
146
146
#[ tokio:: test]
147
- async fn s3_file_consolidator_run ( ) {
147
+ async fn s3_file_consolidator_enabled_run ( ) {
148
148
let _cx = SinkContext :: new_test ( ) ;
149
149
let bucket = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
150
150
@@ -179,6 +179,22 @@ async fn s3_file_consolidator_run() {
179
179
assert_eq ! ( stopped, true , "stopped true" ) ;
180
180
}
181
181
182
+ #[ tokio:: test]
183
+ async fn s3_file_consolidator_disabled_run ( ) {
184
+ let _cx = SinkContext :: new_test ( ) ;
185
+
186
+ // testing the default scenario where the consolidator is disabled
187
+ let mut fc: FileConsolidatorAsync = Default :: default ( ) ;
188
+
189
+ let started = fc. start ( ) ;
190
+ assert ! ( !started, "started false" ) ;
191
+
192
+ thread:: sleep ( time:: Duration :: from_millis ( 1000 ) ) ;
193
+
194
+ let stopped = fc. stop ( ) ;
195
+ assert ! ( !stopped, "stopped false" ) ;
196
+ }
197
+
182
198
#[ tokio:: test]
183
199
async fn s3_file_consolidation_process_no_files ( ) {
184
200
let _cx = SinkContext :: new_test ( ) ;
You can’t perform that action at this time.
0 commit comments