Skip to content

Commit e531a0f

Browse files
garyrussellartembilan
authored andcommitted
GH-2395: RetryListener - Add Batch Methods
Resolves #2395 Previously, `RetryListener` only supported record listeners; add methods for retrying batches. **cherry-pick to 2.9.x, 2.8.x**
1 parent deaded0 commit e531a0f

File tree

6 files changed

+148
-19
lines changed

6 files changed

+148
-19
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+10
Original file line numberDiff line numberDiff line change
@@ -5279,6 +5279,7 @@ public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
52795279
====
52805280

52815281
The error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress.
5282+
Starting with version 2.8.10, methods for batch listeners were added.
52825283

52835284
====
52845285
[source, java]
@@ -5294,6 +5295,15 @@ public interface RetryListener {
52945295
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
52955296
}
52965297
5298+
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
5299+
}
5300+
5301+
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
5302+
}
5303+
5304+
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
5305+
}
5306+
52975307
}
52985308
----
52995309
====

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

+43-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Duration;
20+
import java.util.List;
2021
import java.util.Set;
2122
import java.util.function.BiConsumer;
2223

@@ -27,6 +28,7 @@
2728
import org.springframework.core.log.LogAccessor;
2829
import org.springframework.kafka.KafkaException;
2930
import org.springframework.kafka.support.KafkaUtils;
31+
import org.springframework.lang.Nullable;
3032
import org.springframework.util.backoff.BackOff;
3133
import org.springframework.util.backoff.BackOffExecution;
3234

@@ -39,9 +41,28 @@
3941
*/
4042
public final class ErrorHandlingUtils {
4143

44+
private static final ThreadLocal<List<RetryListener>> retryListeners = new ThreadLocal<>();
45+
4246
private ErrorHandlingUtils() {
4347
}
4448

49+
/**
50+
* Set the retry listeners.
51+
* @param listeners the listeners.
52+
* @since 2.8.10
53+
*/
54+
public static void setRetryListeners(List<RetryListener> listeners) {
55+
retryListeners.set(listeners);
56+
}
57+
58+
/**
59+
* Clear the retry listeners.
60+
* @since 2.8.10
61+
*/
62+
public static void clearRetryListeners() {
63+
retryListeners.remove();
64+
}
65+
4566
/**
4667
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
4768
* consumer, wait for the next back off, then call the listener. When retries are
@@ -67,6 +88,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
6788
String failed = null;
6889
Set<TopicPartition> assignment = consumer.assignment();
6990
consumer.pause(assignment);
91+
List<RetryListener> listeners = retryListeners.get();
92+
int attempt = 1;
93+
listen(listeners, records, thrownException, attempt++);
7094
if (container instanceof KafkaMessageListenerContainer) {
7195
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
7296
}
@@ -88,20 +112,27 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
88112
invokeListener.run();
89113
return;
90114
}
91-
catch (Exception e) {
115+
catch (Exception ex) {
116+
listen(listeners, records, ex, attempt++);
92117
if (failed == null) {
93118
failed = recordsToString(records);
94119
}
95120
String toLog = failed;
96-
logger.debug(e, () -> "Retry failed for: " + toLog);
121+
logger.debug(ex, () -> "Retry failed for: " + toLog);
97122
}
98123
nextBackOff = execution.nextBackOff();
99124
}
100125
try {
101126
recoverer.accept(records, thrownException);
127+
if (listeners != null) {
128+
listeners.forEach(listener -> listener.recovered(records, thrownException));
129+
}
102130
}
103-
catch (Exception e) {
104-
logger.error(e, () -> "Recoverer threw an exception; re-seeking batch");
131+
catch (Exception ex) {
132+
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
133+
if (listeners != null) {
134+
listeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
135+
}
105136
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
106137
}
107138
}
@@ -114,6 +145,14 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
114145
}
115146
}
116147

148+
private static void listen(@Nullable List<RetryListener> listeners, ConsumerRecords<?, ?> records,
149+
Exception thrownException, int attempt) {
150+
151+
if (listeners != null) {
152+
listeners.forEach(listener -> listener.failedDelivery(records, thrownException, attempt));
153+
}
154+
}
155+
117156
/**
118157
* Represent the records as a comma-delimited String of {@code topic-part@offset}.
119158
* @param records the records.

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

+25-13
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,6 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
6969
this(recoverer, backOff, null, fallbackHandler);
7070
}
7171

72-
/**
73-
* Return the fallback batch error handler.
74-
* @return the handler.
75-
* @since 2.8.8
76-
*/
77-
protected CommonErrorHandler getFallbackBatchHandler() {
78-
return this.fallbackBatchHandler;
79-
}
80-
81-
8272
/**
8373
* Construct an instance with the provided properties.
8474
* @param recoverer the recoverer.
@@ -94,6 +84,15 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
9484
this.fallbackBatchHandler = fallbackHandler;
9585
}
9686

87+
/**
88+
* Return the fallback batch error handler.
89+
* @return the handler.
90+
* @since 2.8.8
91+
*/
92+
protected CommonErrorHandler getFallbackBatchHandler() {
93+
return this.fallbackBatchHandler;
94+
}
95+
9796
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
9897
MessageListenerContainer container, Runnable invokeListener) {
9998

@@ -105,17 +104,18 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
105104

106105
BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException);
107106
if (batchListenerFailedException == null) {
108-
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch");
109-
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
107+
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-delivering full batch");
108+
fallback(thrownException, data, consumer, container, invokeListener);
110109
}
111110
else {
111+
getRetryListeners().forEach(listener -> listener.failedDelivery(data, thrownException, 1));
112112
ConsumerRecord<?, ?> record = batchListenerFailedException.getRecord();
113113
int index = record != null ? findIndex(data, record) : batchListenerFailedException.getIndex();
114114
if (index < 0 || index >= data.count()) {
115115
this.logger.warn(batchListenerFailedException, () ->
116116
String.format("Record not found in batch: %s-%d@%d; re-seeking batch",
117117
record.topic(), record.partition(), record.offset()));
118-
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
118+
fallback(thrownException, data, consumer, container, invokeListener);
119119
}
120120
else {
121121
return seekOrRecover(thrownException, data, consumer, container, index);
@@ -124,6 +124,18 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
124124
return ConsumerRecords.empty();
125125
}
126126

127+
private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
128+
MessageListenerContainer container, Runnable invokeListener) {
129+
130+
ErrorHandlingUtils.setRetryListeners(getRetryListeners());
131+
try {
132+
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
133+
}
134+
finally {
135+
ErrorHandlingUtils.clearRetryListeners();
136+
}
137+
}
138+
127139
private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
128140
if (record == null) {
129141
return -1;

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

+11
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.ArrayList;
20+
import java.util.Arrays;
1921
import java.util.List;
2022
import java.util.function.BiConsumer;
2123
import java.util.function.BiFunction;
@@ -54,6 +56,8 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
5456

5557
private final FailedRecordTracker failureTracker;
5658

59+
private final List<RetryListener> retryListeners = new ArrayList<>();
60+
5761
private boolean commitRecovered;
5862

5963
private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> userBackOffFunction = (rec, ex) -> null;
@@ -130,7 +134,14 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange)
130134
* @since 2.7
131135
*/
132136
public void setRetryListeners(RetryListener... listeners) {
137+
Assert.noNullElements(listeners, "'listeners' cannot have null elements");
133138
this.failureTracker.setRetryListeners(listeners);
139+
this.retryListeners.clear();
140+
this.retryListeners.addAll(Arrays.asList(listeners));
141+
}
142+
143+
protected List<RetryListener> getRetryListeners() {
144+
return this.retryListeners;
134145
}
135146

