Skip to content

Commit e5a18c6

Browse files
committed
wip
Signed-off-by: Toby Lawrence <[email protected]>
1 parent 2bd083e commit e5a18c6

File tree

8 files changed

+210
-168
lines changed

8 files changed

+210
-168
lines changed

lib/vector-core/src/metrics/ddsketch.rs

+23-12
Original file line numberDiff line numberDiff line change
@@ -159,19 +159,19 @@ impl Bin {
159159
}
160160

161161
/// An implementation of [`DDSketch`][ddsketch] that mirrors the implementation from the Datadog agent.
162-
///
162+
///
163163
/// This implementation is subtly different from the open-source implementations of DDSketch, as
164164
/// Datadog made some slight tweaks to configuration values and in-memory layout to optimize it for
165165
/// insertion performance within the agent.
166166
///
167167
/// We've mimiced the agent version of DDSketch here in order to support a future where we can take
168168
/// sketches shipped by the agent, handle them internally, merge them, and so on, without any loss
169169
/// of accuracy, eventually forwarding them to Datadog ourselves.
170-
///
170+
///
171171
/// As such, this implementation is constrained in the same ways: the configuration parameters
172172
/// cannot be changed, the collapsing strategy is fixed, and we support a limited number of methods
173173
/// for inserting into the sketch.
174-
///
174+
///
175175
/// Importantly, we have a special function, again taken from the agent version, to allow us to
176176
/// interpolate histograms, specifically our own aggregated histograms, into a sketch so that we can
177177
/// emit useful default quantiles, rather than having to ship the buckets -- upper bound and count
@@ -365,12 +365,11 @@ impl AgentDDSketch {
365365
let mut keys = Vec::with_capacity(vs.len());
366366
for v in vs {
367367
self.adjust_basic_stats(*v, 1);
368-
keys.push(self.config.key(*v));
368+
keys.push(self.config.key(*v));
369369
}
370370
self.insert_keys(keys);
371371
}
372372

373-
374373
pub fn insert_n(&mut self, v: f64, n: u32) {
375374
// TODO: this should return a result that makes sure we have enough room to actually add N
376375
// more samples without hitting `self.config.max_count()`
@@ -773,8 +772,12 @@ mod tests {
773772
assert_eq!(all_values.bin_count(), values.len());
774773

775774
// Values at both ends of the quantile range should be equal.
776-
let low_end = all_values.quantile(0.01).expect("should have estimated value");
777-
let high_end = all_values.quantile(0.99).expect("should have estimated value");
775+
let low_end = all_values
776+
.quantile(0.01)
777+
.expect("should have estimated value");
778+
let high_end = all_values
779+
.quantile(0.99)
780+
.expect("should have estimated value");
778781
assert_eq!(high_end, -low_end);
779782

780783
let target_bin_count = all_values.bin_count();
@@ -785,11 +788,19 @@ mod tests {
785788

786789
for p in 0..50 {
787790
let q = p as f64 / 100.0;
788-
let positive = sketch.quantile(q + 0.5).expect("should have estimated value");
789-
let negative = -sketch.quantile(0.5 - q).expect("should have estimated value");
790-
791-
assert!((positive - negative).abs() <= 1.0e-6,
792-
"positive vs negative difference too great ({} vs {})", positive, negative);
791+
let positive = sketch
792+
.quantile(q + 0.5)
793+
.expect("should have estimated value");
794+
let negative = -sketch
795+
.quantile(0.5 - q)
796+
.expect("should have estimated value");
797+
798+
assert!(
799+
(positive - negative).abs() <= 1.0e-6,
800+
"positive vs negative difference too great ({} vs {})",
801+
positive,
802+
negative
803+
);
793804
}
794805

795806
assert_eq!(target_bin_count, sketch.bin_count());

src/config/datadog.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{ComponentKey, Config, SinkOuter, SourceOuter};
22
use crate::{
3-
sinks::datadog::metrics::DatadogConfig, sources::internal_metrics::InternalMetricsConfig,
3+
sinks::datadog::metrics::DatadogMetricsConfig, sources::internal_metrics::InternalMetricsConfig,
44
};
55
use serde::{Deserialize, Serialize};
66
use std::env;
@@ -75,7 +75,7 @@ pub fn try_attach(config: &mut Config) -> bool {
7575
);
7676

7777
// Create a Datadog metrics sink to consume and emit internal + host metrics.
78-
let datadog_metrics = DatadogConfig::from_api_key(api_key);
78+
let datadog_metrics = DatadogMetricsConfig::from_api_key(api_key);
7979

8080
config.sinks.insert(
8181
datadog_metrics_id,

src/sinks/datadog/metrics/config.rs

+85-92
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,34 @@
1-
use crate::{config::{DataType, SinkConfig, SinkContext}, event::Event, http::HttpClient, sinks::{Healthcheck, UriParseError, VectorSink, datadog::{Region, healthcheck}, util::{Compression, Concurrency, TowerRequestConfig, batch::{BatchConfig, BatchSettings}, retries::RetryLogic}}};
2-
use chrono:: Utc;
3-
use futures::{stream, FutureExt, SinkExt};
1+
use crate::{
2+
config::{DataType, SinkConfig, SinkContext},
3+
http::HttpClient,
4+
sinks::{
5+
datadog::{healthcheck, Region},
6+
util::{
7+
batch::{BatchConfig, BatchSettings},
8+
retries::RetryLogic,
9+
Compression, Concurrency, ServiceBuilderExt, TowerRequestConfig,
10+
},
11+
Healthcheck, UriParseError, VectorSink,
12+
},
13+
};
14+
use futures::FutureExt;
415
use http::{uri::InvalidUri, Uri};
516
use serde::{Deserialize, Serialize};
617
use snafu::{ResultExt, Snafu};
718
use tower::ServiceBuilder;
8-
use std::{
9-
future::ready,
10-
sync::atomic::AtomicI64,
19+
use vector_core::config::proxy::ProxyConfig;
20+
21+
use super::{
22+
service::{DatadogMetricsRetryLogic, DatadogMetricsService},
23+
sink::DatadogMetricsSink,
1124
};
1225

1326
// TODO: revisit our concurrency and batching defaults
1427
const DEFAULT_REQUEST_LIMITS: TowerRequestConfig =
15-
TowerRequestConfig::const_new(Concurrency::None, Concurrency::None).retry_attempts(5);
28+
TowerRequestConfig::const_new(Concurrency::None, Concurrency::None).retry_attempts(5);
1629

1730
const DEFAULT_BATCH_SETTINGS: BatchSettings<()> =
18-
BatchSettings::const_default().events(20).timeout(1);
31+
BatchSettings::const_default().events(20).timeout(1);
1932

2033
const MAXIMUM_SERIES_PAYLOAD_COMPRESSED_SIZE: usize = 3_200_000;
2134
const MAXIMUM_SERIES_PAYLOAD_SIZE: usize = 62_914_560;
@@ -27,29 +40,14 @@ enum BuildError {
2740
}
2841

2942
/// Various metric type-specific API types.
30-
///
43+
///
3144
/// Each of these corresponds to a specific request path when making a request to the agent API.
3245
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33-
enum DatadogMetricsEndpoint {
46+
pub enum DatadogMetricsEndpoint {
3447
Series,
3548
Distribution,
3649
}
3750

38-
pub struct DatadogMetricsRetryLogic;
39-
40-
impl RetryLogic for DatadogMetricsRetryLogic {
41-
type Error = HttpError;
42-
type Response = DatadogMetricsResponse;
43-
44-
fn is_retriable_error(&self, error: &Self::Error) -> bool {
45-
todo!()
46-
}
47-
48-
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
49-
todo!()
50-
}
51-
}
52-
5351
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
5452
#[serde(deny_unknown_fields)]
5553
pub struct DatadogMetricsConfig {
@@ -66,8 +64,8 @@ pub struct DatadogMetricsConfig {
6664
pub batch: BatchConfig,
6765
#[serde(default)]
6866
pub request: TowerRequestConfig,
69-
#[serde(default = "Compression::gzip_default")]
70-
pub compression: Compression,
67+
#[serde(default = "Compression::gzip_default")]
68+
pub compression: Compression,
7169
}
7270

7371
impl_generate_config_from_default!(DatadogMetricsConfig);
@@ -78,12 +76,12 @@ impl SinkConfig for DatadogMetricsConfig {
7876
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
7977
let client = HttpClient::new(None, cx.proxy())?;
8078

81-
let client = self.build_client(&cx.proxy)?;
82-
let healthcheck = self.build_healthcheck(client.clone());
83-
let sink = self.build_sink(client, cx)?;
79+
let client = self.build_client(&cx.proxy)?;
80+
let healthcheck = self.build_healthcheck(client.clone());
81+
let sink = self.build_sink(client, cx)?;
8482

85-
Ok((sink, healthcheck))
86-
}
83+
Ok((sink, healthcheck))
84+
}
8785

8886
fn input_type(&self) -> DataType {
8987
DataType::Metric
@@ -95,103 +93,98 @@ impl SinkConfig for DatadogMetricsConfig {
9593
}
9694

9795
impl DatadogMetricsConfig {
98-
/// Copies this `DatadogMetricsConfig` with the API key set to the given value.
99-
pub fn with_api_key<T: Into<String>>(api_key: T) -> Self {
96+
/// Creates a default [`DatadogMetricsConfig`] with the given API key.
97+
pub fn from_api_key<T: Into<String>>(api_key: T) -> Self {
10098
Self {
10199
api_key: api_key.into(),
102100
..Self::default()
103101
}
104102
}
105103

106-
/// Gets the base URI of the Datadog agent API.
107-
///
108-
/// Per the Datadog agent convention, we should include a unique identifier as part of the
109-
/// domain to indicate that these metrics are being submitted by Vector, including the version,
110-
/// likely useful for detecting if a specific version of the agent (Vector, in this case) is
111-
/// doing something wrong, for understanding issues from the API side.
112-
///
113-
/// The `endpoint` configuration field will be used here if it is present.
104+
/// Gets the base URI of the Datadog agent API.
105+
///
106+
/// Per the Datadog agent convention, we should include a unique identifier as part of the
107+
/// domain to indicate that these metrics are being submitted by Vector, including the version,
108+
/// likely useful for detecting if a specific version of the agent (Vector, in this case) is
109+
/// doing something wrong, for understanding issues from the API side.
110+
///
111+
/// The `endpoint` configuration field will be used here if it is present.
114112
fn get_base_agent_endpoint(&self) -> String {
115113
self.endpoint.clone().unwrap_or_else(|| {
116114
let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
117115
format!("https://{}-vector.agent.{}", version, self.get_site())
118116
})
119117
}
120118

121-
/// Generates the full URIs to use for the various type-specific metrics endpoints.
122-
fn generate_metric_endpoints(&self) -> crate::Result<Vec<(DatadogMetricsEndpoint, Uri)>> {
123-
let base_uri = self.get_base_metric_endpoint();
124-
let series_endpoint = build_uri(&base_uri, "/api/v1/series")?;
125-
let distribution_endpoint = build_uri(&base_uri, "/api/v1/distribution_points")?;
126-
127-
Ok(vec![
128-
(DatadogMetricsEndpoint::Series, series_endpoint),
129-
(DatadogMetricsEndpoint::Distribution, distribution_endpoint),
130-
])
131-
}
132-
133-
/// Gets the base URI of the Datadog API.
134-
///
135-
/// The `endpoint` configuration field will be used here if it is present.
119+
/// Generates the full URIs to use for the various type-specific metrics endpoints.
120+
fn generate_metric_endpoints(&self) -> crate::Result<Vec<(DatadogMetricsEndpoint, Uri)>> {
121+
let base_uri = self.get_base_agent_endpoint();
122+
let series_endpoint = build_uri(&base_uri, "/api/v1/series")?;
123+
let distribution_endpoint = build_uri(&base_uri, "/api/v1/distribution_points")?;
124+
125+
Ok(vec![
126+
(DatadogMetricsEndpoint::Series, series_endpoint),
127+
(DatadogMetricsEndpoint::Distribution, distribution_endpoint),
128+
])
129+
}
130+
131+
/// Gets the base URI of the Datadog API.
132+
///
133+
/// The `endpoint` configuration field will be used here if it is present.
136134
fn get_api_endpoint(&self) -> String {
137135
self.endpoint
138136
.clone()
139137
.unwrap_or_else(|| format!("https://api.{}", self.get_site()))
140138
}
141139

142-
/// Gets the base domain to use for any calls to Datadog.
143-
///
144-
/// If `site` is not specified, we fallback to `region`, and if that is not specified, we
145-
/// fallback to the Datadog US domain.
140+
/// Gets the base domain to use for any calls to Datadog.
141+
///
142+
/// If `site` is not specified, we fallback to `region`, and if that is not specified, we
143+
/// fallback to the Datadog US domain.
146144
fn get_site(&self) -> &str {
147145
self.site.as_deref().unwrap_or_else(|| match self.region {
148146
Some(Region::Eu) => "datadoghq.eu",
149147
None | Some(Region::Us) => "datadoghq.com",
150148
})
151149
}
152150

153-
fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
154-
HttpClient::new(None, proxy)
155-
}
156-
157-
fn build_healthcheck(&self, client: HttpClient) -> Healthcheck {
158-
healthcheck(self.get_api_endpoint(), self.api_key.clone(), client).boxed()
159-
}
160-
161-
fn build_sink(&self, client: HttpClient, cx: SinkContext) -> crate::Result<VectorSink> {
162-
let batch = DEFAULT_BATCH_SETTINGS
163-
.parse_config(self.batch)?;
164-
165-
let request_limits = self.request.unwrap_with(&DEFAULT_REQUEST_LIMITS);
166-
let metric_endpoints = self.generate_metric_endpoints()?;
167-
let service = ServiceBuilder::new()
168-
.settings(request_limits, DatadogMetricsRetryLogic)
169-
.service(DatadogMetricsService::new(client));
170-
171-
let sink = DatadogMetricsSink::new(
172-
cx,
173-
service,
174-
metric_endpoints,
175-
compression: self.compression,
176-
);
177-
178-
Ok(VectorSink::Sink(Box::new(sink)))
179-
}
151+
fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
152+
let client = HttpClient::new(None, proxy)?;
153+
Ok(client)
154+
}
155+
156+
fn build_healthcheck(&self, client: HttpClient) -> Healthcheck {
157+
healthcheck(self.get_api_endpoint(), self.api_key.clone(), client).boxed()
158+
}
159+
160+
fn build_sink(&self, client: HttpClient, cx: SinkContext) -> crate::Result<VectorSink> {
161+
let batch = DEFAULT_BATCH_SETTINGS.parse_config(self.batch)?;
162+
163+
let request_limits = self.request.unwrap_with(&DEFAULT_REQUEST_LIMITS);
164+
let metric_endpoints = self.generate_metric_endpoints()?;
165+
let service = ServiceBuilder::new()
166+
.settings(request_limits, DatadogMetricsRetryLogic)
167+
.service(DatadogMetricsService::new(client));
168+
169+
let sink = DatadogMetricsSink::new(cx, service, metric_endpoints, self.compression);
170+
171+
Ok(VectorSink::Stream(Box::new(sink)))
172+
}
180173
}
181174

182175
fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
183-
format!("{}{}", host, endpoint)
176+
let result = format!("{}{}", host, endpoint)
184177
.parse::<Uri>()
185-
.context(UriParseError)
178+
.context(UriParseError)?;
179+
Ok(result)
186180
}
187181

188182
#[cfg(test)]
189183
mod tests {
190184
use super::*;
191-
use crate::{event::metric::Sample, sinks::util::test::load_sink};
192185

193186
#[test]
194187
fn generate_config() {
195-
crate::test_util::test_generate_config::<DatadogConfig>();
188+
crate::test_util::test_generate_config::<DatadogMetricsConfig>();
196189
}
197190
}

src/sinks/datadog/metrics/mod.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ mod sink;
55

66
use crate::config::SinkDescription;
77

8-
use self::config::DatadogMetricsConfig;
8+
pub use self::config::DatadogMetricsConfig;
99

1010
inventory::submit! {
1111
SinkDescription::new::<DatadogMetricsConfig>("datadog_metrics")
1212
}
13-
14-
impl_generate_config_from_default!(DatadogMetricsConfig);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

0 commit comments

Comments
 (0)