Skip to content

Commit 9f805ae

Browse files
authored
GH-1964: Add DelegatingByTopic(De)Serializer
Resolves #1964 Select the `Serializer`/`Deserializer` based on the topic name. * Extract common code; fix removeDelegate; add default; CASE_SENSITIVE can be Boolean.
1 parent cfaa268 commit 9f805ae

File tree

6 files changed

+673
-16
lines changed

6 files changed

+673
-16
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+58-16
Original file line numberDiff line numberDiff line change
@@ -4136,6 +4136,8 @@ Alternatively, as long as you don't use the fluent API to configure properties,
41364136
[[delegating-serialization]]
41374137
===== Delegating Serializer and Deserializer
41384138

4139+
===== Using Headers
4140+
41394141
Version 2.3 introduced the `DelegatingSerializer` and `DelegatingDeserializer`, which allow producing and consuming records with different key and/or value types.
41404142
Producers must set a header `DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR` to a selector value that is used to select which serializer to use for the value and `DelegatingSerializer.KEY_SERIALIZATION_SELECTOR` for the key; if a match is not found, an `IllegalStateException` is thrown.
41414143

@@ -4178,15 +4180,55 @@ For another technique to send different types to different topics, see <<routing
41784180
----
41794181
@Bean
41804182
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
4181-
return new DefaultKafkaProducerFactory<>(config,
4182-
null, new DelegatingByTypeSerializer(Map.of(
4183-
byte[].class, new ByteArraySerializer(),
4184-
Bytes.class, new BytesSerializer(),
4185-
String.class, new StringSerializer())));
4183+
return new DefaultKafkaProducerFactory<>(config,
4184+
null, new DelegatingByTypeSerializer(Map.of(
4185+
byte[].class, new ByteArraySerializer(),
4186+
Bytes.class, new BytesSerializer(),
4187+
String.class, new StringSerializer())));
4188+
}
4189+
----
4190+
====
4191+
4192+
===== By Topic
4193+
4194+
Starting with version 2.8, the `DelegatingByTopicSerializer` and `DelegatingByTopicDeserializer` allow selection of a serializer/deserializer based on the topic name.
4195+
Regex `Pattern` s are used to lookup the instance to use.
4196+
The map can be configured using a constructor, or via properties (a comma delimited list of `pattern:serializer`).
4197+
4198+
====
4199+
[source, java]
4200+
----
4201+
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
4202+
"topic[0-4]:" + ByteArraySerializer.class.getName()
4203+
+ ", topic[5-9]:" + StringSerializer.class.getName());
4204+
...
4205+
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
4206+
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
4207+
+ ", topic[5-9]:" + StringDeserializer.class.getName());
4208+
----
4209+
====
4210+
4211+
Use `KEY_SERIALIZATION_TOPIC_CONFIG` when using this for keys.
4212+
4213+
====
4214+
[source, java]
4215+
----
4216+
@Bean
4217+
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
4218+
return new DefaultKafkaProducerFactory<>(config,
4219+
null,
4220+
new DelegatingByTopicSerializer(Map.of(
4221+
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
4222+
Pattern.compile("topic[5-9]"), new StringSerializer())),
4223+
new JsonSerializer<Object>()); // default
41864224
}
41874225
----
41884226
====
41894227

4228+
You can specify a default serializer/deserializer to use when there is no pattern match using `DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT` and `DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT`.
4229+
4230+
An additional property `DelegatingByTopicSerialization.CASE_SENSITIVE` (default `true`), when set to `false` makes the topic lookup case insensitive.
4231+
41904232
[[retrying-deserialization]]
41914233
===== Retrying Deserializer
41924234

