Skip to content

Commit 31e5eea

Browse files
authored
Closing a kafka producer/consumer should not disable metrics from other consumers/producers (#11975)
1 parent f7f7d39 commit 31e5eea

File tree

2 files changed

+46
-9
lines changed

2 files changed

+46
-9
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Optional;
3333
import java.util.Random;
3434
import java.util.Set;
35+
import java.util.concurrent.CopyOnWriteArrayList;
3536
import org.apache.kafka.clients.CommonClientConfigs;
3637
import org.apache.kafka.clients.consumer.ConsumerConfig;
3738
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -70,6 +71,13 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest {
7071
private static KafkaProducer<byte[], byte[]> producer;
7172
private static KafkaConsumer<byte[], byte[]> consumer;
7273

74+
private static final List<OpenTelemetryMetricsReporter> metricsReporters =
75+
new CopyOnWriteArrayList<>();
76+
77+
static {
78+
OpenTelemetryMetricsReporter.setListener(metricsReporters::add);
79+
}
80+
7381
@BeforeEach
7482
void beforeAll() {
7583
// only start the kafka container the first time this runs
@@ -90,14 +98,16 @@ void beforeAll() {
9098

9199
@AfterAll
92100
static void afterAll() {
93-
kafka.stop();
94101
producer.close();
95102
consumer.close();
103+
kafka.stop();
96104
}
97105

98106
@AfterEach
99107
void tearDown() {
100-
OpenTelemetryMetricsReporter.resetForTest();
108+
for (OpenTelemetryMetricsReporter metricsReporter : metricsReporters) {
109+
metricsReporter.resetForTest();
110+
}
101111
}
102112

103113
protected abstract InstrumentationExtension testing();
@@ -186,6 +196,14 @@ private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> met
186196

187197
@Test
188198
void observeMetrics() {
199+
// Firstly create new producer and consumer and close them. This is done tp verify that metrics
200+
// are still produced after closing one producer/consumer. See
201+
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11880
202+
KafkaProducer<byte[], byte[]> producer2 = new KafkaProducer<>(producerConfig());
203+
KafkaConsumer<byte[], byte[]> consumer2 = new KafkaConsumer<>(consumerConfig());
204+
producer2.close();
205+
consumer2.close();
206+
189207
produceRecords();
190208
consumeRecords();
191209

@@ -405,7 +423,9 @@ private static void printMappingTable() {
405423
Map<String, List<KafkaMetricId>> kafkaMetricsByGroup =
406424
TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup));
407425
List<RegisteredObservable> registeredObservables =
408-
OpenTelemetryMetricsReporter.getRegisteredObservables();
426+
metricsReporters.stream()
427+
.flatMap(metricsReporter -> metricsReporter.getRegisteredObservables().stream())
428+
.collect(toList());
409429
// Iterate through groups in alpha order
410430
for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) {
411431
List<KafkaMetricId> kafkaMetricIds =

instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,35 @@ public final class OpenTelemetryMetricsReporter implements MetricsReporter {
4141

4242
private static final Logger logger =
4343
Logger.getLogger(OpenTelemetryMetricsReporter.class.getName());
44-
private volatile Meter meter;
44+
private static volatile Listener listener;
4545

46-
private static final Object lock = new Object();
46+
private volatile Meter meter;
47+
private final Object lock = new Object();
4748

4849
@GuardedBy("lock")
49-
private static final List<RegisteredObservable> registeredObservables = new ArrayList<>();
50+
private final List<RegisteredObservable> registeredObservables = new ArrayList<>();
5051

5152
/**
5253
* Reset for test by resetting the {@link #meter} to {@code null} and closing all registered
5354
* instruments.
5455
*/
55-
static void resetForTest() {
56+
void resetForTest() {
5657
closeAllInstruments();
5758
}
5859

5960
// Visible for test
60-
static List<RegisteredObservable> getRegisteredObservables() {
61+
List<RegisteredObservable> getRegisteredObservables() {
6162
synchronized (lock) {
6263
return new ArrayList<>(registeredObservables);
6364
}
6465
}
6566

67+
public OpenTelemetryMetricsReporter() {
68+
if (listener != null) {
69+
listener.metricsReporterCreated(this);
70+
}
71+
}
72+
6673
@Override
6774
public void init(List<KafkaMetric> metrics) {
6875
metrics.forEach(this::metricChange);
@@ -131,7 +138,7 @@ public void close() {
131138
closeAllInstruments();
132139
}
133140

134-
private static void closeAllInstruments() {
141+
private void closeAllInstruments() {
135142
synchronized (lock) {
136143
for (Iterator<RegisteredObservable> it = registeredObservables.iterator(); it.hasNext(); ) {
137144
closeInstrument(it.next().getObservable());
@@ -177,4 +184,14 @@ private static <T> T getProperty(Map<String, ?> configs, String key, Class<T> re
177184
}
178185
return (T) value;
179186
}
187+
188+
// Visible for test
189+
static void setListener(Listener listener) {
190+
OpenTelemetryMetricsReporter.listener = listener;
191+
}
192+
193+
// used for testing
194+
interface Listener {
195+
void metricsReporterCreated(OpenTelemetryMetricsReporter metricsReporter);
196+
}
180197
}

0 commit comments

Comments
 (0)