Skip to content

Commit 4ad3e26

Browse files
committed
Spring-kafka-2 instr & batch consumer feature flag
1 parent 435e391 commit 4ad3e26

File tree

8 files changed

+221
-2
lines changed

8 files changed

+221
-2
lines changed

instrumentation/kafka-clients-spans-consumer-0.11.0.0/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dependencies {
55
}
66

77
jar {
8-
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-spans-consumer-0.11.0.0', 'Enabled': 'false',
8+
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-spans-consumer-0.11.0.0',
99
'Implementation-Title-Alias': 'kafka-clients-spans-consumer' }
1010
}
1111

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.nr.instrumentation.kafka;
2+
3+
import com.newrelic.api.agent.NewRelic;
4+
5+
public class Utils {
6+
public static final boolean DT_CONSUMER_ENABLED = NewRelic.getAgent().getConfig()
7+
.getValue("kafka.spans.distributed_trace.consumer_poll.enabled", false);
8+
}

instrumentation/kafka-clients-spans-consumer-0.11.0.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.newrelic.api.agent.weaver.Weave;
1515
import com.newrelic.api.agent.weaver.Weaver;
1616
import com.nr.instrumentation.kafka.HeadersWrapper;
17+
import com.nr.instrumentation.kafka.Utils;
1718

1819
import java.time.Duration;
1920

@@ -33,7 +34,7 @@ public ConsumerRecords<K, V> poll(final long timeoutMs) {
3334
}
3435