@@ -4261,7 +4303,7 @@ For convenience, starting with version 2.3, the framework also provides a `Strin
42614303

42624304
Starting with version 2.7.1, message payload conversion can be delegated to a `spring-messaging` `SmartMessageConverter`; this enables conversion, for example, to be based on the `MessageHeaders.CONTENT_TYPE` header.
42634305

4264-
IMPORTANT: The `KafkaMessageConverter.fromMessage()` method is called for outbound conversion to a `ProducerRecord` with the message payload in the `ProducerRecord.value()` property.
4306+
IMPORTANT: The `KafkaMessageConverter.fromMessage()` method is called for outbound conversion to a `ProducerRecord` with the message payload in the `ProducerRecord.value()` property.
42654307
The `KafkaMessageConverter.toMessage()` method is called for inbound conversion from `ConsumerRecord` with the payload being the `ConsumerRecord.value()` property.
42664308
The `SmartMessageConverter.toMessage()` method is called to create a new outbound `Message<?>` from the `Message` passed to`fromMessage()` (usually by `KafkaTemplate.send(Message<?> msg)`).
42674309
Similarly, in the `KafkaMessageConverter.toMessage()` method, after the converter has created a new `Message<?>` from the `ConsumerRecord`, the `SmartMessageConverter.fromMessage()` method is called and then the final inbound message is created with the newly converted payload.
@@ -5065,13 +5107,13 @@ The error handler can be configured with one or more `RetryListener` s, receivin
50655107
@FunctionalInterface
50665108
public interface RetryListener {
50675109
5068-
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
5110+
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
50695111
5070-
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
5071-
}
5112+
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
5113+
}
50725114
5073-
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
5074-
}
5115+
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
5116+
}
50755117
50765118
}
50775119
----
@@ -5322,13 +5364,13 @@ Starting with version 2.7, the processor can be configured with one or more `Ret
53225364
@FunctionalInterface
53235365
public interface RetryListener {
53245366
5325-
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
5367+
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
53265368
5327-
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
5328-
}
5369+
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
5370+
}
53295371
5330-
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
5331-
}
5372+
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
5373+
}
53325374
53335375
}
53345376
----

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,9 @@ See <<error-handlers>> for more information.
5656
==== Listener Container Changes
5757

5858
The `interceptBeforeTx` container property is now `true` by default.
59+
60+
[[x28-serializers]]
61+
==== Serializer/Deserializer Changes
62+
63+
The `DelegatingByTopicSerializer` and `DelegatingByTopicDeserializer` are now provided.
64+
See <<delegating-serialization>> for more information.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2019-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import java.util.Map;
20+
import java.util.regex.Pattern;
21+
22+
import org.apache.kafka.common.header.Headers;
23+
import org.apache.kafka.common.serialization.Deserializer;
24+
25+
/**
26+
* A {@link Deserializer} that delegates to other deserializers based on the topic name.
27+
*
28+
* @author Gary Russell
29+
* @since 2.8
30+
*
31+
*/
32+
public class DelegatingByTopicDeserializer extends DelegatingByTopicSerialization<Deserializer<?>>
33+
implements Deserializer<Object> {
34+
35+
/**
36+
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
37+
* with consumer properties.
38+
*/
39+
public DelegatingByTopicDeserializer() {
40+
}
41+
42+
/**
43+
* Construct an instance with the supplied mapping of topic name patterns to delegate
44+
* deserializers.
45+
* @param delegates the map of delegates.
46+
* @param defaultDelegate the default to use when no topic name match.
47+
*/
48+
public DelegatingByTopicDeserializer(Map<Pattern, Deserializer<?>> delegates, Deserializer<?> defaultDelegate) {
49+
super(delegates, defaultDelegate);
50+
}
51+
52+
@Override
53+
public void configure(Map<String, ?> configs, boolean isKey) {
54+
super.configure(configs, isKey);
55+
}
56+
57+
@Override
58+
protected Deserializer<?> configureDelegate(Map<String, ?> configs, boolean isKey, Deserializer<?> delegate) {
59+
delegate.configure(configs, isKey);
60+
return delegate;
61+
}
62+
63+
@Override
64+
protected boolean isInstance(Object delegate) {
65+
return delegate instanceof Deserializer;
66+
}
67+
68+
@Override
69+
public Object deserialize(String topic, byte[] data) {
70+
throw new UnsupportedOperationException();
71+
}
72+
73+
@Override
74+
public Object deserialize(String topic, Headers headers, byte[] data) {
75+
return findDelegate(topic).deserialize(topic, headers, data);
76+
}
77+
78+
}

0 commit comments

Comments
 (0)