Skip to content

Commit a602428

Browse files
committed
Backport DispatchRateLimiterOverconsumingTest to branch-3.0
1 parent 49ca5d6 commit a602428

File tree

1 file changed

+219
-0
lines changed

1 file changed

+219
-0
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.SoftAssertions.assertSoftly;
23+
import static org.testng.Assert.assertEquals;
24+
import java.time.Duration;
25+
import java.util.ArrayList;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.concurrent.atomic.AtomicLong;
33+
import java.util.stream.IntStream;
34+
import lombok.Cleanup;
35+
import lombok.extern.slf4j.Slf4j;
36+
import org.apache.pulsar.client.api.Consumer;
37+
import org.apache.pulsar.client.api.Message;
38+
import org.apache.pulsar.client.api.MessageListener;
39+
import org.apache.pulsar.client.api.Producer;
40+
import org.apache.pulsar.client.api.Schema;
41+
import org.apache.pulsar.client.api.SubscriptionType;
42+
import org.assertj.core.data.Percentage;
43+
import org.awaitility.Awaitility;
44+
import org.testng.annotations.AfterMethod;
45+
import org.testng.annotations.BeforeMethod;
46+
import org.testng.annotations.Test;
47+
48+
@Slf4j
49+
@Test(groups = "broker")
50+
public class DispatchRateLimiterOverconsumingTest extends BrokerTestBase {
51+
@BeforeMethod
52+
@Override
53+
protected void setup() throws Exception {
54+
super.baseSetup();
55+
}
56+
57+
@Override
58+
protected void doInitConf() throws Exception {
59+
super.doInitConf();
60+
// simplify testing by enabling throttling on non-backlog consumers
61+
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
62+
// set the dispatch max read batch size to 1 to stress the rate limiting behavior more effectively
63+
conf.setDispatcherMaxReadBatchSize(1);
64+
// avoid dispatching messages in a separate thread to simplify testing and reduce variance
65+
conf.setDispatcherDispatchMessagesInSubscriptionThread(false);
66+
}
67+
68+
@AfterMethod(alwaysRun = true)
69+
@Override
70+
protected void cleanup() throws Exception {
71+
super.internalCleanup();
72+
}
73+
74+
/**
75+
* This test verifies the broker dispatch rate limiting behavior with multiple concurrent consumers.
76+
* Reproduces issue "with a huge spike in a traffic consume is stuck for a long time" mentioned in
77+
* issue https://github.com/apache/pulsar/issues/24001 and prevents future regressions.
78+
*/
79+
@Test
80+
public void testOverconsumingTokensWithBrokerDispatchRateLimiter() throws Exception {
81+
int rateInMsg = 50;
82+
int durationSeconds = 5;
83+
int numberOfConsumers = 20;
84+
int numberOfMessages = rateInMsg * durationSeconds;
85+
86+
// configure dispatch throttling rate
87+
BrokerService brokerService = pulsar.getBrokerService();
88+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInMsg", String.valueOf(rateInMsg));
89+
Awaitility.await().untilAsserted(() ->
90+
assertEquals(brokerService.getBrokerDispatchRateLimiter()
91+
.getAvailableDispatchRateLimitOnMsg(), rateInMsg));
92+
assertEquals(brokerService.getBrokerDispatchRateLimiter().getDispatchRateOnByte(), -1L);
93+
94+
final String topicName = "persistent://" + newTopicName();
95+
96+
// state for calculating message rate
97+
AtomicLong startTimeNanos = new AtomicLong();
98+
AtomicLong lastReceivedMessageTimeNanos = new AtomicLong();
99+
AtomicInteger totalMessagesReceived = new AtomicInteger();
100+
AtomicInteger currentSecondMessagesCount = new AtomicInteger();
101+
AtomicInteger lastCalculatedSecond = new AtomicInteger(0);
102+
List<Integer> collectedRatesForEachSecond = Collections.synchronizedList(new ArrayList<>());
103+
104+
// track actual consuming rate of messages per second
105+
Runnable rateTracker = () -> {
106+
long startTime = startTimeNanos.get();
107+
if (startTime == 0) {
108+
return;
109+
}
110+
long durationNanos = System.nanoTime() - startTime;
111+
int elapsedFullSeconds = (int) (durationNanos / 1e9);
112+
if (elapsedFullSeconds > 0 && lastCalculatedSecond.compareAndSet(elapsedFullSeconds - 1,
113+
elapsedFullSeconds)) {
114+
int messagesCountForPreviousSecond = currentSecondMessagesCount.getAndSet(0);
115+
log.info("Rate for second {}: {} msg/s {}", elapsedFullSeconds, messagesCountForPreviousSecond, TimeUnit.NANOSECONDS.toMillis(durationNanos));
116+
collectedRatesForEachSecond.add(messagesCountForPreviousSecond);
117+
}
118+
};
119+
@Cleanup("shutdownNow")
120+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
121+
executor.scheduleAtFixedRate(rateTracker, 0, 500, TimeUnit.MILLISECONDS);
122+
123+
// message listener implementation used for all consumers
124+
MessageListener<Integer> messageListener = new MessageListener<>() {
125+
@Override
126+
public void received(Consumer<Integer> consumer, Message<Integer> msg) {
127+
lastReceivedMessageTimeNanos.set(System.nanoTime());
128+
currentSecondMessagesCount.incrementAndGet();
129+
totalMessagesReceived.incrementAndGet();
130+
consumer.acknowledgeAsync(msg);
131+
}
132+
};
133+
134+
// create consumers using a shared subscription called "sub"
135+
List<Consumer<Integer>> consumerList = IntStream.range(0, numberOfConsumers)
136+
.mapToObj(i -> {
137+
try {
138+
return pulsarClient.newConsumer(Schema.INT32)
139+
.topic(topicName)
140+
.consumerName("consumer-" + (i + 1))
141+
.subscriptionType(SubscriptionType.Shared)
142+
.subscriptionName("sub")
143+
.receiverQueueSize(10)
144+
.messageListener(messageListener)
145+
// start paused so that there's a backlog when consumers are resumed
146+
.startPaused(true)
147+
.subscribe();
148+
} catch (Exception e) {
149+
throw new RuntimeException(e);
150+
}
151+
}).toList();
152+
// handle consumer cleanup when the test completes
153+
@Cleanup
154+
AutoCloseable consumerCloser = () -> {
155+
consumerList.forEach(c -> {
156+
try {
157+
c.close();
158+
} catch (Exception e) {
159+
// ignore
160+
}
161+
});
162+
};
163+
164+
@Cleanup
165+
Producer<Integer> producer =
166+
pulsarClient.newProducer(Schema.INT32).enableBatching(false).topic(topicName).create();
167+
// send messages
168+
IntStream.range(0, numberOfMessages).forEach(i -> {
169+
try {
170+
int messageNumber = i + 1;
171+
producer.sendAsync(messageNumber).exceptionally(e -> {
172+
log.error("Failed to send message #{}", messageNumber, e);
173+
return null;
174+
});
175+
} catch (Exception e) {
176+
throw new RuntimeException(e);
177+
}
178+
});
179+
log.info("Waiting for messages to be sent");
180+
// wait until the messages are sent
181+
producer.flush();
182+
183+
// resume the consumers
184+
log.info("Resuming consumers");
185+
startTimeNanos.set(System.nanoTime());
186+
consumerList.forEach(Consumer::resume);
187+
188+
// wait for results
189+
Awaitility.await()
190+
.atMost(Duration.ofSeconds(durationSeconds * 2))
191+
.pollInterval(Duration.ofMillis(100))
192+
.untilAsserted(() -> assertThat(totalMessagesReceived).hasValue(numberOfMessages));
193+
List<Integer> collectedRatesSnapshot = new ArrayList<>(collectedRatesForEachSecond);
194+
int messagesCountForPreviousSecond = currentSecondMessagesCount.getAndSet(0);
195+
if (messagesCountForPreviousSecond > 0) {
196+
collectedRatesSnapshot.add(messagesCountForPreviousSecond);
197+
}
198+
log.info("Collected rates for each second: {}", collectedRatesSnapshot);
199+
long avgMsgRate =
200+
totalMessagesReceived.get() / TimeUnit.NANOSECONDS.toSeconds(
201+
lastReceivedMessageTimeNanos.get() - startTimeNanos.get());
202+
log.info("Average rate during the test run: {} msg/s", avgMsgRate);
203+
204+
assertSoftly(softly -> {
205+
// check the rate during the test run
206+
softly.assertThat(avgMsgRate)
207+
.describedAs("average rate during the test run")
208+
// allow rate in 40% range
209+
.isCloseTo(rateInMsg, Percentage.withPercentage(40));
210+
211+
// check that rates were collected
212+
softly.assertThat(collectedRatesSnapshot)
213+
.describedAs("actual rates for each second")
214+
.size().isGreaterThanOrEqualTo(durationSeconds).returnToIterable()
215+
// allow rate in 10% range
216+
.allSatisfy(rate -> assertThat(rate).isCloseTo(rateInMsg, Percentage.withPercentage(10)));
217+
});
218+
}
219+
}

0 commit comments

Comments
 (0)