Skip to content

Commit 23b1f7b

Browse files
3pacccccclhotari
authored andcommitted
[fix][client] Prevent NPE when seeking with null topic in TopicMessageId (apache#24404)
(cherry picked from commit 9337405)
1 parent 2d9ac9b commit 23b1f7b

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2223
import static org.testng.Assert.assertEquals;
2324
import static org.testng.Assert.assertFalse;
2425
import static org.testng.Assert.assertNotNull;
@@ -64,6 +65,7 @@
6465
import org.apache.pulsar.client.impl.ClientBuilderImpl;
6566
import org.apache.pulsar.client.impl.ClientCnx;
6667
import org.apache.pulsar.client.impl.MessageIdImpl;
68+
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
6769
import org.apache.pulsar.common.api.proto.CommandError;
6870
import org.apache.pulsar.common.naming.TopicName;
6971
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -464,6 +466,22 @@ public void testSeekOnPartitionedTopic() throws Exception {
464466
}
465467
}
466468

469+
@Test
470+
public void testSeekWithNonOwnerTopicMessage() throws Exception {
471+
final String topicName = "persistent://prop/use/ns-abc/testNonOwnerTopicMessage";
472+
473+
admin.topics().createPartitionedTopic(topicName, 2);
474+
@Cleanup
475+
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
476+
.subscriptionName("my-subscription").subscribe();
477+
assertThatThrownBy(
478+
// seek with a TopicMessageIdImpl that has a null topic.
479+
() -> consumer.seek(new TopicMessageIdImpl(null, new BatchMessageIdImpl(123L, 345L, 566, 789)))
480+
)
481+
.isInstanceOf(PulsarClientException.class)
482+
.hasMessage("The owner topic is null");
483+
}
484+
467485
@Test
468486
public void testSeekTime() throws Exception {
469487
final String topicName = "persistent://prop/use/ns-abc/testSeekTime";

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,10 +786,15 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
786786
final ConsumerImpl<T> internalConsumer;
787787
if (messageId instanceof TopicMessageId) {
788788
TopicMessageId topicMessageId = (TopicMessageId) messageId;
789-
internalConsumer = consumers.get(topicMessageId.getOwnerTopic());
789+
String ownerTopic = topicMessageId.getOwnerTopic();
790+
if (ownerTopic == null) {
791+
return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException(
792+
"The owner topic is null"));
793+
}
794+
internalConsumer = consumers.get(ownerTopic);
790795
if (internalConsumer == null) {
791796
return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException(
792-
"The owner topic " + topicMessageId.getOwnerTopic() + " is not subscribed"));
797+
"The owner topic " + ownerTopic + " is not subscribed"));
793798
}
794799
} else {
795800
internalConsumer = null;

0 commit comments

Comments
 (0)