Skip to content

Add an option to disable automatic kafka interceptor configuration in spring starter #12833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,55 @@

package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import java.lang.reflect.Field;
import java.util.function.Supplier;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;

class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor {

private final ObjectProvider<OpenTelemetry> openTelemetryProvider;
private final ObjectProvider<ConfigProperties> configPropertiesProvider;
private final Supplier<SpringKafkaTelemetry> springKafkaTelemetry;

ConcurrentKafkaListenerContainerFactoryPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
this.openTelemetryProvider = openTelemetryProvider;
this.configPropertiesProvider = configPropertiesProvider;
Supplier<SpringKafkaTelemetry> springKafkaTelemetry) {
this.springKafkaTelemetry = springKafkaTelemetry;
}

@SuppressWarnings("unchecked")
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) {
return bean;
}

ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<?, ?>) bean;
SpringKafkaTelemetry springKafkaTelemetry =
SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
.setCaptureExperimentalSpanAttributes(
configPropertiesProvider
.getObject()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.build();
listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<Object, Object>) bean;
SpringKafkaTelemetry springKafkaTelemetry = this.springKafkaTelemetry.get();

// use reflection to read existing values to avoid overwriting user configured interceptors
BatchInterceptor<Object, Object> batchInterceptor =
readField(listenerContainerFactory, "batchInterceptor", BatchInterceptor.class);
RecordInterceptor<Object, Object> recordInterceptor =
readField(listenerContainerFactory, "recordInterceptor", RecordInterceptor.class);
listenerContainerFactory.setBatchInterceptor(
springKafkaTelemetry.createBatchInterceptor(batchInterceptor));
listenerContainerFactory.setRecordInterceptor(
springKafkaTelemetry.createRecordInterceptor(recordInterceptor));

return listenerContainerFactory;
}

private static <T> T readField(Object container, String filedName, Class<T> fieldType) {
try {
Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField(filedName);
field.setAccessible(true);
return fieldType.cast(field.get(container));
} catch (Exception exception) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.ConditionalOnEnabledInstrumentation;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -33,13 +35,29 @@ DefaultKafkaProducerFactoryCustomizer otelKafkaProducerFactoryCustomizer(
return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap);
}

@Bean
static SpringKafkaTelemetry getTelemetry(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
return SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
.setCaptureExperimentalSpanAttributes(
configPropertiesProvider
.getObject()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.build();
}

// static to avoid "is not eligible for getting processed by all BeanPostProcessors" warning
@Bean
@ConditionalOnProperty(
name = "otel.instrumentation.kafka.autoconfigure-interceptor",
havingValue = "true",
matchIfMissing = true)
static ConcurrentKafkaListenerContainerFactoryPostProcessor
otelKafkaListenerContainerFactoryBeanPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
return new ConcurrentKafkaListenerContainerFactoryPostProcessor(
openTelemetryProvider, configPropertiesProvider);
() -> getTelemetry(openTelemetryProvider, configPropertiesProvider));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@
"description": "Enable the capture of experimental Kafka span attributes.",
"defaultValue": false
},
{
"name": "otel.instrumentation.kafka.autoconfigure-interceptor",
"type": "java.lang.Boolean",
"description": "Enable automatic configuration of tracing interceptors on <code>ConcurrentKafkaListenerContainerFactory</code> using a <code>BeanPostProcessor</code>. You may disable this if you wish to manually configure these interceptors.",
"defaultValue": true
},
{
"name": "otel.instrumentation.mongo.enabled",
"type": "java.lang.Boolean",
Expand Down
Loading