Skip to content

Commit afd52e0

Browse files
authored
GH-1948: KafkaTemplate Receive Multiple Records
Resolves #1948 * Remove `@Nullable`. * Move receive methods with no timeout to interface defaults.
1 parent 39eefc8 commit afd52e0

File tree

4 files changed

+98
-24
lines changed

4 files changed

+98
-24
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+10-3
Original file line numberDiff line numberDiff line change
@@ -2378,18 +2378,25 @@ The `Collection` beans will be removed in a future release.
23782378

23792379
This section covers how to use `KafkaTemplate` to receive messages.
23802380

2381-
Starting with version 2.8, the template has two `receive()` methods:
2381+
Starting with version 2.8, the template has four `receive()` methods:
23822382

23832383
====
2384-
[source, jvava]
2384+
[source, java]
23852385
----
23862386
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
23872387
23882388
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
2389+
2390+
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
2391+
2392+
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
23892393
----
23902394
====
23912395

2392-
As you can see, you need to know the partition and offset of the record you need to retrieve; a new `Consumer` is created (and closed) for each operation.
2396+
As you can see, you need to know the partition and offset of the record(s) you need to retrieve; a new `Consumer` is created (and closed) for each operation.
2397+
2398+
With the last two methods, each record is retrieved individually and the results assembled into a `ConsumerRecords` object.
2399+
When creating the `TopicPartitionOffset` s for the request, only positive, absolute offsets are supported.
23932400

23942401
[[container-props]]
23952402
==== Listener Container Properties

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.time.Duration;
20+
import java.util.Collection;
2021
import java.util.List;
2122
import java.util.Map;
2223

2324
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2527
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2628
import org.apache.kafka.clients.producer.Producer;
2729
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -31,6 +33,7 @@
3133
import org.apache.kafka.common.TopicPartition;
3234

3335
import org.springframework.kafka.support.SendResult;
36+
import org.springframework.kafka.support.TopicPartitionOffset;
3437
import org.springframework.lang.Nullable;
3538
import org.springframework.messaging.Message;
3639
import org.springframework.util.concurrent.ListenableFuture;
@@ -284,7 +287,9 @@ default ProducerFactory<K, V> getProducerFactory() {
284287
* @see #DEFAULT_POLL_TIMEOUT
285288
*/
286289
@Nullable
287-
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
290+
default ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
291+
return receive(topic, partition, offset, DEFAULT_POLL_TIMEOUT);
292+
}
288293

