Skip to content

Commit 86f6448

Browse files
authored
GH-2432: Fix Retryable Topic Provisioning
Resolves #2432 Don't provision an individual retry topic bean, if there is already a `NewTopic` bean with the same topic name. **cherry-pick to 2.9.x, 2.8.x** * Fix `TopicForRetry` removal logic; include `NewTopics` beans in logic. * Improve test.
1 parent 6536f3c commit 86f6448

File tree

4 files changed

+121
-7
lines changed

4 files changed

+121
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

+40-4
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import java.util.Collection;
2323
import java.util.Collections;
2424
import java.util.HashMap;
25+
import java.util.Iterator;
2526
import java.util.LinkedList;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Map.Entry;
2830
import java.util.Optional;
2931
import java.util.concurrent.ExecutionException;
3032
import java.util.concurrent.TimeUnit;
3133
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicInteger;
3235
import java.util.stream.Collectors;
3336

3437
import org.apache.commons.logging.LogFactory;
@@ -57,6 +60,7 @@
5760
import org.springframework.context.ApplicationContextAware;
5861
import org.springframework.core.log.LogAccessor;
5962
import org.springframework.kafka.KafkaException;
63+
import org.springframework.kafka.support.TopicForRetryable;
6064
import org.springframework.lang.Nullable;
6165

