Skip to content

Commit 43c5671

Browse files
garyrussellartembilan
authored andcommitted
GH-1951: Add BytesToNumberConverter
Resolves #1951 Converts `byte[]` (particularly headers) to `long`, `int`, `short` or `byte` (or wrappers).
1 parent f045844 commit 43c5671

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

+62
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.StringReader;
2121
import java.lang.reflect.AnnotatedElement;
2222
import java.lang.reflect.Method;
23+
import java.nio.ByteBuffer;
2324
import java.nio.charset.Charset;
2425
import java.nio.charset.StandardCharsets;
2526
import java.util.ArrayList;
@@ -68,6 +69,8 @@
6869
import org.springframework.core.Ordered;
6970
import org.springframework.core.annotation.AnnotatedElementUtils;
7071
import org.springframework.core.annotation.AnnotationUtils;
72+
import org.springframework.core.convert.TypeDescriptor;
73+
import org.springframework.core.convert.converter.ConditionalGenericConverter;
7174
import org.springframework.core.convert.converter.Converter;
7275
import org.springframework.core.convert.converter.GenericConverter;
7376
import org.springframework.core.log.LogAccessor;
@@ -1081,6 +1084,7 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
10811084
defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
10821085
this.defaultFormattingConversionService.addConverter(
10831086
new BytesToStringConverter(KafkaListenerAnnotationBeanPostProcessor.this.charset));
1087+
this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());
10841088
defaultFactory.setConversionService(this.defaultFormattingConversionService);
10851089
GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
10861090
defaultFactory.setMessageConverter(messageConverter);
@@ -1165,4 +1169,62 @@ public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, Anno
11651169

11661170
}
11671171

1172+
private final class BytesToNumberConverter implements ConditionalGenericConverter {
1173+
1174+
BytesToNumberConverter() {
1175+
}
1176+
1177+
@Override
1178+
@Nullable
1179+
public Set<ConvertiblePair> getConvertibleTypes() {
1180+
HashSet<ConvertiblePair> pairs = new HashSet<>();
1181+
pairs.add(new ConvertiblePair(byte[].class, long.class));
1182+
pairs.add(new ConvertiblePair(byte[].class, int.class));
1183+
pairs.add(new ConvertiblePair(byte[].class, short.class));
1184+
pairs.add(new ConvertiblePair(byte[].class, byte.class));
1185+
pairs.add(new ConvertiblePair(byte[].class, Long.class));
1186+
pairs.add(new ConvertiblePair(byte[].class, Integer.class));
1187+
pairs.add(new ConvertiblePair(byte[].class, Short.class));
1188+
pairs.add(new ConvertiblePair(byte[].class, Byte.class));
1189+
return pairs;
1190+
}
1191+
1192+
@Override
1193+
@Nullable
1194+
public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
1195+
byte[] bytes = (byte[]) source;
1196+
if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {
1197+
Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long");
1198+
return ByteBuffer.wrap(bytes).getLong();
1199+
}
1200+
else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {
1201+
Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer");
1202+
return ByteBuffer.wrap(bytes).getInt();
1203+
}
1204+
else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {
1205+
Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short");
1206+
return ByteBuffer.wrap(bytes).getShort();
1207+
}
1208+
else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {
1209+
Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte");
1210+
return ByteBuffer.wrap(bytes).get();
1211+
}
1212+
return null;
1213+
}
1214+
1215+
@Override
1216+
public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {
1217+
if (sourceType.getType().equals(byte[].class)) {
1218+
Class<?> target = targetType.getType();
1219+
return target.equals(long.class) || target.equals(int.class) || target.equals(short.class)
1220+
|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)
1221+
|| target.equals(Short.class) || target.equals(Byte.class);
1222+
}
1223+
else {
1224+
return false;
1225+
}
1226+
}
1227+
1228+
}
1229+
11681230
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.Map;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
27+
import org.springframework.messaging.handler.annotation.Header;
28+
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
29+
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
30+
import org.springframework.messaging.support.GenericMessage;
31+
32+
/**
33+
* @author Gary Russell
34+
* @since 2.8
35+
*
36+
*/
37+
public class HeaderMethodArgumentResolverTests {
38+
39+
@SuppressWarnings("rawtypes")
40+
@Test
41+
void bytesToNumbers() throws Exception {
42+
KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>();
43+
MessageHandlerMethodFactory factory = bpp.getMessageHandlerMethodFactory();
44+
InvocableHandlerMethod method = factory.createInvocableHandlerMethod(this, getClass().getDeclaredMethod(
45+
"numbers", String.class,
46+
long.class, int.class, short.class, byte.class, Long.class, Integer.class, Short.class, Byte.class));
47+
method.invoke(new GenericMessage<>("foo", Map.of(
48+
"l1", ByteBuffer.allocate(8).putLong(1L).array(),
49+
"i1", ByteBuffer.allocate(4).putInt(2).array(),
50+
"s1", ByteBuffer.allocate(2).putShort((short) 3).array(),
51+
"b1", ByteBuffer.allocate(1).put((byte) 4).array(),
52+
"l2", ByteBuffer.allocate(8).putLong(5L).array(),
53+
"i2", ByteBuffer.allocate(4).putInt(6).array(),
54+
"s2", ByteBuffer.allocate(2).putShort((short) 7).array(),
55+
"b2", ByteBuffer.allocate(1).put((byte) 8).array())));
56+
}
57+
58+
public void numbers(String payload,
59+
@Header("l1") long l1, @Header("i1") int i1, @Header("s1") short s1, @Header("b1") byte b1,
60+
@Header("l2") Long l2, @Header("i2") Integer i2, @Header("s2") Short s2, @Header("b2") Byte b2) {
61+
62+
assertThat(l1).isEqualTo(1L);
63+
assertThat(i1).isEqualTo(2);
64+
assertThat(s1).isEqualTo((short) 3);
65+
assertThat(b1).isEqualTo((byte) 4);
66+
assertThat(l2).isEqualTo(5L);
67+
assertThat(i2).isEqualTo(6);
68+
assertThat(s2).isEqualTo(Short.valueOf((short) 7));
69+
assertThat(b2).isEqualTo(Byte.valueOf((byte) 8));
70+
}
71+
72+
}

0 commit comments

Comments
 (0)