Skip to content

Commit 6c6d0a7

Browse files
authored
atlas: support dropping data in rollup policy (#1133)
Add an operation to the rule that can be used to indicate data should be dropped entirely.
1 parent dda652e commit 6c6d0a7

File tree

3 files changed

+75
-5
lines changed

3 files changed

+75
-5
lines changed

spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/RollupPolicy.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,22 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<Rule> rules)
7272
return Rollups.fromRules(commonTags, rules);
7373
}
7474

75+
/** Operation associated with a rule. */
76+
enum Operation {
77+
/** Rollup data by removing specified dimensions. */
78+
ROLLUP,
79+
80+
/** Drop the data that matches the query. */
81+
DROP
82+
}
83+
7584
/**
7685
* Rule for matching a set of measurements and removing specified dimensions.
7786
*/
7887
final class Rule {
7988
private final String query;
8089
private final List<String> rollup;
90+
private final Operation operation;
8191

8292
/**
8393
* Create a new instance.
@@ -86,10 +96,25 @@ final class Rule {
8696
* Atlas query expression that indicates the set of measurements matching this rule.
8797
* @param rollup
8898
* Set of dimensions to remove from the matching measurements.
99+
* @param operation
100+
* Operation to perform if there is a match to the query.
89101
*/
90-
public Rule(String query, List<String> rollup) {
102+
public Rule(String query, List<String> rollup, Operation operation) {
91103
this.query = Preconditions.checkNotNull(query, "query");
92104
this.rollup = Preconditions.checkNotNull(rollup, "rollup");
105+
this.operation = Preconditions.checkNotNull(operation, "operation");
106+
}
107+
108+
/**
109+
* Create a new instance.
110+
*
111+
* @param query
112+
* Atlas query expression that indicates the set of measurements matching this rule.
113+
* @param rollup
114+
* Set of dimensions to remove from the matching measurements.
115+
*/
116+
public Rule(String query, List<String> rollup) {
117+
this(query, rollup, Operation.ROLLUP);
93118
}
94119

95120
/** Return the query expression string. */
@@ -102,18 +127,24 @@ public List<String> rollup() {
102127
return rollup;
103128
}
104129

130+
/** Return the operation to perform if the query matches. */
131+
public Operation operation() {
132+
return operation;
133+
}
134+
105135
@Override
106136
public boolean equals(Object o) {
107137
if (this == o) return true;
108138
if (!(o instanceof Rule)) return false;
109139
Rule rule = (Rule) o;
110140
return query.equals(rule.query)
111-
&& rollup.equals(rule.rollup);
141+
&& rollup.equals(rule.rollup)
142+
&& operation == rule.operation;
112143
}
113144

114145
@Override
115146
public int hashCode() {
116-
return Objects.hash(query, rollup);
147+
return Objects.hash(query, rollup, operation);
117148
}
118149
}
119150

spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/Rollups.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,16 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<RollupPolicy.
5858
for (Measurement m : ms) {
5959
List<RollupPolicy.Rule> matches = index.findMatches(m.id());
6060
if (matches.isEmpty()) {
61-
// No matches for the id, but we sill need to treat as an aggregate because
61+
// No matches for the id, but we still need to treat as an aggregate because
6262
// rollup on another id could cause a collision
6363
Map<Id, Aggregator> idMap = aggregates.computeIfAbsent(commonTags, k -> new HashMap<>());
6464
updateAggregate(idMap, m.id(), m);
6565
} else {
66+
// Skip measurement if one of the rules indicates it should be dropped
67+
if (shouldDrop(matches)) {
68+
continue;
69+
}
70+
6671
// For matching rules, find dimensions from common tags and others that are part
6772
// of the id
6873
Set<String> commonDimensions = new HashSet<>();
@@ -77,7 +82,7 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<RollupPolicy.
7782
}
7883
}
7984

80-
// Peform rollup by removing the dimensions
85+
// Perform rollup by removing the dimensions
8186
Map<String, String> tags = commonDimensions.isEmpty()
8287
? commonTags
8388
: rollup(commonTags, commonDimensions);
@@ -98,6 +103,15 @@ static RollupPolicy fromRules(Map<String, String> commonTags, List<RollupPolicy.
98103
};
99104
}
100105

106+
private static boolean shouldDrop(List<RollupPolicy.Rule> rules) {
107+
for (RollupPolicy.Rule rule : rules) {
108+
if (rule.operation() == RollupPolicy.Operation.DROP) {
109+
return true;
110+
}
111+
}
112+
return false;
113+
}
114+
101115
private static Map<String, String> rollup(Map<String, String> tags, Set<String> dimensions) {
102116
Map<String, String> tmp = new HashMap<>(tags);
103117
for (String dimension : dimensions) {

spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/RollupsTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,29 @@ public void fromRulesMulti() {
271271
}
272272
}
273273
}
274+
275+
@Test
276+
public void fromRulesDrop() {
277+
registry.counter("notDropped").increment();
278+
for (int i = 0; i < 10; ++i) {
279+
registry.counter("test", "i", "" + i).increment();
280+
}
281+
clock.setWallTime(5000);
282+
List<Measurement> input = registry.measurements().collect(Collectors.toList());
283+
List<RollupPolicy.Rule> rules = new ArrayList<>();
284+
rules.add(new RollupPolicy.Rule("i,:has", list(), RollupPolicy.Operation.DROP));
285+
RollupPolicy policy = Rollups.fromRules(map("app", "foo", "node", "i-123"), rules);
286+
287+
List<RollupPolicy.Result> results = policy.apply(input);
288+
Assertions.assertEquals(1, results.size());
289+
for (RollupPolicy.Result result : results) {
290+
Assertions.assertEquals(1, result.measurements().size());
291+
String name = result.measurements().get(0).id().name();
292+
if ("notDropped".equals(name)) {
293+
Assertions.assertEquals(map("app", "foo", "node", "i-123"), result.commonTags());
294+
} else {
295+
Assertions.fail("data not dropped");
296+
}
297+
}
298+
}
274299
}

0 commit comments

Comments
 (0)