Skip to content

Commit f67a09e

Browse files
committed
revert unrelated changes to DD metrics sink
1 parent 577efa9 commit f67a09e

File tree

2 files changed

+41
-77
lines changed

2 files changed

+41
-77
lines changed

src/sinks/datadog/metrics/sink.rs

+40-76
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,17 @@ fn collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Me
179179
let mut idx = 0;
180180
let now_ts = Utc::now().timestamp();
181181

182-
// For each metric, see if it's a counter. If so, we check the rest of the metrics _after_ it to
183-
// see if they share the same series _and_ timestamp, when converted to a Unix timestamp. If
184-
// they match, we take that counter's value and merge it with our "current" counter metric, and
185-
// then drop the secondary one from the vector.
182+
// For each metric, see if it's a counter. If so, we check the rest of the metrics
183+
// _after_ it to see if they share the same series _and_ timestamp, when converted
184+
// to a Unix timestamp. If they match, we take that counter's value and merge it
185+
// with our "current" counter metric, and then drop the secondary one from the
186+
// vector.
186187
//
187188
// For any non-counter, we simply ignore it and leave it as-is.
188189
while idx < metrics.len() {
189-
let outer_idx = idx;
190-
let outer_counter_ts = match metrics[outer_idx].value() {
191-
MetricValue::Counter { .. } => metrics[outer_idx]
190+
let curr_idx = idx;
191+
let counter_ts = match metrics[curr_idx].value() {
192+
MetricValue::Counter { .. } => metrics[curr_idx]
192193
.data()
193194
.timestamp()
194195
.map(|dt| dt.timestamp())
@@ -203,75 +204,62 @@ fn collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Me
203204
let mut accumulated_value = 0.0;
204205
let mut accumulated_finalizers = EventFinalizers::default();
205206

206-
// Now go through each metric _after_ the current one to see if it matches the current
207-
// metric: is a counter, with the same name and timestamp. If it is, we accumulate its value
208-
// and then remove it.
207+
// Now go through each metric _after_ the current one to see if it matches the
208+
// current metric: is a counter, with the same name and timestamp. If it is, we
209+
// accumulate its value and then remove it.
209210
//
210211
// Otherwise, we skip it.
211-
let mut should_advance_outer = true;
212+
let mut is_disjoint = false;
212213
let mut had_match = false;
213-
let mut inner_idx = outer_idx + 1;
214+
let mut inner_idx = curr_idx + 1;
214215
while inner_idx < metrics.len() {
215-
let mut should_advance_inner = true;
216+
let mut should_advance = true;
216217
if let MetricValue::Counter { value } = metrics[inner_idx].value() {
217-
let counters_match = {
218-
let outer_counter_series = metrics[outer_idx].series();
219-
let inner_counter_series = metrics[inner_idx].series();
220-
let inner_counter_ts = metrics[inner_idx]
221-
.data()
222-
.timestamp()
223-
.map(|dt| dt.timestamp())
224-
.unwrap_or(now_ts);
225-
226-
outer_counter_series == inner_counter_series
227-
&& outer_counter_ts == inner_counter_ts
228-
};
229-
230-
if counters_match {
218+
let other_counter_ts = metrics[inner_idx]
219+
.data()
220+
.timestamp()
221+
.map(|dt| dt.timestamp())
222+
.unwrap_or(now_ts);
223+
if metrics[curr_idx].series() == metrics[inner_idx].series()
224+
&& counter_ts == other_counter_ts
225+
{
231226
had_match = true;
232227

233-
// Collapse this counter by accumulating its value, and its finalizers, and
234-
// removing it from the original vector of metrics.
228+
// Collapse this counter by accumulating its value, and its
229+
// finalizers, and removing it from the original vector of metrics.
235230
accumulated_value += *value;
236231

237232
let mut old_metric = metrics.swap_remove(inner_idx);
238233
accumulated_finalizers.merge(old_metric.metadata_mut().take_finalizers());
239-
240-
// We don't advance the inner loop index because since we just moved a
241-
// yet-unseen metric into the slot of the metric we just collapsed, advancing
242-
// past the current inner index would cause the inner loop to never evaluate the
243-
// moved metric.
244-
should_advance_inner = false;
234+
should_advance = false;
245235
} else {
246-
// We hit a counter that _doesn't_ match, but since we need to ensure we
247-
// evaluate all counters, we stop ourselves from advancing the outer loop index
248-
// for the remainder of the inner loop.
249-
should_advance_outer = false;
236+
// We hit a counter that _doesn't_ match, but we can't just skip
237+
// it because we also need to evaluate it against all the
238+
// counters that come after it, so we only increment the index
239+
// for this inner loop.
240+
//
241+
// As well, we mark ourselves to stop incrementing the outer
242+
// index if we find more counters to accumulate, because we've
243+
// hit a disjoint counter here. While we may be continuing to
244+
// shrink the count of remaining metrics from accumulating,
245+
// we have to ensure this counter we just visited is visited by
246+
// the outer loop.
247+
is_disjoint = true;
250248
}
251249
}
252250

253-
// If we didn't consume/accumulate a counter in this iteration, we advance our inner
254-
// loop index. Likewise, we can only advance the outer loop index if we advanced the
255-
// inner loop index because we don't want the outer loop to skip any metrics that we
256-
// moved when accumulating.
257-
if should_advance_inner {
251+
if should_advance {
258252
inner_idx += 1;
259253

260-
if should_advance_outer {
261-
// We advance `idx`, which is where `outer_idx` is derived from, rather than
262-
// `outer_idx` directly, since we need `outer_idx` to be stable so that the
263-
// logic below has a stable index to get the mutable reference to the outer
264-
// counter.
254+
if !is_disjoint {
265255
idx += 1;
266256
}
267257
}
268258
}
269259

270260
// If we had matches during the accumulator phase, update our original counter.
271261
if had_match {
272-
let metric = metrics
273-
.get_mut(outer_idx)
274-
.expect("current index must exist");
262+
let metric = metrics.get_mut(curr_idx).expect("current index must exist");
275263
match metric.value_mut() {
276264
MetricValue::Counter { value } => {
277265
*value += accumulated_value;
@@ -385,30 +373,6 @@ mod tests {
385373
assert_eq!(expected, actual);
386374
}
387375

388-
#[test]
389-
fn collapse_identical_metrics_counter_mixed() {
390-
let counter_value = 42.0;
391-
let input = vec![
392-
create_counter("basic1", counter_value),
393-
create_counter("basic2", counter_value),
394-
create_counter("basic3", counter_value),
395-
create_counter("basic1", counter_value),
396-
create_counter("basic4", counter_value),
397-
create_counter("basic3", counter_value),
398-
create_counter("basic1", counter_value),
399-
];
400-
401-
let expected = vec![
402-
create_counter("basic1", counter_value * 3.0),
403-
create_counter("basic2", counter_value),
404-
create_counter("basic3", counter_value * 2.0),
405-
create_counter("basic4", counter_value),
406-
];
407-
let actual = collapse_counters_by_series_and_timestamp(input);
408-
409-
assert_eq!(expected, actual);
410-
}
411-
412376
#[derive(Eq, Ord, PartialEq, PartialOrd)]
413377
struct MetricCollapseSort {
414378
metric_type: &'static str,

src/sinks/util/service/net/udp.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct UdpConnectorConfig {
2020
/// The address _must_ include a port.
2121
address: HostAndPort,
2222

23-
/// The size of the socket's send buffer, in bytes.
23+
/// The size of the socket's send buffer.
2424
///
2525
/// If set, the value of the setting is passed via the `SO_SNDBUF` option.
2626
#[configurable(metadata(docs::type_unit = "bytes"))]

0 commit comments

Comments
 (0)