Skip to content

Commit 3f6df61

Browse files
authored
chore(datadog_metrics sink): incrementally encode sketches (vectordotdev#17764)
## Context When support was added for encoding/sending sketches in vectordotdev#9178, logic was added to handle "splitting" payloads if a metric exceeded the (un)compressed payload limits. As we lacked (at the time) the ability to encode sketch metrics one-by-one, we were forced to collect all of them, and then attempt to encode them all at once, which had a tendency to grow the response size past the (un)compressed payload limits. This "splitting" mechanism allowed us to compensate for that. However, in order to avoid getting stuck in pathological loops where payloads were too big, and thus required multiple splits (after already attempting at least one split), the logic was configured such that a batch of metrics would only be split once, and if the two subsequent slices couldn't be encoded without also exceeding the limits, they would be dropped and we would give up trying to split further. Despite the gut feeling during that work that it should be exceedingly rare to ever need to split further, real life has shown otherwise: vectordotdev#13175 ## Solution This PR introduces proper incremental encoding of sketches, which doesn't eliminate the possibility of needing to split (more below) but significantly reduces the likelihood that splitting will need to happen down to a purely theoretical level. We're taking advantage of hidden-from-docs methods in `prost` to encode each `SketchPayload` object and append the bytes into a single buffer. This is possible due to how Protocol Buffers functions. Additionally, we're now generating "file descriptors" for our compiled Protocol Buffers definitions. We use this to let us programmatically query the field number of the "sketches" field in the `SketchPayload` message, which is a slightly more robust way than just hardcoding it and hoping it doesn't ever change in the future. In Protocol Buffers, each field in a message is written out such that the field data is preceded by the field number. This is part and parcel to its ability to allow for backwards compatible changes to a definition. Further, for repeated fields -- i.e. `Vec<Sketch>` -- the repetitive nature is determined simply by write the same field multiple times rather than needing to write everything all together. Practically speaking, this means that we can encode a vector of two messages, or encode those two messages individually, and end up with the same encoded output of `[field N][field data][field N][field data]`. ### Ancillary changes We've additionally fixed a bug with the "bytes sent" metric being reported for the `datadog_metrics` sink due to some very tangled and miswired code around how compressed/uncompressed/event bytes/etc sizes were being shuttled from the request builder logic down to `Driver`. We've also reworked some of the encoder error types just to clean them up and simplify things a bit. ## Reviewer notes ### Still needing to handle splits The encoder still does need to care about splits, in a theoretical sense, because while we can accurately track and avoid ever exceeding the uncompressed payload limit, we can't know the final compressed payload size until we finalize the builder/payload. Currently, the encoder does a check to see if adding the current metric would cause us to exceed the compressed payload limit, assuming the compressor couldn't actually compress the encoded metric at all. This is a fairly robust check since it tries to optimally account for the overhead of an entirely incompressible payload, and so on... but we really want to avoid dropping events if possible, obviously, and that's why the splitting code is still in place.
1 parent 671aa79 commit 3f6df61

File tree

13 files changed

+545
-359
lines changed

13 files changed

+545
-359
lines changed

Cargo.lock

+12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,9 @@ serde_yaml = { version = "0.9.22", default-features = false }
205205
rmp-serde = { version = "1.1.1", default-features = false, optional = true }
206206
rmpv = { version = "1.0.0", default-features = false, features = ["with-serde"], optional = true }
207207

208-
# Prost
208+
# Prost / Protocol Buffers
209209
prost = { version = "0.11", default-features = false, features = ["std"] }
210+
prost-reflect = { version = "0.11", default-features = false, optional = true }
210211
prost-types = { version = "0.11", default-features = false, optional = true }
211212

212213
# GCP
@@ -673,7 +674,7 @@ sinks-console = []
673674
sinks-databend = []
674675
sinks-datadog_events = []
675676
sinks-datadog_logs = []
676-
sinks-datadog_metrics = ["protobuf-build"]
677+
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
677678
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
678679
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
679680
sinks-file = ["dep:async-compression"]

LICENSE-3rdparty.csv

+1
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ proc-macro2,https://github.com/dtolnay/proc-macro2,MIT OR Apache-2.0,"David Toln
398398
proptest,https://github.com/proptest-rs/proptest,MIT OR Apache-2.0,Jason Lingle
399399
prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert <[email protected]>, Lucio Franco <[email protected], Tokio Contributors <[email protected]>"
400400
prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert <[email protected]>, Lucio Franco <[email protected]>, Tokio Contributors <[email protected]>"
401+
prost-reflect,https://github.com/andrewhickman/prost-reflect,MIT OR Apache-2.0,Andrew Hickman <[email protected]>
401402
ptr_meta,https://github.com/djkoloski/ptr_meta,MIT,David Koloski <[email protected]>
402403
pulsar,https://github.com/streamnative/pulsar-rs,MIT OR Apache-2.0,"Colin Stearns <[email protected]>, Kevin Stenerson <[email protected]>, Geoffroy Couprie <[email protected]>"
403404
quad-rand,https://github.com/not-fl3/quad-rand,MIT,not-fl3 <[email protected]>

build.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command};
1+
use std::{
2+
collections::HashSet,
3+
env,
4+
fs::File,
5+
io::Write,
6+
path::{Path, PathBuf},
7+
process::Command,
8+
};
29

