Skip to content

Commit 0da801f

Browse files
authored
GH-2404: Refactor FallbackBatchErrorHandler
Resolves #2404 - implement `CommonErrorHandler` instead of using an adapter - populate its retry listeners and log level if needed - remove listeners thread local in `ErrorHandlingUtils`, needed for previous implementation - remove final use of `SeekToCurrentBatchErrorHandler` - used in the fallback if recovery fails or the thread is interrupted while sleeping - remove `SeekToCurrentBatchErrorHandler` and its tests * Add CTOR to private inner class.
1 parent 884a43d commit 0da801f

9 files changed

+85
-512
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff
107107
}
108108

109109
private static CommonErrorHandler createFallback(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
110-
return new ErrorHandlerAdapter(new FallbackBatchErrorHandler(backOff, recoverer));
110+
return new FallbackBatchErrorHandler(backOff, recoverer);
111111
}
112112

113113
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

+8-34
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.springframework.core.log.LogAccessor;
2929
import org.springframework.kafka.KafkaException;
3030
import org.springframework.kafka.support.KafkaUtils;
31-
import org.springframework.lang.Nullable;
3231
import org.springframework.util.backoff.BackOff;
3332
import org.springframework.util.backoff.BackOffExecution;
3433

@@ -41,28 +40,9 @@
4140
*/
4241
public final class ErrorHandlingUtils {
4342

44-
private static final ThreadLocal<List<RetryListener>> retryListeners = new ThreadLocal<>();
45-
4643
private ErrorHandlingUtils() {
4744
}
4845

49-
/**
50-
* Set the retry listeners.
51-
* @param listeners the listeners.
52-
* @since 2.8.10
53-
*/
54-
public static void setRetryListeners(List<RetryListener> listeners) {
55-
retryListeners.set(listeners);
56-
}
57-
58-
/**
59-
* Clear the retry listeners.
60-
* @since 2.8.10
61-
*/
62-
public static void clearRetryListeners() {
63-
retryListeners.remove();
64-
}
65-
6646
/**
6747
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
6848
* consumer, wait for the next back off, then call the listener. When retries are
@@ -77,20 +57,20 @@ public static void clearRetryListeners() {
7757
* @param recoverer the recoverer.
7858
* @param logger the logger.
7959
* @param logLevel the log level.
60+
* @param retryListeners the retry listeners.
8061
*/
8162
public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
8263
MessageListenerContainer container, Runnable invokeListener, BackOff backOff,
8364
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
84-
KafkaException.Level logLevel) {
65+
KafkaException.Level logLevel, List<RetryListener> retryListeners) {
8566

8667
BackOffExecution execution = backOff.start();
8768
long nextBackOff = execution.nextBackOff();
8869
String failed = null;
8970
Set<TopicPartition> assignment = consumer.assignment();
9071
consumer.pause(assignment);
91-
List<RetryListener> listeners = retryListeners.get();
9272
int attempt = 1;
93-
listen(listeners, records, thrownException, attempt++);
73+
listen(retryListeners, records, thrownException, attempt++);
9474
if (container instanceof KafkaMessageListenerContainer) {
9575
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
9676
}
@@ -113,7 +93,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
11393
return;
11494
}
11595
catch (Exception ex) {
116-
listen(listeners, records, ex, attempt++);
96+
listen(retryListeners, records, ex, attempt++);
11797
if (failed == null) {
11898
failed = recordsToString(records);
11999
}
@@ -124,15 +104,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
124104
}
125105
try {
126106
recoverer.accept(records, thrownException);
127-
if (listeners != null) {
128-
listeners.forEach(listener -> listener.recovered(records, thrownException));
129-
}
107+
retryListeners.forEach(listener -> listener.recovered(records, thrownException));
130108
}
131109
catch (Exception ex) {
132110
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
133-
if (listeners != null) {
134-
listeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
135-
}
111+
retryListeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
136112
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
137113
}
138114
}
@@ -145,12 +121,10 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
145121
}
146122
}
147123

148-
private static void listen(@Nullable List<RetryListener> listeners, ConsumerRecords<?, ?> records,
124+
private static void listen(List<RetryListener> listeners, ConsumerRecords<?, ?> records,
149125
Exception thrownException, int attempt) {
150126

151-
if (listeners != null) {
152-
listeners.forEach(listener -> listener.failedDelivery(records, thrownException, attempt));
153-
}
127+
listeners.forEach(listener -> listener.failedDelivery(records, thrownException, attempt));
154128
}
155129

156130
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

+18-7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.kafka.common.TopicPartition;
3636

3737
import org.springframework.kafka.KafkaException;
38+
import org.springframework.kafka.KafkaException.Level;
3839
import org.springframework.lang.Nullable;
3940
import org.springframework.util.backoff.BackOff;
4041

@@ -84,6 +85,22 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
8485
this.fallbackBatchHandler = fallbackHandler;
8586
}
8687

