Skip to content

Commit f80469d

Browse files
Apply suggestions from code review
1 parent 165ac33 commit f80469d

File tree

6 files changed

+154
-188
lines changed

6 files changed

+154
-188
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

+49-15
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,15 @@
3939
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
4040
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
4141
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
42-
import org.springframework.kafka.support.JavaUtils;
4342
import org.springframework.kafka.support.LoggingProducerListener;
4443
import org.springframework.kafka.support.ProducerListener;
4544
import org.springframework.kafka.support.converter.RecordMessageConverter;
4645
import org.springframework.kafka.transaction.KafkaTransactionManager;
46+
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
47+
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
48+
import org.springframework.retry.backoff.FixedBackOffPolicy;
49+
import org.springframework.retry.backoff.SleepingBackOffPolicy;
50+
import org.springframework.retry.backoff.UniformRandomBackOffPolicy;
4751

4852
/**
4953
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@@ -118,20 +122,6 @@ public ProducerListener<Object, Object> kafkaProducerListener() {
118122
return new KafkaTransactionManager<>(producerFactory);
119123
}
120124

121-
@Bean
122-
@ConditionalOnProperty(name = "spring.kafka.retry-topic.enabled")
123-
@ConditionalOnMissingBean
124-
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
125-
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance();
126-
KafkaProperties.RetryTopic retryTopic = this.properties.getRetryTopic();
127-
JavaUtils.INSTANCE.acceptIfNotNull(retryTopic.getAttempts(), builder::maxAttempts).acceptIfNotNull(
128-
retryTopic.getBackOff(),
129-
(backOff) -> builder
130-
.customBackoff(KafkaAutoConfigurationUtils.createBackOffFrom(backOff.getDelayMillis(),
131-
backOff.getMaxDelayMillis(), backOff.getMultiplier(), backOff.isRandom())));
132-
return builder.create(kafkaOperations);
133-
}
134-
135125
@Bean
136126
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
137127
@ConditionalOnMissingBean
@@ -156,4 +146,48 @@ public KafkaAdmin kafkaAdmin() {
156146
return kafkaAdmin;
157147
}
158148

149+
@Bean
150+
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
151+
@ConditionalOnMissingBean
152+
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations<Object, Object> kafkaOperations) {
153+
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance();
154+
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
155+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
156+
map.from(retryTopic.getAttempts()).to(builder::maxAttempts);
157+
map.from(createBackOffFor(retryTopic)).to(builder::customBackoff);
158+
return builder.create(kafkaOperations);
159+
}
160+
161+
private SleepingBackOffPolicy<?> createBackOffFor(KafkaProperties.Retry.Topic properties) {
162+
Long min = properties.getDelayMillis();
163+
Long max = properties.getMaxDelayMillis();
164+
Double multiplier = properties.getMultiplier();
165+
Boolean isRandom = properties.isRandomBackOff();
166+
if (min == null && max == null && multiplier == null && isRandom == null) {
167+
return null;
168+
}
169+
if (multiplier != null && multiplier > 0) {
170+
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
171+
if (isRandom != null && isRandom) {
172+
policy = new ExponentialRandomBackOffPolicy();
173+
}
174+
policy.setInitialInterval((min != null) ? min : ExponentialBackOffPolicy.DEFAULT_INITIAL_INTERVAL);
175+
policy.setMultiplier(multiplier);
176+
policy.setMaxInterval(
177+
(max != null && min != null && max > min) ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);
178+
return policy;
179+
}
180+
if (max != null && min != null && max > min) {
181+
UniformRandomBackOffPolicy policy = new UniformRandomBackOffPolicy();
182+
policy.setMinBackOffPeriod(min);
183+
policy.setMaxBackOffPeriod(max);
184+
return policy;
185+
}
186+
FixedBackOffPolicy policy = new FixedBackOffPolicy();
187+
if (min != null) {
188+
policy.setBackOffPeriod(min);
189+
}
190+
return policy;
191+
}
192+
159193
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationUtils.java

-62
This file was deleted.

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

+64-27
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class KafkaProperties {
9494

9595
private final Security security = new Security();
9696

97-
private final RetryTopic retryTopic = new RetryTopic();
97+
private final Retry retry = new Retry();
9898

9999
public List<String> getBootstrapServers() {
100100
return this.bootstrapServers;
@@ -152,8 +152,8 @@ public Security getSecurity() {
152152
return this.security;
153153
}
154154

155-
public RetryTopic getRetryTopic() {
156-
return this.retryTopic;
155+
public Retry getRetry() {
156+
return this.retry;
157157
}
158158

159159
private Map<String, Object> buildCommonProperties() {
@@ -1321,37 +1321,74 @@ public void setOptions(Map<String, String> options) {
13211321

13221322
}
13231323

1324-
public static class RetryTopic {
1324+
public static class Retry {
13251325

1326-
private Integer attempts;
1326+
private Topic topic = new Topic();
13271327

1328-
private BackOff backOff;
1329-
1330-
public Integer getAttempts() {
1331-
return this.attempts;
1328+
public Topic getTopic() {
1329+
return this.topic;
13321330
}
13331331

1334-
public void setAttempts(Integer attempts) {
1335-
this.attempts = attempts;
1332+
public void setTopic(Topic topic) {
1333+
this.topic = topic;
13361334
}
13371335

1338-
public BackOff getBackOff() {
1339-
return this.backOff;
1340-
}
1336+
/**
1337+
* Properties for non-blocking, topic-based retries.
1338+
*/
1339+
public static class Topic {
13411340

1342-
public void setBackOff(BackOff backOff) {
1343-
this.backOff = backOff;
1344-
}
1341+
/**
1342+
* Whether to enable topic-based retries auto-configuration.
1343+
*/
1344+
private Boolean enabled;
13451345

1346-
public static class BackOff {
1346+
/**
1347+
* The total number of processing attempts made before sending the message to
1348+
* the DLT.
1349+
*/
1350+
private Integer attempts;
13471351

1352+
/**
1353+
* A canonical backoff period. Used as an initial value in the exponential
1354+
* case, and as a minimum value in the uniform case.
1355+
*/
13481356
private Duration delay;
13491357

1358+
/**
1359+
* If positive, then used as a multiplier for generating the next delay for
1360+
* backoff.
1361+
*/
13501362
private Double multiplier;
13511363

1364+
/**
1365+
* The maximimum wait between retries. If less than the delay then the default
1366+
* of 30 seconds is applied.
1367+
*/
13521368
private Duration maxDelay;
13531369

1354-
private Boolean random;
1370+
/**
1371+
* In the exponential case (multiplier() > 0) set this to true to have the
1372+
* backoff delays randomized, so that the maximum delay is multiplier times
1373+
* the previous delay and the distribution is uniform between the two values.
1374+
*/
1375+
private Boolean randomBackOff;
1376+
1377+
public Boolean getEnabled() {
1378+
return this.enabled;
1379+
}
1380+
1381+
public void setEnabled(Boolean enabled) {
1382+
this.enabled = enabled;
1383+
}
1384+
1385+
public Integer getAttempts() {
1386+
return this.attempts;
1387+
}
1388+
1389+
public void setAttempts(Integer attempts) {
1390+
this.attempts = attempts;
1391+
}
13551392

13561393
public Duration getDelay() {
13571394
return this.delay;
@@ -1377,20 +1414,20 @@ public Duration getMaxDelay() {
13771414
return this.maxDelay;
13781415
}
13791416

1380-
public Long getMaxDelayMillis() {
1381-
return (this.maxDelay != null) ? this.maxDelay.toMillis() : null;
1382-
}
1383-
13841417
public void setMaxDelay(Duration maxDelay) {
13851418
this.maxDelay = maxDelay;
13861419
}
13871420

1388-
public Boolean isRandom() {
1389-
return this.random;
1421+
public Long getMaxDelayMillis() {
1422+
return (this.maxDelay != null) ? this.maxDelay.toMillis() : null;
1423+
}
1424+
1425+
public Boolean isRandomBackOff() {
1426+
return this.randomBackOff;
13901427
}
13911428

1392-
public void setRandom(Boolean random) {
1393-
this.random = random;
1429+
public void setRandomBackOff(Boolean randomBackOff) {
1430+
this.randomBackOff = randomBackOff;
13941431
}
13951432

13961433
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,22 @@ void testEndToEnd() throws Exception {
9797
@Test
9898
void testEndToEndWithRetryTopics() throws Exception {
9999
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(),
100-
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry-topic.enabled=true",
101-
"spring.kafka.retry-topic.attempts=4", "spring.kafka.retry-topic.back-off.delay=100ms",
102-
"spring.kafka.retry-topic.back-off.multiplier=2", "spring.kafka.retry-topic.back-off.max-delay=300ms",
100+
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true",
101+
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
102+
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms",
103103
"spring.kafka.consumer.auto-offset-reset=earliest");
104104
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
105105
template.send(TEST_RETRY_TOPIC, "foo", "bar");
106106
RetryListener listener = this.context.getBean(RetryListener.class);
107107
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
108108
assertThat(listener.key).isEqualTo("foo");
109109
assertThat(listener.received).isEqualTo("bar");
110-
assertThat(listener.topics.size()).isEqualTo(4);
110+
assertThat(listener.topics.size()).isEqualTo(5);
111111
assertThat(listener.topics.get(0)).isEqualTo("testRetryTopic");
112112
assertThat(listener.topics.get(1)).isEqualTo("testRetryTopic-retry-100");
113113
assertThat(listener.topics.get(2)).isEqualTo("testRetryTopic-retry-200");
114-
assertThat(listener.topics.get(3)).isEqualTo("testRetryTopic-retry-300");
114+
assertThat(listener.topics.get(3)).isEqualTo("testRetryTopic-retry-300-0");
115+
assertThat(listener.topics.get(4)).isEqualTo("testRetryTopic-retry-300-1");
115116
}
116117

117118
@Test
@@ -189,7 +190,7 @@ void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
189190

190191
static class RetryListener {
191192

192-
private final CountDownLatch latch = new CountDownLatch(4);
193+
private final CountDownLatch latch = new CountDownLatch(5);
193194

194195
private final List<String> topics = new ArrayList<>();
195196

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -321,12 +321,11 @@ void streamsWithCustomKafkaConfiguration() {
321321
}
322322

323323
@Test
324-
void retryTopicConfiguration() {
324+
void retryTopicConfigurationWithExponentialBackOff() {
325325
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
326-
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry-topic.enabled=true",
327-
"spring.kafka.retry-topic.attempts=5", "spring.kafka.retry-topic.back-off.delay=100ms",
328-
"spring.kafka.retry-topic.back-off.multiplier=2", "spring.kafka.retry-topic.back-off.max-delay=300ms")
329-
.run((context) -> {
326+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
327+
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
328+
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms").run((context) -> {
330329
RetryTopicConfiguration config = context.getBean(RetryTopicConfiguration.class);
331330
List<DestinationTopic.Properties> properties = config.getDestinationTopicProperties();
332331
assertThat(properties.size()).isEqualTo(6);
@@ -339,6 +338,36 @@ void retryTopicConfiguration() {
339338
});
340339
}
341340

341+
@Test
342+
void retryTopicConfigurationWithDefaultBackOff() {
343+
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
344+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true")
345+
.run((context) -> {
346+
RetryTopicConfiguration config = context.getBean(RetryTopicConfiguration.class);
347+
List<DestinationTopic.Properties> properties = config.getDestinationTopicProperties();
348+
assertThat(properties.size()).isEqualTo(4);
349+
assertThat(properties.get(0).delay()).isEqualTo(0);
350+
assertThat(properties.get(1).delay()).isEqualTo(1000);
351+
assertThat(properties.get(2).delay()).isEqualTo(1000);
352+
assertThat(properties.get(3).delay()).isEqualTo(0);
353+
});
354+
}
355+
356+
@Test
357+
void retryTopicConfigurationWithFixedBackOff() {
358+
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
359+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
360+
"spring.kafka.retry.topic.attempts=3", "spring.kafka.retry.topic.delay=2s").run((context) -> {
361+
RetryTopicConfiguration config = context.getBean(RetryTopicConfiguration.class);
362+
List<DestinationTopic.Properties> properties = config.getDestinationTopicProperties();
363+
assertThat(properties.size()).isEqualTo(4);
364+
assertThat(properties.get(0).delay()).isEqualTo(0);
365+
assertThat(properties.get(1).delay()).isEqualTo(2000);
366+
assertThat(properties.get(2).delay()).isEqualTo(2000);
367+
assertThat(properties.get(3).delay()).isEqualTo(0);
368+
});
369+
}
370+
342371
@SuppressWarnings("unchecked")
343372
@Test
344373
void streamsWithSeveralStreamsBuilderFactoryBeans() {

0 commit comments

Comments
 (0)