Skip to content

Commit d8e2dab

Browse files
thetumbledlhotari
authored andcommitted
[fix][client] fix retry topic with exclusive mode. (#23859)
(cherry picked from commit 5a59ab7)
1 parent bcbbbd3 commit d8e2dab

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,46 @@ public void testRetryTopic() throws Exception {
136136
checkConsumer.close();
137137
}
138138

139+
/**
140+
* Retry topic feature relies on the delay queue feature when consumer produce a delayed message
141+
* to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type.
142+
* As a result, the subscription type of the retry topic should be shared or key-shared.
143+
* @throws Exception
144+
*/
145+
@Test
146+
public void testRetryTopicWithExclusiveMode() throws Exception {
147+
final String topic = "persistent://my-property/my-ns/retry-topic-exclusive";
148+
final int maxRedeliveryCount = 2;
149+
150+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
151+
.topic(topic)
152+
.subscriptionName("my-subscription")
153+
.subscriptionType(SubscriptionType.Exclusive)
154+
.enableRetry(true)
155+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
156+
.receiverQueueSize(100)
157+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
158+
.subscribe();
159+
160+
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
161+
.topic(topic)
162+
.create();
163+
164+
producer.send("Hello Pulsar".getBytes());
165+
producer.close();
166+
167+
// receive message and set delay to 5 seconds
168+
Message<byte[]> message = consumer.receive();
169+
long timestamp = System.currentTimeMillis();
170+
consumer.reconsumeLater(message, 4, TimeUnit.SECONDS);
171+
172+
// receive message and check the delay is at least 4 seconds
173+
consumer.receive();
174+
long delay = System.currentTimeMillis() - timestamp;
175+
assertTrue(delay >= 2000);
176+
consumer.close();
177+
}
178+
139179
@Data
140180
public static class Foo {
141181
@Nullable

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.pulsar.client.api.Consumer;
4848
import org.apache.pulsar.client.api.ConsumerBuilder;
4949
import org.apache.pulsar.client.api.ConsumerEventListener;
50+
import org.apache.pulsar.client.api.DeadLetterPolicy;
5051
import org.apache.pulsar.client.api.Message;
5152
import org.apache.pulsar.client.api.MessageId;
5253
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -787,6 +788,13 @@ private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
787788
}
788789

789790
protected SubType getSubType() {
791+
// For retry topic, we always use Shared subscription
792+
// Because we will produce delayed messages to retry topic.
793+
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
794+
if (deadLetterPolicy != null && topic.equals(deadLetterPolicy.getRetryLetterTopic())) {
795+
return SubType.Shared;
796+
}
797+
790798
SubscriptionType type = conf.getSubscriptionType();
791799
switch (type) {
792800
case Exclusive:

0 commit comments

Comments
 (0)