Skip to content

Commit 0ba3e16

Browse files
authored
GH-1966: Upgrade Kafka Clients to 3.0.0
Resolves #1966 * Update spring-kafka-docs/src/main/asciidoc/kafka.adoc
1 parent da04f48 commit 0ba3e16

File tree

54 files changed

+434
-235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+434
-235
lines changed

build.gradle

+4-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ext {
5959
jaywayJsonPathVersion = '2.4.0'
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.7.2'
62-
kafkaVersion = '2.8.1'
62+
kafkaVersion = '3.0.0'
6363
log4jVersion = '2.14.1'
6464
micrometerVersion = '1.8.0-M3'
6565
mockitoVersion = '3.11.2'
@@ -69,6 +69,7 @@ ext {
6969
springDataVersion = '2021.1.0-M3'
7070
springRetryVersion = '1.3.1'
7171
springVersion = '5.3.10'
72+
zookeeperVersion = '3.6.3'
7273

7374
idPrefix = 'kafka'
7475
}
@@ -341,8 +342,9 @@ project ('spring-kafka-test') {
341342
exclude group: 'org.springframework'
342343
}
343344

345+
api "org.apache.zookeeper:zookeeper:$zookeeperVersion"
344346
api "org.apache.kafka:kafka-clients:$kafkaVersion:test"
345-
api "org.apache.kafka:kafka-streams:$kafkaVersion"
347+
api "org.apache.kafka:kafka-metadata:$kafkaVersion"
346348
api "org.apache.kafka:kafka-streams-test-utils:$kafkaVersion"
347349
api "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
348350
api "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"

spring-kafka-docs/src/main/asciidoc/appendix.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
////
12
[[update-deps]]
23
== Override Spring Boot Dependencies
34

@@ -82,6 +83,7 @@ dependencies {
8283
====
8384

8485
The test scope dependencies are only needed if you are using the embedded Kafka broker in tests.
86+
////
8587
8688
[appendix]
8789
[[history]]

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+3-7
Original file line numberDiff line numberDiff line change
@@ -3555,11 +3555,10 @@ Starting with version 2.6, the default `EOSMode` is `V2`.
35553555

35563556
To configure the container to use mode `ALPHA`, set the container property `EOSMode` to `ALPHA`, to revert to the previous behavior.
35573557

3558-
IMPORTANT: With `V2`, your brokers must be version 2.5 or later, however with `kafka-clients` version 2.6, the producer will automatically fall back to `ALPHA` if the broker does not support `BETA`.
3559-
The `DefaultKafkaProducerFactory` is configured to enable that behavior.
3560-
If your brokers are earlier than 2.5, be sure to leave the `DefaultKafkaProducerFactory` `producerPerConsumerPartition` set to `true` and, if you are using a batch listener, you should set `subBatchPerPartition` to `true`.
3558+
IMPORTANT: With `V2` (default), your brokers must be version 2.5 or later; `kafka-clients` version 3.0, the producer will no longer fall back to `V1`; if the broker does not support `V2`, an exception is thrown.
3559+
If your brokers are earlier than 2.5, you must set the `EOSMode` to `V1`, leave the `DefaultKafkaProducerFactory` `producerPerConsumerPartition` set to `true` and, if you are using a batch listener, you should set `subBatchPerPartition` to `true`.
35613560

3562-
When your brokers are upgraded to 2.5 or later, the producer will automatically use mode `V2`, but the number of producers will remain as before.
3561+
When your brokers are upgraded to 2.5 or later, you should switch the mode to `V2`, but the number of producers will remain as before.
35633562
You can then do a rolling upgrade of your application with `producerPerConsumerPartition` set to `false` to reduce the number of producers; you should also no longer set the `subBatchPerPartition` container property.
35643563

35653564
If your brokers are already 2.5 or newer, you should set the `DefaultKafkaProducerFactory` `producerPerConsumerPartition` property to `false`, to reduce the number of producers needed.
@@ -3570,11 +3569,8 @@ When using `V2` mode, it is no longer necessary to set the `subBatchPerPartition
35703569

35713570
Refer to https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics[KIP-447] for more information.
35723571

3573-
IMPORTANT: `kafka-clients` 3.0.0 and later no longer support `V2` (and automatic fallback to `V1`) with brokers earlier than 2.5; you must therefore override the default (`V2`) with `V1` if your brokers are older (or upgrade your brokers).
3574-
35753572
`V1` and `V2` were previously `ALPHA` and `BETA`; they have been changed to align the framework with https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2[KIP-732].
35763573

3577-
35783574
[[interceptors]]
35793575
==== Wiring Spring Beans into Producer/Consumer Interceptors
35803576

spring-kafka-docs/src/main/asciidoc/quick-tour.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ However, the quickest way to get started is to use https://start.spring.io[start
5151

5252
This quick tour works with the following versions:
5353

54-
* Apache Kafka Clients 2.8.0
54+
* Apache Kafka Clients 3.0.0
5555
* Spring Framework 5.3.x
5656
* Minimum Java version: 8
5757

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ For changes in earlier version, see <<history>>.
66
[[x28-kafka-client]]
77
==== Kafka Client Version
88

9-
This version requires the 2.8.0 `kafka-clients
9+
This version requires the 3.0.0 `kafka-clients
10+
11+
IMPORTANT: When using transactions, `kafka-clients` 3.0.0 and later no longer support `EOSMode.V2` (aka `BETA`) (and automatic fallback to `V1` - aka `ALPHA`) with brokers earlier than 2.5; you must therefore override the default `EOSMode` (`V2`) with `V1` if your brokers are older (or upgrade your brokers).
12+
13+
See <<exactly-once>> and https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics[KIP-447] for more information.
1014

1115
[[x28-packages]]
1216
==== Package Changes

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.kafka.common.utils.Time;
5656
import org.apache.kafka.common.utils.Utils;
5757
import org.apache.kafka.metadata.BrokerState;
58+
import org.apache.zookeeper.client.ZKClientConfig;
5859
import org.apache.zookeeper.server.NIOServerCnxnFactory;
5960
import org.apache.zookeeper.server.ZooKeeperServer;
6061

@@ -67,6 +68,7 @@
6768
import org.springframework.retry.support.RetryTemplate;
6869
import org.springframework.util.Assert;
6970

71+
import kafka.cluster.EndPoint;
7072
import kafka.common.KafkaException;
7173
import kafka.server.KafkaConfig;
7274
import kafka.server.KafkaServer;
@@ -557,7 +559,7 @@ public void destroy() {
557559
}
558560

559561
private boolean brokerRunning(KafkaServer kafkaServer) {
560-
return !kafkaServer.brokerState().get().equals(BrokerState.NOT_RUNNING);
562+
return !kafkaServer.brokerState().equals(BrokerState.NOT_RUNNING);
561563
}
562564

563565
public Set<String> getTopics() {
@@ -584,7 +586,7 @@ public EmbeddedZookeeper getZookeeper() {
584586
public synchronized ZooKeeperClient getZooKeeperClient() {
585587
if (this.zooKeeperClient == null) {
586588
this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout,
587-
1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK");
589+
1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK", new ZKClientConfig(), "embeddedKafkaZK");
588590
}
589591
return this.zooKeeperClient;
590592
}
@@ -595,7 +597,7 @@ public String getZookeeperConnectionString() {
595597

596598
public BrokerAddress getBrokerAddress(int i) {
597599
KafkaServer kafkaServer = this.kafkaServers.get(i);
598-
return new BrokerAddress(LOOPBACK, kafkaServer.config().port());
600+
return new BrokerAddress(LOOPBACK, kafkaServer.config().listeners().apply(0).port());
599601
}
600602

601603
public BrokerAddress[] getBrokerAddresses() {
@@ -612,7 +614,8 @@ public int getPartitionsPerTopic() {
612614

613615
public void bounce(BrokerAddress brokerAddress) {
614616
for (KafkaServer kafkaServer : getKafkaServers()) {
615-
if (brokerAddress.equals(new BrokerAddress(kafkaServer.config().hostName(), kafkaServer.config().port()))) {
617+
EndPoint endpoint = kafkaServer.config().listeners().apply(0);
618+
if (brokerAddress.equals(new BrokerAddress(endpoint.host(), endpoint.port()))) {
616619
kafkaServer.shutdown();
617620
kafkaServer.awaitShutdown();
618621
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java

-8
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,14 @@ public class KafkaStreamsDefaultConfiguration {
5959
/**
6060
* Bean for the default {@link StreamsBuilderFactoryBean}.
6161
* @param streamsConfigProvider the streams config.
62-
* @param customizerProvider the customizer.
6362
* @param configurerProvider the configurer.
6463
*
6564
* @return the factory bean.
6665
*/
67-
@SuppressWarnings("deprecation")
6866
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
6967
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
7068
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME)
7169
ObjectProvider<KafkaStreamsConfiguration> streamsConfigProvider,
72-
ObjectProvider<org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer> customizerProvider,
7370
ObjectProvider<StreamsBuilderFactoryBeanConfigurer> configurerProvider) {
7471

7572
KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable();
@@ -80,11 +77,6 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
8077
configurer.configure(fb);
8178
configuredBy.add(configurer);
8279
});
83-
org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider
84-
.getIfUnique();
85-
if (customizer != null && !configuredBy.contains(customizer)) {
86-
customizer.configure(fb);
87-
}
8880
return fb;
8981
}
9082
else {

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanConfigurer.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@
2828
*
2929
*/
3030
@FunctionalInterface
31-
@SuppressWarnings("deprecation")
32-
public interface StreamsBuilderFactoryBeanConfigurer extends StreamsBuilderFactoryBeanCustomizer, Ordered {
31+
public interface StreamsBuilderFactoryBeanConfigurer extends Ordered {
3332

3433
/**
35-
* Overridden to avoid deprecation warnings.
34+
* Configure the factory bean.
35+
* @param factoryBean the factory bean.
3636
*/
37-
@Override
3837
void configure(StreamsBuilderFactoryBean factoryBean);
3938

4039
@Override

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,6 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
212212
setTransactionIdPrefix(txId);
213213
this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
214214
}
215-
this.configs.put("internal.auto.downgrade.txn.commit", true);
216215
}
217216

218217
private Supplier<Serializer<K>> keySerializerSupplier(

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

+4
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,9 @@ public interface KafkaOperations<K, V> {
208208
* the container will take care of sending the offsets to the transaction.
209209
* @param offsets The offsets.
210210
* @since 1.3
211+
* @deprecated in the 3.0.0 KafkaProducer.
211212
*/
213+
@Deprecated
212214
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);
213215

214216
/**
@@ -220,7 +222,9 @@ public interface KafkaOperations<K, V> {
220222
* @param offsets The offsets.
221223
* @param consumerGroupId the consumer's group.id.
222224
* @since 1.3
225+
* @deprecated in the 3.0.0 KafkaProducer.
223226
*/
227+
@Deprecated
224228
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);
225229

226230
/**

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+3
Original file line numberDiff line numberDiff line change
@@ -546,11 +546,14 @@ public void flush() {
546546

547547

548548
@Override
549+
@Deprecated
549550
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
550551
sendOffsetsToTransaction(offsets, KafkaUtils.getConsumerGroupId());
551552
}
552553

554+
@SuppressWarnings("deprecation")
553555
@Override
556+
@Deprecated
554557
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
555558
producerForOffsets().sendOffsetsToTransaction(offsets, consumerGroupId);
556559
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void checkConfig() {
128128
"A KafkaOperations is required when 'commitRecovered' is true");
129129
}
130130

131-
@SuppressWarnings({ "unchecked", "rawtypes" })
131+
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
132132
@Override
133133
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
134134
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+1
Original file line numberDiff line numberDiff line change
@@ -2689,6 +2689,7 @@ private void sendOffsetsToTransaction() {
26892689
doSendOffsets(this.producer, commits);
26902690
}
26912691

2692+
@SuppressWarnings("deprecation")
26922693
private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMetadata> commits) {
26932694
if (this.eosMode.getMode().equals(EOSMode.V1)) {
26942695
prod.sendOffsetsToTransaction(commits, this.consumerGroupId);

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,7 +72,7 @@ public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) {
7272
}
7373
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) data;
7474
return new ConsumerRecordMetadata(new RecordMetadata(new TopicPartition(record.topic(), record.partition()),
75-
0, record.offset(), record.timestamp(), null, record.serializedKeySize(),
75+
record.offset(), 0, record.timestamp(), record.serializedKeySize(),
7676
record.serializedValueSize()), record.timestampType());
7777
}
7878

spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.List;
21+
import java.util.Optional;
2122

2223
import org.apache.kafka.clients.consumer.ConsumerRecord;
2324
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -77,9 +78,9 @@ public KeyValue<K, R> transform(K key, V value) {
7778
ConsumerRecord<Object, Object> record = new ConsumerRecord<Object, Object>(this.processorContext.topic(),
7879
this.processorContext.partition(), this.processorContext.offset(),
7980
this.processorContext.timestamp(), TimestampType.NO_TIMESTAMP_TYPE,
80-
null, 0, 0,
81+
0, 0,
8182
key, value,
82-
headers);
83+
headers, Optional.empty());
8384
Message<?> message = this.converter.toMessage(record, null, null, null);
8485
message = this.function.exchange(message);
8586
List<String> headerList = new ArrayList<>();

spring-kafka/src/test/java/org/springframework/kafka/config/RecordMessagingMessageListenerAdapterTests.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,8 +21,10 @@
2121
import java.lang.reflect.Method;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.Optional;
2425

2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.common.header.internals.RecordHeaders;
2628
import org.apache.kafka.common.record.TimestampType;
2729
import org.junit.jupiter.api.Test;
2830

@@ -51,7 +53,7 @@ void testMetaString() throws Exception {
5153
RecordMessagingMessageListenerAdapter<Object, Object> adapter =
5254
(RecordMessagingMessageListenerAdapter<Object, Object>) endpoint.createMessageListener(null, null);
5355
ConsumerRecord<Object, Object> record = new ConsumerRecord<>("topic", 0, 42L, 43L, TimestampType.CREATE_TIME,
54-
0L, 44, 45, null, "foo");
56+
44, 45, null, "foo", new RecordHeaders(), Optional.empty());
5557
adapter.onMessage(record, null, null);
5658
assertThat(bean.string).isEqualTo("foo");
5759
assertThat(bean.metadata.topic()).isEqualTo(record.topic());
@@ -86,7 +88,7 @@ else if (method.getName().equals("listen")) {
8688
adapter.onMessage(new ConsumerRecord<>("topic", 0, 0L, null, 42), null, null);
8789
assertThat(bean.value).isEqualTo(42);
8890
ConsumerRecord<Object, Object> record = new ConsumerRecord<>("topic", 0, 42L, 43L, TimestampType.CREATE_TIME,
89-
0L, 44, 45, null, "foo");
91+
44, 45, null, "foo", new RecordHeaders(), Optional.empty());
9092
adapter.onMessage(record, null, null);
9193
assertThat(bean.string).isEqualTo("foo");
9294
assertThat(bean.metadata.topic()).isEqualTo(record.topic());

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -457,23 +457,23 @@ void configUpdates() {
457457
Map<String, Object> configs = new HashMap<>();
458458
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
459459
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs);
460-
assertThat(pf.getConfigurationProperties()).hasSize(2);
460+
assertThat(pf.getConfigurationProperties()).hasSize(1);
461461
configs.remove(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
462462
configs.put(ProducerConfig.ACKS_CONFIG, "all");
463463
pf.updateConfigs(configs);
464-
assertThat(pf.getConfigurationProperties()).hasSize(3);
465-
pf.removeConfig(ProducerConfig.ACKS_CONFIG);
466464
assertThat(pf.getConfigurationProperties()).hasSize(2);
465+
pf.removeConfig(ProducerConfig.ACKS_CONFIG);
466+
assertThat(pf.getConfigurationProperties()).hasSize(1);
467467
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");
468468
assertThatIllegalArgumentException().isThrownBy(() -> pf.updateConfigs(configs));
469469
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
470470
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
471471
DefaultKafkaProducerFactory pf1 = new DefaultKafkaProducerFactory<>(configs);
472-
assertThat(pf1.getConfigurationProperties()).hasSize(5);
472+
assertThat(pf1.getConfigurationProperties()).hasSize(4);
473473
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId2");
474474
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx2-");
475475
pf1.updateConfigs(configs);
476-
assertThat(pf1.getConfigurationProperties()).hasSize(5);
476+
assertThat(pf1.getConfigurationProperties()).hasSize(4);
477477
assertThat(KafkaTestUtils.getPropertyValue(pf1, "clientIdPrefix")).isEqualTo("clientId2");
478478
assertThat(KafkaTestUtils.getPropertyValue(pf1, "transactionIdPrefix")).isEqualTo("tx2-");
479479
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,15 @@ public void testAddTopicsAndAddPartitions() throws Exception {
9696
int n = 0;
9797
await().until(() -> {
9898
results.putAll(this.admin.describeTopics("foo", "bar"));
99+
TopicDescription foo = results.values().stream()
100+
.filter(tp -> tp.name().equals("foo"))
101+
.findFirst()
102+
.get();
99103
TopicDescription bar = results.values().stream()
100104
.filter(tp -> tp.name().equals("bar"))
101105
.findFirst()
102106
.get();
103-
return bar.partitions().size() != 1;
107+
return foo.partitions().size() == 4 && bar.partitions().size() == 3;
104108
});
105109
results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 4 : 3));
106110
new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", Optional.of(5));

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public void onError(ProducerRecord<Integer, String> producerRecord, RecordMetada
330330
//Drain the topic
331331
KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
332332
pf.destroy();
333-
cpl.onError(records.get(0), new RecordMetadata(new TopicPartition(INT_KEY_TOPIC, -1), 0L, 0L, 0L, 0L, 0, 0),
333+
cpl.onError(records.get(0), new RecordMetadata(new TopicPartition(INT_KEY_TOPIC, -1), 0L, 0, 0L, 0, 0),
334334
new RuntimeException("x"));
335335
assertThat(onErrorDelegateCalls.get()).isEqualTo(2);
336336
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public class KafkaTemplateTransactionTests {
9898

9999
private final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker();
100100

101+
@SuppressWarnings("deprecation")
101102
@Test
102103
public void testLocalTransaction() {
103104
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

0 commit comments

Comments
 (0)