Skip to content

Commit e55ee49

Browse files
committed
temp commit
Signed-off-by: Toby Lawrence <[email protected]>
1 parent e29038a commit e55ee49

File tree

14 files changed

+273
-235
lines changed

14 files changed

+273
-235
lines changed

lib/vector-core/src/event/metric.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl ByteSizeOf for MetricName {
8484
}
8585
}
8686

87-
#[derive(Clone, Debug, Deserialize, Getters, PartialEq, Serialize)]
87+
#[derive(Clone, Debug, Deserialize, Getters, MutGetters, PartialEq, Serialize)]
8888
pub struct MetricData {
8989
#[getset(get = "pub")]
9090
#[serde(skip_serializing_if = "Option::is_none")]
@@ -93,7 +93,7 @@ pub struct MetricData {
9393
#[getset(get = "pub")]
9494
pub kind: MetricKind,
9595

96-
#[getset(get = "pub")]
96+
#[getset(get = "pub", get_mut = "pub")]
9797
#[serde(flatten)]
9898
pub value: MetricValue,
9999
}

lib/vector-core/src/metrics/ddsketch.rs

+64-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::{cmp, mem};
22

33
use core_common::byte_size_of::ByteSizeOf;
4+
use ordered_float::OrderedFloat;
45
use serde::{Deserialize, Serialize};
56

7+
use crate::event::metric::Bucket;
8+
69
const AGENT_DEFAULT_BIN_LIMIT: u16 = 4096;
710
const AGENT_DEFAULT_EPS: f64 = 1.0 / 128.0;
811
const AGENT_DEFAULT_MIN_VALUE: f64 = 1.0e-9;
@@ -294,6 +297,7 @@ impl AgentDDSketch {
294297
self.is_empty().then(|| self.avg)
295298
}
296299

300+
/// Clears the sketch, removing all bins and resetting all statistics.
297301
pub fn clear(&mut self) {
298302
self.count = 0;
299303
self.min = f64::MAX;
@@ -459,7 +463,7 @@ impl AgentDDSketch {
459463
self.insert_key_counts(vec![(key, n)]);
460464
}
461465

462-
pub fn insert_interpolate(&mut self, lower: f64, upper: f64, count: u32) {
466+
fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u32) {
463467
// Find the keys for the bins where the lower bound and upper bound would end up, and
464468
// collect all of the keys in between, inclusive.
465469
let lower_key = self.config.key(lower);
@@ -516,6 +520,29 @@ impl AgentDDSketch {
516520
self.insert_key_counts(key_counts);
517521
}
518522

523+
pub fn insert_interpolate_buckets(&mut self, mut buckets: Vec<Bucket>) {
524+
// Buckets need to be sorted from lowest to highest so that we can properly calculate the
525+
// rolling lower/upper bounds.
526+
buckets.sort_by(|a, b| {
527+
let oa = OrderedFloat(a.upper_limit);
528+
let ob = OrderedFloat(b.upper_limit);
529+
530+
oa.cmp(&ob)
531+
});
532+
533+
let mut lower = 0.0;
534+
535+
for bucket in buckets {
536+
let mut upper = bucket.upper_limit;
537+
if upper.is_sign_positive() && upper.is_infinite() {
538+
upper = lower;
539+
}
540+
541+
self.insert_interpolate_bucket(lower, upper, bucket.count);
542+
lower = bucket.upper_limit;
543+
}
544+
}
545+
519546
pub fn quantile(&self, q: f64) -> Option<f64> {
520547
if self.count == 0 {
521548
return None;
@@ -860,10 +887,24 @@ fn round_to_even(v: f64) -> f64 {
860887

861888
#[cfg(test)]
862889
mod tests {
890+
use crate::{event::metric::Bucket, metrics::handle::Histogram};
891+
863892
use super::{round_to_even, AgentDDSketch, Config, AGENT_DEFAULT_EPS};
864893

865894
const FLOATING_POINT_ACCEPTABLE_ERROR: f64 = 1.0e-10;
866895

896+
static HISTO_VALUES: &[u64] = &[
897+
104221, 10206, 32436, 121686, 92848, 83685, 23739, 15122, 50491, 88507, 48318, 28004,
898+
29576, 8735, 77693, 33965, 88047, 7592, 64138, 59966, 117956, 112525, 41743, 82790, 27084,
899+
26967, 75008, 10752, 96636, 97150, 60768, 33411, 24746, 91872, 59057, 48329, 16756, 100459,
900+
117640, 59244, 107584, 124303, 32368, 109940, 106353, 90452, 84471, 39086, 91119, 89680,
901+
41339, 23329, 25629, 98156, 97002, 9538, 73671, 112586, 101616, 70719, 117291, 90043,
902+
10713, 49195, 60656, 60887, 47332, 113675, 8371, 42619, 33489, 108629, 70501, 84355, 24576,
903+
34468, 76756, 110706, 42854, 83841, 120751, 66494, 65210, 70244, 118529, 28021, 51603,
904+
96315, 92364, 59120, 118968, 5484, 91790, 45171, 102756, 29673, 85303, 108322, 122793,
905+
88373,
906+
];
907+
867908
#[cfg(ddsketch_extended)]
868909
fn generate_pareto_distribution() -> Vec<OrderedFloat<f64>> {
869910
use ordered_float::OrderedFloat;
@@ -1093,6 +1134,28 @@ mod tests {
10931134
test_relative_accuracy(config, AGENT_DEFAULT_EPS, min_value, max_value)
10941135
}
10951136

1137+
#[test]
1138+
fn test_histogram_interpolation() {
1139+
let mut histo_sketch = AgentDDSketch::with_agent_defaults();
1140+
assert!(histo_sketch.is_empty());
1141+
1142+
let histo = Histogram::new();
1143+
for num in HISTO_VALUES {
1144+
histo.record((*num as f64) / 10_000.0);
1145+
}
1146+
1147+
let buckets = histo
1148+
.buckets()
1149+
.map(|(ub, n)| Bucket {
1150+
upper_limit: ub,
1151+
count: n,
1152+
})
1153+
.collect::<Vec<_>>();
1154+
histo_sketch.insert_interpolate_buckets(buckets);
1155+
1156+
assert!(!histo_sketch.is_empty());
1157+
}
1158+
10961159
fn test_relative_accuracy(config: Config, rel_acc: f64, min_value: f32, max_value: f32) {
10971160
let max_observed_rel_acc = check_max_relative_accuracy(config, min_value, max_value);
10981161
assert!(

lib/vector-core/src/metrics/histogram.rs

-177
This file was deleted.

lib/vector-core/src/metrics/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
mod ddsketch;
22
mod handle;
3-
mod histogram;
43
mod label_filter;
54
mod recorder;
65

@@ -9,7 +8,6 @@ use std::sync::Arc;
98
use crate::event::Metric;
109
pub use crate::metrics::ddsketch::{AgentDDSketch, BinMap};
1110
pub use crate::metrics::handle::{Counter, Handle};
12-
pub use crate::metrics::histogram::AgentDDSketchHistogram;
1311
use crate::metrics::label_filter::VectorLabelFilter;
1412
use crate::metrics::recorder::VectorRecorder;
1513
use metrics::Key;
+39-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,48 @@
1-
use vector_core::event::Metric;
1+
use std::mem;
2+
3+
use vector_core::{event::{Metric, MetricKind, MetricValue, metric::MetricSketch}, metrics::AgentDDSketch};
24

35
use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};
46

57
pub struct DatadogMetricsNormalizer;
68

79
impl MetricNormalize for DatadogMetricsNormalizer {
810
fn apply_state(state: &mut MetricSet, metric: Metric) -> Option<Metric> {
9-
todo!()
11+
// We primarily care about making sure that counters are incremental, and that gauges are
12+
// always absolute. For other metric kinds, we want them to be incremental.
13+
match &metric.value() {
14+
MetricValue::Counter { .. } => state.make_incremental(metric),
15+
MetricValue::Gauge { .. } => state.make_absolute(metric),
16+
MetricValue::AggregatedHistogram { .. } => {
17+
// Sketches should be sent to Datadog in an incremental fashion, so we need to
18+
// incrementalize the aggregated histogram first and then generate a sketch from it.
19+
state.make_incremental(metric)
20+
.map(|metric| {
21+
let (series, data, metadata) = metric.into_parts();
22+
23+
let sketch = match data.value_mut() {
24+
MetricValue::AggregatedHistogram { buckets, .. } => {
25+
let delta_buckets = mem::replace(buckets, Vec::new());
26+
let sketch = AgentDDSketch::with_agent_defaults();
27+
sketch.insert_interpolate_buckets(delta_buckets);
28+
sketch
29+
},
30+
// We should never get back a different metric value simply from converting
31+
// between absolute and incremental.
32+
_ => unreachable!(),
33+
};
34+
35+
let _ = mem::replace(data.value_mut(), MetricValue::Sketch {
36+
sketch: MetricSketch::AgentDDSketch(sketch),
37+
});
38+
39+
Metric::from_parts(series, data, metadata)
40+
})
41+
},
42+
_ => match metric.kind() {
43+
MetricKind::Absolute => state.make_incremental(metric),
44+
MetricKind::Incremental => Some(metric),
45+
}
46+
}
1047
}
1148
}

0 commit comments

Comments
 (0)