136147
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.listener;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2021

2122
/**
2223
* A listener for retry activity.
@@ -53,4 +54,31 @@ default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
5354
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
5455
}
5556

57+
/**
58+
* Called after a delivery failed for batch records.
59+
* @param records the records.
60+
* @param ex the exception.
61+
* @param deliveryAttempt the delivery attempt, if available.
62+
* @since 2.8.10
63+
*/
64+
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
65+
}
66+
67+
/**
68+
* Called after a failing record was successfully recovered.
69+
* @param records the record.
70+
* @param ex the exception.
71+
*/
72+
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
73+
}
74+
75+
/**
76+
* Called after a recovery attempt failed.
77+
* @param records the record.
78+
* @param original the original exception causing the recovery attempt.
79+
* @param failure the exception thrown by the recoverer.
80+
*/
81+
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
82+
}
83+
5684
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,10 +19,12 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.BDDMockito.given;
2324
import static org.mockito.BDDMockito.willAnswer;
2425
import static org.mockito.Mockito.inOrder;
2526
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.times;
2628
import static org.mockito.Mockito.verify;
2729

2830
import java.time.Duration;
@@ -205,6 +207,33 @@ records, mockConsumer, mock(MessageListenerContainer.class), () -> { }))
205207
verify(mockConsumer).seek(tp, 0L);
206208
}
207209

210+
@SuppressWarnings({ "unchecked", "rawtypes" })
211+
@Test
212+
void fallbackListener() {
213+
Consumer mockConsumer = mock(Consumer.class);
214+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
215+
DefaultErrorHandler beh = new DefaultErrorHandler(recoverer, new FixedBackOff(0, 2));
216+
RetryListener retryListener = mock(RetryListener.class);
217+
beh.setRetryListeners(retryListener);
218+
TopicPartition tp = new TopicPartition("foo", 0);
219+
ConsumerRecords<?, ?> records = new ConsumerRecords(Collections.singletonMap(tp,
220+
List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
221+
new RecordHeaders(), Optional.empty()),
222+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
223+
new RecordHeaders(), Optional.empty()))));
224+
MessageListenerContainer container = mock(MessageListenerContainer.class);
225+
given(container.isRunning()).willReturn(true);
226+
beh.handleBatch(new ListenerExecutionFailedException("test"),
227+
records, mockConsumer, container, () -> {
228+
throw new ListenerExecutionFailedException("test");
229+
});
230+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(1));
231+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(2));
232+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(3));
233+
verify(recoverer, times(2)).accept(any(), any()); // each record in batch
234+
verify(retryListener).recovered(any(ConsumerRecords.class), any());
235+
}
236+
208237
@Configuration
209238
@EnableKafka
210239
public static class Config {

0 commit comments

Comments
 (0)