Skip to content

Commit 138da83

Browse files
srkukarnisijie
authored andcommitted
Simplify DefaultSerde and make users be able to explicitly set it (apache#178)
1 parent d498fa9 commit 138da83

File tree

4 files changed

+65
-118
lines changed

4 files changed

+65
-118
lines changed

pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/DefaultSerDe.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,14 @@ public class DefaultSerDe implements SerDe<Object> {
4141
Float.class
4242
));
4343
private Class type;
44-
private boolean ser;
4544

46-
public DefaultSerDe(Class type, boolean ser) {
45+
public DefaultSerDe(Class type) {
4746
this.type = type;
48-
this.ser = ser;
49-
verifySupportedType(ser);
47+
verifySupportedType();
5048
}
5149

5250
@Override
5351
public Object deserialize(byte[] input) {
54-
if (ser) {
55-
throw new RuntimeException("Serializer function cannot deserialize");
56-
}
5752
String data = new String(input, StandardCharsets.UTF_8);
5853
if (type.equals(Integer.class)) {
5954
return Integer.valueOf(data);
@@ -76,9 +71,6 @@ public Object deserialize(byte[] input) {
7671

7772
@Override
7873
public byte[] serialize(Object input) {
79-
if (!ser) {
80-
throw new RuntimeException("DeSerializer function cannot serialize");
81-
}
8274
if (type.equals(Integer.class)) {
8375
return ((Integer) input).toString().getBytes(StandardCharsets.UTF_8);
8476
} else if (type.equals(Double.class)) {
@@ -98,10 +90,8 @@ public byte[] serialize(Object input) {
9890
}
9991
}
10092

101-
public void verifySupportedType(boolean allowVoid) {
102-
if (!allowVoid && !supportedInputTypes.contains(type)) {
103-
throw new RuntimeException("Non Basic types not yet supported: " + type);
104-
} else if (!(supportedInputTypes.contains(type) || type.equals(Void.class))) {
93+
public void verifySupportedType() {
94+
if (!supportedInputTypes.contains(type)) {
10595
throw new RuntimeException("Non Basic types not yet supported: " + type);
10696
}
10797
}

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceRunnable.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,12 @@ private static void addSystemMetrics(String metricName, double value, InstanceCo
464464
bldr.putMetrics(metricName, digest);
465465
}
466466

467-
private static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoader) {
467+
private static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoader,
468+
PulsarFunction pulsarFunction, boolean inputArgs) {
468469
if (null == serdeClassName || serdeClassName.isEmpty()) {
469470
return null;
471+
} else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
472+
return initializeDefaultSerDe(pulsarFunction, inputArgs);
470473
} else {
471474
return Reflections.createInstance(
472475
serdeClassName,
@@ -475,20 +478,24 @@ private static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoade
475478
}
476479
}
477480

478-
private static SerDe initializeDefaultSerDe(PulsarFunction pulsarFunction) {
481+
private static SerDe initializeDefaultSerDe(PulsarFunction pulsarFunction, boolean inputArgs) {
479482
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
480-
return new DefaultSerDe(typeArgs[0], false);
483+
if (inputArgs) {
484+
return new DefaultSerDe(typeArgs[0]);
485+
} else {
486+
return new DefaultSerDe(typeArgs[1]);
487+
}
481488
}
482489

483490
private void setupSerDe(PulsarFunction pulsarFunction, ClassLoader clsLoader) {
484491
this.inputSerDe = new HashMap<>();
485-
instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().forEach((k, v) -> this.inputSerDe.put(k, initializeSerDe(v, clsLoader)));
492+
instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().forEach((k, v) -> this.inputSerDe.put(k, initializeSerDe(v, clsLoader, pulsarFunction, true)));
486493
for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
487-
this.inputSerDe.put(topicName, initializeDefaultSerDe(pulsarFunction));
494+
this.inputSerDe.put(topicName, initializeDefaultSerDe(pulsarFunction, true));
488495
}
489496

490497
if (instanceConfig.getFunctionConfig().getOutputSerdeClassName() != null) {
491-
this.outputSerDe = initializeSerDe(instanceConfig.getFunctionConfig().getOutputSerdeClassName(), clsLoader);
498+
this.outputSerDe = initializeSerDe(instanceConfig.getFunctionConfig().getOutputSerdeClassName(), clsLoader, pulsarFunction, false);
492499
}
493500

494501
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
@@ -507,7 +514,7 @@ private void setupSerDe(PulsarFunction pulsarFunction, ClassLoader clsLoader) {
507514

508515
if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class`
509516
if (outputSerDe == null) {
510-
outputSerDe = initializeDefaultSerDe(pulsarFunction);
517+
outputSerDe = initializeDefaultSerDe(pulsarFunction, false);
511518
}
512519
Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
513520
if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArgs[1])) {

pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/instance/JavaInstanceRunnableTest.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,19 @@
2020

2121
import lombok.Getter;
2222
import lombok.Setter;
23-
import org.apache.pulsar.client.api.MessageId;
2423
import org.apache.pulsar.functions.api.Context;
2524
import org.apache.pulsar.functions.api.PulsarFunction;
2625
import org.apache.pulsar.functions.api.SerDe;
26+
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
2727
import org.apache.pulsar.functions.api.utils.Utf8StringSerDe;
2828
import org.apache.pulsar.functions.fs.LimitsConfig;
2929
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
3030
import org.apache.pulsar.functions.runtime.container.InstanceConfig;
31-
import org.apache.pulsar.functions.utils.Reflections;
3231
import org.testng.annotations.Test;
3332

3433
import java.lang.reflect.InvocationTargetException;
3534
import java.lang.reflect.Method;
36-
import java.util.Arrays;
37-
import java.util.HashMap;
3835

39-
import static org.testng.Assert.assertNull;
4036
import static org.testng.AssertJUnit.assertFalse;
4137
import static org.testng.AssertJUnit.assertTrue;
4238
import static org.testng.AssertJUnit.fail;
@@ -63,7 +59,9 @@ private static InstanceConfig createInstanceConfig(boolean addCustom, String out
6359
} else {
6460
functionConfigBuilder.putCustomSerdeInputs("TEST", IntegerSerDe.class.getName());
6561
}
66-
functionConfigBuilder.setOutputSerdeClassName(outputSerde);
62+
if (outputSerde != null) {
63+
functionConfigBuilder.setOutputSerdeClassName(outputSerde);
64+
}
6765
InstanceConfig instanceConfig = new InstanceConfig();
6866
instanceConfig.setFunctionConfig(functionConfigBuilder.build());
6967
instanceConfig.setLimitsConfig(limitsConfig);
@@ -177,6 +175,38 @@ public void testInconsistentInputType() {
177175
}
178176
}
179177

178+
/**
179+
* Verify that Default Serializer works fine.
180+
*/
181+
@Test
182+
public void testDefaultSerDe() {
183+
try {
184+
JavaInstanceRunnable runnable = createRunnable(false, null);
185+
Method method = makeAccessible(runnable);
186+
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
187+
PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
188+
method.invoke(runnable, pulsarFunction, clsLoader);
189+
} catch (Exception ex) {
190+
assertTrue(false);
191+
}
192+
}
193+
194+
/**
195+
* Verify that Explicit setting of Default Serializer works fine.
196+
*/
197+
@Test
198+
public void testExplicitDefaultSerDe() {
199+
try {
200+
JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName());
201+
Method method = makeAccessible(runnable);
202+
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
203+
PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
204+
method.invoke(runnable, pulsarFunction, clsLoader);
205+
} catch (Exception ex) {
206+
assertTrue(false);
207+
}
208+
}
209+
180210
/**
181211
* Verify that function output type should be consistent with output serde type.
182212
*/

pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/utils/DefaultSerDeTest.java

Lines changed: 11 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -35,132 +35,52 @@
3535
public class DefaultSerDeTest {
3636
@Test
3737
public void testStringSerDe() {
38-
DefaultSerDe serializer = new DefaultSerDe(String.class, true);
39-
DefaultSerDe deserializer = new DefaultSerDe(String.class, false);
38+
DefaultSerDe serializer = new DefaultSerDe(String.class);
39+
DefaultSerDe deserializer = new DefaultSerDe(String.class);
4040
String input = new String("input");
4141
byte[] output = serializer.serialize(input);
4242
String result = (String) deserializer.deserialize(output);
4343
assertEquals(result, input);
44-
45-
try {
46-
DefaultSerDe serDe = new DefaultSerDe(String.class, false);
47-
serDe.serialize(new String("input"));
48-
assertFalse(true);
49-
} catch (Exception ex) {
50-
// This is good
51-
}
52-
53-
try {
54-
DefaultSerDe serDe = new DefaultSerDe(String.class, true);
55-
serDe.deserialize(new byte[10]);
56-
assertFalse(true);
57-
} catch (Exception ex) {
58-
// This is good
59-
}
6044
}
6145

6246
@Test
6347
public void testLongSerDe() {
64-
DefaultSerDe serializer = new DefaultSerDe(Long.class, true);
65-
DefaultSerDe deserializer = new DefaultSerDe(Long.class, false);
48+
DefaultSerDe serializer = new DefaultSerDe(Long.class);
49+
DefaultSerDe deserializer = new DefaultSerDe(Long.class);
6650
Long input = new Long(648292);
6751
byte[] output = serializer.serialize(input);
6852
Long result = (Long) deserializer.deserialize(output);
6953
assertEquals(result, input);
70-
71-
try {
72-
DefaultSerDe serDe = new DefaultSerDe(Long.class, false);
73-
serDe.serialize(new Long(34242));
74-
assertFalse(true);
75-
} catch (Exception ex) {
76-
// This is good
77-
}
78-
79-
try {
80-
DefaultSerDe serDe = new DefaultSerDe(Long.class, true);
81-
serDe.deserialize(new byte[10]);
82-
assertFalse(true);
83-
} catch (Exception ex) {
84-
// This is good
85-
}
8654
}
8755

8856
@Test
8957
public void testDoubleSerDe() {
90-
DefaultSerDe serializer = new DefaultSerDe(Double.class, true);
91-
DefaultSerDe deserializer = new DefaultSerDe(Double.class, false);
58+
DefaultSerDe serializer = new DefaultSerDe(Double.class);
59+
DefaultSerDe deserializer = new DefaultSerDe(Double.class);
9260
Double input = new Double(648292.32432);
9361
byte[] output = serializer.serialize(input);
9462
Double result = (Double) deserializer.deserialize(output);
9563
assertEquals(result, input);
96-
97-
try {
98-
DefaultSerDe serDe = new DefaultSerDe(Double.class, false);
99-
serDe.serialize(new Double(34242));
100-
assertFalse(true);
101-
} catch (Exception ex) {
102-
// This is good
103-
}
104-
105-
try {
106-
DefaultSerDe serDe = new DefaultSerDe(Double.class, true);
107-
serDe.deserialize(new byte[10]);
108-
assertFalse(true);
109-
} catch (Exception ex) {
110-
// This is good
111-
}
11264
}
11365

11466
@Test
11567
public void testFloatSerDe() {
116-
DefaultSerDe serializer = new DefaultSerDe(Float.class, true);
117-
DefaultSerDe deserializer = new DefaultSerDe(Float.class, false);
68+
DefaultSerDe serializer = new DefaultSerDe(Float.class);
69+
DefaultSerDe deserializer = new DefaultSerDe(Float.class);
11870
Float input = new Float(354353.54654);
11971
byte[] output = serializer.serialize(input);
12072
Float result = (Float) deserializer.deserialize(output);
12173
assertEquals(result, input);
122-
123-
try {
124-
DefaultSerDe serDe = new DefaultSerDe(Float.class, false);
125-
serDe.serialize(new Float(34242));
126-
assertFalse(true);
127-
} catch (Exception ex) {
128-
// This is good
129-
}
130-
131-
try {
132-
DefaultSerDe serDe = new DefaultSerDe(Float.class, true);
133-
serDe.deserialize(new byte[10]);
134-
assertFalse(true);
135-
} catch (Exception ex) {
136-
// This is good
137-
}
13874
}
13975

14076
@Test
14177
public void testIntegerSerDe() {
142-
DefaultSerDe serializer = new DefaultSerDe(Integer.class, true);
143-
DefaultSerDe deserializer = new DefaultSerDe(Integer.class, false);
78+
DefaultSerDe serializer = new DefaultSerDe(Integer.class);
79+
DefaultSerDe deserializer = new DefaultSerDe(Integer.class);
14480
Integer input = new Integer(2542352);
14581
byte[] output = serializer.serialize(input);
14682
Integer result = (Integer) deserializer.deserialize(output);
14783
assertEquals(result, input);
148-
149-
try {
150-
DefaultSerDe serDe = new DefaultSerDe(Integer.class, false);
151-
serDe.serialize(new Integer(34242));
152-
assertFalse(true);
153-
} catch (Exception ex) {
154-
// This is good
155-
}
156-
157-
try {
158-
DefaultSerDe serDe = new DefaultSerDe(Integer.class, true);
159-
serDe.deserialize(new byte[10]);
160-
assertFalse(true);
161-
} catch (Exception ex) {
162-
// This is good
163-
}
16484
}
16585

16686
private class SimplePulsarFunction implements PulsarFunction<String, String> {
@@ -174,7 +94,7 @@ public String process(String input, Context context) {
17494
public void testPulsarFunction() {
17595
SimplePulsarFunction pulsarFunction = new SimplePulsarFunction();
17696
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
177-
SerDe serDe = new DefaultSerDe(String.class, false);
97+
SerDe serDe = new DefaultSerDe(String.class);
17898
Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
17999
assertTrue(inputSerdeTypeArgs[0].isAssignableFrom(typeArgs[0]));
180100
}

0 commit comments

Comments
 (0)