Skip to content

Commit 56c0587

Browse files
garyrussellartembilan
authored andcommitted
GH-1970: Container Stopping EH Improvement
Add an option to stop the container normally. There might be a case where the container is stopped due to an error, but the user wants the container to remain 'healthy', e.g. to avoid Kubernetes restarting the instance.
1 parent a8d0313 commit 56c0587

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class CommonContainerStoppingErrorHandler extends KafkaExceptionLogLevelA
4040

4141
private final Executor executor;
4242

43+
private boolean stopContainerAbnormally = true;
44+
4345
/**
4446
* Construct an instance with a default {@link SimpleAsyncTaskExecutor}.
4547
*/
@@ -56,6 +58,18 @@ public CommonContainerStoppingErrorHandler(Executor executor) {
5658
this.executor = executor;
5759
}
5860

61+
/**
62+
* Set to false to stop the container normally. By default, the container is stopped
63+
* abnormally, so that {@code container.isInExpectedState()} returns false. If you
64+
* want to container to remain "healthy" when using this error handler, set the
65+
* property to false.
66+
* @param stopContainerAbnormally false for normal stop.
67+
* @since 2.8
68+
*/
69+
public void setStopContainerAbnormally(boolean stopContainerAbnormally) {
70+
this.stopContainerAbnormally = stopContainerAbnormally;
71+
}
72+
5973
@Override
6074
public boolean remainingRecords() {
6175
return true;
@@ -84,8 +98,16 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
8498
}
8599

86100
private void stopContainer(MessageListenerContainer container, Exception thrownException) {
87-
this.executor.execute(() -> container.stopAbnormally(() -> {
88-
}));
101+
this.executor.execute(() -> {
102+
if (this.stopContainerAbnormally) {
103+
container.stopAbnormally(() -> {
104+
});
105+
}
106+
else {
107+
container.stop(() -> {
108+
});
109+
}
110+
});
89111
// isRunning is false before the container.stop() waits for listener thread
90112
try {
91113
ListenerUtils.stoppableSleep(container, 10_000); // NOSONAR

spring-kafka/src/test/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler2Tests.java

+3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public void stopContainerAfterException() throws Exception {
104104
assertThat(this.config.count).isEqualTo(4);
105105
assertThat(this.config.contents.toArray()).isEqualTo(new String[]
106106
{ "foo", "bar", "baz", "qux" });
107+
assertThat(container.isInExpectedState()).isTrue();
107108
}
108109

109110
@Configuration
@@ -202,6 +203,8 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
202203
@Override
203204
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
204205
Consumer<?, ?> consumer, MessageListenerContainer container) {
206+
207+
setStopContainerAbnormally(false);
205208
RuntimeException exception = null;
206209
try {
207210
super.handleRemaining(thrownException, records, consumer, container);

0 commit comments

Comments
 (0)