Skip to content

Add auto-configuration for Kafka Retry Topics #29812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
Expand All @@ -33,13 +34,18 @@
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.retry.backoff.BackOffPolicyBuilder;
import org.springframework.retry.backoff.SleepingBackOffPolicy;

/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
Expand All @@ -48,6 +54,7 @@
* @author Stephane Nicoll
* @author Eddú Meléndez
* @author Nakul Mishra
* @author Tomaz Fernandes
* @since 1.5.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -137,4 +144,23 @@ public KafkaAdmin kafkaAdmin() {
return kafkaAdmin;
}

@Bean
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
.doNotAutoCreateRetryTopics();
setBackOffPolicy(builder, retryTopic);
return builder.create(kafkaOperations);
}

private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
PropertyMapper.get().from(retryTopic.getDelayMillis()).whenEqualTo(0L).toCall(builder::noBackoff);
PropertyMapper.get().from(retryTopic.getDelayMillis()).when((delay) -> delay > 0)
.toCall(() -> builder.customBackoff((SleepingBackOffPolicy<?>) BackOffPolicyBuilder.newBuilder()
.delay(retryTopic.getDelayMillis()).maxDelay(retryTopic.getMaxDelayMillis())
.multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.kafka;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand All @@ -41,6 +42,7 @@
import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.unit.DataSize;

Expand All @@ -54,6 +56,7 @@
* @author Stephane Nicoll
* @author Artem Bilan
* @author Nakul Mishra
* @author Tomaz Fernandes
* @since 1.5.0
*/
@ConfigurationProperties(prefix = "spring.kafka")
Expand Down Expand Up @@ -94,6 +97,8 @@ public class KafkaProperties {

private final Security security = new Security();

private final Retry retry = new Retry();

public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
Expand Down Expand Up @@ -150,6 +155,10 @@ public Security getSecurity() {
return this.security;
}

public Retry getRetry() {
return this.retry;
}

private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
Expand Down Expand Up @@ -1332,6 +1341,140 @@ public void setOptions(Map<String, String> options) {

}

public static class Retry {

private Topic topic = new Topic();

public Topic getTopic() {
return this.topic.validate();
}

public void setTopic(Topic topic) {
this.topic = topic;
}

/**
* Properties for non-blocking, topic-based retries.
*/
public static class Topic {

private static final String RETRY_TOPIC_PROPERTIES_PREFIX = "spring.kafka.retry.topic.";

private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property " + RETRY_TOPIC_PROPERTIES_PREFIX
+ "%s should be greater than or equal to %s. Provided value was %s.";

/**
* Whether to enable topic-based retries auto-configuration.
*/
private Boolean enabled;

/**
* The total number of processing attempts made before sending the message to
* the DLT.
*/
private Integer attempts = 3;

/**
* A canonical backoff period. Used as an initial value in the exponential
* case, and as a minimum value in the uniform case.
*/
private Duration delay = Duration.ofSeconds(1);

/**
* If positive, then used as a multiplier for generating the next delay for
* backoff.
*/
private Double multiplier = 0.0;

/**
* The maximum wait between retries. If less than the delay then the default
* of 30 seconds is applied.
*/
private Duration maxDelay = Duration.ZERO;

/**
* In the exponential case, set this to true to have the backoff delays
* randomized.
*/
private Boolean randomBackOff = false;

public Boolean getEnabled() {
return this.enabled;
}

public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}

public Integer getAttempts() {
return this.attempts;
}

public void setAttempts(Integer attempts) {
this.attempts = attempts;
}

public Duration getDelay() {
return this.delay;
}

public Long getDelayMillis() {
return (this.delay != null) ? this.delay.toMillis() : null;
}

public void setDelay(Duration delay) {
this.delay = delay;
}

public Double getMultiplier() {
return this.multiplier;
}

public void setMultiplier(Double multiplier) {
this.multiplier = multiplier;
}

public Duration getMaxDelay() {
return this.maxDelay;
}

public void setMaxDelay(Duration maxDelay) {
this.maxDelay = maxDelay;
}

public Long getMaxDelayMillis() {
return (this.maxDelay != null) ? this.maxDelay.toMillis() : null;
}

public Boolean isRandomBackOff() {
return this.randomBackOff;
}

public void setRandomBackOff(Boolean randomBackOff) {
this.randomBackOff = randomBackOff;
}

private Topic validate() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't mind, but I don't know Spring Boot development process in details, so I'll defer a decision for a proper validation and its place to respective Spring Boot team members.
Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks. Just for reference, it seems we already have some validation in this buildProperties method of the KafkaProperties class, so I thought it seemed ok there. Of course, I'm ok with validating any other way or place.

public Map<String, Object> buildProperties() {
validate();
Properties properties = new Properties();

Thanks for looking into this.

validateProperty("attempts", this.attempts, 1);
validateProperty("delay", this.getDelayMillis(), 0);
validateProperty("multiplier", this.multiplier, 0);
validateProperty("maxDelay", this.getMaxDelayMillis(), 0);
Assert.isTrue(this.multiplier != 0 || !this.isRandomBackOff(),
"Property " + RETRY_TOPIC_PROPERTIES_PREFIX
+ "randomBackOff should not be true with non-exponential back offs.");
return this;
}

private static void validateProperty(String propertyName, Number providedValue, int minValue) {
Assert.notNull(providedValue, () -> RETRY_TOPIC_PROPERTIES_PREFIX + propertyName + " cannot be null.");
Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0,
() -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue));
}

}

}

