Skip to content

Commit 2bd083e

Browse files
committed
temp commit
Signed-off-by: Toby Lawrence <[email protected]>
1 parent 36df289 commit 2bd083e

File tree

10 files changed

+424
-5
lines changed

10 files changed

+424
-5
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ logfmt = { version = "0.0.2", default-features = false, optional = true }
239239
lru = { version = "0.6.6", default-features = false, optional = true }
240240
maxminddb = { version = "0.21.0", default-features = false, optional = true }
241241
md-5 = { version = "0.9", optional = true }
242+
mime = { version = "0.3.16", default-features = false }
242243
# make sure to update the external docs when the Lua version changes
243244
mlua = { version = "0.6.3", default-features = false, features = ["lua54", "send", "vendored"], optional = true }
244245
mongodb = { version = "2.0.0", default-features = false, features = ["tokio-runtime"], optional = true }

lib/vector-core/src/metrics/ddsketch.rs

+85-5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,25 @@ impl Bin {
158158
}
159159
}
160160

161+
/// An implementation of [`DDSketch`][ddsketch] that mirrors the implementation from the Datadog agent.
162+
///
163+
/// This implementation is subtly different from the open-source implementations of DDSketch, as
164+
/// Datadog made some slight tweaks to configuration values and in-memory layout to optimize it for
165+
/// insertion performance within the agent.
166+
///
167+
/// We've mimiced the agent version of DDSketch here in order to support a future where we can take
168+
/// sketches shipped by the agent, handle them internally, merge them, and so on, without any loss
169+
/// of accuracy, eventually forwarding them to Datadog ourselves.
170+
///
171+
/// As such, this implementation is constrained in the same ways: the configuration parameters
172+
/// cannot be changed, the collapsing strategy is fixed, and we support a limited number of methods
173+
/// for inserting into the sketch.
174+
///
175+
/// Importantly, we have a special function, again taken from the agent version, to allow us to
176+
/// interpolate histograms, specifically our own aggregated histograms, into a sketch so that we can
177+
/// emit useful default quantiles, rather than having to ship the buckets -- upper bound and count
178+
/// -- to a downstream system that might have no native way to do the same thing, basically
179+
/// providing no value as they have no way to render useful data from them.
161180
pub struct AgentDDSketch {
162181
config: Config,
163182
bins: Vec<Bin>,
@@ -340,6 +359,18 @@ impl AgentDDSketch {
340359
self.insert_keys(vec![key]);
341360
}
342361

362+
pub fn insert_many(&mut self, vs: &[f64]) {
363+
// TODO: this should return a result that makes sure we have enough room to actually add 1
364+
// more sample without hitting `self.config.max_count()`
365+
let mut keys = Vec::with_capacity(vs.len());
366+
for v in vs {
367+
self.adjust_basic_stats(*v, 1);
368+
keys.push(self.config.key(*v));
369+
}
370+
self.insert_keys(keys);
371+
}
372+
373+
343374
pub fn insert_n(&mut self, v: f64, n: u32) {
344375
// TODO: this should return a result that makes sure we have enough room to actually add N
345376
// more samples without hitting `self.config.max_count()`
@@ -711,6 +742,60 @@ mod tests {
711742
assert!((end - max).abs() < FLOATING_POINT_ACCEPTABLE_ERROR);
712743
}
713744

745+
#[test]
746+
fn test_merge() {
747+
let mut all_values = AgentDDSketch::with_agent_defaults();
748+
let mut odd_values = AgentDDSketch::with_agent_defaults();
749+
let mut even_values = AgentDDSketch::with_agent_defaults();
750+
let mut all_values_many = AgentDDSketch::with_agent_defaults();
751+
752+
let mut values = Vec::new();
753+
for i in -50..=50 {
754+
let v = i as f64;
755+
756+
all_values.insert(v);
757+
758+
if i & 1 == 0 {
759+
odd_values.insert(v);
760+
} else {
761+
even_values.insert(v);
762+
}
763+
764+
values.push(v);
765+
}
766+
767+
all_values_many.insert_many(&values);
768+
769+
odd_values.merge(even_values);
770+
let merged_values = odd_values;
771+
772+
// Number of bins should be equal to the number of values we inserted.
773+
assert_eq!(all_values.bin_count(), values.len());
774+
775+
// Values at both ends of the quantile range should be equal.
776+
let low_end = all_values.quantile(0.01).expect("should have estimated value");
777+
let high_end = all_values.quantile(0.99).expect("should have estimated value");
778+
assert_eq!(high_end, -low_end);
779+
780+
let target_bin_count = all_values.bin_count();
781+
for sketch in &[all_values, all_values_many, merged_values] {
782+
assert_eq!(sketch.quantile(0.5), Some(0.0));
783+
assert_eq!(sketch.quantile(0.0), Some(-50.0));
784+
assert_eq!(sketch.quantile(1.0), Some(50.0));
785+
786+
for p in 0..50 {
787+
let q = p as f64 / 100.0;
788+
let positive = sketch.quantile(q + 0.5).expect("should have estimated value");
789+
let negative = -sketch.quantile(0.5 - q).expect("should have estimated value");
790+
791+
assert!((positive - negative).abs() <= 1.0e-6,
792+
"positive vs negative difference too great ({} vs {})", positive, negative);
793+
}
794+
795+
assert_eq!(target_bin_count, sketch.bin_count());
796+
}
797+
}
798+
714799
#[test]
715800
#[cfg(ddsketch_extended)]
716801
fn test_ddsketch_pareto_distribution() {
@@ -867,11 +952,6 @@ mod tests {
867952
max_relative_acc
868953
}
869954

870-
#[test]
871-
fn test_sketch_merge() {
872-
todo!()
873-
}
874-
875955
#[test]
876956
fn test_round_to_even() {
877957
let alike = |a: f64, b: f64| -> bool {

src/sinks/datadog/metrics/config.rs

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
use crate::{config::{DataType, SinkConfig, SinkContext}, event::Event, http::HttpClient, sinks::{Healthcheck, UriParseError, VectorSink, datadog::{Region, healthcheck}, util::{Compression, Concurrency, TowerRequestConfig, batch::{BatchConfig, BatchSettings}, retries::RetryLogic}}};
2+
use chrono:: Utc;
3+
use futures::{stream, FutureExt, SinkExt};
4+
use http::{uri::InvalidUri, Uri};
5+
use serde::{Deserialize, Serialize};
6+
use snafu::{ResultExt, Snafu};
7+
use tower::ServiceBuilder;
8+
use std::{
9+
future::ready,
10+
sync::atomic::AtomicI64,
11+
};
12+
13+
// TODO: revisit our concurrency and batching defaults
14+
const DEFAULT_REQUEST_LIMITS: TowerRequestConfig =
15+
TowerRequestConfig::const_new(Concurrency::None, Concurrency::None).retry_attempts(5);
16+
17+
const DEFAULT_BATCH_SETTINGS: BatchSettings<()> =
18+
BatchSettings::const_default().events(20).timeout(1);
19+
20+
const MAXIMUM_SERIES_PAYLOAD_COMPRESSED_SIZE: usize = 3_200_000;
21+
const MAXIMUM_SERIES_PAYLOAD_SIZE: usize = 62_914_560;
22+
23+
#[derive(Debug, Snafu)]
24+
enum BuildError {
25+
#[snafu(display("Invalid host {:?}: {:?}", host, source))]
26+
InvalidHost { host: String, source: InvalidUri },
27+
}
28+
29+
/// Various metric type-specific API types.
30+
///
31+
/// Each of these corresponds to a specific request path when making a request to the agent API.
32+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33+
enum DatadogMetricsEndpoint {
34+
Series,
35+
Distribution,
36+
}
37+
38+
pub struct DatadogMetricsRetryLogic;
39+
40+
impl RetryLogic for DatadogMetricsRetryLogic {
41+
type Error = HttpError;
42+
type Response = DatadogMetricsResponse;
43+
44+
fn is_retriable_error(&self, error: &Self::Error) -> bool {
45+
todo!()
46+
}
47+
48+
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
49+
todo!()
50+
}
51+
}
52+
53+
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
54+
#[serde(deny_unknown_fields)]
55+
pub struct DatadogMetricsConfig {
56+
#[serde(alias = "namespace")]
57+
pub default_namespace: Option<String>,
58+
// Deprecated name
59+
#[serde(alias = "host")]
60+
pub endpoint: Option<String>,
61+
// Deprecated, replaced by the site option
62+
pub region: Option<Region>,
63+
pub site: Option<String>,
64+
pub api_key: String,
65+
#[serde(default)]
66+
pub batch: BatchConfig,
67+
#[serde(default)]
68+
pub request: TowerRequestConfig,
69+
#[serde(default = "Compression::gzip_default")]
70+
pub compression: Compression,
71+
}
72+
73+
impl_generate_config_from_default!(DatadogMetricsConfig);
74+
75+
#[async_trait::async_trait]
76+
#[typetag::serde(name = "datadog_metrics")]
77+
impl SinkConfig for DatadogMetricsConfig {
78+
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
79+
let client = HttpClient::new(None, cx.proxy())?;
80+
81+
let client = self.build_client(&cx.proxy)?;
82+
let healthcheck = self.build_healthcheck(client.clone());
83+
let sink = self.build_sink(client, cx)?;
84+
85+
Ok((sink, healthcheck))
86+
}
87+
88+
fn input_type(&self) -> DataType {
89+
DataType::Metric
90+
}
91+
92+
fn sink_type(&self) -> &'static str {
93+
"datadog_metrics"
94+
}
95+
}
96+
97+
impl DatadogMetricsConfig {
98+
/// Copies this `DatadogMetricsConfig` with the API key set to the given value.
99+
pub fn with_api_key<T: Into<String>>(api_key: T) -> Self {
100+
Self {
101+
api_key: api_key.into(),
102+
..Self::default()
103+
}
104+
}
105+
106+
/// Gets the base URI of the Datadog agent API.
107+
///
108+
/// Per the Datadog agent convention, we should include a unique identifier as part of the
109+
/// domain to indicate that these metrics are being submitted by Vector, including the version,
110+
/// likely useful for detecting if a specific version of the agent (Vector, in this case) is
111+
/// doing something wrong, for understanding issues from the API side.
112+
///
113+
/// The `endpoint` configuration field will be used here if it is present.
114+
fn get_base_agent_endpoint(&self) -> String {
115+
self.endpoint.clone().unwrap_or_else(|| {
116+
let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
117+
format!("https://{}-vector.agent.{}", version, self.get_site())
118+
})
119+
}
120+
121+
/// Generates the full URIs to use for the various type-specific metrics endpoints.
122+
fn generate_metric_endpoints(&self) -> crate::Result<Vec<(DatadogMetricsEndpoint, Uri)>> {
123+
let base_uri = self.get_base_metric_endpoint();
124+
let series_endpoint = build_uri(&base_uri, "/api/v1/series")?;
125+
let distribution_endpoint = build_uri(&base_uri, "/api/v1/distribution_points")?;
126+
127+
Ok(vec![
128+
(DatadogMetricsEndpoint::Series, series_endpoint),
129+
(DatadogMetricsEndpoint::Distribution, distribution_endpoint),
130+
])
131+
}
132+
133+
/// Gets the base URI of the Datadog API.
134+
///
135+
/// The `endpoint` configuration field will be used here if it is present.
136+
fn get_api_endpoint(&self) -> String {
137+
self.endpoint
138+
.clone()
139+
.unwrap_or_else(|| format!("https://api.{}", self.get_site()))
140+
}
141+
142+
/// Gets the base domain to use for any calls to Datadog.
143+
///
144+
/// If `site` is not specified, we fallback to `region`, and if that is not specified, we
145+
/// fallback to the Datadog US domain.
146+
fn get_site(&self) -> &str {
147+
self.site.as_deref().unwrap_or_else(|| match self.region {
148+
Some(Region::Eu) => "datadoghq.eu",
149+
None | Some(Region::Us) => "datadoghq.com",
150+
})
151+
}
152+
153+
fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
154+
HttpClient::new(None, proxy)
155+
}
156+
157+
fn build_healthcheck(&self, client: HttpClient) -> Healthcheck {
158+
healthcheck(self.get_api_endpoint(), self.api_key.clone(), client).boxed()
159+
}
160+
161+
fn build_sink(&self, client: HttpClient, cx: SinkContext) -> crate::Result<VectorSink> {
162+
let batch = DEFAULT_BATCH_SETTINGS
163+
.parse_config(self.batch)?;
164+
165+
let request_limits = self.request.unwrap_with(&DEFAULT_REQUEST_LIMITS);
166+
let metric_endpoints = self.generate_metric_endpoints()?;
167+
let service = ServiceBuilder::new()
168+
.settings(request_limits, DatadogMetricsRetryLogic)
169+
.service(DatadogMetricsService::new(client));
170+
171+
let sink = DatadogMetricsSink::new(
172+
cx,
173+
service,
174+
metric_endpoints,
175+
compression: self.compression,
176+
);
177+
178+
Ok(VectorSink::Sink(Box::new(sink)))
179+
}
180+
}
181+
182+
fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
183+
format!("{}{}", host, endpoint)
184+
.parse::<Uri>()
185+
.context(UriParseError)
186+
}
187+
188+
#[cfg(test)]
189+
mod tests {
190+
use super::*;
191+
use crate::{event::metric::Sample, sinks::util::test::load_sink};
192+
193+
#[test]
194+
fn generate_config() {
195+
crate::test_util::test_generate_config::<DatadogConfig>();
196+
}
197+
}

src/sinks/datadog/metrics/mod.rs

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
mod config;
2+
mod request_builder;
3+
mod service;
4+
mod sink;
5+
6+
use crate::config::SinkDescription;
7+
8+
use self::config::DatadogMetricsConfig;
9+
10+
inventory::submit! {
11+
SinkDescription::new::<DatadogMetricsConfig>("datadog_metrics")
12+
}
13+
14+
impl_generate_config_from_default!(DatadogMetricsConfig);

src/sinks/datadog/metrics/request_builder.rs

Whitespace-only changes.

0 commit comments

Comments
 (0)