Skip to content

Commit 8bb279f

Browse files
authored
Fix spring kafka interceptor wrappers not delegating some methods (#10935)
1 parent 080f857 commit 8bb279f

File tree

3 files changed

+43
-1
lines changed

3 files changed

+43
-1
lines changed

instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ dependencies {
1010

1111
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
1212

13-
compileOnly("org.springframework.kafka:spring-kafka:2.7.0")
13+
// compiling against 2.8.0 to use methods that are not present in 2.7
14+
compileOnly("org.springframework.kafka:spring-kafka:2.8.0")
1415

1516
testImplementation(project(":instrumentation:spring:spring-kafka-2.7:testing"))
1617
testImplementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library"))

instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java

+17
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
1313
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
1414
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
15+
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
1516
import java.lang.ref.WeakReference;
1617
import javax.annotation.Nullable;
1718
import org.apache.kafka.clients.consumer.Consumer;
@@ -94,4 +95,20 @@ private void end(ConsumerRecords<K, V> records, @Nullable Throwable error) {
9495
lastProcessed.set(new WeakReference<>(records));
9596
}
9697
}
98+
99+
@NoMuzzle // method was added in 2.8.0
100+
@Override
101+
public void setupThreadState(Consumer<?, ?> consumer) {
102+
if (decorated != null) {
103+
decorated.setupThreadState(consumer);
104+
}
105+
}
106+
107+
@NoMuzzle // method was added in 2.8.0
108+
@Override
109+
public void clearThreadState(Consumer<?, ?> consumer) {
110+
if (decorated != null) {
111+
decorated.clearThreadState(consumer);
112+
}
113+
}
97114
}

instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java

+24
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,28 @@ private void end(ConsumerRecord<K, V> record, @Nullable Throwable error) {
9292
processInstrumenter.end(state.context(), request, null, error);
9393
}
9494
}
95+
96+
@NoMuzzle // method was added in 2.8.0
97+
@Override
98+
public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
99+
if (decorated != null) {
100+
decorated.afterRecord(record, consumer);
101+
}
102+
}
103+
104+
@NoMuzzle // method was added in 2.8.0
105+
@Override
106+
public void setupThreadState(Consumer<?, ?> consumer) {
107+
if (decorated != null) {
108+
decorated.setupThreadState(consumer);
109+
}
110+
}
111+
112+
@NoMuzzle // method was added in 2.8.0
113+
@Override
114+
public void clearThreadState(Consumer<?, ?> consumer) {
115+
if (decorated != null) {
116+
decorated.clearThreadState(consumer);
117+
}
118+
}
95119
}

0 commit comments

Comments
 (0)