Skip to content

Commit 6287d85

Browse files
authored
GH-1974: Fix Default Task Executor
Resolves #1974 A new `SimpleAsyncTaskExecutor` was created for each child container when the container was stopped/started. Reuse the same container for each child; handle an increase to `concurrency` between stop and start. * Fix unnecessary bean name null check. * Fix previous commit.
1 parent 8f15307 commit 6287d85

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
import org.springframework.context.ApplicationContext;
3535
import org.springframework.context.ApplicationEventPublisher;
36+
import org.springframework.core.task.AsyncListenableTaskExecutor;
37+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3638
import org.springframework.kafka.core.ConsumerFactory;
3739
import org.springframework.kafka.support.TopicPartitionOffset;
3840
import org.springframework.lang.Nullable;
@@ -60,6 +62,8 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
6062

6163
private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
6264

65+
private final List<AsyncListenableTaskExecutor> executors = new ArrayList<>();
66+
6367
private int concurrency = 1;
6468

6569
private boolean alwaysClientIdSuffix = true;
@@ -210,7 +214,8 @@ protected void doStart() {
210214

211215
private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
212216
String beanName = getBeanName();
213-
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + index);
217+
beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
218+
container.setBeanName(beanName);
214219
ApplicationContext applicationContext = getApplicationContext();
215220
if (applicationContext != null) {
216221
container.setApplicationContext(applicationContext);
@@ -230,6 +235,17 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
230235
stopAbnormally(() -> {
231236
});
232237
});
238+
AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
239+
if (exec == null) {
240+
if ((this.executors.size() > index)) {
241+
exec = this.executors.get(index);
242+
}
243+
else {
244+
exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
245+
this.executors.add(exec);
246+
}
247+
container.getContainerProperties().setConsumerTaskExecutor(exec);
248+
}
233249
}
234250

235251
private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperties containerProperties,

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@
7979
ConcurrentMessageListenerContainerTests.topic6, ConcurrentMessageListenerContainerTests.topic7,
8080
ConcurrentMessageListenerContainerTests.topic8, ConcurrentMessageListenerContainerTests.topic9,
8181
ConcurrentMessageListenerContainerTests.topic10, ConcurrentMessageListenerContainerTests.topic11,
82-
ConcurrentMessageListenerContainerTests.topic12 })
82+
ConcurrentMessageListenerContainerTests.topic12 },
83+
brokerProperties = "group.initial.rebalance.delay.ms:500")
8384
public class ConcurrentMessageListenerContainerTests {
8485

8586
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
@@ -321,6 +322,19 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
321322
latch.countDown();
322323
});
323324

325+
Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
326+
CountDownLatch rebalLatch = new CountDownLatch(5);
327+
containerProps.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
328+
329+
@Override
330+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
331+
if (listenerThreadNames.add(Thread.currentThread().getName())) {
332+
rebalLatch.countDown();
333+
}
334+
}
335+
336+
});
337+
324338
ConcurrentMessageListenerContainer<Integer, String> container =
325339
new ConcurrentMessageListenerContainer<>(cf, containerProps);
326340
container.setConcurrency(2);
@@ -340,8 +354,15 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
340354
template.flush();
341355
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
342356
container.stop();
343-
this.logger.info("Stop manual");
357+
container.setConcurrency(3);
358+
container.start();
359+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
360+
container.stop();
361+
assertThat(listenerThreadNames)
362+
.extracting(str -> str.substring(str.length() - 5))
363+
.containsExactlyInAnyOrder("0-C-1", "1-C-1", "0-C-2", "1-C-2", "2-C-1");
344364
assertThat(overrides.get().getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isEqualTo("false");
365+
this.logger.info("Stop manual");
345366
}
346367

347368
@Test

0 commit comments

Comments
 (0)