Skip to content

Commit 3b2a2be

Browse files
chore(observability): ensure sent_event and received_event metrics are estimated json size (vectordotdev#17465)
This PR creates a newtype - [`JsonSize`](https://github.com/vectordotdev/vector/blob/stephen/event_json_size/lib/vector-common/src/json_size.rs) that is returned by the `EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of` trait function. The events that emit a `component_received_event_bytes_total` or `component_sent_event_bytes_total` event accept `JsonSize`. This allows us to use the compiler to ensure we are emitting the correct measurement. A number of components needed changing to ensure this worked. --------- Signed-off-by: Stephen Wakely <[email protected]>
1 parent dbd7151 commit 3b2a2be

File tree

87 files changed

+807
-449
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+807
-449
lines changed

lib/vector-common/src/internal_event/events_received.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ crate::registered_event!(
1818
#[allow(clippy::cast_precision_loss)]
1919
self.events_count.record(count as f64);
2020
self.events.increment(count as u64);
21-
self.event_bytes.increment(byte_size as u64);
21+
self.event_bytes.increment(byte_size.get() as u64);
2222
}
2323
);

lib/vector-common/src/internal_event/events_sent.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ crate::registered_event!(
2727

2828
match &self.output {
2929
Some(output) => {
30-
trace!(message = "Events sent.", count = %count, byte_size = %byte_size, output = %output);
30+
trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
3131
}
3232
None => {
33-
trace!(message = "Events sent.", count = %count, byte_size = %byte_size);
33+
trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
3434
}
3535
}
3636

3737
self.events.increment(count as u64);
38-
self.event_bytes.increment(byte_size as u64);
38+
self.event_bytes.increment(byte_size.get() as u64);
3939
}
4040
);
4141

lib/vector-common/src/internal_event/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
1616
pub use prelude::{error_stage, error_type};
1717
pub use service::{CallError, PollReadyError};
1818

19+
use crate::json_size::JsonSize;
20+
1921
pub trait InternalEvent: Sized {
2022
fn emit(self);
2123

@@ -106,9 +108,9 @@ pub struct ByteSize(pub usize);
106108
#[derive(Clone, Copy)]
107109
pub struct Count(pub usize);
108110

109-
/// Holds the tuple `(count_of_events, size_of_events_in_bytes)`.
111+
/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`.
110112
#[derive(Clone, Copy)]
111-
pub struct CountByteSize(pub usize, pub usize);
113+
pub struct CountByteSize(pub usize, pub JsonSize);
112114

113115
// Wrapper types used to hold parameters for registering events
114116

lib/vector-common/src/json_size.rs

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::{
2+
fmt,
3+
iter::Sum,
4+
ops::{Add, AddAssign, Sub},
5+
};
6+
7+
/// A newtype for the JSON size of an event.
8+
/// Used to emit the `component_received_event_bytes_total` and
9+
/// `component_sent_event_bytes_total` metrics.
10+
#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
11+
pub struct JsonSize(usize);
12+
13+
impl fmt::Display for JsonSize {
14+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15+
write!(f, "{}", self.0)
16+
}
17+
}
18+
19+
impl Sub for JsonSize {
20+
type Output = JsonSize;
21+
22+
#[inline]
23+
fn sub(mut self, rhs: Self) -> Self::Output {
24+
self.0 -= rhs.0;
25+
self
26+
}
27+
}
28+
29+
impl Add for JsonSize {
30+
type Output = JsonSize;
31+
32+
#[inline]
33+
fn add(mut self, rhs: Self) -> Self::Output {
34+
self.0 += rhs.0;
35+
self
36+
}
37+
}
38+
39+
impl AddAssign for JsonSize {
40+
#[inline]
41+
fn add_assign(&mut self, rhs: Self) {
42+
self.0 += rhs.0;
43+
}
44+
}
45+
46+
impl Sum for JsonSize {
47+
#[inline]
48+
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
49+
let mut accum = 0;
50+
for val in iter {
51+
accum += val.get();
52+
}
53+
54+
JsonSize::new(accum)
55+
}
56+
}
57+
58+
impl From<usize> for JsonSize {
59+
#[inline]
60+
fn from(value: usize) -> Self {
61+
Self(value)
62+
}
63+
}
64+
65+
impl JsonSize {
66+
/// Create a new instance with the specified size.
67+
#[must_use]
68+
#[inline]
69+
pub const fn new(size: usize) -> Self {
70+
Self(size)
71+
}
72+
73+
/// Create a new instance with size 0.
74+
#[must_use]
75+
#[inline]
76+
pub const fn zero() -> Self {
77+
Self(0)
78+
}
79+
80+
/// Returns the contained size.
81+
#[must_use]
82+
#[inline]
83+
pub fn get(&self) -> usize {
84+
self.0
85+
}
86+
}
87+
88+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
89+
#[allow(clippy::module_name_repetitions)]
90+
pub struct NonZeroJsonSize(JsonSize);
91+
92+
impl NonZeroJsonSize {
93+
#[must_use]
94+
#[inline]
95+
pub fn new(size: JsonSize) -> Option<Self> {
96+
(size.0 > 0).then_some(NonZeroJsonSize(size))
97+
}
98+
}
99+
100+
impl From<NonZeroJsonSize> for JsonSize {
101+
#[inline]
102+
fn from(value: NonZeroJsonSize) -> Self {
103+
value.0
104+
}
105+
}

lib/vector-common/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ pub use vrl::btreemap;
1818
#[cfg(feature = "byte_size_of")]
1919
pub mod byte_size_of;
2020

21+
pub mod json_size;
22+
2123
pub mod config;
2224

2325
#[cfg(feature = "conversion")]

lib/vector-common/src/request_metadata.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::ops::Add;
22

3+
use crate::json_size::JsonSize;
4+
35
/// Metadata for batch requests.
46
#[derive(Clone, Copy, Debug, Default)]
57
pub struct RequestMetadata {
@@ -8,7 +10,7 @@ pub struct RequestMetadata {
810
/// Size, in bytes, of the in-memory representation of all events in this batch request.
911
events_byte_size: usize,
1012
/// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request.
11-
events_estimated_json_encoded_byte_size: usize,
13+
events_estimated_json_encoded_byte_size: JsonSize,
1214
/// Uncompressed size, in bytes, of the encoded events in this batch request.
1315
request_encoded_size: usize,
1416
/// On-the-wire size, in bytes, of the batch request itself after compression, etc.
@@ -25,7 +27,7 @@ impl RequestMetadata {
2527
events_byte_size: usize,
2628
request_encoded_size: usize,
2729
request_wire_size: usize,
28-
events_estimated_json_encoded_byte_size: usize,
30+
events_estimated_json_encoded_byte_size: JsonSize,
2931
) -> Self {
3032
Self {
3133
event_count,
@@ -47,7 +49,7 @@ impl RequestMetadata {
4749
}
4850

4951
#[must_use]
50-
pub const fn events_estimated_json_encoded_byte_size(&self) -> usize {
52+
pub const fn events_estimated_json_encoded_byte_size(&self) -> JsonSize {
5153
self.events_estimated_json_encoded_byte_size
5254
}
5355

@@ -64,7 +66,7 @@ impl RequestMetadata {
6466
/// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided.
6567
#[must_use]
6668
pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
67-
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, 0);
69+
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, JsonSize::zero());
6870

6971
for metadata in metadata_iter {
7072
metadata_sum = metadata_sum + &metadata;

lib/vector-core/src/event/array.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use futures::{stream, Stream};
88
#[cfg(test)]
99
use quickcheck::{Arbitrary, Gen};
1010
use vector_buffers::EventCount;
11-
use vector_common::finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable};
11+
use vector_common::{
12+
finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
13+
json_size::JsonSize,
14+
};
1215

1316
use super::{
1417
EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef,
@@ -253,7 +256,7 @@ impl ByteSizeOf for EventArray {
253256
}
254257

255258
impl EstimatedJsonEncodedSizeOf for EventArray {
256-
fn estimated_json_encoded_size_of(&self) -> usize {
259+
fn estimated_json_encoded_size_of(&self) -> JsonSize {
257260
match self {
258261
Self::Logs(v) => v.estimated_json_encoded_size_of(),
259262
Self::Traces(v) => v.estimated_json_encoded_size_of(),

0 commit comments

Comments
 (0)