88+
@Override
89+
public void setRetryListeners(RetryListener... listeners) {
90+
super.setRetryListeners(listeners);
91+
if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
92+
handler.setRetryListeners(listeners);
93+
}
94+
}
95+
96+
@Override
97+
public void setLogLevel(Level logLevel) {
98+
super.setLogLevel(logLevel);
99+
if (this.fallbackBatchHandler instanceof KafkaExceptionLogLevelAware handler) {
100+
handler.setLogLevel(logLevel);
101+
}
102+
}
103+
87104
/**
88105
* Return the fallback batch error handler.
89106
* @return the handler.
@@ -127,13 +144,7 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
127144
private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
128145
MessageListenerContainer container, Runnable invokeListener) {
129146

130-
ErrorHandlingUtils.setRetryListeners(getRetryListeners());
131-
try {
132-
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
133-
}
134-
finally {
135-
ErrorHandlingUtils.clearRetryListeners();
136-
}
147+
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
137148
}
138149

139150
private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

+50-9
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,23 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.ArrayList;
20+
import java.util.Arrays;
1921
import java.util.Collection;
22+
import java.util.LinkedHashMap;
23+
import java.util.List;
2024
import java.util.function.BiConsumer;
25+
import java.util.stream.Collectors;
2126

2227
import org.apache.commons.logging.LogFactory;
2328
import org.apache.kafka.clients.consumer.Consumer;
2429
import org.apache.kafka.clients.consumer.ConsumerRecords;
2530
import org.apache.kafka.common.TopicPartition;
2631

2732
import org.springframework.core.log.LogAccessor;
33+
import org.springframework.kafka.KafkaException;
2834
import org.springframework.lang.Nullable;
35+
import org.springframework.util.Assert;
2936
import org.springframework.util.backoff.BackOff;
3037
import org.springframework.util.backoff.FixedBackOff;
3138

@@ -41,23 +48,22 @@
4148
* @since 2.3.7
4249
*
4350
*/
44-
@SuppressWarnings("deprecation")
45-
class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware
46-
implements ListenerInvokingBatchErrorHandler {
51+
class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware implements CommonErrorHandler {
4752

4853
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
4954

5055
private final BackOff backOff;
5156

5257
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
5358

54-
@SuppressWarnings("deprecation")
55-
private final CommonErrorHandler seeker = new ErrorHandlerAdapter(new SeekToCurrentBatchErrorHandler());
56-
57-
private boolean ackAfterHandle = true;
59+
private final CommonErrorHandler seeker = new SeekAfterRecoverFailsOrInterrupted();
5860

5961
private final ThreadLocal<Boolean> retrying = ThreadLocal.withInitial(() -> false);
6062

63+
private final List<RetryListener> retryListeners = new ArrayList<>();
64+
65+
private boolean ackAfterHandle = true;
66+
6167
/**
6268
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
6369
* a 5 second back off).
@@ -85,6 +91,18 @@ class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware
8591
};
8692
}
8793

94+
/**
95+
* Set one or more {@link RetryListener} to receive notifications of retries and
96+
* recovery.
97+
* @param listeners the listeners.
98+
* @since 3.0
99+
*/
100+
public void setRetryListeners(RetryListener... listeners) {
101+
Assert.noNullElements(listeners, "'listeners' cannot have null elements");
102+
this.retryListeners.clear();
103+
this.retryListeners.addAll(Arrays.asList(listeners));
104+
}
105+
88106
@Override
89107
public boolean isAckAfterHandle() {
90108
return this.ackAfterHandle;
@@ -96,7 +114,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
96114
}
97115

98116
@Override
99-
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
117+
public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
100118
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
101119

102120
if (records == null || records.count() == 0) {
@@ -106,13 +124,14 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
106124
this.retrying.set(true);
107125
try {
108126
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
109-
this.seeker, this.recoverer, this.logger, getLogLevel());
127+
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners);
110128
}
111129
finally {
112130
this.retrying.set(false);
113131
}
114132
}
115133

134+
@Override
116135
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
117136
Runnable publishPause) {
118137

@@ -122,4 +141,26 @@ public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartit
122141
}
123142
}
124143

144+
private final class SeekAfterRecoverFailsOrInterrupted implements CommonErrorHandler {
145+
146+
SeekAfterRecoverFailsOrInterrupted() {
147+
}
148+
149+
@Override
150+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
151+
MessageListenerContainer container, Runnable invokeListener) {
152+
153+
data.partitions()
154+
.stream()
155+
.collect(
156+
Collectors.toMap(tp -> tp,
157+
tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new))
158+
.forEach(consumer::seek);
159+
160+
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
161+
162+
}
163+
164+
}
165+
125166
}

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java

-89
This file was deleted.

0 commit comments

Comments
 (0)