diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 2df16374c4..54761850a8 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -238,9 +238,6 @@ type Consumer interface { // Seek resets the subscription associated with this consumer to a specific message id. // The message id can either be a specific message or represent the first or last messages in the topic. - // - // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the - // seek() on the individual partitions. Seek(MessageID) error // SeekByTime resets the subscription associated with this consumer to a specific message publish time. diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2328ca882b..9663a97a86 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -575,15 +575,21 @@ func (c *consumer) Seek(msgID MessageID) error { c.Lock() defer c.Unlock() - if len(c.consumers) > 1 { - return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions") - } - mid, ok := c.messageID(msgID) if !ok { return nil } + if mid.partitionIdx < 0 { + return newError(SeekFailed, "partitionIdx is negative") + } + if mid.partitionIdx > int32(len(c.consumers)) { + return newError( + SeekFailed, + fmt.Sprintf("partitionIdx is %d, but there are %d partitions", mid.partitionIdx, len(c.consumers)), + ) + } + return c.consumers[mid.partitionIdx].Seek(mid) } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 20d4290726..9d62dca6c4 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3053,6 +3053,151 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) { consumer.Ack(msg) } +// TestConsumerSeekOnPartitionedTopic test seekin on a partitioned topic. +// It is based on existing test case [TestConsumerSeek] but for a partitioned topic. +func TestConsumerSeekOnPartitionedTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + // Create topic with 5 partitions + topicAdminURL := "admin/v2/persistent/public/default/TestSeekOnPartitionedTopic/partitions" + err = httpPut(topicAdminURL, 5) + defer httpDelete(topicAdminURL) + assert.Nil(t, err) + + topicName := "persistent://public/default/TestSeekOnPartitionedTopic" + + partitions, err := client.TopicPartitions(topicName) + assert.Nil(t, err) + assert.Equal(t, len(partitions), 5) + for i := 0; i < 5; i++ { + assert.Equal(t, partitions[i], + fmt.Sprintf("%s-partition-%d", topicName, i)) + } + + ctx := context.Background() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "my-sub", + }) + assert.Nil(t, err) + defer consumer.Close() + + // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10 + const N = 1100 + var seekID MessageID + for i := 0; i < N; i++ { + id, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "key", // Ensure all messages go to the same partition. + }) + assert.Nil(t, err) + + if i == N-50 { + seekID = id + } + } + + // Don't consume all messages so some stay in queues + for i := 0; i < N-20; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) + consumer.Ack(msg) + } + + err = consumer.Seek(seekID) + assert.Nil(t, err) + + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload())) +} + +// TestConsumerSeekOnPartitionedTopicKeyShared test seekin on a partitioned topic with a KeyShared subscription. +// It is based on existing test case [TestConsumerSeekOnPartitionedTopicKeyShared] but for a KeyShared subscription. +func TestConsumerSeekOnPartitionedTopicKeyShared(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + // Create topic with 5 partitions + topicAdminURL := "admin/v2/persistent/public/default/TestSeekOnPartitionedTopicKeyShared/partitions" + err = httpPut(topicAdminURL, 5) + defer httpDelete(topicAdminURL) + assert.Nil(t, err) + + topicName := "persistent://public/default/TestSeekOnPartitionedTopicKeyShared" + + partitions, err := client.TopicPartitions(topicName) + assert.Nil(t, err) + assert.Equal(t, len(partitions), 5) + for i := 0; i < 5; i++ { + assert.Equal(t, partitions[i], + fmt.Sprintf("%s-partition-%d", topicName, i)) + } + + ctx := context.Background() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "my-sub", + Type: KeyShared, + }) + assert.Nil(t, err) + defer consumer.Close() + + // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10 + const N = 1100 + var seekID MessageID + for i := 0; i < N; i++ { + id, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "key", // Ensure all messages go to the same partition. + }) + assert.Nil(t, err) + + if i == N-50 { + seekID = id + } + } + + // Don't consume all messages so some stay in queues + for i := 0; i < N-20; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) + consumer.Ack(msg) + } + + err = consumer.Seek(seekID) + assert.Nil(t, err) + + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload())) +} + // TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned topic. // It is based on existing test case [TestConsumerSeekByTime] but for partitioned topic. func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {