Skip to content

Commit dcf7f9a

Browse files
chore(observability): emit component_sent events by source and service (vectordotdev#17549)
Closes vectordotdev#17580 Closes vectordotdev#17581 This is still in draft until I can get the following done. - [ ] ~~There are way more clones that I am happy with here, especially since this is in a hot path. These need reducing.~~ The remaining clones that I would like to remove are in the `get_tags` functions. This didn't seem trivial, and given the fairly positive regression numbers, I think it should be ok to defer for now. - [x] Function documentation. - [ ] Currently source schemas aren't being attached to the event at runtime, so the service meaning can't be retrieved. That won't work until this has been done. This will be a separate PR - vectordotdev#17692 - [x] I've only tested this with the kafka sink so far. I think it should work with all Stream sinks without needing any further modification - but further testing is needed. - [x] Tests. A bunch of tests need writing. - [x] The Vector source tests are failing I think because we now have `EventsSent` and `TaggedEventsSent` which both emit `component_sent_event` events and the test framework doesn't like this. This needs fixed. - [ ] We will need to review every sink to ensure they work with this. All the stream based sinks should, but the others are highly likely to need some work. --------- Signed-off-by: Stephen Wakely <[email protected]>
1 parent 6a6b42b commit dcf7f9a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1387
-501
lines changed

docs/tutorials/sinks/2_http_sink.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -366,9 +366,9 @@ impl DriverResponse for BasicResponse {
366366
EventStatus::Delivered
367367
}
368368

369-
fn events_sent(&self) -> CountByteSize {
369+
fn events_sent(&self) -> RequestCountByteSize {
370370
// (events count, byte size)
371-
CountByteSize(1, self.byte_size)
371+
CountByteSize(1, self.byte_size).into()
372372
}
373373
}
374374
```

lib/vector-common/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ bytes = { version = "1.4.0", default-features = false, optional = true }
4646
chrono-tz = { version = "0.8.2", default-features = false, features = ["serde"] }
4747
chrono = { version = "0.4", default-features = false, optional = true, features = ["clock"] }
4848
crossbeam-utils = { version = "0.8.16", default-features = false }
49-
derivative = "2.1.3"
49+
derivative = { version = "2.2.0", default-features = false }
5050
futures = { version = "0.3.28", default-features = false, features = ["std"] }
5151
indexmap = { version = "~1.9.3", default-features = false }
5252
metrics = "0.21.0"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use std::{
2+
collections::BTreeMap,
3+
sync::{Arc, RwLock},
4+
};
5+
6+
use derivative::Derivative;
7+
8+
use super::{InternalEventHandle, RegisterInternalEvent};
9+
10+
/// Metrics (eg. `component_sent_event_bytes_total`) may need to emit tags based on
11+
/// values contained within the events. These tags can't be determined in advance.
12+
///
13+
/// Metrics need to be registered and the handle needs to be held onto in order to
14+
/// prevent them from expiring and being dropped (this would result in the counter
15+
/// resetting to zero).
16+
/// `CachedEvent` is used to maintain a store of these registered metrics. When a
17+
/// new event is emitted for a previously unseen set of tags an event is registered
18+
/// and stored in the cache.
19+
#[derive(Derivative)]
20+
#[derivative(Clone(bound = ""), Default(bound = ""))]
21+
pub struct RegisteredEventCache<Event: RegisterTaggedInternalEvent> {
22+
cache: Arc<
23+
RwLock<
24+
BTreeMap<
25+
<Event as RegisterTaggedInternalEvent>::Tags,
26+
<Event as RegisterInternalEvent>::Handle,
27+
>,
28+
>,
29+
>,
30+
}
31+
32+
/// This trait must be implemented by events that emit dynamic tags. `register` must
33+
/// be implemented to register an event based on the set of tags passed.
34+
pub trait RegisterTaggedInternalEvent: RegisterInternalEvent {
35+
/// The type that will contain the data necessary to extract the tags
36+
/// that will be used when registering the event.
37+
type Tags;
38+
39+
fn register(tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle;
40+
}
41+
42+
impl<Event, EventHandle, Data, Tags> RegisteredEventCache<Event>
43+
where
44+
Data: Sized,
45+
EventHandle: InternalEventHandle<Data = Data>,
46+
Tags: Ord + Clone,
47+
Event: RegisterInternalEvent<Handle = EventHandle> + RegisterTaggedInternalEvent<Tags = Tags>,
48+
{
49+
/// Emits the event with the given tags.
50+
/// It will register the event and store in the cache if this has not already
51+
/// been done.
52+
///
53+
/// # Panics
54+
///
55+
/// This will panic if the lock is poisoned.
56+
pub fn emit(&self, tags: &Tags, value: Data) {
57+
let read = self.cache.read().unwrap();
58+
if let Some(event) = read.get(tags) {
59+
event.emit(value);
60+
} else {
61+
let event = <Event as RegisterTaggedInternalEvent>::register(tags.clone());
62+
event.emit(value);
63+
64+
// Ensure the read lock is dropped so we can write.
65+
drop(read);
66+
self.cache.write().unwrap().insert(tags.clone(), event);
67+
}
68+
}
69+
}

lib/vector-common/src/internal_event/events_sent.rs

+64-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
use std::sync::Arc;
2+
13
use metrics::{register_counter, Counter};
24
use tracing::trace;
35

4-
use super::{CountByteSize, Output, SharedString};
6+
use crate::{config::ComponentKey, request_metadata::EventCountTags};
7+
8+
use super::{CountByteSize, OptionalTag, Output, SharedString};
59

610
pub const DEFAULT_OUTPUT: &str = "_default";
711

@@ -44,3 +48,62 @@ impl From<Output> for EventsSent {
4448
Self { output: output.0 }
4549
}
4650
}
51+
52+
/// Makes a list of the tags to use with the events sent event.
53+
fn make_tags(
54+
source: &OptionalTag<Arc<ComponentKey>>,
55+
service: &OptionalTag<String>,
56+
) -> Vec<(&'static str, String)> {
57+
let mut tags = Vec::new();
58+
if let OptionalTag::Specified(tag) = source {
59+
tags.push((
60+
"source",
61+
tag.as_ref()
62+
.map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
63+
));
64+
}
65+
66+
if let OptionalTag::Specified(tag) = service {
67+
tags.push(("service", tag.clone().unwrap_or("-".to_string())));
68+
}
69+
70+
tags
71+
}
72+
73+
crate::registered_event!(
74+
TaggedEventsSent {
75+
source: OptionalTag<Arc<ComponentKey>>,
76+
service: OptionalTag<String>,
77+
} => {
78+
events: Counter = {
79+
register_counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
80+
},
81+
event_bytes: Counter = {
82+
register_counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
83+
},
84+
}
85+
86+
fn emit(&self, data: CountByteSize) {
87+
let CountByteSize(count, byte_size) = data;
88+
trace!(message = "Events sent.", %count, %byte_size);
89+
90+
self.events.increment(count as u64);
91+
self.event_bytes.increment(byte_size.get() as u64);
92+
}
93+
94+
fn register(tags: EventCountTags) {
95+
super::register(TaggedEventsSent::new(
96+
tags,
97+
))
98+
}
99+
);
100+
101+
impl TaggedEventsSent {
102+
#[must_use]
103+
pub fn new(tags: EventCountTags) -> Self {
104+
Self {
105+
source: tags.source,
106+
service: tags.service,
107+
}
108+
}
109+
}

lib/vector-common/src/internal_event/mod.rs

+38-2
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
11
mod bytes_received;
22
mod bytes_sent;
3+
mod cached_event;
34
pub mod component_events_dropped;
45
mod events_received;
56
mod events_sent;
7+
mod optional_tag;
68
mod prelude;
79
pub mod service;
810

11+
use std::ops::{Add, AddAssign};
12+
913
pub use metrics::SharedString;
1014

1115
pub use bytes_received::BytesReceived;
1216
pub use bytes_sent::BytesSent;
17+
#[allow(clippy::module_name_repetitions)]
18+
pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache};
1319
pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
1420
pub use events_received::EventsReceived;
15-
pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
21+
pub use events_sent::{EventsSent, TaggedEventsSent, DEFAULT_OUTPUT};
22+
pub use optional_tag::OptionalTag;
1623
pub use prelude::{error_stage, error_type};
1724
pub use service::{CallError, PollReadyError};
1825

@@ -109,9 +116,24 @@ pub struct ByteSize(pub usize);
109116
pub struct Count(pub usize);
110117

111118
/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`.
112-
#[derive(Clone, Copy)]
119+
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
113120
pub struct CountByteSize(pub usize, pub JsonSize);
114121

122+
impl AddAssign for CountByteSize {
123+
fn add_assign(&mut self, rhs: Self) {
124+
self.0 += rhs.0;
125+
self.1 += rhs.1;
126+
}
127+
}
128+
129+
impl Add<CountByteSize> for CountByteSize {
130+
type Output = CountByteSize;
131+
132+
fn add(self, rhs: CountByteSize) -> Self::Output {
133+
CountByteSize(self.0 + rhs.0, self.1 + rhs.1)
134+
}
135+
}
136+
115137
// Wrapper types used to hold parameters for registering events
116138

117139
pub struct Output(pub Option<SharedString>);
@@ -196,6 +218,9 @@ macro_rules! registered_event {
196218

197219
fn emit(&$slf:ident, $data_name:ident: $data:ident)
198220
$emit_body:block
221+
222+
$(fn register($tags_name:ident: $tags:ty)
223+
$register_body:block)?
199224
) => {
200225
paste::paste!{
201226
#[derive(Clone)]
@@ -223,6 +248,17 @@ macro_rules! registered_event {
223248
fn emit(&$slf, $data_name: $data)
224249
$emit_body
225250
}
251+
252+
$(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event {
253+
type Tags = $tags;
254+
255+
fn register(
256+
$tags_name: $tags,
257+
) -> <TaggedEventsSent as super::RegisterInternalEvent>::Handle {
258+
$register_body
259+
}
260+
})?
261+
226262
}
227263
};
228264
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/// The user can configure whether a tag should be emitted. If they configure it to
2+
/// be emitted, but the value doesn't exist - we should emit the tag but with a value
3+
/// of `-`.
4+
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
5+
pub enum OptionalTag<T> {
6+
Ignored,
7+
Specified(Option<T>),
8+
}
9+
10+
impl<T> From<Option<T>> for OptionalTag<T> {
11+
fn from(value: Option<T>) -> Self {
12+
Self::Specified(value)
13+
}
14+
}

0 commit comments

Comments
 (0)