Skip to content

Commit a055afe

Browse files
authored
atlas: fix expiration for counter batch updates (#1178)
Ensure that if a counter is expired, any existing batch updaters will re-lookup the counter when flushing updates. Otherwise all updates from the batch updater will get ignored.
1 parent e1ae80c commit a055afe

File tree

4 files changed

+118
-2
lines changed

4 files changed

+118
-2
lines changed

spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasCounter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2024 Netflix, Inc.
2+
* Copyright 2014-2025 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,4 +56,10 @@ class AtlasCounter extends AtlasMeter implements Counter {
5656
@Override public double actualCount() {
5757
return value.poll();
5858
}
59+
60+
@Override public Counter.BatchUpdater batchUpdater(int batchSize) {
61+
AtlasCounterBatchUpdater updater = new AtlasCounterBatchUpdater(batchSize);
62+
updater.accept(() -> this);
63+
return updater;
64+
}
5965
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2014-2025 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.spectator.atlas;
17+
18+
import com.netflix.spectator.api.Counter;
19+
20+
import java.util.function.Consumer;
21+
import java.util.function.Supplier;
22+
23+
final class AtlasCounterBatchUpdater implements Counter.BatchUpdater, Consumer<Supplier<Counter>> {
24+
25+
private Supplier<Counter> counterSupplier;
26+
private final int batchSize;
27+
28+
private int count;
29+
private double sum;
30+
31+
AtlasCounterBatchUpdater(int batchSize) {
32+
this.batchSize = batchSize;
33+
this.count = 0;
34+
this.sum = 0.0;
35+
}
36+
37+
@Override
38+
public void accept(Supplier<Counter> counterSupplier) {
39+
this.counterSupplier = counterSupplier;
40+
}
41+
42+
private AtlasCounter getCounter() {
43+
if (counterSupplier != null) {
44+
Counter c = counterSupplier.get();
45+
return (c instanceof AtlasCounter) ? (AtlasCounter) c : null;
46+
}
47+
return null;
48+
}
49+
50+
@Override
51+
public void add(double amount) {
52+
if (Double.isFinite(amount) && amount > 0.0) {
53+
sum += amount;
54+
++count;
55+
if (count >= batchSize) {
56+
flush();
57+
}
58+
}
59+
}
60+
61+
@Override
62+
public void flush() {
63+
AtlasCounter counter = getCounter();
64+
if (counter != null) {
65+
counter.add(sum);
66+
sum = 0.0;
67+
count = 0;
68+
}
69+
}
70+
71+
@Override
72+
public void close() throws Exception {
73+
flush();
74+
}
75+
}

spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasCounterTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 Netflix, Inc.
2+
* Copyright 2014-2025 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
1515
*/
1616
package com.netflix.spectator.atlas;
1717

18+
import com.netflix.spectator.api.Counter;
1819
import com.netflix.spectator.api.Id;
1920
import com.netflix.spectator.api.ManualClock;
2021
import com.netflix.spectator.api.Measurement;
@@ -124,4 +125,24 @@ public void preferStatisticFromTags() {
124125
Id actual = c.measure().iterator().next().id();
125126
Assertions.assertEquals(id.withTag(DsType.rate), actual);
126127
}
128+
129+
@Test
130+
public void batchUpdate() throws Exception {
131+
try (Counter.BatchUpdater b = counter.batchUpdater(2)) {
132+
b.increment();
133+
b.add(Double.POSITIVE_INFINITY);
134+
b.add(Double.NaN);
135+
b.add(-1.0);
136+
clock.setWallTime(step + 1);
137+
Assertions.assertEquals(0, counter.count());
138+
b.increment();
139+
clock.setWallTime(step * 2 + 1);
140+
Assertions.assertEquals(2, counter.count());
141+
b.increment(42);
142+
clock.setWallTime(step * 3 + 1);
143+
Assertions.assertEquals(0, counter.count());
144+
}
145+
clock.setWallTime(step * 4 + 1);
146+
Assertions.assertEquals(42, counter.count());
147+
}
127148
}

spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasRegistryTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.fasterxml.jackson.core.JsonToken;
2121
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
2222
import com.netflix.spectator.api.Clock;
23+
import com.netflix.spectator.api.Counter;
2324
import com.netflix.spectator.api.Gauge;
2425
import com.netflix.spectator.api.Id;
2526
import com.netflix.spectator.api.ManualClock;
@@ -285,6 +286,19 @@ public void flushOnShutdownCounter() {
285286
Assertions.assertEquals(1.0, getValue(payloads.get(1)));
286287
}
287288

289+
@Test
290+
public void batchUpdateExpiration() throws Exception {
291+
Counter.BatchUpdater b = registry.counter("test").batchUpdater(2);
292+
final long step = 10_000L;
293+
final long ttl = step * 6 * 20;
294+
clock.setWallTime(step * ttl);
295+
registry.removeExpiredMeters();
296+
b.increment();
297+
b.flush();
298+
clock.setWallTime(step * (ttl + 1));
299+
Assertions.assertEquals(1, registry.counter("test").count());
300+
}
301+
288302
private double getValue(PublishPayload payload) {
289303
return payload.getMetrics()
290304
.stream()

0 commit comments

Comments
 (0)