Skip to content

chore(observability): emit component_sent events by source and service #17549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
796def9
Add initial version of Cached
StephenWakely May 25, 2023
b205354
Remove the deadlock from writing to the cache after reading
StephenWakely May 26, 2023
d0c36ed
Insert source and service tag into events
StephenWakely May 26, 2023
ec666b0
Add bytesize count grouped by source and service
StephenWakely May 30, 2023
3e06791
Add EventCountTags trait
StephenWakely May 30, 2023
3a5fe02
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely May 31, 2023
15ea497
Fix merge
StephenWakely May 31, 2023
d66654c
Fix compile errors
StephenWakely May 31, 2023
d1c7df6
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 1, 2023
c895653
Set source and service tags for most other sinks
StephenWakely Jun 1, 2023
d28a809
Register the events with a trait rather than a Fn
StephenWakely Jun 2, 2023
4ade9be
These tests are round trip
StephenWakely Jun 2, 2023
3632fa2
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 5, 2023
4d9c5df
Clippy
StephenWakely Jun 5, 2023
c9640d9
Add event count tags for loki sink
StephenWakely Jun 5, 2023
a758704
RegisterEvent inherits form RegisterInternalEvent
StephenWakely Jun 6, 2023
5cbfee4
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 6, 2023
9eab386
Added telemetry options
StephenWakely Jun 7, 2023
ee50bf5
Only collect configured tags
StephenWakely Jun 8, 2023
9fd7854
Added tests and clippy.
StephenWakely Jun 8, 2023
01d8ad1
Little tidy
StephenWakely Jun 9, 2023
87ca474
TaggedEventsSent doesn't need Output
StephenWakely Jun 9, 2023
ebd10c8
Spelling
StephenWakely Jun 9, 2023
b30a4bb
Remove default impl of take_metadata
StephenWakely Jun 9, 2023
8b74470
Outer event should get tags from inner event type
StephenWakely Jun 9, 2023
b11c7a6
Driver should not consume the metadata
StephenWakely Jun 9, 2023
3327904
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 9, 2023
7a43045
Tags should be an associated type of RegisterEvent
StephenWakely Jun 9, 2023
f43af1b
Feedback from Bruce
StephenWakely Jun 15, 2023
b141f30
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 19, 2023
8ac7020
Set source tag to be Arc<ComponentKey>
StephenWakely Jun 19, 2023
243ec8b
Replace take_metadata with metadata_mut
StephenWakely Jun 21, 2023
574cbae
Add test for telemetry tags to Kafka sink
StephenWakely Jun 21, 2023
73131e6
Fix datadog integration test
StephenWakely Jun 21, 2023
4dee27c
Use Derivative to replace clone with no bounds
StephenWakely Jun 23, 2023
f1af663
Spelling
StephenWakely Jun 23, 2023
8f0cc1b
Feedback from Bruce
StephenWakely Jun 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,9 @@ impl DriverResponse for BasicResponse {
EventStatus::Delivered
}

