Skip to content

Commit 61ca213

Browse files
authored
Configure kafka metrics reporter as class (#10855)
1 parent dcf859a commit 61ca213

File tree

4 files changed

+54
-8
lines changed

4 files changed

+54
-8
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsUtil.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.instrumentation.kafka.internal.MetricsReporterList;
910
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
1011
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
1112
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
1213
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
13-
import java.util.ArrayList;
1414
import java.util.List;
1515
import java.util.Map;
1616
import org.apache.kafka.clients.CommonClientConfigs;
@@ -34,21 +34,24 @@ public static void enhanceConfig(Map<? super String, Object> config) {
3434
}
3535
config.merge(
3636
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
37-
OpenTelemetryMetricsReporter.class.getName(),
37+
MetricsReporterList.singletonList(OpenTelemetryMetricsReporter.class),
3838
(class1, class2) -> {
3939
// class1 is either a class name or List of class names or classes
4040
if (class1 instanceof List) {
41-
List<Object> result = new ArrayList<>();
41+
List<Object> result = new MetricsReporterList<>();
4242
result.addAll((List<Object>) class1);
43-
result.add(class2);
43+
result.addAll((List<Object>) class2);
4444
return result;
4545
} else if (class1 instanceof String) {
4646
String className1 = (String) class1;
4747
if (className1.isEmpty()) {
4848
return class2;
4949
}
5050
}
51-
return class1 + "," + class2;
51+
List<Object> result = new MetricsReporterList<>();
52+
result.add(class1);
53+
result.addAll((List<Object>) class2);
54+
return result;
5255
});
5356
config.put(
5457
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ protected Map<String, Object> producerConfig() {
117117
producerConfig.merge(
118118
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
119119
TestMetricsReporter.class.getName(),
120-
(o, o2) -> o + "," + o2);
120+
AbstractOpenTelemetryMetricsReporterTest::mergeValue);
121121
return producerConfig;
122122
}
123123

@@ -134,10 +134,18 @@ protected Map<String, Object> consumerConfig() {
134134
consumerConfig.merge(
135135
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
136136
TestMetricsReporter.class.getName(),
137-
(o, o2) -> o + "," + o2);
137+
AbstractOpenTelemetryMetricsReporterTest::mergeValue);
138138
return consumerConfig;
139139
}
140140

141+
@SuppressWarnings("unchecked")
142+
private static Object mergeValue(Object o1, Object o2) {
143+
List<Object> result = new MetricsReporterList<>();
144+
result.addAll((List<Object>) o1);
145+
result.add(o2);
146+
return result;
147+
}
148+
141149
@Test
142150
void noDuplicateMetricsReporter() {
143151
List<MetricsReporter> producerMetricsReporters = getMetricsReporters(producer);

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
2323
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
2424
import io.opentelemetry.instrumentation.kafka.internal.KafkaUtil;
25+
import io.opentelemetry.instrumentation.kafka.internal.MetricsReporterList;
2526
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
2627
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
2728
import io.opentelemetry.instrumentation.kafka.internal.TracingList;
@@ -196,7 +197,7 @@ <K, V> ConsumerRecords<K, V> addTracing(
196197
Map<String, Object> config = new HashMap<>();
197198
config.put(
198199
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
199-
OpenTelemetryMetricsReporter.class.getName());
200+
MetricsReporterList.singletonList(OpenTelemetryMetricsReporter.class));
200201
config.put(
201202
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
202203
new OpenTelemetrySupplier(openTelemetry));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafka.internal;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
11+
/**
12+
* List implementation that can be used to hold metrics reporters in kafka configuration without
13+
* breaking serialization. When this list is serialized it removes OpenTelemetryMetricsReporter to
14+
* ensure that the configuration can be deserialized even when the instrumentation is not present.
15+
*
16+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
17+
* at any time.
18+
*/
19+
public class MetricsReporterList<T> extends ArrayList<T> {
20+
private static final long serialVersionUID = 1L;
21+
22+
public static <T> List<T> singletonList(T o) {
23+
List<T> list = new MetricsReporterList<>();
24+
list.add(o);
25+
return list;
26+
}
27+
28+
private Object writeReplace() {
29+
// serialize to plain ArrayList that does not contain OpenTelemetryMetricsReporter
30+
List<Object> result = new ArrayList<>();
31+
this.stream().filter(x -> x != OpenTelemetryMetricsReporter.class).forEach(result::add);
32+
return result;
33+
}
34+
}

0 commit comments

Comments
 (0)