6266
/**
@@ -181,10 +185,7 @@ public void afterSingletonsInstantiated() {
181185
* @see #setAutoCreate(boolean)
182186
*/
183187
public final boolean initialize() {
184-
Collection<NewTopic> newTopics = new ArrayList<>(
185-
this.applicationContext.getBeansOfType(NewTopic.class, false, false).values());
186-
Collection<NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false).values();
187-
wrappers.forEach(wrapper -> newTopics.addAll(wrapper.getNewTopics()));
188+
Collection<NewTopic> newTopics = newTopics();
188189
if (newTopics.size() > 0) {
189190
AdminClient adminClient = null;
190191
try {
@@ -225,6 +226,41 @@ public final boolean initialize() {
225226
return false;
226227
}
227228

229+
/*
230+
* Remove any TopicForRetryable bean if there is also a NewTopic with the same topic name.
231+
*/
232+
private Collection<NewTopic> newTopics() {
233+
Map<String, NewTopic> newTopicsMap = new HashMap<>(
234+
this.applicationContext.getBeansOfType(NewTopic.class, false, false));
235+
Map<String, NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false);
236+
AtomicInteger count = new AtomicInteger();
237+
wrappers.forEach((name, newTopics) -> {
238+
newTopics.getNewTopics().forEach(nt -> newTopicsMap.put(name + "#" + count.getAndIncrement(), nt));
239+
});
240+
Map<String, NewTopic> topicsForRetry = newTopicsMap.entrySet().stream()
241+
.filter(entry -> entry.getValue() instanceof TopicForRetryable)
242+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
243+
for (Entry<String, NewTopic> entry : topicsForRetry.entrySet()) {
244+
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
245+
boolean remove = false;
246+
while (iterator.hasNext()) {
247+
Entry<String, NewTopic> nt = iterator.next();
248+
// if we have a NewTopic and TopicForRetry with the same name, remove the latter
249+
if (nt.getValue().name().equals(entry.getValue().name())
250+
&& !(nt.getValue() instanceof TopicForRetryable)) {
251+
252+
remove = true;
253+
break;
254+
}
255+
}
256+
if (remove) {
257+
newTopicsMap.remove(entry.getKey());
258+
}
259+
}
260+
Collection<NewTopic> newTopics = new ArrayList<>(newTopicsMap.values());
261+
return newTopics;
262+
}
263+
228264
@Override
229265
@Nullable
230266
public String clusterId() {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.function.Consumer;
2222

2323
import org.apache.commons.logging.LogFactory;
24-
import org.apache.kafka.clients.admin.NewTopic;
2524
import org.apache.kafka.clients.consumer.ConsumerRecord;
2625

2726
import org.springframework.beans.BeansException;
@@ -37,6 +36,7 @@
3736
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
3837
import org.springframework.kafka.support.EndpointHandlerMethod;
3938
import org.springframework.kafka.support.KafkaUtils;
39+
import org.springframework.kafka.support.TopicForRetryable;
4040
import org.springframework.lang.Nullable;
4141

4242

@@ -363,7 +363,7 @@ protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfigur
363363
String beanName = topic + "-topicRegistrationBean";
364364
if (!bf.containsBean(beanName)) {
365365
bf.registerSingleton(beanName,
366-
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
366+
new TopicForRetryable(topic, config.getNumPartitions(), config.getReplicationFactor()));
367367
}
368368
}
369369
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import org.apache.kafka.clients.admin.NewTopic;
20+
21+
/**
22+
* Marker to indicate this {@link NewTopic} is for retryable topics; admin will ignore these if
23+
* a regular {@link NewTopic} exist.
24+
*
25+
* @author Gary Russell
26+
* @since 2.8.10
27+
*
28+
*/
29+
public class TopicForRetryable extends NewTopic {
30+
31+
/**
32+
* Create an instance with the provided properties.
33+
* @param topic the topic.
34+
* @param numPartitions the partitions.
35+
* @param replicationFactor the replication factor.
36+
*/
37+
public TopicForRetryable(String topic, int numPartitions, short replicationFactor) {
38+
super(topic, numPartitions, replicationFactor);
39+
}
40+
41+
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

+38-1
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.fail;
2121

22+
import java.lang.reflect.Method;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
25+
import java.util.Collection;
2426
import java.util.Collections;
2527
import java.util.HashMap;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.concurrent.CountDownLatch;
2931
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.concurrent.atomic.AtomicReference;
3034

3135
import org.apache.kafka.clients.admin.AdminClientConfig;
36+
import org.apache.kafka.clients.admin.NewTopic;
37+
import org.apache.kafka.clients.admin.TopicDescription;
3238
import org.apache.kafka.clients.consumer.ConsumerConfig;
3339
import org.apache.kafka.clients.producer.ProducerConfig;
3440
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -47,10 +53,12 @@
4753
import org.springframework.kafka.annotation.RetryableTopic;
4854
import org.springframework.kafka.annotation.TopicPartition;
4955
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
56+
import org.springframework.kafka.config.TopicBuilder;
5057
import org.springframework.kafka.core.ConsumerFactory;
5158
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5259
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
5360
import org.springframework.kafka.core.KafkaAdmin;
61+
import org.springframework.kafka.core.KafkaAdmin.NewTopics;
5462
import org.springframework.kafka.core.KafkaTemplate;
5563
import org.springframework.kafka.core.ProducerFactory;
5664
import org.springframework.kafka.listener.ContainerProperties;
@@ -133,11 +141,30 @@ void shouldRetrySecondTopic() {
133141
}
134142

135143
@Test
136-
void shouldRetryThirdTopicWithTimeout() {
144+
void shouldRetryThirdTopicWithTimeout(@Autowired KafkaAdmin admin) throws Exception {
137145
logger.debug("Sending message to topic " + THIRD_TOPIC);
138146
kafkaTemplate.send(THIRD_TOPIC, "Testing topic 3");
139147
assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue();
140148
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
149+
Map<String, TopicDescription> topics = admin.describeTopics(THIRD_TOPIC, THIRD_TOPIC + "-dlt", FOURTH_TOPIC);
150+
assertThat(topics.get(THIRD_TOPIC).partitions()).hasSize(2);
151+
assertThat(topics.get(THIRD_TOPIC + "-dlt").partitions()).hasSize(3);
152+
assertThat(topics.get(FOURTH_TOPIC).partitions()).hasSize(2);
153+
AtomicReference<Method> method = new AtomicReference<>();
154+
org.springframework.util.ReflectionUtils.doWithMethods(KafkaAdmin.class, m -> {
155+
m.setAccessible(true);
156+
method.set(m);
157+
}, m -> m.getName().equals("newTopics"));
158+
@SuppressWarnings("unchecked")
159+
Collection<NewTopic> weededTopics = (Collection<NewTopic>) method.get().invoke(admin);
160+
AtomicInteger weeded = new AtomicInteger();
161+
weededTopics.forEach(topic -> {
162+
if (topic.name().equals(THIRD_TOPIC) || topic.name().equals(FOURTH_TOPIC)) {
163+
assertThat(topic).isExactlyInstanceOf(NewTopic.class);
164+
weeded.incrementAndGet();
165+
}
166+
});
167+
assertThat(weeded.get()).isEqualTo(2);
141168
}
142169

143170
@Test
@@ -527,6 +554,16 @@ public KafkaAdmin kafkaAdmin() {
527554
return new KafkaAdmin(configs);
528555
}
529556

557+
@Bean
558+
public NewTopic topic() {
559+
return TopicBuilder.name(THIRD_TOPIC).partitions(2).replicas(1).build();
560+
}
561+
562+
@Bean
563+
public NewTopics topics() {
564+
return new NewTopics(TopicBuilder.name(FOURTH_TOPIC).partitions(2).replicas(1).build());
565+
}
566+
530567
@Bean
531568
public ConsumerFactory<String, String> consumerFactory() {
532569
Map<String, Object> props = new HashMap<>();

0 commit comments

Comments
 (0)