Skip to content

Commit 34cfd91

Browse files
thetumbledlhotari
authored andcommitted
[improve][client] PIP-393: Support configuring NegativeAckPrecisionBitCnt while building consumer. (#23804)
(cherry picked from commit f199e88)
1 parent 23ec604 commit 34cfd91

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,4 +545,51 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti
545545
consumer.close();
546546
admin.topics().deletePartitionedTopic("persistent://public/default/" + topic);
547547
}
548+
549+
@DataProvider(name = "negativeAckPrecisionBitCnt")
550+
public Object[][] negativeAckPrecisionBitCnt() {
551+
return new Object[][]{
552+
{1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}, {12}
553+
};
554+
}
555+
556+
/**
557+
* When negativeAckPrecisionBitCnt is greater than 0, the lower bits of the redelivery time will be truncated
558+
* to reduce the memory occupation. If set to k, the redelivery time will be bucketed by 2^k ms, resulting in
559+
* the redelivery time could be earlier(no later) than the expected time no more than 2^k ms.
560+
* @throws Exception if an error occurs
561+
*/
562+
@Test(dataProvider = "negativeAckPrecisionBitCnt")
563+
public void testConfigureNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt) throws Exception {
564+
String topic = BrokerTestUtil.newUniqueName("testConfigureNegativeAckPrecisionBitCnt");
565+
long timeDeviation = 1L << negativeAckPrecisionBitCnt;
566+
long delayInMs = 2000;
567+
568+
@Cleanup
569+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
570+
.topic(topic)
571+
.subscriptionName("sub1")
572+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
573+
.subscriptionType(SubscriptionType.Shared)
574+
.negativeAckRedeliveryDelay(delayInMs, TimeUnit.MILLISECONDS)
575+
.negativeAckRedeliveryDelayPrecision(negativeAckPrecisionBitCnt)
576+
.subscribe();
577+
578+
@Cleanup
579+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
580+
.topic(topic)
581+
.create();
582+
producer.sendAsync("test-0");
583+
producer.flush();
584+
585+
// receive the message and negative ack
586+
consumer.negativeAcknowledge(consumer.receive());
587+
long expectedTime = System.currentTimeMillis() + delayInMs;
588+
589+
// receive the redelivered message and calculate the time deviation
590+
// assert that the redelivery time is no earlier than the `expected time - timeDeviation`
591+
Message<String> msg1 = consumer.receive();
592+
assertTrue(System.currentTimeMillis() >= expectedTime - timeDeviation);
593+
assertNotNull(msg1);
594+
}
548595
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,19 @@ public interface ConsumerBuilder<T> extends Cloneable {
243243
*/
244244
ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);
245245

246+
/**
247+
* Sets the redelivery time precision bit count. The lower bits of the redelivery time will be
248+
* trimmed to reduce the memory occupation. The default value is 8, which means the redelivery time
249+
* will be bucketed by 256ms, the redelivery time could be earlier(no later) than the expected time,
250+
* but no more than 256ms. If set to k, the redelivery time will be bucketed by 2^k ms.
251+
* If the value is 0, the redelivery time will be accurate to ms.
252+
*
253+
* @param negativeAckPrecisionBitCnt
254+
* The redelivery time precision bit count.
255+
* @return the consumer builder instance
256+
*/
257+
ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int negativeAckPrecisionBitCount);
258+
246259
/**
247260
* Select the subscription type to be used when subscribing to a topic.
248261
*

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,13 @@ public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeU
281281
return this;
282282
}
283283

284+
@Override
285+
public ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int negativeAckPrecisionBitCount) {
286+
checkArgument(negativeAckPrecisionBitCount >= 0, "negativeAckPrecisionBitCount needs to be >= 0");
287+
conf.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCount);
288+
return this;
289+
}
290+
284291
@Override
285292
public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType subscriptionType) {
286293
conf.setSubscriptionType(subscriptionType);

0 commit comments

Comments
 (0)