Skip to content

Commit 39eefc8

Browse files
garyrussellartembilan
authored andcommitted
Fix package tangles
1 parent 5001396 commit 39eefc8

25 files changed

+149
-104
lines changed

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+11-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,17 @@ For changes in earlier version, see <<history>>.
66
[[x28-kafka-client]]
77
==== Kafka Client Version
88

9-
This version requires the 2.8.0 `kafka-clients`.
9+
This version requires the 2.8.0 `kafka-clients
10+
11+
[[x28-packages]]
12+
==== Package Changes
13+
14+
Classes and interfaces related to type mapping have been moved from `...support.converter` to `...support.mapping`.
15+
16+
* `AbstractJavaTypeMapper`
17+
* `ClassMapper`
18+
* `DefaultJackson2JavaTypeMapper`
19+
* `Jackson2JavaTypeMapper`
1020

1121
[[x28-ooo-commits]]
1222
==== Out of Order Manual Commits

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public abstract class AbstractKafkaBackOffManagerFactory
3434
implements KafkaBackOffManagerFactory, ApplicationContextAware {
3535

3636
private ApplicationContext applicationContext;
37+
3738
private ListenerContainerRegistry listenerContainerRegistry;
3839

3940
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.springframework.core.log.LogAccessor;
3131
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3232
import org.springframework.core.task.TaskExecutor;
33-
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
3433
import org.springframework.kafka.event.ListenerContainerIdleEvent;
3534

3635
/**
@@ -46,7 +45,7 @@ public class ContainerGroupSequencer implements ApplicationContextAware,
4645

4746
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ContainerGroupSequencer.class));
4847

49-
private final KafkaListenerEndpointRegistry registry;
48+
private final ListenerContainerRegistry registry;
5049

5150
private final long defaultIdleEventInterval;
5251

@@ -78,7 +77,7 @@ public class ContainerGroupSequencer implements ApplicationContextAware,
7877
* @param defaultIdleEventInterval the idle event interval if not already set.
7978
* @param containerGroups The list of container groups, in order.
8079
*/
81-
public ContainerGroupSequencer(KafkaListenerEndpointRegistry registry, long defaultIdleEventInterval,
80+
public ContainerGroupSequencer(ListenerContainerRegistry registry, long defaultIdleEventInterval,
8281
String... containerGroups) {
8382

8483
this.registry = registry;

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import org.springframework.kafka.support.KafkaUtils;
5050
import org.springframework.kafka.support.SendResult;
5151
import org.springframework.kafka.support.serializer.DeserializationException;
52-
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
52+
import org.springframework.kafka.support.serializer.SerializationUtils;
5353
import org.springframework.lang.Nullable;
5454
import org.springframework.util.Assert;
5555
import org.springframework.util.ObjectUtils;
@@ -319,9 +319,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
319319
tp = checkPartition(tp, consumer);
320320
}
321321
DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record,
322-
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
322+
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
323323
DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
324-
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
324+
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
325325
Headers headers = new RecordHeaders(record.headers().toArray());
326326
addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
327327
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
@@ -336,13 +336,13 @@ private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception excepti
336336

337337
if (kDeserEx != null) {
338338
if (!this.retainExceptionHeader) {
339-
headers.remove(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER);
339+
headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
340340
}
341341
addExceptionInfoHeaders(headers, kDeserEx, true);
342342
}
343343
if (vDeserEx != null) {
344344
if (!this.retainExceptionHeader) {
345-
headers.remove(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
345+
headers.remove(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
346346
}
347347
addExceptionInfoHeaders(headers, vDeserEx, false);
348348
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.springframework.kafka.support.micrometer.MicrometerHolder;
105105
import org.springframework.kafka.support.serializer.DeserializationException;
106106
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
107+
import org.springframework.kafka.support.serializer.SerializationUtils;
107108
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
108109
import org.springframework.lang.Nullable;
109110
import org.springframework.scheduling.SchedulingAwareRunnable;
@@ -2516,10 +2517,10 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
25162517
throw (DeserializationException) record.key();
25172518
}
25182519
if (record.value() == null && this.checkNullValueForExceptions) {
2519-
checkDeser(record, ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
2520+
checkDeser(record, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
25202521
}
25212522
if (record.key() == null && this.checkNullKeyForExceptions) {
2522-
checkDeser(record, ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER);
2523+
checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
25232524
}
25242525
if (this.deliveryAttemptAware != null) {
25252526
byte[] buff = new byte[4]; // NOSONAR (magic #)

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import org.springframework.kafka.support.KafkaHeaders;
5050
import org.springframework.kafka.support.TopicPartitionOffset;
5151
import org.springframework.kafka.support.serializer.DeserializationException;
52-
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
52+
import org.springframework.kafka.support.serializer.SerializationUtils;
5353
import org.springframework.lang.Nullable;
5454
import org.springframework.messaging.Message;
5555
import org.springframework.scheduling.TaskScheduler;
@@ -504,8 +504,8 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
504504
* deserialization; null otherwise. If you need to determine whether it was the key or
505505
* value, call
506506
* {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
507-
* with {@link ErrorHandlingDeserializer#KEY_DESERIALIZER_EXCEPTION_HEADER} and
508-
* {@link ErrorHandlingDeserializer#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
507+
* with {@link SerializationUtils#KEY_DESERIALIZER_EXCEPTION_HEADER} and
508+
* {@link SerializationUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
509509
* @param record the record.
510510
* @param logger a {@link LogAccessor}.
511511
* @return the {@link DeserializationException} or {@code null}.
@@ -514,14 +514,14 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
514514
@Nullable
515515
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
516516
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record,
517-
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
517+
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
518518
if (exception != null) {
519519
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
520520
+ record.partition() + "@" + record.offset());
521521
return exception;
522522
}
523523
exception = ListenerUtils.getExceptionFromHeader(record,
524-
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
524+
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
525525
if (exception != null) {
526526
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
527527
+ record.partition() + "@" + record.offset());

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.springframework.kafka.support.KafkaHeaderMapper;
3939
import org.springframework.kafka.support.KafkaHeaders;
4040
import org.springframework.kafka.support.KafkaNull;
41-
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
41+
import org.springframework.kafka.support.serializer.SerializationUtils;
4242
import org.springframework.lang.Nullable;
4343
import org.springframework.messaging.Message;
4444
import org.springframework.messaging.support.MessageBuilder;
@@ -253,7 +253,7 @@ else if (record.value() instanceof String) {
253253
original = ((String) record.value()).getBytes(StandardCharsets.UTF_8);
254254
}
255255
if (original != null) {
256-
ErrorHandlingDeserializer.deserializationException(record.headers(), original, ex, false);
256+
SerializationUtils.deserializationException(record.headers(), original, ex, false);
257257
conversionFailures.add(ex);
258258
return null;
259259
}

spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626

2727
import org.springframework.kafka.support.JacksonUtils;
2828
import org.springframework.kafka.support.KafkaNull;
29-
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence;
29+
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
30+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
31+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper.TypePrecedence;
3032
import org.springframework.messaging.Message;
3133
import org.springframework.util.Assert;
3234

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MappingJacksonParameterizedConverter.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.kafka.common.utils.Bytes;
2424

2525
import org.springframework.kafka.support.KafkaHeaders;
26-
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence;
26+
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
27+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
28+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper.TypePrecedence;
2729
import org.springframework.lang.Nullable;
2830
import org.springframework.messaging.Message;
2931
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.support.converter;
17+
package org.springframework.kafka.support.mapping;
1818

1919
import java.nio.charset.StandardCharsets;
2020
import java.util.Collections;

spring-kafka/src/main/java/org/springframework/kafka/support/converter/ClassMapper.java renamed to spring-kafka/src/main/java/org/springframework/kafka/support/mapping/ClassMapper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.support.converter;
17+
package org.springframework.kafka.support.mapping;
1818

1919
import org.apache.kafka.common.header.Headers;
2020

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.support.converter;
17+
package org.springframework.kafka.support.mapping;
1818

1919
import java.util.Arrays;
2020
import java.util.LinkedHashSet;
+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.support.converter;
17+
package org.springframework.kafka.support.mapping;
1818

1919
import org.apache.kafka.common.header.Headers;
2020

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes related to type mapping.
3+
*/
4+
package org.springframework.kafka.support.mapping;

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.java

+11-43
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,10 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19-
import java.io.ByteArrayOutputStream;
20-
import java.io.IOException;
21-
import java.io.ObjectOutputStream;
2219
import java.util.Map;
2320
import java.util.function.Function;
2421

2522
import org.apache.kafka.common.header.Headers;
26-
import org.apache.kafka.common.header.internals.RecordHeader;
2723
import org.apache.kafka.common.serialization.Deserializer;
2824

2925
import org.springframework.util.Assert;
@@ -46,18 +42,25 @@ public class ErrorHandlingDeserializer<T> implements Deserializer<T> {
4642

4743
/**
4844
* Header name for deserialization exceptions.
45+
* @deprecated in favor of {@link SerializationUtils#DESERIALIZER_EXCEPTION_HEADER_PREFIX}.
4946
*/
50-
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER_PREFIX = "springDeserializerException";
47+
@Deprecated
48+
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER_PREFIX =
49+
SerializationUtils.DESERIALIZER_EXCEPTION_HEADER_PREFIX;
5150

5251
/**
5352
* Header name for deserialization exceptions.
53+
* @deprecated in favor of {@link SerializationUtils#KEY_DESERIALIZER_EXCEPTION_HEADER}.
5454
*/
55-
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = KEY_DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Key";
55+
@Deprecated
56+
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER;
5657

5758
/**
5859
* Header name for deserialization exceptions.
60+
* @deprecated in favor of {@link SerializationUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER}.
5961
*/
60-
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = KEY_DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Value";
62+
@Deprecated
63+
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER;
6164

6265
/**
6366
* Supplier for a T when deserialization fails.
@@ -188,7 +191,7 @@ public T deserialize(String topic, Headers headers, byte[] data) {
188191
return this.delegate.deserialize(topic, headers, data);
189192
}
190193
catch (Exception e) {
191-
deserializationException(headers, data, e, this.isForKey);
194+
SerializationUtils.deserializationException(headers, data, e, this.isForKey);
192195
return recoverFromSupplier(topic, headers, data, e);
193196
}
194197
}
@@ -211,39 +214,4 @@ public void close() {
211214
}
212215
}
213216

214-
/**
215-
* Populate the record headers with a serialized {@link DeserializationException}.
216-
* @param headers the headers.
217-
* @param data the data.
218-
* @param ex the exception.
219-
* @param isForKeyArg true if this is a key deserialization problem, otherwise value.
220-
* @since 2.8
221-
*/
222-
public static void deserializationException(Headers headers, byte[] data, Exception ex, boolean isForKeyArg) {
223-
ByteArrayOutputStream stream = new ByteArrayOutputStream();
224-
DeserializationException exception =
225-
new DeserializationException("failed to deserialize", data, isForKeyArg, ex);
226-
try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
227-
oos.writeObject(exception);
228-
}
229-
catch (IOException ioex) {
230-
stream = new ByteArrayOutputStream();
231-
try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
232-
exception = new DeserializationException("failed to deserialize",
233-
data, isForKeyArg, new RuntimeException("Could not deserialize type "
234-
+ ioex.getClass().getName() + " with message " + ioex.getMessage()
235-
+ " failure: " + ioex.getMessage()));
236-
oos.writeObject(exception);
237-
}
238-
catch (IOException ex2) {
239-
throw new IllegalStateException("Could not serialize a DeserializationException", ex2); // NOSONAR
240-
}
241-
}
242-
headers.add(
243-
new RecordHeader(isForKeyArg
244-
? KEY_DESERIALIZER_EXCEPTION_HEADER
245-
: VALUE_DESERIALIZER_EXCEPTION_HEADER,
246-
stream.toByteArray()));
247-
}
248-
249217
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333

3434
import org.springframework.core.ResolvableType;
3535
import org.springframework.kafka.support.JacksonUtils;
36-
import org.springframework.kafka.support.converter.AbstractJavaTypeMapper;
37-
import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper;
38-
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
39-
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence;
36+
import org.springframework.kafka.support.mapping.AbstractJavaTypeMapper;
37+
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
38+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
39+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper.TypePrecedence;
4040
import org.springframework.lang.Nullable;
4141
import org.springframework.util.Assert;
4242
import org.springframework.util.ClassUtils;

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerde.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@
2424

2525
import org.springframework.core.ResolvableType;
2626
import org.springframework.kafka.support.JacksonUtils;
27-
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
27+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
2828
import org.springframework.lang.Nullable;
2929
import org.springframework.util.Assert;
3030

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.kafka.common.serialization.Serializer;
2626

2727
import org.springframework.kafka.support.JacksonUtils;
28-
import org.springframework.kafka.support.converter.AbstractJavaTypeMapper;
29-
import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper;
30-
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
28+
import org.springframework.kafka.support.mapping.AbstractJavaTypeMapper;
29+
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
30+
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
3131
import org.springframework.lang.Nullable;
3232
import org.springframework.util.Assert;
3333
import org.springframework.util.ClassUtils;

0 commit comments

Comments
 (0)