Skip to content

Commit ec0147e

Browse files
committed
spring-projectsGH-1704: Interceptor Improvements
Resolves spring-projects#1704 - Extract `ThreadStateProcessor` and `ConsumerAwareThreadStateProcessor` - Avoid all the if/else tests around calling the interceptor common methods - Add `afterRecord` to the record interceptor so sleuth can defer cleaning up until after the error handler.
1 parent 0e0f615 commit ec0147e

10 files changed

+127
-90
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323

2424
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
25+
import org.springframework.kafka.support.ThreadStateProcessor;
2526

2627
/**
2728
* Invoked by a listener container with remaining, unprocessed, records
@@ -39,7 +40,7 @@
3940
*
4041
*/
4142
@FunctionalInterface
42-
public interface AfterRollbackProcessor<K, V> {
43+
public interface AfterRollbackProcessor<K, V> extends ThreadStateProcessor {
4344

4445
/**
4546
* Process the remaining records. Recoverable will be true if the container is
@@ -63,14 +64,6 @@ public interface AfterRollbackProcessor<K, V> {
6364
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
6465
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);
6566

66-
/**
67-
* Optional method to clear thread state; will be called just before a consumer
68-
* thread terminates.
69-
* @since 2.2
70-
*/
71-
default void clearThreadState() {
72-
}
73-
7467
/**
7568
* Return true to invoke
7669
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
*
3333
*/
3434
@FunctionalInterface
35-
public interface BatchInterceptor<K, V> extends BeforeAfterPollProcessor<K, V> {
35+
public interface BatchInterceptor<K, V> extends ConsumerAwareThreadStateProcessor {
3636

3737
/**
3838
* Perform some action on the records or return a different one. If null is returned

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.clients.consumer.ConsumerRecord;
2424
import org.apache.kafka.clients.consumer.ConsumerRecords;
2525

26+
import org.springframework.kafka.support.ThreadStateProcessor;
2627
import org.springframework.kafka.support.TopicPartitionOffset;
2728

2829
/**
@@ -33,7 +34,7 @@
3334
* @since 2.8
3435
*
3536
*/
36-
public interface CommonErrorHandler extends DeliveryAttemptAware {
37+
public interface CommonErrorHandler extends DeliveryAttemptAware, ThreadStateProcessor {
3738

3839
/**
3940
* Return false if this error handler should only receive the current failed record;
@@ -125,13 +126,6 @@ default int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
125126
return 0;
126127
}
127128

128-
/**
129-
* Optional method to clear thread state; will be called just before a consumer
130-
* thread terminates.
131-
*/
132-
default void clearThreadState() {
133-
}
134-
135129
/**
136130
* Return true if the offset should be committed for a handled error (no exception
137131
* thrown).

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer
7575
}
7676

7777
@Override
78-
public void beforePoll(Consumer<K, V> consumer) {
79-
this.delegates.forEach(del -> del.beforePoll(consumer));
78+
public void setupThreadState(Consumer<?, ?> consumer) {
79+
this.delegates.forEach(del -> del.setupThreadState(consumer));
8080
}
8181

8282
@Override
83-
public void clearThreadState(Consumer<K, V> consumer) {
83+
public void clearThreadState(Consumer<?, ?> consumer) {
8484
this.delegates.forEach(del -> del.clearThreadState(consumer));
8585
}
8686

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,13 @@ public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K
7878
}
7979

8080
@Override
81-
public void beforePoll(Consumer<K, V> consumer) {
82-
this.delegates.forEach(del -> del.beforePoll(consumer));
81+
public void setupThreadState(Consumer<?, ?> consumer) {
82+
this.delegates.forEach(del -> del.setupThreadState(consumer));
8383
}
8484

8585
@Override
86-
public void clearThreadState(Consumer<K, V> consumer) {
86+
public void clearThreadState(Consumer<?, ?> consumer) {
8787
this.delegates.forEach(del -> del.clearThreadState(consumer));
8888
}
89+
8990
}

spring-kafka/src/main/java/org/springframework/kafka/listener/BeforeAfterPollProcessor.java renamed to spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareThreadStateProcessor.java

+11-20
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,34 @@
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
2020

21+
import org.springframework.kafka.support.ThreadStateProcessor;
22+
2123
/**
22-
* An interceptor for consumer poll operation.
23-
*
24-
* @param <K> the key type.
25-
* @param <V> the value type.
24+
* A {@link ThreadStateProcessor} involving a {@link Consumer}.
2625
*
27-
* @author Gary Russell
2826
* @author Karol Dowbecki
29-
* @author Artem Bilan
27+
* @author Gary Russell
3028
* @since 2.8
3129
*
3230
*/
33-
public interface BeforeAfterPollProcessor<K, V> {
31+
public interface ConsumerAwareThreadStateProcessor extends ThreadStateProcessor {
3432

3533
/**
36-
* Called before consumer is polled.
37-
* <p>
38-
* It can be used to set up thread-bound resources which will be available for the
39-
* entire duration of the consumer poll operation e.g. logging with MDC.
40-
* </p>
34+
* Call to set up thread-bound resources which will be available for the
35+
* entire duration of enclosed operation involving a {@link Consumer}.
4136
*
4237
* @param consumer the consumer.
4338
*/
44-
default void beforePoll(Consumer<K, V> consumer) {
39+
default void setupThreadState(Consumer<?, ?> consumer) {
4540
}
4641

4742
/**
48-
* Called after records were processed by listener and error handler.
49-
* <p>
50-
* It can be used to clear thread-bound resources which were set up in {@link #beforePoll(Consumer)}.
51-
* This is the last method called by the {@link MessageListenerContainer} before the next record
52-
* processing cycle starts.
53-
* </p>
43+
* Call to clear thread-bound resources which were set up in
44+
* {@link #beforePoll(Consumer)}.
5445
*
5546
* @param consumer the consumer.
5647
*/
57-
default void clearThreadState(Consumer<K, V> consumer) {
48+
default void clearThreadState(Consumer<?, ?> consumer) {
5849
}
5950

6051
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+27-32
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
632632

633633
private final BatchInterceptor<K, V> commonBatchInterceptor = getBatchInterceptor();
634634

635+
private final ConsumerAwareThreadStateProcessor pollThreadStateProcessor;
636+
635637
private final ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback();
636638

637639
private final long maxPollInterval;
@@ -746,12 +748,14 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
746748
this.batchListener = (BatchMessageListener<K, V>) listener;
747749
this.isBatchListener = true;
748750
this.wantsFullRecords = this.batchListener.wantsPollResult();
751+
this.pollThreadStateProcessor = setUpPollProcessor(true);
749752
}
750753
else if (listener instanceof MessageListener) {
751754
this.listener = (MessageListener<K, V>) listener;
752755
this.batchListener = null;
753756
this.isBatchListener = false;
754757
this.wantsFullRecords = false;
758+
this.pollThreadStateProcessor = setUpPollProcessor(false);
755759
}
756760
else {
757761
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
@@ -802,6 +806,19 @@ else if (listener instanceof MessageListener) {
802806
this.pausedPartitions = new HashSet<>();
803807
}
804808

809+
@Nullable
810+
private ConsumerAwareThreadStateProcessor setUpPollProcessor(boolean batch) {
811+
if (batch) {
812+
if (this.commonBatchInterceptor != null) {
813+
return this.commonBatchInterceptor;
814+
}
815+
}
816+
else if (this.commonRecordInterceptor != null) {
817+
return this.commonRecordInterceptor;
818+
}
819+
return null;
820+
}
821+
805822
@Nullable
806823
private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
807824
CommonErrorHandler common = getCommonErrorHandler();
@@ -1314,22 +1331,8 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
13141331
}
13151332

13161333
private void clearThreadState() {
1317-
if (this.isBatchListener) {
1318-
interceptClearThreadState(this.commonBatchInterceptor);
1319-
}
1320-
else {
1321-
interceptClearThreadState(this.commonRecordInterceptor);
1322-
}
1323-
}
1324-
1325-
private void interceptClearThreadState(BeforeAfterPollProcessor<K, V> processor) {
1326-
if (processor != null) {
1327-
try {
1328-
processor.clearThreadState(this.consumer);
1329-
}
1330-
catch (Exception e) {
1331-
this.logger.error(e, "BeforeAfterPollProcessor.clearThreadState threw an exception");
1332-
}
1334+
if (this.pollThreadStateProcessor != null) {
1335+
this.pollThreadStateProcessor.clearThreadState(this.consumer);
13331336
}
13341337
}
13351338

@@ -1480,22 +1483,8 @@ private ConsumerRecords<K, V> pollConsumer() {
14801483
}
14811484

14821485
private void beforePoll() {
1483-
if (this.isBatchListener) {
1484-
interceptBeforePoll(this.commonBatchInterceptor);
1485-
}
1486-
else {
1487-
interceptBeforePoll(this.commonRecordInterceptor);
1488-
}
1489-
}
1490-
1491-
private void interceptBeforePoll(BeforeAfterPollProcessor<K, V> processor) {
1492-
if (processor != null) {
1493-
try {
1494-
processor.beforePoll(this.consumer);
1495-
}
1496-
catch (Exception e) {
1497-
this.logger.error(e, "BeforeAfterPollProcessor.beforePoll threw an exception");
1498-
}
1486+
if (this.pollThreadStateProcessor != null) {
1487+
this.pollThreadStateProcessor.setupThreadState(this.consumer);
14991488
}
15001489
}
15011490

@@ -2294,6 +2283,9 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
22942283
TransactionSupport.clearTransactionIdSuffix();
22952284
}
22962285
}
2286+
if (this.commonRecordInterceptor != null) {
2287+
this.commonRecordInterceptor.afterRecord(record, this.consumer);
2288+
}
22972289
if (this.nackSleep >= 0) {
22982290
handleNack(records, record);
22992291
break;
@@ -2374,6 +2366,9 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
23742366
}
23752367
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
23762368
doInvokeRecordListener(record, iterator);
2369+
if (this.commonRecordInterceptor != null) {
2370+
this.commonRecordInterceptor.afterRecord(record, this.consumer);
2371+
}
23772372
if (this.nackSleep >= 0) {
23782373
handleNack(records, record);
23792374
break;

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*
3434
*/
3535
@FunctionalInterface
36-
public interface RecordInterceptor<K, V> extends BeforeAfterPollProcessor<K, V> {
36+
public interface RecordInterceptor<K, V> extends ConsumerAwareThreadStateProcessor {
3737

3838
/**
3939
* Perform some action on the record or return a different one. If null is returned
@@ -81,4 +81,15 @@ default void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
8181
default void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
8282
}
8383

84+
/**
85+
* Called when processing the record is complete either
86+
* {@link #success(ConsumerRecord, Consumer)} or
87+
* {@link #failure(ConsumerRecord, Exception, Consumer)}.
88+
* @param record the record.
89+
* @param consumer the consumer.
90+
* @since 2.8
91+
*/
92+
default void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
93+
}
94+
8495
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
/**
20+
* A general interface for managing thread-bound resources.
21+
*
22+
* @author Gary Russell
23+
* @since 2.8
24+
*
25+
*/
26+
public interface ThreadStateProcessor {
27+
28+
/**
29+
* Call to set up thread-bound resources which will be available until
30+
* {@link #clearThreadState()} is called.
31+
*/
32+
default void setupThreadState() {
33+
}
34+
35+
/**
36+
* Call to clear thread-bound resources which were set up in {@link #beforePoll()}
37+
* or by other operations that store thread state.
38+
*/
39+
default void clearThreadState() {
40+
}
41+
42+
}

0 commit comments

Comments
 (0)