Skip to content

Commit 5001396

Browse files
garyrussellartembilan
authored andcommitted
GH-1957: Align EOSMode with KIP-732
Resolves #1957
1 parent f49db5e commit 5001396

File tree

8 files changed

+89
-41
lines changed

8 files changed

+89
-41
lines changed

spring-kafka-docs/src/main/asciidoc/appendix.adoc

+4-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
== Override Spring Boot Dependencies
33

44
When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot's dependency management.
5-
If you wish to use a different version of `kafka-clients` or `kafka-streams`, such as 2.8.0, you need to override all of the associated dependencies.
5+
If you wish to use a different version of `kafka-clients` or `kafka-streams`, such as 3.0.0, you need to override all of the associated dependencies.
66
This is especially true when using the embedded Kafka broker in `spring-kafka-test`.
77

8-
IMPORTANT: Backwards compatibility is not supported for all Boot versions; Spring for Apache Kafka 2.7 has been tested with Spring Boot 2.4 and 2.5.
8+
IMPORTANT: Backwards compatibility is not supported for all Boot versions; Spring for Apache Kafka 2.8 has been tested with Spring Boot 2.5 and 2.6.
9+
10+
IMPORTANT: When using the 3.0.0 clients, the default `EOSMode.V2` is not supported unless the broker is 2.5 or higher; if you wish to use the 3.0.0 clients with an earlier broker, you must change the container's `EOSMode` to `V1`.
911

