1
+ use std:: collections:: HashMap ;
1
2
use vector_lib:: config:: LegacyKey ;
2
3
use vrl:: event_path;
3
4
4
5
use crate :: {
5
6
conditions:: Condition ,
6
7
event:: Event ,
7
8
internal_events:: SampleEventDiscarded ,
9
+ sinks:: prelude:: TemplateRenderingError ,
10
+ template:: Template ,
8
11
transforms:: { FunctionTransform , OutputBuffer } ,
9
12
} ;
10
13
@@ -13,26 +16,29 @@ pub struct Sample {
13
16
name : String ,
14
17
rate : u64 ,
15
18
key_field : Option < String > ,
19
+ group_by : Option < Template > ,
16
20
exclude : Option < Condition > ,
17
- count : u64 ,
21
+ counter : HashMap < Option < String > , u64 > ,
18
22
}
19
23
20
24
impl Sample {
21
25
// This function is dead code when the feature flag `transforms-impl-sample` is specified but not
22
26
// `transforms-sample`.
23
27
#![ allow( dead_code) ]
24
- pub const fn new (
28
+ pub fn new (
25
29
name : String ,
26
30
rate : u64 ,
27
31
key_field : Option < String > ,
32
+ group_by : Option < Template > ,
28
33
exclude : Option < Condition > ,
29
34
) -> Self {
30
35
Self {
31
36
name,
32
37
rate,
33
38
key_field,
39
+ group_by,
34
40
exclude,
35
- count : 0 ,
41
+ counter : HashMap :: new ( ) ,
36
42
}
37
43
}
38
44
}
@@ -69,13 +75,42 @@ impl FunctionTransform for Sample {
69
75
} )
70
76
. map ( |v| v. to_string_lossy ( ) ) ;
71
77
78
+ // Fetch actual field value if group_by option is set.
79
+ let group_by_key = self . group_by . as_ref ( ) . and_then ( |group_by| match & event {
80
+ Event :: Log ( event) => group_by
81
+ . render_string ( event)
82
+ . map_err ( |error| {
83
+ emit ! ( TemplateRenderingError {
84
+ error,
85
+ field: Some ( "group_by" ) ,
86
+ drop_event: false ,
87
+ } )
88
+ } )
89
+ . ok ( ) ,
90
+ Event :: Trace ( event) => group_by
91
+ . render_string ( event)
92
+ . map_err ( |error| {
93
+ emit ! ( TemplateRenderingError {
94
+ error,
95
+ field: Some ( "group_by" ) ,
96
+ drop_event: false ,
97
+ } )
98
+ } )
99
+ . ok ( ) ,
100
+ Event :: Metric ( _) => panic ! ( "component can never receive metric events" ) ,
101
+ } ) ;
102
+
103
+ let counter_value: u64 = * self . counter . entry ( group_by_key. clone ( ) ) . or_default ( ) ;
104
+
72
105
let num = if let Some ( value) = value {
73
106
seahash:: hash ( value. as_bytes ( ) )
74
107
} else {
75
- self . count
108
+ counter_value
76
109
} ;
77
110
78
- self . count = ( self . count + 1 ) % self . rate ;
111
+ // reset counter for particular key, or default key if group_by option isn't provided
112
+ let increment: u64 = ( counter_value + 1 ) % self . rate ;
113
+ self . counter . insert ( group_by_key. clone ( ) , increment) ;
79
114
80
115
if num % self . rate == 0 {
81
116
match event {
@@ -134,6 +169,7 @@ mod tests {
134
169
"sample" . to_string ( ) ,
135
170
2 ,
136
171
log_schema ( ) . message_key ( ) . map ( ToString :: to_string) ,
172
+ None ,
137
173
Some ( condition_contains (
138
174
log_schema ( ) . message_key ( ) . unwrap ( ) . to_string ( ) . as_str ( ) ,
139
175
"na" ,
@@ -156,6 +192,7 @@ mod tests {
156
192
"sample" . to_string ( ) ,
157
193
25 ,
158
194
log_schema ( ) . message_key ( ) . map ( ToString :: to_string) ,
195
+ None ,
159
196
Some ( condition_contains (
160
197
log_schema ( ) . message_key ( ) . unwrap ( ) . to_string ( ) . as_str ( ) ,
161
198
"na" ,
@@ -181,6 +218,7 @@ mod tests {
181
218
"sample" . to_string ( ) ,
182
219
2 ,
183
220
log_schema ( ) . message_key ( ) . map ( ToString :: to_string) ,
221
+ None ,
184
222
Some ( condition_contains (
185
223
log_schema ( ) . message_key ( ) . unwrap ( ) . to_string ( ) . as_str ( ) ,
186
224
"na" ,
@@ -216,6 +254,7 @@ mod tests {
216
254
"sample" . to_string ( ) ,
217
255
0 ,
218
256
key_field. clone ( ) ,
257
+ None ,
219
258
Some ( condition_contains (
220
259
log_schema ( ) . message_key ( ) . unwrap ( ) . to_string ( ) . as_str ( ) ,
221
260
"important" ,
@@ -232,6 +271,33 @@ mod tests {
232
271
}
233
272
}
234
273
274
+ #[ test]
275
+ fn handles_group_by ( ) {
276
+ for group_by in & [ None , Some ( Template :: try_from ( "{{ other_field }}" ) . unwrap ( ) ) ] {
277
+ let mut event = Event :: Log ( LogEvent :: from ( "nananana" ) ) ;
278
+ let log = event. as_mut_log ( ) ;
279
+ log. insert ( "other_field" , "foo" ) ;
280
+ let mut sampler = Sample :: new (
281
+ "sample" . to_string ( ) ,
282
+ 0 ,
283
+ log_schema ( ) . message_key ( ) . map ( ToString :: to_string) ,
284
+ group_by. clone ( ) ,
285
+ Some ( condition_contains (
286
+ log_schema ( ) . message_key ( ) . unwrap ( ) . to_string ( ) . as_str ( ) ,
287
+ "na" ,
288
+ ) ) ,
289
+ ) ;
290
+ let iterations = 0 ..1000 ;
291
+ let total_passed = iterations
292
+ . filter_map ( |_| {
293
+ transform_one ( & mut sampler, event. clone ( ) )
294
+ . map ( |result| assert_eq ! ( result, event) )
295
+ } )
296
+ . count ( ) ;
297
+ assert_eq ! ( total_passed, 1000 ) ;
298
+ }
299
+ }
300
+
235
301
#[ test]
236
302
fn handles_key_field ( ) {
237
303
for key_field in & [ None , Some ( "other_field" . into ( ) ) ] {
@@ -242,6 +308,7 @@ mod tests {
242
308
"sample" . to_string ( ) ,
243
309
0 ,
244
310
key_field. clone ( ) ,
311
+ None ,
245
312
Some ( condition_contains ( "other_field" , "foo" ) ) ,
246
313
) ;
247
314
let iterations = 0 ..1000 ;
@@ -264,6 +331,7 @@ mod tests {
264
331
"sample" . to_string ( ) ,
265
332
10 ,
266
333
key_field. clone ( ) ,
334
+ None ,
267
335
Some ( condition_contains ( & message_key, "na" ) ) ,
268
336
) ;
269
337
let passing = events
@@ -278,6 +346,7 @@ mod tests {
278
346
"sample" . to_string ( ) ,
279
347
25 ,
280
348
key_field. clone ( ) ,
349
+ None ,
281
350
Some ( condition_contains ( & message_key, "na" ) ) ,
282
351
) ;
283
352
let passing = events
@@ -292,6 +361,7 @@ mod tests {
292
361
"sample" . to_string ( ) ,
293
362
25 ,
294
363
key_field. clone ( ) ,
364
+ None ,
295
365
Some ( condition_contains ( & message_key, "na" ) ) ,
296
366
) ;
297
367
let event = Event :: Log ( LogEvent :: from ( "nananana" ) ) ;
@@ -304,7 +374,7 @@ mod tests {
304
374
fn handles_trace_event ( ) {
305
375
let event: TraceEvent = LogEvent :: from ( "trace" ) . into ( ) ;
306
376
let trace = Event :: Trace ( event) ;
307
- let mut sampler = Sample :: new ( "sample" . to_string ( ) , 2 , None , None ) ;
377
+ let mut sampler = Sample :: new ( "sample" . to_string ( ) , 2 , None , None , None ) ;
308
378
let iterations = 0 ..2 ;
309
379
let total_passed = iterations
310
380
. filter_map ( |_| transform_one ( & mut sampler, trace. clone ( ) ) )
0 commit comments