Skip to content

Commit 5a59ab7

Browse files
authored
[fix][client] fix retry topic with exclusive mode. (apache#23859)
1 parent 4bfdcd8 commit 5a59ab7

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,46 @@ public void testRetryTopic() throws Exception {
136136
checkConsumer.close();
137137
}
138138

139+
/**
140+
* Retry topic feature relies on the delay queue feature when consumer produce a delayed message
141+
* to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type.
142+
* As a result, the subscription type of the retry topic should be shared or key-shared.
143+
* @throws Exception
144+
*/
145+
@Test
146+
public void testRetryTopicWithExclusiveMode() throws Exception {
147+
final String topic = "persistent://my-property/my-ns/retry-topic-exclusive";
148+
final int maxRedeliveryCount = 2;
149+
150+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
151+
.topic(topic)
152+
.subscriptionName("my-subscription")
153+
.subscriptionType(SubscriptionType.Exclusive)
154+
.enableRetry(true)
155+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
156+
.receiverQueueSize(100)
157+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
158+
.subscribe();
159+
160+
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
161+
.topic(topic)
162+
.create();
163+
164+
producer.send("Hello Pulsar".getBytes());
165+
producer.close();
166+
167+
// receive message and set delay to 5 seconds
168+
Message<byte[]> message = consumer.receive();
169+
long timestamp = System.currentTimeMillis();
170+
consumer.reconsumeLater(message, 4, TimeUnit.SECONDS);
171+
172+
// receive message and check the delay is at least 4 seconds
173+
consumer.receive();
174+
long delay = System.currentTimeMillis() - timestamp;
175+
assertTrue(delay >= 2000);
176+
consumer.close();
177+
}
178+
139179
@Data
140180
public static class Foo {
141181
@Nullable

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.pulsar.client.api.Consumer;
4949
import org.apache.pulsar.client.api.ConsumerBuilder;
5050
import org.apache.pulsar.client.api.ConsumerEventListener;
51+
import org.apache.pulsar.client.api.DeadLetterPolicy;
5152
import org.apache.pulsar.client.api.Message;
5253
import org.apache.pulsar.client.api.MessageId;
5354
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -792,6 +793,13 @@ private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
792793
}
793794

794795
protected SubType getSubType() {
796+
// For retry topic, we always use Shared subscription
797+
// Because we will produce delayed messages to retry topic.
798+
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
799+
if (deadLetterPolicy != null && topic.equals(deadLetterPolicy.getRetryLetterTopic())) {
800+
return SubType.Shared;
801+
}
802+
795803
SubscriptionType type = conf.getSubscriptionType();
796804
switch (type) {
797805
case Exclusive:

0 commit comments

Comments
 (0)