fn events_sent(&self) -> CountByteSize {
fn events_sent(&self) -> RequestCountByteSize {
// (events count, byte size)
CountByteSize(1, self.byte_size)
CountByteSize(1, self.byte_size).into()
}
}
```
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ bytes = { version = "1.4.0", default-features = false, optional = true }
chrono-tz = { version = "0.8.2", default-features = false, features = ["serde"] }
chrono = { version = "0.4", default-features = false, optional = true, features = ["clock"] }
crossbeam-utils = { version = "0.8.16", default-features = false }
derivative = "2.1.3"
derivative = { version = "2.2.0", default-features = false }
futures = { version = "0.3.28", default-features = false, features = ["std"] }
indexmap = { version = "~1.9.3", default-features = false }
metrics = "0.21.0"
Expand Down
69 changes: 69 additions & 0 deletions lib/vector-common/src/internal_event/cached_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};

use derivative::Derivative;

use super::{InternalEventHandle, RegisterInternalEvent};

/// Metrics (eg. `component_sent_event_bytes_total`) may need to emit tags based on
/// values contained within the events. These tags can't be determined in advance.
///
/// Metrics need to be registered and the handle needs to be held onto in order to
/// prevent them from expiring and being dropped (this would result in the counter
/// resetting to zero).
/// `CachedEvent` is used to maintain a store of these registered metrics. When a
/// new event is emitted for a previously unseen set of tags an event is registered
/// and stored in the cache.
#[derive(Derivative)]
#[derivative(Clone(bound = ""), Default(bound = ""))]
pub struct RegisteredEventCache<Event: RegisterTaggedInternalEvent> {
cache: Arc<
RwLock<
BTreeMap<
<Event as RegisterTaggedInternalEvent>::Tags,
<Event as RegisterInternalEvent>::Handle,
>,
>,
>,
}

/// This trait must be implemented by events that emit dynamic tags. `register` must
/// be implemented to register an event based on the set of tags passed.
pub trait RegisterTaggedInternalEvent: RegisterInternalEvent {
/// The type that will contain the data necessary to extract the tags
/// that will be used when registering the event.
type Tags;

fn register(tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle;
}

impl<Event, EventHandle, Data, Tags> RegisteredEventCache<Event>
where
Data: Sized,
EventHandle: InternalEventHandle<Data = Data>,
Tags: Ord + Clone,
Event: RegisterInternalEvent<Handle = EventHandle> + RegisterTaggedInternalEvent<Tags = Tags>,
{
/// Emits the event with the given tags.
/// It will register the event and store in the cache if this has not already
/// been done.
///
/// # Panics
///
/// This will panic if the lock is poisoned.
pub fn emit(&self, tags: &Tags, value: Data) {
let read = self.cache.read().unwrap();
if let Some(event) = read.get(tags) {
event.emit(value);
} else {
let event = <Event as RegisterTaggedInternalEvent>::register(tags.clone());
event.emit(value);

// Ensure the read lock is dropped so we can write.
drop(read);
self.cache.write().unwrap().insert(tags.clone(), event);
}
}
}
65 changes: 64 additions & 1 deletion lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::sync::Arc;

use metrics::{register_counter, Counter};
use tracing::trace;

use super::{CountByteSize, Output, SharedString};
use crate::{config::ComponentKey, request_metadata::EventCountTags};

use super::{CountByteSize, OptionalTag, Output, SharedString};

pub const DEFAULT_OUTPUT: &str = "_default";

Expand Down Expand Up @@ -44,3 +48,62 @@ impl From<Output> for EventsSent {
Self { output: output.0 }
}
}

/// Makes a list of the tags to use with the events sent event.
fn make_tags(
source: &OptionalTag<Arc<ComponentKey>>,
service: &OptionalTag<String>,
) -> Vec<(&'static str, String)> {
let mut tags = Vec::new();
if let OptionalTag::Specified(tag) = source {
tags.push((
"source",
tag.as_ref()
.map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
));
}

if let OptionalTag::Specified(tag) = service {
tags.push(("service", tag.clone().unwrap_or("-".to_string())));
}

tags
}

crate::registered_event!(
TaggedEventsSent {
source: OptionalTag<Arc<ComponentKey>>,
service: OptionalTag<String>,
} => {
events: Counter = {
register_counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
},
event_bytes: Counter = {
register_counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
},
Comment on lines +78 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too bad the macro doesn't let you make one call to make_tags and reuse it for both of these counters. Good thing it's just a setup function.

}

fn emit(&self, data: CountByteSize) {
let CountByteSize(count, byte_size) = data;
trace!(message = "Events sent.", %count, %byte_size);

self.events.increment(count as u64);
self.event_bytes.increment(byte_size.get() as u64);
}

fn register(tags: EventCountTags) {
super::register(TaggedEventsSent::new(
tags,
))
}
);

impl TaggedEventsSent {
#[must_use]
pub fn new(tags: EventCountTags) -> Self {
Self {
source: tags.source,
service: tags.service,
}
}
}
40 changes: 38 additions & 2 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
mod bytes_received;
mod bytes_sent;
mod cached_event;
pub mod component_events_dropped;
mod events_received;
mod events_sent;
mod optional_tag;
mod prelude;
pub mod service;

use std::ops::{Add, AddAssign};

pub use metrics::SharedString;

pub use bytes_received::BytesReceived;
pub use bytes_sent::BytesSent;
#[allow(clippy::module_name_repetitions)]
pub use cached_event::{RegisterTaggedInternalEvent, RegisteredEventCache};
pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
pub use events_received::EventsReceived;
pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
pub use events_sent::{EventsSent, TaggedEventsSent, DEFAULT_OUTPUT};
pub use optional_tag::OptionalTag;
pub use prelude::{error_stage, error_type};
pub use service::{CallError, PollReadyError};

Expand Down Expand Up @@ -109,9 +116,24 @@ pub struct ByteSize(pub usize);
pub struct Count(pub usize);

/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct CountByteSize(pub usize, pub JsonSize);

impl AddAssign for CountByteSize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting here because the struct name didn't actually change in this PR, but: it's kind of unfortunate we never changed this to CountJsonByteSize or CountJsonSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like an easy enough find and replace fix?

fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0;
self.1 += rhs.1;
}
}

impl Add<CountByteSize> for CountByteSize {
type Output = CountByteSize;

fn add(self, rhs: CountByteSize) -> Self::Output {
CountByteSize(self.0 + rhs.0, self.1 + rhs.1)
}
}

// Wrapper types used to hold parameters for registering events

pub struct Output(pub Option<SharedString>);
Expand Down Expand Up @@ -196,6 +218,9 @@ macro_rules! registered_event {

fn emit(&$slf:ident, $data_name:ident: $data:ident)
$emit_body:block

$(fn register($tags_name:ident: $tags:ty)
$register_body:block)?
) => {
paste::paste!{
#[derive(Clone)]
Expand Down Expand Up @@ -223,6 +248,17 @@ macro_rules! registered_event {
fn emit(&$slf, $data_name: $data)
$emit_body
}

$(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event {
type Tags = $tags;

fn register(
$tags_name: $tags,
) -> <TaggedEventsSent as super::RegisterInternalEvent>::Handle {
$register_body
}
})?

}
};
}
14 changes: 14 additions & 0 deletions lib/vector-common/src/internal_event/optional_tag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/// The user can configure whether a tag should be emitted. If they configure it to
/// be emitted, but the value doesn't exist - we should emit the tag but with a value
/// of `-`.
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub enum OptionalTag<T> {
Ignored,
Specified(Option<T>),
}

impl<T> From<Option<T>> for OptionalTag<T> {
fn from(value: Option<T>) -> Self {
Self::Specified(value)
}
}
Loading