Skip to content

Commit 5ab0e93

Browse files
committed
[fix] [client] Unclear error message when creating a consumer with two same topics (#22255)
(cherry picked from commit c616b35)
1 parent b51b748 commit 5ab0e93

File tree

2 files changed

+38
-10
lines changed

2 files changed

+38
-10
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import static org.testng.Assert.assertFalse;
2727
import static org.testng.Assert.assertEquals;
2828
import static org.testng.Assert.assertTrue;
29+
import static org.testng.Assert.fail;
2930

3031
import com.google.common.collect.Lists;
3132
import java.util.ArrayList;
33+
import java.util.Arrays;
3234
import java.util.Collections;
3335
import java.util.HashMap;
3436
import java.util.List;
@@ -42,6 +44,7 @@
4244
import java.util.stream.Collectors;
4345
import java.util.stream.IntStream;
4446
import lombok.Cleanup;
47+
import org.apache.pulsar.broker.BrokerTestUtil;
4548
import org.apache.pulsar.client.admin.PulsarAdminException;
4649
import org.apache.pulsar.client.impl.ClientBuilderImpl;
4750
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -50,6 +53,7 @@
5053
import org.apache.pulsar.client.impl.PulsarClientImpl;
5154
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
5255
import org.apache.pulsar.common.naming.TopicName;
56+
import org.apache.pulsar.common.util.FutureUtil;
5357
import org.awaitility.Awaitility;
5458
import org.mockito.AdditionalAnswers;
5559
import org.mockito.Mockito;
@@ -372,6 +376,29 @@ public void testMultipleIOThreads() throws PulsarAdminException, PulsarClientExc
372376
assertTrue(consumer.isConnected());
373377
}
374378

379+
@Test
380+
public void testSameTopics() throws Exception {
381+
final String topic1 = BrokerTestUtil.newUniqueName("public/default/tp");
382+
final String topic2 = "persistent://" + topic1;
383+
admin.topics().createNonPartitionedTopic(topic2);
384+
// Create consumer with two same topics.
385+
try {
386+
pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2))
387+
.subscriptionName("s1").subscribe();
388+
fail("Do not allow use two same topics.");
389+
} catch (Exception e) {
390+
if (e instanceof PulsarClientException && e.getCause() != null) {
391+
e = (Exception) e.getCause();
392+
}
393+
Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
394+
assertTrue(unwrapEx instanceof IllegalArgumentException);
395+
assertTrue(e.getMessage().contains( "Subscription topics include duplicate items"
396+
+ " or invalid names"));
397+
}
398+
// cleanup.
399+
admin.topics().delete(topic2);
400+
}
401+
375402
@Test(timeOut = 30000)
376403
public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClientException {
377404
final var topic1 = newTopicName();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
165165
return;
166166
}
167167

168-
checkArgument(topicNamesValid(conf.getTopicNames()), "Topics is invalid.");
168+
checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription topics include duplicate items"
169+
+ " or invalid names.");
169170

170171
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream()
171172
.map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
@@ -202,21 +203,21 @@ private static boolean topicNamesValid(Collection<String> topics) {
202203
checkState(topics != null && topics.size() >= 1,
203204
"topics should contain more than 1 topic");
204205

205-
Optional<String> result = topics.stream()
206-
.filter(topic -> !TopicName.isValid(topic))
207-
.findFirst();
206+
Set<TopicName> topicNames = new HashSet<>();
208207

209-
if (result.isPresent()) {
210-
log.warn("Received invalid topic name: {}", result.get());
211-
return false;
208+
for (String topic : topics) {
209+
if (!TopicName.isValid(topic)) {
210+
log.warn("Received invalid topic name: {}", topic);
211+
return false;
212+
}
213+
topicNames.add(TopicName.get(topic));
212214
}
213215

214216
// check topic names are unique
215-
HashSet<String> set = new HashSet<>(topics);
216-
if (set.size() == topics.size()) {
217+
if (topicNames.size() == topics.size()) {
217218
return true;
218219
} else {
219-
log.warn("Topic names not unique. unique/all : {}/{}", set.size(), topics.size());
220+
log.warn("Topic names not unique. unique/all : {}/{}", topicNames.size(), topics.size());
220221
return false;
221222
}
222223
}

0 commit comments

Comments
 (0)