-
Notifications
You must be signed in to change notification settings - Fork 1.7k
chore(observability): emit component_sent
events by source
and service
#17549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
796def9
b205354
d0c36ed
ec666b0
3e06791
3a5fe02
15ea497
d66654c
d1c7df6
c895653
d28a809
4ade9be
3632fa2
4d9c5df
c9640d9
a758704
5cbfee4
9eab386
ee50bf5
9fd7854
01d8ad1
87ca474
ebd10c8
b30a4bb
8b74470
b11c7a6
3327904
7a43045
f43af1b
b141f30
8ac7020
243ec8b
574cbae
73131e6
4dee27c
f1af663
8f0cc1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
use std::{ | ||
collections::BTreeMap, | ||
sync::{Arc, RwLock}, | ||
}; | ||
|
||
use derivative::Derivative; | ||
|
||
use super::{InternalEventHandle, RegisterInternalEvent}; | ||
|
||
/// Metrics (eg. `component_sent_event_bytes_total`) may need to emit tags based on | ||
/// values contained within the events. These tags can't be determined in advance. | ||
/// | ||
/// Metrics need to be registered and the handle needs to be held onto in order to | ||
/// prevent them from expiring and being dropped (this would result in the counter | ||
/// resetting to zero). | ||
/// `CachedEvent` is used to maintain a store of these registered metrics. When a | ||
/// new event is emitted for a previously unseen set of tags an event is registered | ||
/// and stored in the cache. | ||
#[derive(Derivative)] | ||
#[derivative(Clone(bound = ""), Default(bound = ""))] | ||
pub struct RegisteredEventCache<Event: RegisterTaggedInternalEvent> { | ||
cache: Arc< | ||
RwLock< | ||
BTreeMap< | ||
<Event as RegisterTaggedInternalEvent>::Tags, | ||
<Event as RegisterInternalEvent>::Handle, | ||
>, | ||
>, | ||
>, | ||
} | ||
|
||
/// This trait must be implemented by events that emit dynamic tags. `register` must | ||
/// be implemented to register an event based on the set of tags passed. | ||
pub trait RegisterTaggedInternalEvent: RegisterInternalEvent { | ||
/// The type that will contain the data necessary to extract the tags | ||
/// that will be used when registering the event. | ||
type Tags; | ||
|
||
fn register(tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle; | ||
} | ||
|
||
impl<Event, EventHandle, Data, Tags> RegisteredEventCache<Event> | ||
where | ||
Data: Sized, | ||
EventHandle: InternalEventHandle<Data = Data>, | ||
Tags: Ord + Clone, | ||
Event: RegisterInternalEvent<Handle = EventHandle> + RegisterTaggedInternalEvent<Tags = Tags>, | ||
{ | ||
/// Emits the event with the given tags. | ||
/// It will register the event and store in the cache if this has not already | ||
/// been done. | ||
/// | ||
/// # Panics | ||
/// | ||
/// This will panic if the lock is poisoned. | ||
pub fn emit(&self, tags: &Tags, value: Data) { | ||
let read = self.cache.read().unwrap(); | ||
if let Some(event) = read.get(tags) { | ||
event.emit(value); | ||
} else { | ||
let event = <Event as RegisterTaggedInternalEvent>::register(tags.clone()); | ||
event.emit(value); | ||
|
||
// Ensure the read lock is dropped so we can write. | ||
drop(read); | ||
self.cache.write().unwrap().insert(tags.clone(), event); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,25 @@ | ||
mod bytes_received; | ||
mod bytes_sent; | ||
mod cached_event; | ||
pub mod component_events_dropped; | ||
mod events_received; | ||
mod events_sent; | ||
mod optional_tag; | ||
mod prelude; | ||
pub mod service; | ||
|
||
use std::ops::{Add, AddAssign}; | ||
|
||
pub use metrics::SharedString; | ||
|
||
pub use bytes_received::BytesReceived; | ||
pub use bytes_sent::BytesSent; | ||
#[allow(clippy::module_name_repetitions)] | ||
pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache}; | ||
pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL}; | ||
pub use events_received::EventsReceived; | ||
pub use events_sent::{EventsSent, DEFAULT_OUTPUT}; | ||
pub use events_sent::{EventsSent, TaggedEventsSent, DEFAULT_OUTPUT}; | ||
pub use optional_tag::OptionalTag; | ||
pub use prelude::{error_stage, error_type}; | ||
pub use service::{CallError, PollReadyError}; | ||
|
||
|
@@ -109,9 +116,24 @@ pub struct ByteSize(pub usize); | |
pub struct Count(pub usize); | ||
|
||
/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`. | ||
#[derive(Clone, Copy)] | ||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] | ||
pub struct CountByteSize(pub usize, pub JsonSize); | ||
|
||
impl AddAssign for CountByteSize { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commenting here because the struct name didn't actually change in this PR, but: it's kind of unfortunate we never changed this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feels like an easy enough find and replace fix? |
||
fn add_assign(&mut self, rhs: Self) { | ||
self.0 += rhs.0; | ||
self.1 += rhs.1; | ||
} | ||
} | ||
|
||
impl Add<CountByteSize> for CountByteSize { | ||
type Output = CountByteSize; | ||
|
||
fn add(self, rhs: CountByteSize) -> Self::Output { | ||
CountByteSize(self.0 + rhs.0, self.1 + rhs.1) | ||
} | ||
} | ||
|
||
// Wrapper types used to hold parameters for registering events | ||
|
||
pub struct Output(pub Option<SharedString>); | ||
|
@@ -196,6 +218,9 @@ macro_rules! registered_event { | |
|
||
fn emit(&$slf:ident, $data_name:ident: $data:ident) | ||
$emit_body:block | ||
|
||
$(fn register($tags_name:ident: $tags:ty) | ||
$register_body:block)? | ||
) => { | ||
paste::paste!{ | ||
#[derive(Clone)] | ||
|
@@ -223,6 +248,17 @@ macro_rules! registered_event { | |
fn emit(&$slf, $data_name: $data) | ||
$emit_body | ||
} | ||
|
||
$(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event { | ||
type Tags = $tags; | ||
|
||
fn register( | ||
$tags_name: $tags, | ||
) -> <TaggedEventsSent as super::RegisterInternalEvent>::Handle { | ||
$register_body | ||
} | ||
})? | ||
|
||
} | ||
}; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
/// The user can configure whether a tag should be emitted. If they configure it to | ||
/// be emitted, but the value doesn't exist - we should emit the tag but with a value | ||
/// of `-`. | ||
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)] | ||
pub enum OptionalTag<T> { | ||
Ignored, | ||
Specified(Option<T>), | ||
} | ||
|
||
impl<T> From<Option<T>> for OptionalTag<T> { | ||
fn from(value: Option<T>) -> Self { | ||
Self::Specified(value) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's too bad the macro doesn't let you make one call to
make_tags
and reuse it for both of these counters. Good thing it's just a setup function.