17
17
package org .springframework .kafka .annotation ;
18
18
19
19
import static org .assertj .core .api .Assertions .assertThat ;
20
+ import static org .mockito .Mockito .never ;
21
+ import static org .mockito .Mockito .spy ;
22
+ import static org .mockito .Mockito .verify ;
20
23
21
24
import java .util .ArrayList ;
22
25
import java .util .Collection ;
28
31
import java .util .concurrent .TimeUnit ;
29
32
import java .util .stream .Collectors ;
30
33
34
+ import org .apache .kafka .clients .admin .AdminClientConfig ;
31
35
import org .apache .kafka .clients .consumer .ConsumerConfig ;
32
36
import org .apache .kafka .common .serialization .ByteArraySerializer ;
33
37
import org .apache .kafka .common .serialization .BytesDeserializer ;
44
48
import org .springframework .kafka .config .KafkaListenerContainerFactory ;
45
49
import org .springframework .kafka .core .DefaultKafkaConsumerFactory ;
46
50
import org .springframework .kafka .core .DefaultKafkaProducerFactory ;
51
+ import org .springframework .kafka .core .KafkaAdmin ;
47
52
import org .springframework .kafka .core .KafkaTemplate ;
48
53
import org .springframework .kafka .core .ProducerFactory ;
49
54
import org .springframework .kafka .listener .BatchListenerFailedException ;
@@ -110,7 +115,7 @@ private void doTest(Listener listener, String topic) throws InterruptedException
110
115
}
111
116
112
117
@ Test
113
- public void testBatchOfPojoMessages () throws Exception {
118
+ public void testBatchOfPojoMessages (@ Autowired KafkaAdmin admin ) throws Exception {
114
119
String topic = "blc3" ;
115
120
this .template .send (new GenericMessage <>(
116
121
new Foo ("bar" ), Collections .singletonMap (KafkaHeaders .TOPIC , topic )));
@@ -119,6 +124,7 @@ public void testBatchOfPojoMessages() throws Exception {
119
124
assertThat (listener .received .size ()).isGreaterThan (0 );
120
125
assertThat (listener .received .get (0 ).getPayload ()).isInstanceOf (Foo .class );
121
126
assertThat (listener .received .get (0 ).getPayload ().getBar ()).isEqualTo ("bar" );
127
+ verify (admin , never ()).clusterId ();
122
128
}
123
129
124
130
@ Test
@@ -152,6 +158,11 @@ void conversionError() throws InterruptedException {
152
158
@ EnableKafka
153
159
public static class Config {
154
160
161
+ @ Bean
162
+ KafkaAdmin admin (EmbeddedKafkaBroker broker ) {
163
+ return spy (new KafkaAdmin (Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , broker .getBrokersAsString ())));
164
+ }
165
+
155
166
@ Bean
156
167
public KafkaListenerContainerFactory <?> kafkaListenerContainerFactory (EmbeddedKafkaBroker embeddedKafka ,
157
168
KafkaTemplate <Integer , Object > template ) {
0 commit comments