public static class Security {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.boot.autoconfigure.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand All @@ -41,6 +43,8 @@
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
Expand All @@ -53,12 +57,14 @@
*
* @author Gary Russell
* @author Stephane Nicoll
* @author Tomaz Fernandes
*/
@DisabledOnOs(OS.WINDOWS)
@EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC)
class KafkaAutoConfigurationIntegrationTests {

static final String TEST_TOPIC = "testTopic";
static final String TEST_RETRY_TOPIC = "testRetryTopic";

private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";

Expand Down Expand Up @@ -89,6 +95,27 @@ void testEndToEnd() throws Exception {
producer.close();
}

@SuppressWarnings("unchecked")
@Test
void testEndToEndWithRetryTopics() throws Exception {
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(),
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms",
"spring.kafka.consumer.auto-offset-reset=earliest");
RetryTopicConfiguration configuration = this.context.getBean(RetryTopicConfiguration.class);
assertThat(configuration.getDestinationTopicProperties()).extracting(DestinationTopic.Properties::delay)
.containsExactly(0L, 100L, 200L, 300L, 300L, 0L);
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
template.send(TEST_RETRY_TOPIC, "foo", "bar");
RetryListener listener = this.context.getBean(RetryListener.class);
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(listener).extracting(RetryListener::getKey, RetryListener::getReceived).containsExactly("foo",
"bar");
assertThat(listener).extracting(RetryListener::getTopics).asList().hasSize(5).containsSequence("testRetryTopic",
"testRetryTopic-retry-0", "testRetryTopic-retry-1", "testRetryTopic-retry-2", "testRetryTopic-retry-3");
}

@Test
void testStreams() {
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
Expand Down Expand Up @@ -121,6 +148,11 @@ Listener listener() {
return new Listener();
}

@Bean
RetryListener retryListener() {
return new RetryListener();
}

@Bean
NewTopic adminCreated() {
return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build();
Expand Down Expand Up @@ -157,4 +189,38 @@ void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {

}

static class RetryListener {

private final CountDownLatch latch = new CountDownLatch(5);

private final List<String> topics = new ArrayList<>();

private volatile String received;

private volatile String key;

@KafkaListener(topics = TEST_RETRY_TOPIC)
void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
this.received = foo;
this.key = key;
this.topics.add(topic);
this.latch.countDown();
throw new RuntimeException("Test exception");
}

private List<String> getTopics() {
return this.topics;
}

private String getReceived() {
return this.received;
}

private String getKey() {
return this.key;
}

}

}
Loading