|
19 | 19 | package org.apache.pulsar.broker.service;
|
20 | 20 |
|
21 | 21 | import static org.assertj.core.api.Assertions.assertThat;
|
| 22 | +import static org.assertj.core.api.Assertions.assertThatThrownBy; |
22 | 23 | import static org.testng.Assert.assertEquals;
|
23 | 24 | import static org.testng.Assert.assertFalse;
|
24 | 25 | import static org.testng.Assert.assertNotNull;
|
|
64 | 65 | import org.apache.pulsar.client.impl.ClientBuilderImpl;
|
65 | 66 | import org.apache.pulsar.client.impl.ClientCnx;
|
66 | 67 | import org.apache.pulsar.client.impl.MessageIdImpl;
|
| 68 | +import org.apache.pulsar.client.impl.TopicMessageIdImpl; |
67 | 69 | import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
|
68 | 70 | import org.apache.pulsar.common.api.proto.CommandError;
|
69 | 71 | import org.apache.pulsar.common.naming.TopicName;
|
@@ -464,6 +466,22 @@ public void testSeekOnPartitionedTopic() throws Exception {
|
464 | 466 | }
|
465 | 467 | }
|
466 | 468 |
|
| 469 | + @Test |
| 470 | + public void testSeekWithNonOwnerTopicMessage() throws Exception { |
| 471 | + final String topicName = "persistent://prop/use/ns-abc/testNonOwnerTopicMessage"; |
| 472 | + |
| 473 | + admin.topics().createPartitionedTopic(topicName, 2); |
| 474 | + @Cleanup |
| 475 | + org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) |
| 476 | + .subscriptionName("my-subscription").subscribe(); |
| 477 | + assertThatThrownBy( |
| 478 | + // seek with a TopicMessageIdImpl that has a null topic. |
| 479 | + () -> consumer.seek(new TopicMessageIdImpl(null, new BatchMessageIdImpl(123L, 345L, 566, 789))) |
| 480 | + ) |
| 481 | + .isInstanceOf(PulsarClientException.class) |
| 482 | + .hasMessage("The owner topic is null"); |
| 483 | + } |
| 484 | + |
467 | 485 | @Test
|
468 | 486 | public void testSeekTime() throws Exception {
|
469 | 487 | final String topicName = "persistent://prop/use/ns-abc/testSeekTime";
|
|
0 commit comments