Skip to content

Commit 9ab7114

Browse files
authored
add support for explicitly configured broker ports (#39)
* add support for explicitly configured broker ports * update readme
1 parent 7f8ca7d commit 9ab7114

18 files changed

+350
-28
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
The format is based on [Keep a Changelog](http://keepachangelog.com/)
33
and this project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## 3.2.0 (11/13/2019)
6+
- [ISSUE-38](https://github.com/salesforce/kafka-junit/issues/38) Optionally allow for explicitly defining which ports kakfa brokers listen on.
7+
58
## 3.1.2 (11/08/2019)
69
- [ISSUE-36](https://github.com/salesforce/kafka-junit/issues/36) Temporary directories should now be cleaned up properly on JVM shutdown.
710

kafka-junit-core/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
<parent>
66
<artifactId>kafka-junit</artifactId>
77
<groupId>com.salesforce.kafka.test</groupId>
8-
<version>3.1.2</version>
8+
<version>3.2.0</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

1212
<artifactId>kafka-junit-core</artifactId>
13-
<version>3.1.2</version>
13+
<version>3.2.0</version>
1414

1515
<!-- defined properties -->
1616
<properties>

kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestCluster.java

+41
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
import java.util.ArrayList;
3636
import java.util.Collection;
3737
import java.util.Collections;
38+
import java.util.HashSet;
3839
import java.util.List;
3940
import java.util.Optional;
4041
import java.util.Properties;
42+
import java.util.Set;
4143
import java.util.concurrent.TimeoutException;
4244
import java.util.stream.Collectors;
4345

@@ -140,6 +142,9 @@ public void start() throws Exception, TimeoutException {
140142
// Ensure zookeeper instance has been started.
141143
zkTestServer.start();
142144

145+
// Validate listeners
146+
validateListenerPorts();
147+
143148
// If we have no brokers defined yet...
144149
if (brokers.isEmpty()) {
145150
// Loop over brokers, starting with brokerId 1.
@@ -169,6 +174,42 @@ public void start() throws Exception, TimeoutException {
169174
waitUntilClusterReady(10_000L);
170175
}
171176

177+
private void validateListenerPorts() {
178+
final Set<Integer> registeredPorts = new HashSet<>();
179+
180+
// Validate listeners that have explicitly defined ports.
181+
for (final BrokerListener listener : registeredListeners) {
182+
// Listeners with no explicitly defined ports will use a randomly assigned port.
183+
if (listener.getPorts().length == 0) {
184+
// Skip to next listener.
185+
continue;
186+
}
187+
188+
// Otherwise ensure we have at least 1 port per broker
189+
else if (listener.getPorts().length < numberOfBrokers) {
190+
// TODO Log warning that a random port will be used
191+
logger.warn(
192+
"{} will use at least one randomly generated port. "
193+
+ "To avoid this warning assign the same number of ports via the onPorts() method to this listener "
194+
+ "as brokers you have in your test cluster.",
195+
listener.getClass().getSimpleName()
196+
);
197+
}
198+
199+
// Ensure ports are not duplicated.
200+
for (final int port : listener.getPorts()) {
201+
if (registeredPorts.contains(port)) {
202+
throw new RuntimeException(
203+
"Error configuring listener " + listener.getClass().getSimpleName() + " as port " + port + " "
204+
+ "is already registered by a listener. Ensure that all explicitly defined ports passed to "
205+
+ "BrokerListener.onPorts() is unique across all listeners."
206+
);
207+
}
208+
registeredPorts.add(port);
209+
}
210+
}
211+
}
212+
172213
/**
173214
* Returns an immutable list of broker hosts for the kafka cluster.
174215
* @return immutable list of hosts for brokers within the cluster.

kafka-junit-core/src/main/java/com/salesforce/kafka/test/KafkaTestServer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.salesforce.kafka.test.listeners.PlainListener;
3030
import kafka.server.KafkaConfig;
3131
import kafka.server.KafkaServerStartable;
32-
import org.apache.curator.test.InstanceSpec;
3332

3433
import java.util.ArrayList;
3534
import java.util.Collection;
@@ -262,8 +261,8 @@ public void start() throws Exception {
262261

263262
// Loop over registered listeners and add each
264263
for (final BrokerListener listener : registeredListeners) {
265-
// Generate port to listen on.
266-
final int port = InstanceSpec.getRandomPort();
264+
// Get port to listen on.
265+
final int port = listener.getNextPort();
267266
final String listenerDefinition = listener.getProtocol() + "://" + getConfiguredHostname() + ":" + port;
268267
listenerProperties.add(
269268
new ListenerProperties(listener.getProtocol(), listenerDefinition, listener.getClientProperties())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* Copyright (c) 2017-2018, Salesforce.com, Inc.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
6+
* following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
9+
* disclaimer.
10+
*
11+
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
12+
* disclaimer in the documentation and/or other materials provided with the distribution.
13+
*
14+
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
15+
* derived from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
18+
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
22+
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
23+
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24+
*/
25+
26+
package com.salesforce.kafka.test.listeners;
27+
28+
import org.apache.curator.test.InstanceSpec;
29+
30+
/**
31+
* Shared Listener class.
32+
*
33+
* @param <Self> reference to parent class.
34+
*/
35+
public abstract class AbstractListener<Self> implements BrokerListener {
36+
/**
37+
* Defines which port(s) to listen on.
38+
*/
39+
private int[] ports = {};
40+
private int portIndex = 0;
41+
42+
/**
43+
* Optionally allow for explicitly defining which ports this listener will bind to.
44+
* Pass a unique port per broker running.
45+
*
46+
* If not explicitly called, random ports will be assigned to each listener and broker.
47+
*
48+
* @param ports the ports to bind to.
49+
* @return self for method chaining.
50+
*/
51+
public Self onPorts(final int ... ports) {
52+
this.ports = ports;
53+
return (Self) this;
54+
}
55+
56+
/**
57+
* The ports configured.
58+
* @return Configured ports.
59+
*/
60+
public int[] getPorts() {
61+
return ports;
62+
}
63+
64+
/**
65+
* Internal method to get the next assigned port. If called more times than configured ports,
66+
* this method will generate a random port to be used.
67+
*
68+
* @return next configured port to use.
69+
*/
70+
public int getNextPort() {
71+
if (ports == null || ports.length == 0 || portIndex >= ports.length) {
72+
// Return random Port
73+
return InstanceSpec.getRandomPort();
74+
}
75+
return ports[portIndex++];
76+
}
77+
}

kafka-junit-core/src/main/java/com/salesforce/kafka/test/listeners/BrokerListener.java

+14
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,18 @@ public interface BrokerListener {
5454
* @return Properties to be registered on connecting client.
5555
*/
5656
Properties getClientProperties();
57+
58+
/**
59+
* The ports configured.
60+
* @return Configured ports.
61+
*/
62+
int[] getPorts();
63+
64+
/**
65+
* Internal method to get the next assigned port. If called more times than configured ports,
66+
* this method will generate a random port to be used.
67+
*
68+
* @return next configured port to use.
69+
*/
70+
int getNextPort();
5771
}

kafka-junit-core/src/main/java/com/salesforce/kafka/test/listeners/PlainListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
/**
3131
* Default implementation. Defines a PLAINTEXT listener.
3232
*/
33-
public class PlainListener implements BrokerListener {
33+
public class PlainListener extends AbstractListener<PlainListener> {
3434

3535
@Override
3636
public String getProtocol() {

kafka-junit-core/src/main/java/com/salesforce/kafka/test/listeners/SaslPlainListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
* In order to make use of this Listener, you **must** start the JVM with the following:
4040
* -Djava.security.auth.login.config=/path/to/your/jaas.conf
4141
*/
42-
public class SaslPlainListener implements BrokerListener {
42+
public class SaslPlainListener extends AbstractListener<SaslPlainListener> {
4343
private static final Logger logger = LoggerFactory.getLogger(SaslPlainListener.class);
4444

4545
private String username = "";

kafka-junit-core/src/main/java/com/salesforce/kafka/test/listeners/SaslSslListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
* In order to make use of this Listener, you **must** start the JVM with the following:
4040
* -Djava.security.auth.login.config=/path/to/your/jaas.conf
4141
*/
42-
public class SaslSslListener implements BrokerListener {
42+
public class SaslSslListener extends AbstractListener<SaslSslListener> {
4343
private static final Logger logger = LoggerFactory.getLogger(SaslSslListener.class);
4444

4545
// SASL Settings.

kafka-junit-core/src/main/java/com/salesforce/kafka/test/listeners/SslListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
/**
3131
* Define and register an SSL listener on a Kafka broker.
3232
*/
33-
public class SslListener implements BrokerListener {
33+
public class SslListener extends AbstractListener<SslListener> {
3434

3535
private String trustStoreFile = "";
3636
private String trustStorePassword = "";

kafka-junit-core/src/test/java/com/salesforce/kafka/test/KafkaTestClusterTest.java

+84
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.salesforce.kafka.test.listeners.SaslPlainListener;
3131
import com.salesforce.kafka.test.listeners.SaslSslListener;
3232
import com.salesforce.kafka.test.listeners.SslListener;
33+
import org.apache.curator.test.InstanceSpec;
3334
import org.apache.kafka.clients.admin.TopicDescription;
3435
import org.apache.kafka.clients.consumer.ConsumerRecord;
3536
import org.apache.kafka.common.Node;
@@ -40,6 +41,7 @@
4041
import org.junit.jupiter.params.provider.Arguments;
4142
import org.junit.jupiter.params.provider.MethodSource;
4243

44+
import java.util.ArrayList;
4345
import java.util.Arrays;
4446
import java.util.Collection;
4547
import java.util.Collections;
@@ -459,6 +461,88 @@ private static Stream<Arguments> provideListeners() {
459461
);
460462
}
461463

464+
/**
465+
* Test a cluster instance with listeners on specified ports.
466+
*/
467+
@Test
468+
void testListenerWithSpecificPort() throws Exception {
469+
// Explicitly define our port
470+
final int exportedPort1 = InstanceSpec.getRandomPort();
471+
final int exportedPort2 = InstanceSpec.getRandomPort();
472+
473+
// Create default plain listener
474+
final BrokerListener plainListener = new PlainListener()
475+
.onPorts(exportedPort1, exportedPort2);
476+
final List<BrokerListener> listeners = Collections.singletonList(plainListener);
477+
478+
final String topicName = "TestTopic-" + System.currentTimeMillis();
479+
final int expectedMsgCount = 2;
480+
final int numberOfBrokers = 2;
481+
482+
// Speed up shutdown in our tests
483+
final Properties overrideProperties = getDefaultBrokerOverrideProperties();
484+
485+
// Create our test server instance
486+
try (final KafkaTestCluster kafkaTestCluster = new KafkaTestCluster(numberOfBrokers, overrideProperties, listeners)) {
487+
// Start broker
488+
kafkaTestCluster.start();
489+
490+
// Validate connect string is as expected.
491+
final String connectString = kafkaTestCluster.getKafkaConnectString();
492+
final String expectedConnectString = "PLAINTEXT://localhost:" + exportedPort1 + ",PLAINTEXT://localhost:" + exportedPort2;
493+
Assertions.assertEquals(expectedConnectString, connectString, "Should be using our specified ports");
494+
495+
// Create KafkaTestUtils
496+
final KafkaTestUtils kafkaTestUtils = new KafkaTestUtils(kafkaTestCluster);
497+
498+
// Create topic
499+
kafkaTestUtils.createTopic(topicName, 1, (short) numberOfBrokers);
500+
501+
// Publish 2 messages into topic
502+
kafkaTestUtils.produceRecords(expectedMsgCount, topicName, 0);
503+
504+
// Sanity test - Consume the messages back out before shutting down broker.
505+
final List<ConsumerRecord<byte[], byte[]>> records = kafkaTestUtils.consumeAllRecordsFromTopic(topicName);
506+
Assertions.assertNotNull(records);
507+
Assertions.assertEquals(expectedMsgCount, records.size(), "Should have found 2 records.");
508+
}
509+
}
510+
511+
/**
512+
* Test a cluster instance with listeners on specified ports, where a port is duplicated.
513+
*/
514+
@Test
515+
void testListenerWithSpecificPortRepeated() throws Exception {
516+
// Explicitly define our port
517+
final int port1 = InstanceSpec.getRandomPort();
518+
final int port2 = InstanceSpec.getRandomPort();
519+
final int port3 = InstanceSpec.getRandomPort();
520+
521+
// Create plain listeners using the same port.
522+
final BrokerListener plainListener1 = new PlainListener()
523+
.onPorts(port1, port2);
524+
525+
final BrokerListener plainListener2 = new PlainListener()
526+
.onPorts(port3, port1);
527+
528+
final List<BrokerListener> listeners = new ArrayList<>();
529+
listeners.add(plainListener1);
530+
listeners.add(plainListener2);
531+
532+
// Define how many brokers.
533+
final int numberOfBrokers = 2;
534+
535+
// Speed up shutdown in our tests
536+
final Properties overrideProperties = getDefaultBrokerOverrideProperties();
537+
538+
// Create our test server instance
539+
try (final KafkaTestCluster kafkaTestCluster = new KafkaTestCluster(numberOfBrokers, overrideProperties, listeners)) {
540+
541+
// Start broker, this should throw an exception
542+
Assertions.assertThrows(RuntimeException.class, kafkaTestCluster::start);
543+
}
544+
}
545+
462546
private Properties getDefaultBrokerOverrideProperties() {
463547
// Speed up shutdown in our tests
464548
final Properties overrideProperties = new Properties();

0 commit comments

Comments
 (0)