Skip to content

Commit 0377e80

Browse files
authored
MINOR: Use Producer interface and ClusterInstance producer factory (#18197)
Reviewers: David Arthur <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 08ef22d commit 0377e80

File tree

2 files changed

+10
-26
lines changed

2 files changed

+10
-26
lines changed

core/src/test/java/kafka/admin/AdminFenceProducersTest.java

+6-14
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020
import org.apache.kafka.clients.admin.Admin;
2121
import org.apache.kafka.clients.admin.AdminClientConfig;
2222
import org.apache.kafka.clients.admin.FenceProducersOptions;
23-
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.Producer;
2424
import org.apache.kafka.clients.producer.ProducerConfig;
2525
import org.apache.kafka.clients.producer.ProducerRecord;
2626
import org.apache.kafka.common.errors.ApiException;
2727
import org.apache.kafka.common.errors.InvalidProducerEpochException;
2828
import org.apache.kafka.common.errors.ProducerFencedException;
2929
import org.apache.kafka.common.errors.TimeoutException;
30-
import org.apache.kafka.common.serialization.ByteArraySerializer;
3130
import org.apache.kafka.common.test.api.ClusterConfigProperty;
3231
import org.apache.kafka.common.test.api.ClusterInstance;
3332
import org.apache.kafka.common.test.api.ClusterTest;
@@ -42,7 +41,6 @@
4241
import java.util.Collections;
4342
import java.util.HashMap;
4443
import java.util.Map;
45-
import java.util.Properties;
4644
import java.util.concurrent.ExecutionException;
4745

4846
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -68,22 +66,16 @@ public class AdminFenceProducersTest {
6866
this.clusterInstance = clusterInstance;
6967
}
7068

71-
private KafkaProducer<byte[], byte[]> createProducer() {
72-
Properties config = new Properties();
73-
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
74-
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TXN_ID);
75-
config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "2000");
76-
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
77-
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
78-
79-
return new KafkaProducer<>(config);
69+
private Producer<byte[], byte[]> createProducer() {
70+
return clusterInstance.producer(Map.of(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TXN_ID,
71+
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "2000"));
8072
}
8173

8274
@ClusterTest
8375
void testFenceAfterProducerCommit() throws Exception {
8476
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
8577

86-
try (KafkaProducer<byte[], byte[]> producer = createProducer();
78+
try (Producer<byte[], byte[]> producer = createProducer();
8779
Admin adminClient = clusterInstance.admin()) {
8880
producer.initTransactions();
8981
producer.beginTransaction();
@@ -125,7 +117,7 @@ void testFenceProducerTimeoutMs() {
125117
void testFenceBeforeProducerCommit() throws Exception {
126118
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
127119

128-
try (KafkaProducer<byte[], byte[]> producer = createProducer();
120+
try (Producer<byte[], byte[]> producer = createProducer();
129121
Admin adminClient = clusterInstance.admin()) {
130122

131123
producer.initTransactions();

tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java

+4-12
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,12 @@
2525
import org.apache.kafka.clients.consumer.ConsumerRecords;
2626
import org.apache.kafka.clients.consumer.GroupProtocol;
2727
import org.apache.kafka.clients.consumer.KafkaConsumer;
28-
import org.apache.kafka.clients.producer.KafkaProducer;
28+
import org.apache.kafka.clients.producer.Producer;
2929
import org.apache.kafka.clients.producer.ProducerConfig;
3030
import org.apache.kafka.clients.producer.ProducerRecord;
3131
import org.apache.kafka.common.TopicPartition;
3232
import org.apache.kafka.common.protocol.Errors;
3333
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
34-
import org.apache.kafka.common.serialization.ByteArraySerializer;
3534
import org.apache.kafka.common.test.api.ClusterConfig;
3635
import org.apache.kafka.common.test.api.ClusterInstance;
3736
import org.apache.kafka.common.test.api.ClusterTemplate;
@@ -47,7 +46,6 @@
4746
import java.util.List;
4847
import java.util.Map;
4948
import java.util.Map.Entry;
50-
import java.util.Properties;
5149

5250
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
5351
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -226,19 +224,13 @@ private void testWithConsumerGroup(String inputTopic,
226224
}
227225

228226
private void produceRecord(String topic) {
229-
try (KafkaProducer<byte[], byte[]> producer = createProducer()) {
227+
try (Producer<byte[], byte[]> producer = createProducer()) {
230228
assertDoesNotThrow(() -> producer.send(new ProducerRecord<>(topic, 0, null, null)).get());
231229
}
232230
}
233231

234-
private KafkaProducer<byte[], byte[]> createProducer() {
235-
Properties config = new Properties();
236-
config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
237-
config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1");
238-
config.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
239-
config.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
240-
241-
return new KafkaProducer<>(config);
232+
private Producer<byte[], byte[]> createProducer() {
233+
return clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"));
242234
}
243235

244236
private Consumer<byte[], byte[]> createConsumer(String group, GroupProtocol groupProtocol) {

0 commit comments

Comments
 (0)