Skip to content

Commit 44d2741

Browse files
authored
MINOR: fix bug in TimeWindowedDeserializerTest (#19570)
Test throws `NumberFormatException` and thus still passed as this exception extends `IllegalArgumentException`. However, the test does not verify what it is supposed to verify. Piggybacking some code cleanup to all related files. Reviewers: PoAn Yang <[email protected]>, Ken Huang <[email protected]>, Bill Bejeck <[email protected]>
1 parent 1059af4 commit 44d2741

File tree

4 files changed

+93
-54
lines changed

4 files changed

+93
-54
lines changed

streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -44,37 +44,44 @@ public void testSessionWindowedDeserializerConstructor() {
4444
assertInstanceOf(StringDeserializer.class, inner, "Inner deserializer type should be StringDeserializer");
4545
}
4646

47+
@Deprecated
4748
@Test
4849
public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
4950
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
50-
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
51-
deserializer.configure(props, false);
52-
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
51+
try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) {
52+
deserializer.configure(props, false);
53+
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
54+
}
5355
}
5456

5557
@Test
5658
public void shouldSetSerializerThroughWindowedInnerDeserializerClassConfig() {
5759
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
58-
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
59-
deserializer.configure(props, false);
60-
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
60+
try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) {
61+
deserializer.configure(props, false);
62+
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
63+
}
6164
}
6265

66+
@Deprecated
6367
@Test
6468
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet() {
6569
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
6670
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
67-
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
68-
deserializer.configure(props, false);
69-
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
71+
try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) {
72+
deserializer.configure(props, false);
73+
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
74+
}
7075
}
7176

7277
@Test
7378
public void shouldThrowErrorIfWindowedInnerClassSerdeAndSessionWindowedDeserializerClassAreNotSet() {
74-
final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>();
75-
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
79+
try (final SessionWindowedDeserializer<?> deserializer = new SessionWindowedDeserializer<>()) {
80+
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
81+
}
7682
}
7783

84+
@Deprecated
7885
@Test
7986
public void shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerClassSerdeConfig() {
8087
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
@@ -87,6 +94,7 @@ public void shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInner
8794
assertThrows(IllegalArgumentException.class, () -> sessionWindowedDeserializer.configure(props, false));
8895
}
8996

97+
@Deprecated
9098
@Test
9199
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
92100
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");

streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -44,37 +44,44 @@ public void testSessionWindowedSerializerConstructor() {
4444
assertInstanceOf(StringSerializer.class, inner, "Inner serializer type should be StringSerializer");
4545
}
4646

47+
@Deprecated
4748
@Test
4849
public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
4950
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
50-
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
51-
serializer.configure(props, false);
52-
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
51+
try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) {
52+
serializer.configure(props, false);
53+
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
54+
}
5355
}
5456

5557
@Test
5658
public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() {
5759
props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
58-
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
59-
serializer.configure(props, false);
60-
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
60+
try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) {
61+
serializer.configure(props, false);
62+
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
63+
}
6164
}
6265

66+
@Deprecated
6367
@Test
6468
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet() {
6569
props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
6670
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
67-
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
68-
serializer.configure(props, false);
69-
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
71+
try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) {
72+
serializer.configure(props, false);
73+
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
74+
}
7075
}
7176

7277
@Test
7378
public void shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet() {
74-
final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>();
75-
assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false));
79+
try (final SessionWindowedSerializer<?> serializer = new SessionWindowedSerializer<>()) {
80+
assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false));
81+
}
7682
}
7783

84+
@Deprecated
7885
@Test
7986
public void shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerClassSerdeConfig() {
8087
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
@@ -87,6 +94,7 @@ public void shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerSe
8794
assertThrows(IllegalArgumentException.class, () -> sessionWindowedSerializer.configure(props, false));
8895
}
8996

97+
@Deprecated
9098
@Test
9199
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
92100
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");

streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java

+36-21
Original file line numberDiff line numberDiff line change
@@ -49,42 +49,49 @@ public void testTimeWindowedDeserializerConstructor() {
4949
assertThat(timeWindowedDeserializer.getWindowSize(), is(5000000L));
5050
}
5151

52+
@Deprecated
5253
@Test
5354
public void shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerClassSerdeConfigs() {
5455
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
5556
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
56-
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
57-
deserializer.configure(props, false);
58-
assertThat(deserializer.getWindowSize(), is(500L));
59-
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
57+
try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) {
58+
deserializer.configure(props, false);
59+
assertThat(deserializer.getWindowSize(), is(500L));
60+
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
61+
}
6062
}
6163

