Skip to content

Commit 1400fbd

Browse files
garyrussellartembilan
authored andcommitted
GH-2438: RetryTopic Destination Partition Select
Resolves #2438 **cherry-pick to 2.9.x**
1 parent 67e0ed1 commit 1400fbd

File tree

3 files changed

+185
-2
lines changed

3 files changed

+185
-2
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+27
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,33 @@ public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
200200
IMPORTANT: When using this configuration approach, the `@EnableKafkaRetryTopic` annotation should not be used to prevent context failing to start due to duplicated beans.
201201
Use the simple `@EnableKafka` annotation instead.
202202

203+
When `autoCreateTopics` is true, the main and retry topics will be created with the specified number of partitions and replication factor.
204+
To override these values for a particular topic (e.g. the main topic or DLT), simply add a `NewTopic` `@Bean` with the required properties; that will override the auto creation properties.
205+
206+
IMPORTANT: By default, records are published to the retry topic(s) using the original partition of the received record.
207+
If the retry topics have fewer partitions than the main topic, you should configure the framework appropriately; an example follows.
208+
209+
====
210+
[source, java]
211+
----
212+
@EnableKafka
213+
@Configuration
214+
public class Config extends RetryTopicConfigurationSupport {
215+
216+
@Override
217+
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
218+
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
219+
}
220+
221+
...
222+
223+
}
224+
----
225+
====
226+
227+
The parameters to the function are the consumer record and the name of the next topic.
228+
You can return a specific partition number, or `null` to indicate that the `KafkaProducer` should determine the partition.
229+
203230
==== Features
204231

205232
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class DeadLetterPublishingRecovererFactory {
6969

7070
private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED;
7171

72+
private BiFunction<ConsumerRecord<?, ?>, String, Integer> partitionResolver = (cr, nextTopic) -> cr.partition();
73+
7274
public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
7375
this.destinationTopicResolver = destinationTopicResolver;
7476
}
@@ -83,6 +85,19 @@ public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Heade
8385
this.headersFunction = headersFunction;
8486
}
8587

88+
/**
89+
* Set a resolver for the partition number to publish to. By default the same partition as
90+
* the consumer record is used. If the resolver returns {@code null} or a negative number, the
91+
* partition is set to null in the producer record and the {@code KafkaProducer} decides which
92+
* partition to publish to.
93+
* @param resolver the resolver.
94+
* @since 2.9.2
95+
*/
96+
public void setPartitionResolver(BiFunction<ConsumerRecord<?, ?>, String, Integer> resolver) {
97+
Assert.notNull(resolver, "'resolver' cannot be null");
98+
this.partitionResolver = resolver;
99+
}
100+
86101
/**
87102
* Add exception type to the default list. By default, the following exceptions will
88103
* not be retried:
@@ -253,13 +268,17 @@ private static String getRecordInfo(ConsumerRecord<?, ?> cr) {
253268
* and if it doesn't it sets -1, to allow the Producer itself to assign a partition to the record.</p>
254269
*
255270
* <p>Subclasses can inherit from this method to override the implementation, if necessary.</p>
271+
* The destination partition can also be customized using {@link #setPartitionResolver(BiFunction)}.
256272
*
257273
* @param cr The original {@link ConsumerRecord}, which is to be forwarded to DLT
258274
* @param nextDestination The next {@link DestinationTopic}, where the consumerRecord is to be forwarded
259-
* @return An instance of {@link TopicPartition}, specifying the topic and partition, where the cr is to be sent
275+
* @return An instance of {@link TopicPartition}, specifying the topic and partition, where the cr is to be sent.
276+
* @see #setPartitionResolver(BiFunction)
260277
*/
261278
protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, final DestinationTopic nextDestination) {
262-
return new TopicPartition(nextDestination.getDestinationName(), cr.partition());
279+
String nextTopic = nextDestination.getDestinationName();
280+
Integer partition = this.partitionResolver.apply(cr, nextTopic);
281+
return new TopicPartition(nextTopic, partition == null ? -1 : partition);
263282
}
264283

