Skip to content

Commit 239e350

Browse files
authored
GH-2423: Upgrade to Kafka 3.3.1, Streams Proc. API
Resolves #2423 Upgrade Apache Kafka version to 3.3.1. Migrate transformers to the new processor API. * Update docs.
1 parent f408490 commit 239e350

File tree

14 files changed

+257
-39
lines changed

14 files changed

+257
-39
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ ext {
6464
jaywayJsonPathVersion = '2.6.0'
6565
junit4Version = '4.13.2'
6666
junitJupiterVersion = '5.9.0'
67-
kafkaVersion = '3.2.3'
67+
kafkaVersion = '3.3.1'
6868
log4jVersion = '2.18.0'
6969
micrometerVersion = '1.10.0-M6'
7070
micrometerTracingVersion = '1.0.0-M8'

spring-kafka-docs/src/main/asciidoc/appendix.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ If you wish to use a different version of `kafka-clients` or `kafka-streams`, an
99
.Maven
1010
----
1111
<properties>
12-
<kafka.version>3.2.3</kafka.version>
12+
<kafka.version>3.3.1</kafka.version>
1313
</properties>
1414
1515
<dependency>

spring-kafka-docs/src/main/asciidoc/quick-tour.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ However, the quickest way to get started is to use https://start.spring.io[start
5151

5252
This quick tour works with the following versions:
5353

54-
* Apache Kafka Clients 3.2.x
54+
* Apache Kafka Clients 3.3.x
5555
* Spring Framework 6.0.x
5656
* Minimum Java version: 17
5757

spring-kafka-docs/src/main/asciidoc/streams.adoc

+13-11
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,13 @@ Starting with version 2.7, the default is to never clean up local state.
239239
[[streams-header-enricher]]
240240
==== Header Enricher
241241

242-
Version 2.3 added the `HeaderEnricher` implementation of `Transformer`.
242+
Version 3.0 added the `HeaderEnricherProcessor` extension of `ContextualProcessor`; providing the same functionality as the deprecated `HeaderEnricher` which implemented the deprecated `Transformer` interface.
243243
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
244244

245-
* `context` - the `ProcessorContext`, allowing access to the current record metadata
245+
* `record` - the `org.apache.kafka.streams.processor.api.Record` (`key`, `value`, `timestamp`, `headers`)
246246
* `key` - the key of the current record
247247
* `value` - the value of the current record
248+
* `context` - the `ProcessorContext`, allowing access to the current record metadata
248249

249250
The expressions must return a `byte[]` or a `String` (which will be converted to `byte[]` using `UTF-8`).
250251

@@ -253,18 +254,18 @@ To use the enricher within a stream:
253254
====
254255
[source, java]
255256
----
256-
.transform(() -> enricher)
257+
.process(() -> new HeaderEnricherProcessor(expressions))
257258
----
258259
====
259260

260-
The transformer does not change the `key` or `value`; it simply adds headers.
261+
The processor does not change the `key` or `value`; it simply adds headers.
261262

262-
IMPORTANT: If your stream is multi-threaded, you need a new instance for each record.
263+
IMPORTANT: You need a new instance for each record.
263264

264265
====
265266
[source, java]
266267
----
267-
.transform(() -> new HeaderEnricher<..., ...>(expressionMap))
268+
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
268269
----
269270
====
270271

@@ -276,19 +277,20 @@ Here is a simple example, adding one literal header and one variable:
276277
Map<String, Expression> headers = new HashMap<>();
277278
headers.put("header1", new LiteralExpression("value1"));
278279
SpelExpressionParser parser = new SpelExpressionParser();
279-
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
280-
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
280+
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
281+
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
281282
KStream<String, String> stream = builder.stream(INPUT);
282283
stream
283-
.transform(() -> enricher)
284+
.process(() -> supplier)
284285
.to(OUTPUT);
285286
----
286287
====
287288

288289
[[streams-messaging]]
289-
==== `MessagingTransformer`
290+
==== `MessagingProcessor`
290291

291-
Version 2.3 added the `MessagingTransformer` this allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
292+
Version 3.0 added the `MessagingProcessor` extension of `ContextualProcessor`; providing the same functionality as the deprecated `MessagingTransformer` which implemented the deprecated `Transformer` interface.
293+
This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
292294
The transformer requires an implementation of `MessagingFunction`.
293295

294296
====

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ For changes in earlier version, see <<history>>.
66
[[x30-kafka-client]]
77
==== Kafka Client Version
88

9-
This version requires the 3.2.0 `kafka-clients`.
9+
This version requires the 3.3.1 `kafka-clients`.
1010

1111
[[x30-eos]]
1212
==== Exactly Once Semantics

spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHints.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.apache.kafka.clients.consumer.StickyAssignor;
2828
import org.apache.kafka.clients.producer.Producer;
2929
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
30-
import org.apache.kafka.clients.producer.UniformStickyPartitioner;
31-
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
3230
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
3331
import org.apache.kafka.common.protocol.Message;
3432
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -97,6 +95,7 @@
9795
*/
9896
public class KafkaRuntimeHints implements RuntimeHintsRegistrar {
9997

98+
@SuppressWarnings("deprecation")
10099
@Override
101100
public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {
102101
ReflectionHints reflectionHints = hints.reflection();
@@ -147,9 +146,9 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader)
147146
RoundRobinAssignor.class,
148147
StickyAssignor.class,
149148
// standard partitioners
150-
DefaultPartitioner.class,
149+
org.apache.kafka.clients.producer.internals.DefaultPartitioner.class,
151150
RoundRobinPartitioner.class,
152-
UniformStickyPartitioner.class,
151+
org.apache.kafka.clients.producer.UniformStickyPartitioner.class,
153152
// standard serialization
154153
ByteArrayDeserializer.class,
155154
ByteArraySerializer.class,

spring-kafka/src/main/java/org/springframework/kafka/streams/HeaderEnricher.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
*
3737
* @author Gary Russell
3838
* @since 2.3
39+
* @deprecated in favor of {@link HeaderEnricherProcessor}.
3940
*
4041
*/
42+
@Deprecated
4143
public class HeaderEnricher<K, V> implements Transformer<K, V, KeyValue<K, V>> {
4244

4345
private final Map<String, Expression> headerExpressions = new HashMap<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2019-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.kafka.common.header.Headers;
24+
import org.apache.kafka.common.header.internals.RecordHeader;
25+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
26+
import org.apache.kafka.streams.processor.api.ProcessorContext;
27+
import org.apache.kafka.streams.processor.api.Record;
28+
29+
import org.springframework.expression.Expression;
30+
31+
/**
32+
* Manipulate the headers.
33+
*
34+
* @param <K> the input key type.
35+
* @param <V> the input value type.
36+
*
37+
* @author Gary Russell
38+
* @since 3.0
39+
*
40+
*/
41+
public class HeaderEnricherProcessor<K, V> extends ContextualProcessor<K, V, K, V> {
42+
43+
private final Map<String, Expression> headerExpressions = new HashMap<>();
44+
45+
/**
46+
* Construct an instance with the provided header expressions.
47+
* @param headerExpressions the header expressions; name:expression.
48+
*/
49+
public HeaderEnricherProcessor(Map<String, Expression> headerExpressions) {
50+
this.headerExpressions.putAll(headerExpressions);
51+
}
52+
53+
@Override
54+
public void process(Record<K, V> record) {
55+
Headers headers = record.headers();
56+
Container<K, V> container = new Container<>(context(), record.key(), record.value(), record);
57+
this.headerExpressions.forEach((name, expression) -> {
58+
Object headerValue = expression.getValue(container);
59+
if (headerValue instanceof String) {
60+
headerValue = ((String) headerValue).getBytes(StandardCharsets.UTF_8);
61+
}
62+
else if (!(headerValue instanceof byte[])) {
63+
throw new IllegalStateException("Invalid header value type: " + headerValue.getClass());
64+
}
65+
headers.add(new RecordHeader(name, (byte[]) headerValue));
66+
});
67+
context().forward(record);
68+
}
69+
70+
@Override
71+
public void close() {
72+
// NO-OP
73+
}
74+
75+
/**
76+
* Container object for SpEL evaluation.
77+
*
78+
* @param <K> the key type.
79+
* @param <V> the value type.
80+
*
81+
*/
82+
public static final class Container<K, V> {
83+
84+
private final ProcessorContext<K, V> context;
85+
86+
private final K key;
87+
88+
private final V value;
89+
90+
private final Record record;
91+
92+
Container(ProcessorContext<K, V> context, K key, V value, Record record) {
93+
this.context = context;
94+
this.key = key;
95+
this.value = value;
96+
this.record = record;
97+
}
98+
99+
public ProcessorContext getContext() {
100+
return this.context;
101+
}
102+
103+
public K getKey() {
104+
return this.key;
105+
}
106+
107+
public V getValue() {
108+
return this.value;
109+
}
110+
111+
public Record getRecord() {
112+
return this.record;
113+
}
114+
115+
}
116+
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2019-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams.messaging;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Optional;
22+
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.header.Headers;
26+
import org.apache.kafka.common.record.TimestampType;
27+
import org.apache.kafka.streams.kstream.Transformer;
28+
import org.apache.kafka.streams.processor.api.ContextualProcessor;
29+
import org.apache.kafka.streams.processor.api.ProcessorContext;
30+
import org.apache.kafka.streams.processor.api.Record;
31+
import org.apache.kafka.streams.processor.api.RecordMetadata;
32+
33+
import org.springframework.kafka.support.KafkaHeaders;
34+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
35+
import org.springframework.messaging.Message;
36+
import org.springframework.util.Assert;
37+
38+
/**
39+
* A {@link Transformer} implementation that invokes a {@link MessagingFunction}
40+
* converting to/from spring-messaging {@link Message}. Can be used, for example,
41+
* to invoke a Spring Integration flow.
42+
*
43+
* @param <Kin> the input key type.
44+
* @param <Vin> the input value type.
45+
* @param <Kout> the output key type.
46+
* @param <Vout> the output value type.
47+
*
48+
* @author Gary Russell
49+
* @since 2.3
50+
*
51+
*/
52+
public class MessagingProcessor<Kin, Vin, Kout, Vout> extends ContextualProcessor<Kin, Vin, Kout, Vout> {
53+
54+
private final MessagingFunction function;
55+
56+
private final MessagingMessageConverter converter;
57+
58+
/**
59+
* Construct an instance with the provided function and converter.
60+
* @param function the function.
61+
* @param converter the converter.
62+
*/
63+
public MessagingProcessor(MessagingFunction function, MessagingMessageConverter converter) {
64+
Assert.notNull(function, "'function' cannot be null");
65+
Assert.notNull(converter, "'converter' cannot be null");
66+
this.function = function;
67+
this.converter = converter;
68+
}
69+
70+
@SuppressWarnings("unchecked")
71+
@Override
72+
public void process(Record<Kin, Vin> record) {
73+
ProcessorContext<Kout, Vout> context = context();
74+
RecordMetadata meta = context.recordMetadata().orElse(null);
75+
Assert.state(meta != null, "No record metadata present");
76+
Headers headers = record.headers();
77+
ConsumerRecord<Object, Object> rebuilt = new ConsumerRecord<Object, Object>(meta.topic(),
78+
meta.partition(), meta.offset(),
79+
record.timestamp(), TimestampType.NO_TIMESTAMP_TYPE,
80+
0, 0,
81+
record.key(), record.value(),
82+
headers, Optional.empty());
83+
Message<?> message = this.converter.toMessage(rebuilt, null, null, null);
84+
message = this.function.exchange(message);
85+
List<String> headerList = new ArrayList<>();
86+
headers.forEach(header -> headerList.add(header.key()));
87+
headerList.forEach(name -> headers.remove(name));
88+
ProducerRecord<?, ?> fromMessage = this.converter.fromMessage(message, "dummy");
89+
fromMessage.headers().forEach(header -> {
90+
if (!header.key().equals(KafkaHeaders.TOPIC)) {
91+
headers.add(header);
92+
}
93+
});
94+
context.forward(new Record<>((Kout) message.getHeaders().get(KafkaHeaders.KEY), (Vout) message.getPayload(),
95+
record.timestamp(), headers));
96+
}
97+
98+
@Override
99+
public void close() {
100+
// NO-OP
101+
}
102+
103+
}

spring-kafka/src/main/java/org/springframework/kafka/streams/messaging/MessagingTransformer.java

+2
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@
4444
*
4545
* @author Gary Russell
4646
* @since 2.3
47+
* @deprecated in favor of {@link MessagingProcessor}.
4748
*
4849
*/
50+
@Deprecated
4951
public class MessagingTransformer<K, V, R> implements Transformer<K, V, KeyValue<K, R>> {
5052

5153
private final MessagingFunction function;

0 commit comments

Comments
 (0)