310
struct TrackedEnv {
411
tracked: HashSet<String>,
@@ -124,8 +131,19 @@ fn main() {
124131
println!("cargo:rerun-if-changed=proto/google/rpc/status.proto");
125132
println!("cargo:rerun-if-changed=proto/vector.proto");
126133

134+
// Create and store the "file descriptor set" from the compiled Protocol Buffers packages.
135+
//
136+
// This allows us to use runtime reflection to manually build Protocol Buffers payloads
137+
// in a type-safe way, which is necessary for incrementally building certain payloads, like
138+
// the ones generated in the `datadog_metrics` sink.
139+
let protobuf_fds_path =
140+
PathBuf::from(std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set"))
141+
.join("protobuf-fds.bin");
142+
127143
let mut prost_build = prost_build::Config::new();
128-
prost_build.btree_map(["."]);
144+
prost_build
145+
.btree_map(["."])
146+
.file_descriptor_set_path(protobuf_fds_path);
129147

130148
tonic_build::configure()
131149
.protoc_arg("--experimental_allow_proto3_optional")

src/internal_events/datadog_metrics.rs

+6-8
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@ use vector_common::internal_event::{
77
};
88

99
#[derive(Debug)]
10-
pub struct DatadogMetricsEncodingError {
11-
pub error_message: &'static str,
10+
pub struct DatadogMetricsEncodingError<'a> {
11+
pub reason: &'a str,
1212
pub error_code: &'static str,
1313
pub dropped_events: usize,
1414
}
1515

16-
impl InternalEvent for DatadogMetricsEncodingError {
16+
impl<'a> InternalEvent for DatadogMetricsEncodingError<'a> {
1717
fn emit(self) {
18-
let reason = "Failed to encode Datadog metrics.";
1918
error!(
20-
message = reason,
21-
error = %self.error_message,
22-
error_code = %self.error_code,
19+
message = self.reason,
20+
error_code = self.error_code,
2321
error_type = error_type::ENCODER_FAILED,
2422
intentional = "false",
2523
stage = error_stage::PROCESSING,
@@ -35,7 +33,7 @@ impl InternalEvent for DatadogMetricsEncodingError {
3533
if self.dropped_events > 0 {
3634
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
3735
count: self.dropped_events,
38-
reason,
36+
reason: self.reason,
3937
});
4038
}
4139
}

src/proto.rs

-5
This file was deleted.

src/proto/mod.rs

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))]
2+
use crate::event::proto as event;
3+
4+
#[cfg(any(feature = "sources-vector", feature = "sinks-vector"))]
5+
pub mod vector;
6+
7+
#[cfg(feature = "sinks-datadog_metrics")]
8+
pub mod fds {
9+
use once_cell::sync::OnceCell;
10+
use prost_reflect::DescriptorPool;
11+
12+
pub fn protobuf_descriptors() -> &'static DescriptorPool {
13+
static PROTOBUF_FDS: OnceCell<DescriptorPool> = OnceCell::new();
14+
PROTOBUF_FDS.get_or_init(|| {
15+
DescriptorPool::decode(include_bytes!(concat!(env!("OUT_DIR"), "/protobuf-fds.bin")).as_ref())
16+
.expect("should not fail to decode protobuf file descriptor set generated from build script")
17+
})
18+
}
19+
}

src/sinks/datadog/metrics/config.rs

+5
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ impl DatadogMetricsEndpoint {
5959
DatadogMetricsEndpoint::Sketches => "application/x-protobuf",
6060
}
6161
}
62+
63+
// Gets whether or not this is a series endpoint.
64+
pub const fn is_series(self) -> bool {
65+
matches!(self, Self::Series)
66+
}
6267
}
6368

6469
/// Maps Datadog metric endpoints to their actual URI.

0 commit comments

Comments
 (0)