289294
/**
290295
* Receive a single record.
@@ -298,6 +303,27 @@ default ProducerFactory<K, V> getProducerFactory() {
298303
@Nullable
299304
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
300305

306+
/**
307+
* Receive a multiple records with the default poll timeout (5 seconds). Only
308+
* absolute, positive offsets are supported.
309+
* @param requested a collection of record requests (topic/partition/offset).
310+
* @return the records
311+
* @since 2.8
312+
* @see #DEFAULT_POLL_TIMEOUT
313+
*/
314+
default ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested) {
315+
return receive(requested, DEFAULT_POLL_TIMEOUT);
316+
}
317+
318+
/**
319+
* Receive multiple records. Only absolute, positive offsets are supported.
320+
* @param requested a collection of record requests (topic/partition/offset).
321+
* @param pollTimeout the timeout.
322+
* @return the record or null.
323+
* @since 2.8
324+
*/
325+
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
326+
301327
/**
302328
* A callback for executing arbitrary operations on the {@link Producer}.
303329
* @param <K> the key type.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+36-14
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
2022
import java.util.Collections;
2123
import java.util.HashMap;
24+
import java.util.LinkedHashMap;
2225
import java.util.List;
2326
import java.util.Map;
2427
import java.util.Properties;
@@ -54,6 +57,7 @@
5457
import org.springframework.kafka.support.LoggingProducerListener;
5558
import org.springframework.kafka.support.ProducerListener;
5659
import org.springframework.kafka.support.SendResult;
60+
import org.springframework.kafka.support.TopicPartitionOffset;
5761
import org.springframework.kafka.support.TransactionSupport;
5862
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5963
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -558,29 +562,47 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
558562
producerForOffsets().sendOffsetsToTransaction(offsets, groupMetadata);
559563
}
560564

561-
562565
@Override
563566
@Nullable
564-
public ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
565-
return receive(topic, partition, offset, DEFAULT_POLL_TIMEOUT);
567+
public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
568+
Properties props = oneOnly();
569+
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
570+
TopicPartition topicPartition = new TopicPartition(topic, partition);
571+
return receiveOne(topicPartition, offset, pollTimeout, consumer);
572+
}
566573
}
567574

568575
@Override
569-
@Nullable
570-
public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
576+
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
577+
Properties props = oneOnly();
578+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
579+
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
580+
requested.forEach(tpo -> {
581+
ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
582+
records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>()).add(one);
583+
});
584+
return new ConsumerRecords<>(records);
585+
}
586+
}
587+
588+
private Properties oneOnly() {
571589
Assert.notNull(this.consumerFactory, "A consumerFactory is required");
572590
Properties props = new Properties();
573591
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
574-
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
575-
TopicPartition topicPartition = new TopicPartition(topic, partition);
576-
consumer.assign(Collections.singletonList(topicPartition));
577-
consumer.seek(topicPartition, offset);
578-
ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
579-
if (records.count() == 1) {
580-
return records.iterator().next();
581-
}
582-
return null;
592+
return props;
593+
}
594+
595+
@Nullable
596+
private ConsumerRecord<K, V> receiveOne(TopicPartition topicPartition, long offset, Duration pollTimeout,
597+
Consumer<K, V> consumer) {
598+
599+
consumer.assign(Collections.singletonList(topicPartition));
600+
consumer.seek(topicPartition, offset);
601+
ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
602+
if (records.count() == 1) {
603+
return records.iterator().next();
583604
}
605+
return null;
584606
}
585607

586608
private Producer<K, V> producerForOffsets() {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

+25-6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Iterator;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.Set;
3839
import java.util.UUID;
3940
import java.util.concurrent.CountDownLatch;
4041
import java.util.concurrent.TimeUnit;
@@ -44,6 +45,7 @@
4445

4546
import org.apache.kafka.clients.consumer.Consumer;
4647
import org.apache.kafka.clients.consumer.ConsumerRecord;
48+
import org.apache.kafka.clients.consumer.ConsumerRecords;
4749
import org.apache.kafka.clients.producer.Callback;
4850
import org.apache.kafka.clients.producer.Producer;
4951
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -72,6 +74,7 @@
7274
import org.springframework.kafka.support.KafkaUtils;
7375
import org.springframework.kafka.support.ProducerListener;
7476
import org.springframework.kafka.support.SendResult;
77+
import org.springframework.kafka.support.TopicPartitionOffset;
7578
import org.springframework.kafka.support.converter.MessagingMessageConverter;
7679
import org.springframework.kafka.test.EmbeddedKafkaBroker;
7780
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
@@ -144,9 +147,9 @@ void testTemplate() {
144147
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
145148
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
146149

147-
template.send(INT_KEY_TOPIC, 0, null, "qux");
150+
template.send(INT_KEY_TOPIC, 1, null, "qux");
148151
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
149-
assertThat(received).has(allOf(keyValue(null, "qux"), partition(0)));
152+
assertThat(received).has(allOf(keyValue(null, "qux"), partition(1)));
150153

151154
template.send(MessageBuilder.withPayload("fiz")
152155
.setHeader(KafkaHeaders.TOPIC, INT_KEY_TOPIC)
@@ -157,11 +160,11 @@ void testTemplate() {
157160
assertThat(received).has(allOf(keyValue(2, "fiz"), partition(0)));
158161

159162
template.send(MessageBuilder.withPayload("buz")
160-
.setHeader(KafkaHeaders.PARTITION_ID, 0)
163+
.setHeader(KafkaHeaders.PARTITION_ID, 1)
161164
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
162165
.build());
163166
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
164-
assertThat(received).has(allOf(keyValue(2, "buz"), partition(0)));
167+
assertThat(received).has(allOf(keyValue(2, "buz"), partition(1)));
165168

166169
Map<MetricName, ? extends Metric> metrics = template.execute(Producer::metrics);
167170
assertThat(metrics).isNotNull();
@@ -173,10 +176,26 @@ void testTemplate() {
173176
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
174177
template.setConsumerFactory(
175178
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
176-
ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 0, received.offset());
177-
assertThat(receive).has(allOf(keyValue(2, "buz"), partition(0)))
179+
ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 1, received.offset());
180+
assertThat(receive).has(allOf(keyValue(2, "buz"), partition(1)))
178181
.extracting(rec -> rec.offset())
179182
.isEqualTo(received.offset());
183+
ConsumerRecords<Integer, String> records = template.receive(List.of(
184+
new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L),
185+
new TopicPartitionOffset(INT_KEY_TOPIC, 0, 1L),
186+
new TopicPartitionOffset(INT_KEY_TOPIC, 0, 0L),
187+
new TopicPartitionOffset(INT_KEY_TOPIC, 1, 0L)));
188+
assertThat(records.count()).isEqualTo(4);
189+
Set<TopicPartition> partitions2 = records.partitions();
190+
assertThat(partitions2).containsExactly(
191+
new TopicPartition(INT_KEY_TOPIC, 1),
192+
new TopicPartition(INT_KEY_TOPIC, 0));
193+
assertThat(records.records(new TopicPartition(INT_KEY_TOPIC, 1)))
194+
.extracting(rec -> rec.offset())
195+
.containsExactly(1L, 0L);
196+
assertThat(records.records(new TopicPartition(INT_KEY_TOPIC, 0)))
197+
.extracting(rec -> rec.offset())
198+
.containsExactly(1L, 0L);
180199
pf.destroy();
181200
}
182201

0 commit comments

Comments
 (0)