1012
====
1113
[source, xml, subs="+attributes", role="primary"]

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+17-10
Original file line numberDiff line numberDiff line change
@@ -2462,7 +2462,7 @@ The default executor creates threads named `<name>-C-n`; with the `KafkaMessageL
24622462
|See <<delivery-header>>.
24632463

24642464
|[[eosMode]]<<eosMode,`eosMode`>>
2465-
|`BETA`
2465+
|`V2`
24662466
|Exactly Once Semantics mode; see <<exactly-once>>.
24672467

24682468
|[[fixTxOffsets]]<<fixTxOffsets,`fixTxOffsets`>>
@@ -3534,33 +3534,40 @@ This means that, for a `read->process-write` sequence, it is guaranteed that the
35343534

35353535
Spring for Apache Kafka version 2.5 and later supports two EOS modes:
35363536

3537-
* `ALPHA` - aka `transactional.id` fencing (since version 0.11.0.0)
3538-
* `BETA` - aka fetch-offset-request fencing (since version 2.5)
3537+
* `ALPHA` - alias for `V1` (deprecated)
3538+
* `BETA` - alias for `V2` (deprecated)
3539+
* `V1` - aka `transactional.id` fencing (since version 0.11.0.0)
3540+
* `V2` - aka fetch-offset-request fencing (since version 2.5)
35393541

3540-
With mode `ALPHA`, the producer is "fenced" if another instance with the same `transactional.id` is started.
3542+
With mode `V1`, the producer is "fenced" if another instance with the same `transactional.id` is started.
35413543
Spring manages this by using a `Producer` for each `group.id/topic/partition`; when a rebalance occurs a new instance will use the same `transactional.id` and the old producer is fenced.
35423544

3543-
With mode `BETA`, it is not necessary to have a producer for each `group.id/topic/partition` because consumer metadata is sent along with the offsets to the transaction and the broker can determine if the producer is fenced using that information instead.
3545+
With mode `V2`, it is not necessary to have a producer for each `group.id/topic/partition` because consumer metadata is sent along with the offsets to the transaction and the broker can determine if the producer is fenced using that information instead.
35443546

3545-
Starting with version 2.6, the default `EOSMode` is `BETA`.
3547+
Starting with version 2.6, the default `EOSMode` is `V2`.
35463548

35473549
To configure the container to use mode `ALPHA`, set the container property `EOSMode` to `ALPHA`, to revert to the previous behavior.
35483550

3549-
IMPORTANT: With `BETA`, your brokers must be version 2.5 or later, however with `kafka-clients` version 2.6, the producer will automatically fall back to `ALPHA` if the broker does not support `BETA`.
3551+
IMPORTANT: With `V2`, your brokers must be version 2.5 or later, however with `kafka-clients` version 2.6, the producer will automatically fall back to `ALPHA` if the broker does not support `BETA`.
35503552
The `DefaultKafkaProducerFactory` is configured to enable that behavior.
35513553
If your brokers are earlier than 2.5, be sure to leave the `DefaultKafkaProducerFactory` `producerPerConsumerPartition` set to `true` and, if you are using a batch listener, you should set `subBatchPerPartition` to `true`.
35523554

3553-
When your brokers are upgraded to 2.5 or later, the producer will automatically switch to using mode `BETA`, but the number of producers will remain as before.
3555+
When your brokers are upgraded to 2.5 or later, the producer will automatically use mode `V2`, but the number of producers will remain as before.
35543556
You can then do a rolling upgrade of your application with `producerPerConsumerPartition` set to `false` to reduce the number of producers; you should also no longer set the `subBatchPerPartition` container property.
35553557

35563558
If your brokers are already 2.5 or newer, you should set the `DefaultKafkaProducerFactory` `producerPerConsumerPartition` property to `false`, to reduce the number of producers needed.
35573559

3558-
IMPORTANT: When using `EOSMode.BETA` with `producerPerConsumerPartition=false` the `transactional.id` must be unique across all application instances.
3560+
IMPORTANT: When using `EOSMode.V2` with `producerPerConsumerPartition=false` the `transactional.id` must be unique across all application instances.
35593561

3560-
When using `BETA` mode, it is no longer necessary to set the `subBatchPerPartition` to `true`; it will default to `false` when the `EOSMode` is `BETA`.
3562+
When using `V2` mode, it is no longer necessary to set the `subBatchPerPartition` to `true`; it will default to `false` when the `EOSMode` is `V2`.
35613563

35623564
Refer to https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics[KIP-447] for more information.
35633565

3566+
IMPORTANT: `kafka-clients` 3.0.0 and later no longer support `V2` (and automatic fallback to `V1`) with brokers earlier than 2.5; you must therefore override the default (`V2`) with `V1` if your brokers are older (or upgrade your brokers).
3567+
3568+
`V1` and `V2` were previously `ALPHA` and `BETA`; they have been changed to align the framework with https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2[KIP-732].
3569+
3570+
35643571
[[interceptors]]
35653572
==== Wiring Spring Beans into Producer/Consumer Interceptors
35663573

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

+46-7
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,49 @@ public enum EOSMode {
149149
/**
150150
* 'transactional.id' fencing (0.11 - 2.4 brokers).
151151
*/
152-
ALPHA,
152+
V1,
153153

154154
/**
155155
* fetch-offset-request fencing (2.5+ brokers).
156156
*/
157-
BETA,
157+
V2,
158+
159+
/**
160+
* 'transactional.id' fencing (0.11 - 2.4 brokers).
161+
* @deprecated in favor of {@link #V1}.
162+
*/
163+
@Deprecated
164+
ALPHA(V1),
165+
166+
/**
167+
* fetch-offset-request fencing (2.5+ brokers).
168+
* @deprecated in favor of {@link #V2}.
169+
*/
170+
@Deprecated
171+
BETA(V2);
172+
173+
174+
final EOSMode mode;
175+
176+
EOSMode() {
177+
this.mode = this;
178+
}
179+
180+
/**
181+
* Create an alias.
182+
* @param v22 the mode for which this is an alias.
183+
*/
184+
EOSMode(EOSMode v12) {
185+
this.mode = v12;
186+
}
187+
188+
/**
189+
* Return the mode or the aliased mode.
190+
* @return the mode.
191+
*/
192+
public EOSMode getMode() {
193+
return this.mode;
194+
}
158195

159196
}
160197

@@ -275,7 +312,7 @@ public enum EOSMode {
275312

276313
private boolean deliveryAttemptHeader;
277314

278-
private EOSMode eosMode = EOSMode.BETA;
315+
private EOSMode eosMode = EOSMode.V2;
279316

280317
private TransactionDefinition transactionDefinition;
281318

@@ -755,12 +792,14 @@ public EOSMode getEosMode() {
755792
}
756793

757794
/**
758-
* Set the exactly once semantics mode. When {@link EOSMode#ALPHA} a producer per
795+
* Set the exactly once semantics mode. When {@link EOSMode#V1} a producer per
759796
* group/topic/partition is used (enabling 'transactional.id fencing`).
760-
* {@link EOSMode#BETA} enables fetch-offset-request fencing, and requires brokers 2.5
761-
* or later. With the 2.6 client, the default is now BETA because the 2.6 client can
797+
* {@link EOSMode#V2} enables fetch-offset-request fencing, and requires brokers 2.5
798+
* or later. With the 2.6 client, the default is now V2 because the 2.6 client can
762799
* automatically fall back to ALPHA.
763-
* @param eosMode the mode; default BETA.
800+
* IMPORTANT the 3.0 clients cannot be used with {@link EOSMode#V2} unless the broker
801+
* is 2.5 or higher.
802+
* @param eosMode the mode; default V2.
764803
* @since 2.5
765804
*/
766805
public void setEosMode(EOSMode eosMode) {

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
137137
getRecoveryStrategy((List) records, exception), container, this.logger)
138138
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
139139
ConsumerRecord<K, V> skipped = records.get(0);
140-
if (EOSMode.ALPHA.equals(eosMode)) {
140+
if (EOSMode.V1.equals(eosMode.getMode())) {
141141
this.kafkaTemplate.sendOffsetsToTransaction(
142142
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
143143
new OffsetAndMetadata(skipped.offset() + 1)));

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,7 @@ private boolean setupSubBatchPerPartition() {
905905
if (this.transactionManager == null) {
906906
return false;
907907
}
908-
return this.eosMode.equals(EOSMode.ALPHA);
908+
return this.eosMode.getMode().equals(EOSMode.V1);
909909
}
910910

911911
@Nullable
@@ -2689,7 +2689,7 @@ private void sendOffsetsToTransaction() {
26892689
}
26902690

26912691
private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMetadata> commits) {
2692-
if (this.eosMode.equals(EOSMode.ALPHA)) {
2692+
if (this.eosMode.getMode().equals(EOSMode.V1)) {
26932693
prod.sendOffsetsToTransaction(commits, this.consumerGroupId);
26942694
}
26952695
else {

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,18 @@ void testClassifier() {
7575
Consumer<String, String> consumer = mock(Consumer.class);
7676
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
7777
MessageListenerContainer container = mock(MessageListenerContainer.class);
78-
processor.process(records, consumer, container, illegalState, true, EOSMode.ALPHA);
78+
processor.process(records, consumer, container, illegalState, true, EOSMode.V1);
7979
processor.process(records, consumer, container,
80-
new DeserializationException("intended", null, false, illegalState), true, EOSMode.ALPHA);
80+
new DeserializationException("intended", null, false, illegalState), true, EOSMode.V1);
8181
verify(template).sendOffsetsToTransaction(anyMap());
8282
verify(template, never()).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class));
8383
assertThat(recovered.get()).isSameAs(record1);
8484
processor.addNotRetryableExceptions(IllegalStateException.class);
8585
recovered.set(null);
8686
recovererShouldFail.set(true);
87-
processor.process(records, consumer, container, illegalState, true, EOSMode.ALPHA);
87+
processor.process(records, consumer, container, illegalState, true, EOSMode.V1);
8888
verify(template, times(1)).sendOffsetsToTransaction(anyMap()); // recovery failed
89-
processor.process(records, consumer, container, illegalState, true, EOSMode.BETA);
89+
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
9090
verify(template, times(1)).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class));
9191
assertThat(recovered.get()).isSameAs(record1);
9292
InOrder inOrder = inOrder(consumer);
@@ -123,12 +123,12 @@ void testBatchBackOff() {
123123
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
124124
MessageListenerContainer container = mock(MessageListenerContainer.class);
125125
given(container.isRunning()).willReturn(true);
126-
processor.process(records, consumer, container, illegalState, false, EOSMode.BETA);
127-
processor.process(records, consumer, container, illegalState, false, EOSMode.BETA);
126+
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
127+
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
128128
verify(backOff, times(2)).start();
129129
verify(execution.get(), times(2)).nextBackOff();
130130
processor.clearThreadState();
131-
processor.process(records, consumer, container, illegalState, false, EOSMode.BETA);
131+
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
132132
verify(backOff, times(3)).start();
133133
}
134134

@@ -144,7 +144,7 @@ void testEarlyExitBackOff() {
144144
MessageListenerContainer container = mock(MessageListenerContainer.class);
145145
given(container.isRunning()).willReturn(false);
146146
long t1 = System.currentTimeMillis();
147-
processor.process(records, consumer, container, illegalState, true, EOSMode.BETA);
147+
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
148148
assertThat(System.currentTimeMillis() < t1 + 5_000);
149149
}
150150

@@ -161,7 +161,7 @@ void testNoEarlyExitBackOff() {
161161
MessageListenerContainer container = mock(MessageListenerContainer.class);
162162
given(container.isRunning()).willReturn(true);
163163
long t1 = System.currentTimeMillis();
164-
processor.process(records, consumer, container, illegalState, true, EOSMode.BETA);
164+
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
165165
assertThat(System.currentTimeMillis() >= t1 + 200);
166166
}
167167

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -156,7 +156,7 @@ void defaults() {
156156
containerProps = new ContainerProperties("sbpp");
157157
containerProps.setMessageListener(mock(MessageListener.class));
158158
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
159-
containerProps.setEosMode(EOSMode.ALPHA);
159+
containerProps.setEosMode(EOSMode.V1);
160160
container = new KafkaMessageListenerContainer<>(cf, containerProps);
161161
container.start();
162162
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))
@@ -166,7 +166,7 @@ void defaults() {
166166
containerProps = new ContainerProperties("sbpp");
167167
containerProps.setMessageListener(mock(MessageListener.class));
168168
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
169-
containerProps.setEosMode(EOSMode.BETA);
169+
containerProps.setEosMode(EOSMode.V2);
170170
container = new KafkaMessageListenerContainer<>(cf, containerProps);
171171
container.start();
172172
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition"))

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -146,27 +146,27 @@ public static void setup() {
146146

147147
@Test
148148
public void testConsumeAndProduceTransactionKTM() throws Exception {
149-
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.ALPHA);
149+
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.V1);
150150
}
151151

152152
@Test
153153
public void testConsumeAndProduceTransactionHandleError() throws Exception {
154-
testConsumeAndProduceTransactionGuts(true, AckMode.RECORD, EOSMode.ALPHA);
154+
testConsumeAndProduceTransactionGuts(true, AckMode.RECORD, EOSMode.V1);
155155
}
156156

157157
@Test
158158
public void testConsumeAndProduceTransactionKTMManual() throws Exception {
159-
testConsumeAndProduceTransactionGuts(false, AckMode.MANUAL_IMMEDIATE, EOSMode.ALPHA);
159+
testConsumeAndProduceTransactionGuts(false, AckMode.MANUAL_IMMEDIATE, EOSMode.V1);
160160
}
161161

162162
@Test
163163
public void testConsumeAndProduceTransactionKTM_BETA() throws Exception {
164-
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.BETA);
164+
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.V1);
165165
}
166166

167167
@Test
168168
public void testConsumeAndProduceTransactionStopWhenFenced() throws Exception {
169-
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.BETA, true);
169+
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.V2, true);
170170
}
171171

172172
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -272,7 +272,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a
272272
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
273273
InOrder inOrder = inOrder(producer);
274274
inOrder.verify(producer).beginTransaction();
275-
if (eosMode.equals(EOSMode.ALPHA)) {
275+
if (eosMode.equals(EOSMode.V1)) {
276276
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
277277
new OffsetAndMetadata(0)), "group");
278278
}
@@ -291,7 +291,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a
291291
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
292292
inOrder.verify(producer).send(captor.capture(), any(Callback.class));
293293
assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
294-
if (eosMode.equals(EOSMode.ALPHA)) {
294+
if (eosMode.equals(EOSMode.V1)) {
295295
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
296296
new OffsetAndMetadata(1)), "group");
297297
}

0 commit comments

Comments
 (0)