20
20
package org .apache .heron .metricsmgr .sink ;
21
21
22
22
import java .io .IOException ;
23
+ import java .util .ArrayList ;
23
24
import java .util .HashMap ;
25
+ import java .util .List ;
24
26
import java .util .Map ;
27
+ import java .util .Set ;
28
+ import java .util .TreeMap ;
29
+ import java .util .concurrent .atomic .AtomicBoolean ;
30
+ import java .util .concurrent .atomic .AtomicReference ;
25
31
import java .util .logging .Level ;
26
32
import java .util .logging .Logger ;
33
+ import java .util .regex .Matcher ;
27
34
import java .util .regex .Pattern ;
28
35
29
36
import com .google .common .cache .Cache ;
33
40
import org .apache .heron .spi .metricsmgr .metrics .MetricsRecord ;
34
41
import org .apache .heron .spi .metricsmgr .sink .SinkContext ;
35
42
43
+ import static java .lang .String .format ;
44
+ import static org .apache .heron .metricsmgr .sink .PrometheusSink .Prometheus .sanitizeMetricName ;
45
+
36
46
/**
37
47
* A web sink that exposes and endpoint that Prometheus can scrape
38
48
*
@@ -57,6 +67,7 @@ public class PrometheusSink extends AbstractWebSink {
57
67
58
68
// This is the cache that is used to serve the metrics
59
69
private Cache <String , Map <String , Double >> metricsCache ;
70
+ private List <Rule > rules = new ArrayList <Rule >();
60
71
61
72
private String cluster ;
62
73
private String role ;
@@ -66,13 +77,91 @@ public PrometheusSink() {
66
77
super ();
67
78
}
68
79
80
+ private enum Type {
81
+ COUNTER ,
82
+ GAUGE ,
83
+ SUMMARY ,
84
+ HISTOGRAM ,
85
+ UNTYPED ,
86
+ }
87
+
88
+ private static class Rule {
89
+ public Pattern pattern ;
90
+ public String name ;
91
+ public String value ;
92
+ public Double valueFactor = 1.0 ;
93
+ public String help ;
94
+ public boolean attrNameSnakeCase ;
95
+ public Type type = Type .UNTYPED ;
96
+ public ArrayList <String > labelNames ;
97
+ public ArrayList <String > labelValues ;
98
+ }
99
+
69
100
@ Override
70
101
void initialize (Map <String , Object > configuration , SinkContext context ) {
71
102
metricsCache = createCache ();
72
103
73
104
cluster = context .getCluster ();
74
105
role = context .getRole ();
75
106
environment = context .getEnvironment ();
107
+
108
+ if (configuration .containsKey ("rules" )) {
109
+ List <Map <String , Object >> configRules = (List <Map <String , Object >>)
110
+ configuration .get ("rules" );
111
+ for (Map <String , Object > ruleObject : configRules ) {
112
+ Rule rule = new Rule ();
113
+ rules .add (rule );
114
+ if (ruleObject .containsKey ("pattern" )) {
115
+ rule .pattern = Pattern .compile ("^.*(?:" + (String ) ruleObject .get ("pattern" ) + ").*$" );
116
+ }
117
+ if (ruleObject .containsKey ("name" )) {
118
+ rule .name = (String ) ruleObject .get ("name" );
119
+ }
120
+ if (ruleObject .containsKey ("value" )) {
121
+ rule .value = String .valueOf (ruleObject .get ("value" ));
122
+ }
123
+ if (ruleObject .containsKey ("valueFactor" )) {
124
+ String valueFactor = String .valueOf (ruleObject .get ("valueFactor" ));
125
+ try {
126
+ rule .valueFactor = Double .valueOf (valueFactor );
127
+ } catch (NumberFormatException e ) {
128
+ // use default value
129
+ }
130
+ }
131
+ if (ruleObject .containsKey ("attrNameSnakeCase" )) {
132
+ rule .attrNameSnakeCase = (Boolean ) ruleObject .get ("attrNameSnakeCase" );
133
+ }
134
+ if (ruleObject .containsKey ("type" )) {
135
+ rule .type = Type .valueOf ((String ) ruleObject .get ("type" ));
136
+ }
137
+ if (ruleObject .containsKey ("help" )) {
138
+ rule .help = (String ) ruleObject .get ("help" );
139
+ }
140
+ if (ruleObject .containsKey ("labels" )) {
141
+ TreeMap labels = new TreeMap ((Map <String , Object >) ruleObject .get ("labels" ));
142
+ rule .labelNames = new ArrayList <String >();
143
+ rule .labelValues = new ArrayList <String >();
144
+ for (Map .Entry <String , Object > entry : (Set <Map .Entry <String , Object >>) labels
145
+ .entrySet ()) {
146
+ rule .labelNames .add (entry .getKey ());
147
+ rule .labelValues .add ((String ) entry .getValue ());
148
+ }
149
+ }
150
+
151
+ // Validation.
152
+ if ((rule .labelNames != null || rule .help != null ) && rule .name == null ) {
153
+ throw new IllegalArgumentException ("Must provide name, if help or labels are given: "
154
+ + ruleObject );
155
+ }
156
+ if (rule .name != null && rule .pattern == null ) {
157
+ throw new IllegalArgumentException ("Must provide pattern, if name is given: "
158
+ + ruleObject );
159
+ }
160
+ }
161
+ } else {
162
+ // Default to a single default rule.
163
+ rules .add (new Rule ());
164
+ }
76
165
}
77
166
78
167
@ Override
@@ -82,6 +171,9 @@ byte[] generateResponse() throws IOException {
82
171
final StringBuilder sb = new StringBuilder ();
83
172
84
173
metrics .forEach ((String source , Map <String , Double > sourceMetrics ) -> {
174
+ // Map the labels.
175
+ final Map <String , String > labelKV = new TreeMap <String , String >();
176
+
85
177
String [] sources = source .split ("/" );
86
178
String topology = sources [0 ];
87
179
String component = sources [1 ];
@@ -96,6 +188,18 @@ byte[] generateResponse() throws IOException {
96
188
final String clusterRoleEnv = hasClusterRoleEnvironment (c , r , e )
97
189
? String .format ("%s/%s/%s" , c , r , e ) : null ;
98
190
191
+ labelKV .put ("topology" , topology );
192
+ labelKV .put ("component" , component );
193
+ labelKV .put ("instance_id" , instance );
194
+
195
+ if (clusterRoleEnv != null ) {
196
+ labelKV .put ("cluster_role_env" , clusterRoleEnv );
197
+ }
198
+
199
+ if (componentType != null ) {
200
+ labelKV .put ("component_type" , componentType );
201
+ }
202
+
99
203
sourceMetrics .forEach ((String metric , Double value ) -> {
100
204
101
205
// some stream manager metrics in heron contain a instance id as part of the metric name
@@ -104,46 +208,79 @@ byte[] generateResponse() throws IOException {
104
208
// __time_spent_back_pressure_by_compid/container_1_exclaim1_1
105
209
// TODO convert to small classes for less string manipulation
106
210
final String metricName ;
107
- final String metricInstanceId ;
108
211
if (componentIsStreamManger ) {
109
212
final boolean metricHasInstanceId = metric .contains ("_by_" );
110
213
final String [] metricParts = metric .split ("/" );
111
214
if (metricHasInstanceId && metricParts .length == 3 ) {
112
- metricName = String . format ( "%s_%s" , metricParts [0 ], metricParts [2 ]);
113
- metricInstanceId = metricParts [1 ];
215
+ metricName = splitTargetInstance ( metricParts [0 ], metricParts [2 ], labelKV );
216
+ labelKV . put ( "metric_instance_id" , metricParts [1 ]) ;
114
217
} else if (metricHasInstanceId && metricParts .length == 2 ) {
115
- metricName = metricParts [0 ];
116
- metricInstanceId = metricParts [1 ];
218
+ metricName = splitTargetInstance (metricParts [0 ], null , labelKV );
219
+ labelKV .put ("metric_instance_id" , metricParts [1 ]);
220
+ } else if (metricParts .length == 2 ) {
221
+ metricName = splitTargetInstance (metricParts [0 ], metricParts [1 ], labelKV );
117
222
} else {
118
- metricName = metric ;
119
- metricInstanceId = null ;
223
+ metricName = splitTargetInstance (metric , null , labelKV );
120
224
}
121
-
122
225
} else {
123
- metricName = metric ;
124
- metricInstanceId = null ;
125
- }
126
-
127
- String exportedMetricName = String .format ("%s_%s" , HERON_PREFIX ,
128
- metricName .replace ("__" , "" ).toLowerCase ());
129
- sb .append (Prometheus .sanitizeMetricName (exportedMetricName ))
130
- .append ("{" )
131
- .append ("topology=\" " ).append (topology ).append ("\" ," )
132
- .append ("component=\" " ).append (component ).append ("\" ," )
133
- .append ("instance_id=\" " ).append (instance ).append ("\" " );
134
-
135
- if (clusterRoleEnv != null ) {
136
- sb .append (",cluster_role_env=\" " ).append (clusterRoleEnv ).append ("\" " );
137
- }
138
-
139
- if (componentType != null ) {
140
- sb .append (",component_type=\" " ).append (componentType ).append ("\" " );
141
- }
142
-
143
- if (metricInstanceId != null ) {
144
- sb .append (",metric_instance_id=\" " ).append (metricInstanceId ).append ("\" " );
226
+ final AtomicReference <String > name = new AtomicReference <>(sanitizeMetricName (metric ));
227
+ rules .forEach (rule -> {
228
+ String ruleName = name .get ();
229
+ Matcher matcher = null ;
230
+ if (rule .pattern != null ) {
231
+ matcher = rule .pattern .matcher (metric );
232
+ if (!matcher .matches ()) {
233
+ return ;
234
+ }
235
+ }
236
+
237
+ // If there's no name provided, use default export format.
238
+ if (rule .name == null || rule .name .isEmpty ()) {
239
+ // nothing
240
+ } else {
241
+ // Matcher is set below here due to validation in the constructor.
242
+ ruleName = sanitizeMetricName (matcher .replaceAll (rule .name ));
243
+ if (ruleName .isEmpty ()) {
244
+ return ;
245
+ }
246
+ }
247
+ if (rule .attrNameSnakeCase ) {
248
+ name .set (toSnakeAndLowerCase (ruleName ));
249
+ } else {
250
+ name .set (ruleName .toLowerCase ());
251
+ }
252
+ if (rule .labelNames != null ) {
253
+ for (int i = 0 ; i < rule .labelNames .size (); i ++) {
254
+ final String unsafeLabelName = rule .labelNames .get (i );
255
+ final String labelValReplacement = rule .labelValues .get (i );
256
+ String labelName = sanitizeMetricName (matcher .replaceAll (unsafeLabelName ));
257
+ String labelValue = matcher .replaceAll (labelValReplacement );
258
+ labelName = labelName .toLowerCase ();
259
+ if (!labelName .isEmpty () && !labelValue .isEmpty ()) {
260
+ labelKV .put (labelName , labelValue );
261
+ }
262
+ }
263
+ }
264
+ });
265
+ metricName = name .get ();
145
266
}
146
267
268
+ // TODO Type, Help
269
+ String exportedMetricName = format ("%s_%s" , HERON_PREFIX ,
270
+ metricName
271
+ .replace ("__" , "" )
272
+ .toLowerCase ());
273
+ sb .append (sanitizeMetricName (exportedMetricName ))
274
+ .append ("{" );
275
+ final AtomicBoolean isFirst = new AtomicBoolean (true );
276
+ labelKV .forEach ((k , v ) -> {
277
+ // Add labels
278
+ if (!isFirst .get ()) {
279
+ sb .append (',' );
280
+ }
281
+ sb .append (format ("%s=\" %s\" " , k , v ));
282
+ isFirst .set (false );
283
+ });
147
284
sb .append ("} " )
148
285
.append (Prometheus .doubleToGoString (value ))
149
286
.append (" " ).append (currentTimeMillis ())
@@ -154,6 +291,45 @@ byte[] generateResponse() throws IOException {
154
291
return sb .toString ().getBytes ();
155
292
}
156
293
294
+ private static final Pattern SPLIT_TARGET = Pattern .compile ("__(?<name>\\ w+)"
295
+ + "_(?<target>(?<instance>\\ w+)-\\ d+)" );
296
+ private static final Pattern DIGIT = Pattern .compile ("[0-9]+" );
297
+
298
+ private String splitTargetInstance (String part1 , String part2 , Map <String , String > labelKV ) {
299
+ if (part2 != null ) {
300
+ if (DIGIT .matcher (part2 ).matches ()) {
301
+ labelKV .put ("metric_instance_id" , part2 );
302
+ return part1 ;
303
+ }
304
+ final Matcher m = SPLIT_TARGET .matcher (part1 );
305
+ if (m .matches ()) {
306
+ labelKV .put ("metric_instance_id" , m .group ("target" ));
307
+ return String .format ("%s_%s_%s" , m .group ("name" ), m .group ("instance" ), part2 );
308
+ }
309
+ return String .format ("%s_%s" , part1 , part2 );
310
+ }
311
+ return part1 ;
312
+ }
313
+
314
+ static String toSnakeAndLowerCase (String attrName ) {
315
+ if (attrName == null || attrName .isEmpty ()) {
316
+ return attrName ;
317
+ }
318
+ char firstChar = attrName .subSequence (0 , 1 ).charAt (0 );
319
+ boolean prevCharIsUpperCaseOrUnderscore = Character .isUpperCase (firstChar ) || firstChar == '_' ;
320
+ StringBuilder resultBuilder = new StringBuilder (attrName .length ())
321
+ .append (Character .toLowerCase (firstChar ));
322
+ for (char attrChar : attrName .substring (1 ).toCharArray ()) {
323
+ boolean charIsUpperCase = Character .isUpperCase (attrChar );
324
+ if (!prevCharIsUpperCaseOrUnderscore && charIsUpperCase ) {
325
+ resultBuilder .append ("_" );
326
+ }
327
+ resultBuilder .append (Character .toLowerCase (attrChar ));
328
+ prevCharIsUpperCaseOrUnderscore = charIsUpperCase || attrChar == '_' ;
329
+ }
330
+ return resultBuilder .toString ();
331
+ }
332
+
157
333
@ Override
158
334
public void processRecord (MetricsRecord record ) {
159
335
final String [] sources = MetricsUtil .splitRecordSource (record );
0 commit comments