265284
private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.BDDMockito.willAnswer;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.verify;
24+
25+
import java.util.Map;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.function.Consumer;
30+
31+
import org.apache.kafka.clients.producer.ProducerRecord;
32+
import org.junit.jupiter.api.Test;
33+
import org.mockito.ArgumentCaptor;
34+
35+
import org.springframework.beans.factory.annotation.Autowired;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.kafka.annotation.EnableKafka;
39+
import org.springframework.kafka.annotation.KafkaListener;
40+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
41+
import org.springframework.kafka.core.ConsumerFactory;
42+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
43+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
44+
import org.springframework.kafka.core.KafkaOperations;
45+
import org.springframework.kafka.core.KafkaTemplate;
46+
import org.springframework.kafka.support.SendResult;
47+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
48+
import org.springframework.kafka.test.context.EmbeddedKafka;
49+
import org.springframework.kafka.test.utils.KafkaTestUtils;
50+
import org.springframework.scheduling.TaskScheduler;
51+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
52+
import org.springframework.test.annotation.DirtiesContext;
53+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
54+
55+
/**
56+
* @author Gary Russell
57+
* @since 2.9.2
58+
*
59+
*/
60+
@DirtiesContext
61+
@SpringJUnitConfig
62+
@EmbeddedKafka(topics = "partition.resolver.tests")
63+
public class PartitionResolverTests extends AbstractRetryTopicIntegrationTests {
64+
65+
@Test
66+
void testNullPartition(@Autowired KafkaOperations<Integer, String> template,
67+
@Autowired EmbeddedKafkaBroker broker, @Autowired Config config) throws InterruptedException {
68+
69+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
70+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
71+
KafkaTemplate<Integer, String> kt = new KafkaTemplate<>(pf);
72+
kt.send("partition.resolver.tests", 1, null, "test");
73+
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
74+
@SuppressWarnings("unchecked")
75+
ArgumentCaptor<ProducerRecord<Integer, String>> captor = ArgumentCaptor.forClass(ProducerRecord.class);
76+
verify(template).send(captor.capture());
77+
assertThat(captor.getValue().partition()).isNull();
78+
}
79+
80+
@EnableKafka
81+
@Configuration
82+
public static class Config extends RetryTopicConfigurationSupport {
83+
84+
final CountDownLatch latch = new CountDownLatch(1);
85+
86+
@Override
87+
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
88+
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
89+
}
90+
91+
@Bean
92+
RetryTopicConfiguration myRetryTopic(KafkaOperations<Integer, String> template) {
93+
return RetryTopicConfigurationBuilder
94+
.newInstance()
95+
.create(template);
96+
}
97+
98+
@SuppressWarnings("unchecked")
99+
@Bean
100+
KafkaOperations<Integer, String> template() {
101+
KafkaOperations<Integer, String> mock = mock(KafkaOperations.class);
102+
CompletableFuture<SendResult<Integer, String>> future = new CompletableFuture<>();
103+
future.complete(mock(SendResult.class));
104+
willAnswer(inv -> {
105+
latch.countDown();
106+
return future;
107+
}).given(mock).send(any(ProducerRecord.class));
108+
return mock;
109+
}
110+
111+
@KafkaListener(topics = "partition.resolver.tests")
112+
void listen(String in) {
113+
throw new RuntimeException("test");
114+
}
115+
116+
@Bean
117+
ConsumerFactory<Integer, String> cf(EmbeddedKafkaBroker broker) {
118+
Map<String, Object> props = KafkaTestUtils.consumerProps("prt", "false", broker);
119+
return new DefaultKafkaConsumerFactory<>(props);
120+
}
121+
122+
@Bean
123+
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
124+
ConsumerFactory<Integer, String> cf) {
125+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
126+
factory.setConsumerFactory(cf);
127+
return factory;
128+
}
129+
130+
@Bean
131+
TaskScheduler taskScheduler() {
132+
return new ThreadPoolTaskScheduler();
133+
}
134+
135+
}
136+
137+
}

0 commit comments

Comments
 (0)