6264
@Test
6365
public void shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerDeserializerClassConfigs() {
6466
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
6567
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
66-
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
67-
deserializer.configure(props, false);
68-
assertThat(deserializer.getWindowSize(), is(500L));
69-
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
68+
try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) {
69+
deserializer.configure(props, false);
70+
assertThat(deserializer.getWindowSize(), is(500L));
71+
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
72+
}
7073
}
7174

75+
@Deprecated
7276
@Test
7377
public void shouldHaveSameConfigNameForWindowSizeMs() {
7478
assertEquals(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, StreamsConfig.WINDOW_SIZE_MS_CONFIG);
7579
}
7680

81+
@Deprecated
7782
@Test
7883
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet() {
7984
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
8085
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
8186
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
82-
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
83-
deserializer.configure(props, false);
84-
assertThat(deserializer.getWindowSize(), is(500L));
85-
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
87+
try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) {
88+
deserializer.configure(props, false);
89+
assertThat(deserializer.getWindowSize(), is(500L));
90+
assertInstanceOf(ByteArrayDeserializer.class, deserializer.innerDeserializer());
91+
}
8692
}
8793

94+
@Deprecated
8895
@Test
8996
public void shouldThrowErrorIfWindowSizeSetInStreamsConfigAndConstructor() {
9097
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
@@ -97,34 +104,41 @@ public void shouldThrowErrorIfWindowSizeSetInConstructorConfigAndConstructor() {
97104
assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false));
98105
}
99106

107+
@Deprecated
100108
@Test
101109
public void shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerClassSerdeIsSet() {
102110
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
103-
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
104-
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
111+
try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) {
112+
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
113+
}
105114
}
106115

107116
@Test
108117
public void shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerDeserializerClassIsSet() {
109-
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, Serdes.ByteArraySerde.class.getName());
110-
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
111-
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
118+
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
119+
try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) {
120+
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
121+
}
112122
}
113123

124+
@Deprecated
114125
@Test
115126
public void shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInStreamsConfigIsSet() {
116127
props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
117-
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
118-
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
128+
try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) {
129+
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
130+
}
119131
}
120132

121133
@Test
122134
public void shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInConstructorConfigIsSet() {
123135
props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
124-
final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>();
125-
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
136+
try (final TimeWindowedDeserializer<?> deserializer = new TimeWindowedDeserializer<>()) {
137+
assertThrows(IllegalArgumentException.class, () -> deserializer.configure(props, false));
138+
}
126139
}
127140

141+
@Deprecated
128142
@Test
129143
public void shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerClassSerdeConfig() {
130144
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
@@ -137,6 +151,7 @@ public void shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerD
137151
assertThrows(IllegalArgumentException.class, () -> timeWindowedDeserializer.configure(props, false));
138152
}
139153

154+
@Deprecated
140155
@Test
141156
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
142157
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");

streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -44,37 +44,44 @@ public void testTimeWindowedSerializerConstructor() {
4444
assertInstanceOf(StringSerializer.class, inner, "Inner serializer type should be StringSerializer");
4545
}
4646

47+
@Deprecated
4748
@Test
4849
public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
4950
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
50-
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
51-
serializer.configure(props, false);
52-
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
51+
try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) {
52+
serializer.configure(props, false);
53+
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
54+
}
5355
}
5456

5557
@Test
5658
public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() {
5759
props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
58-
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
59-
serializer.configure(props, false);
60-
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
60+
try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) {
61+
serializer.configure(props, false);
62+
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
63+
}
6164
}
6265

66+
@Deprecated
6367
@Test
6468
public void shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet() {
6569
props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, Serdes.ByteArraySerde.class.getName());
6670
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");
67-
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
68-
serializer.configure(props, false);
69-
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
71+
try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) {
72+
serializer.configure(props, false);
73+
assertInstanceOf(ByteArraySerializer.class, serializer.innerSerializer());
74+
}
7075
}
7176

7277
@Test
7378
public void shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet() {
74-
final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>();
75-
assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false));
79+
try (final TimeWindowedSerializer<?> serializer = new TimeWindowedSerializer<>()) {
80+
assertThrows(IllegalArgumentException.class, () -> serializer.configure(props, false));
81+
}
7682
}
7783

84+
@Deprecated
7885
@Test
7986
public void shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerClassSerdeConfig() {
8087
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, Serdes.ByteArraySerde.class.getName());
@@ -87,6 +94,7 @@ public void shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerSer
8794
assertThrows(IllegalArgumentException.class, () -> timeWindowedSerializer.configure(props, false));
8895
}
8996

97+
@Deprecated
9098
@Test
9199
public void shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
92100
props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, "some.non.existent.class");

0 commit comments

Comments
 (0)