3536
private void nrAcceptDtHeaders(ConsumerRecords<K, V> records) {
36-
if (AgentBridge.getAgent().getTransaction(false) != null) {
37+
if (Utils.DT_CONSUMER_ENABLED && AgentBridge.getAgent().getTransaction(false) != null) {
3738
for (ConsumerRecord<?, ?> record : records) {
3839
Headers dtHeaders = new HeadersWrapper(record.headers());
3940
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, dtHeaders);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
dependencies {
2+
implementation(project(":agent-bridge"))
3+
implementation("org.springframework.kafka:spring-kafka:2.9.13")
4+
5+
}
6+
7+
jar {
8+
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.spring-kafka-2.0.0' }
9+
}
10+
verifyInstrumentation {
11+
passesOnly 'org.springframework.kafka:spring-kafka:[2.0.0.RELEASE,)'
12+
excludeRegex 'org.springframework.kafka:spring-kafka:.*M[0-9]*$'
13+
}
14+
15+
site {
16+
title 'Spring Kafka'
17+
type 'Messaging'
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package com.nr.instrumentation.spring.kafka;
9+
10+
import com.newrelic.api.agent.HeaderType;
11+
import com.newrelic.api.agent.Headers;
12+
import org.apache.kafka.common.header.Header;
13+
14+
import java.util.ArrayList;
15+
import java.util.Collection;
16+
import java.util.HashSet;
17+
import java.util.Iterator;
18+
import java.util.Objects;
19+
20+
public class HeadersWrapper implements Headers {
21+
22+
private final org.apache.kafka.common.header.Headers delegate;
23+
24+
public HeadersWrapper(org.apache.kafka.common.header.Headers headers) {
25+
this.delegate = headers;
26+
}
27+
28+
@Override
29+
public HeaderType getHeaderType() {
30+
return HeaderType.MESSAGE;
31+
}
32+
33+
@Override
34+
public String getHeader(String name) {
35+
String value = null;
36+
Iterator<Header> iterator = delegate.headers(name).iterator();
37+
if (iterator.hasNext()) {
38+
byte[] bytes = iterator.next().value();
39+
if (bytes != null) {
40+
value = new String(bytes);
41+
}
42+
}
43+
return value;
44+
}
45+
46+
@Override
47+
public Collection<String> getHeaders(String name) {
48+
Collection<String> headers = new ArrayList<>();
49+
Iterator<Header> iterator = delegate.headers(name).iterator();
50+
while (iterator.hasNext()) {
51+
byte[] bytes = iterator.next().value();
52+
if (bytes != null) {
53+
headers.add(new String(bytes));
54+
}
55+
}
56+
return headers;
57+
}
58+
59+
@Override
60+
public void setHeader(String name, String value) {
61+
delegate.remove(name);
62+
delegate.add(name, value.getBytes());
63+
}
64+
65+
@Override
66+
public void addHeader(String name, String value) {
67+
delegate.add(name, value.getBytes());
68+
}
69+
70+
@Override
71+
public Collection<String> getHeaderNames() {
72+
Collection<String> headerNames = new HashSet<>();
73+
for(Header header : delegate) {
74+
headerNames.add(header.key());
75+
}
76+
return headerNames;
77+
}
78+
79+
@Override
80+
public boolean containsHeader(String name) {
81+
for(Header header : delegate) {
82+
if (Objects.equals(name,header.key())) {
83+
return true;
84+
}
85+
}
86+
return false;
87+
}
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.nr.instrumentation.spring.kafka;
2+
3+
import com.newrelic.agent.bridge.AgentBridge;
4+
import com.newrelic.api.agent.DestinationType;
5+
import com.newrelic.api.agent.InboundHeaders;
6+
import com.newrelic.api.agent.MessageConsumeParameters;
7+
import com.newrelic.api.agent.NewRelic;
8+
import com.newrelic.api.agent.TransactionNamePriority;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
import org.apache.kafka.clients.consumer.ConsumerRecords;
11+
12+
import java.util.Map;
13+
14+
public class SpringKafkaUtil {
15+
16+
public static Map<Object, Boolean> recordsCache = AgentBridge.collectionFactory.createConcurrentWeakKeyedMap();
17+
18+
public static String CATEGORY = "Message";
19+
public static String LIBRARY = "Kafka";
20+
21+
private static final boolean DT_CONSUMER_ENABLED = NewRelic.getAgent().getConfig()
22+
.getValue("kafka.spans.distributed_trace.consumer_poll.enabled", false);
23+
24+
public static <T> void processConsume(T record) {
25+
if (recordsCache.containsKey(record)) {
26+
return;
27+
}
28+
if (record instanceof ConsumerRecord) {
29+
processConsumeRecord((ConsumerRecord<?, ?>) record);
30+
} else if (record instanceof ConsumerRecords<?, ?>) {
31+
processConsumeBatch((ConsumerRecords<?, ?>) record);
32+
}
33+
recordsCache.put(record, true);
34+
}
35+
36+
private static <K,V> void processConsumeRecord(ConsumerRecord<K, V> record) {
37+
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH,
38+
false, CATEGORY, "Kafka/Listen/Topic/Named", record.topic());
39+
40+
HeadersWrapper inboundHeaders = new HeadersWrapper(record.headers());
41+
reportExternalConsume(record, inboundHeaders);
42+
}
43+
44+
private static <K,V> void processConsumeBatch(ConsumerRecords<K,V> records) {
45+
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_HIGH,
46+
false, CATEGORY, "Kafka/Listen/Batch");
47+
48+
for (ConsumerRecord<?, ?> record : records) {
49+
HeadersWrapper inboundHeaders = DT_CONSUMER_ENABLED ? new HeadersWrapper(record.headers()) : null;
50+
reportExternalConsume(record, inboundHeaders);
51+
break;
52+
}
53+
}
54+
55+
private static <K,V> void reportExternalConsume(ConsumerRecord<K,V> record, HeadersWrapper inboundHeaders) {
56+
MessageConsumeParameters params = MessageConsumeParameters.library(LIBRARY)
57+
.destinationType(DestinationType.NAMED_TOPIC)
58+
.destinationName(record.topic())
59+
.inboundHeaders(inboundHeaders)
60+
.build();
61+
NewRelic.getAgent().getTracedMethod().reportAsExternal(params);
62+
}
63+
64+
65+
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.springframework.kafka.listener;
2+
3+
import com.newrelic.api.agent.Trace;
4+
import com.newrelic.api.agent.weaver.MatchType;
5+
import com.newrelic.api.agent.weaver.Weave;
6+
import com.newrelic.api.agent.weaver.Weaver;
7+
import com.nr.instrumentation.spring.kafka.SpringKafkaUtil;
8+
import org.apache.kafka.clients.consumer.Consumer;
9+
import org.springframework.kafka.support.Acknowledgment;
10+
11+
@Weave(originalName = "org.springframework.kafka.listener.GenericMessageListener", type= MatchType.Interface)
12+
public class GenericMessageListener_Instrumentation<T> {
13+
14+
@Trace(dispatcher = true)
15+
public void onMessage(T data) {
16+
SpringKafkaUtil.processConsume(data);
17+
Weaver.callOriginal();
18+
}
19+
20+
@Trace(dispatcher = true)
21+
public void onMessage(T data, Acknowledgment acknowledgment) {
22+
SpringKafkaUtil.processConsume(data);
23+
Weaver.callOriginal();
24+
}
25+
26+
@Trace(dispatcher = true)
27+
public void onMessage(T data, Consumer<?, ?> consumer) {
28+
SpringKafkaUtil.processConsume(data);
29+
Weaver.callOriginal();
30+
}
31+
32+
@Trace(dispatcher = true)
33+
public void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
34+
SpringKafkaUtil.processConsume(data);
35+
Weaver.callOriginal();
36+
}
37+
}

settings.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ include 'instrumentation:spring-boot-actuator-3.0.0'
382382
include 'instrumentation:spring-cache-3.1.0'
383383
include 'instrumentation:spring-jms-2'
384384
include 'instrumentation:spring-jms-3'
385+
include 'instrumentation:spring-kafka-2.0.0'
385386
include 'instrumentation:spring-webclient-5.0'
386387
include 'instrumentation:spring-webclient-6.0'
387388
include 'instrumentation:spring-webflux-6.1.0'

0 commit comments

Comments
 (0)