Skip to content

Commit 088ca04

Browse files
committed
Add similar test case for PublishRateLimiter
1 parent e819c04 commit 088ca04

File tree

1 file changed

+264
-0
lines changed

1 file changed

+264
-0
lines changed
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.SoftAssertions.assertSoftly;
23+
import static org.testng.Assert.assertEquals;
24+
import static org.testng.Assert.assertNull;
25+
import java.time.Duration;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.ScheduledFuture;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.concurrent.atomic.AtomicLong;
35+
import java.util.stream.IntStream;
36+
import lombok.Cleanup;
37+
import lombok.extern.slf4j.Slf4j;
38+
import org.apache.pulsar.client.api.Consumer;
39+
import org.apache.pulsar.client.api.Message;
40+
import org.apache.pulsar.client.api.MessageListener;
41+
import org.apache.pulsar.client.api.Producer;
42+
import org.apache.pulsar.client.api.PulsarClient;
43+
import org.apache.pulsar.client.api.PulsarClientException;
44+
import org.apache.pulsar.client.api.Schema;
45+
import org.apache.pulsar.client.api.SubscriptionType;
46+
import org.assertj.core.data.Percentage;
47+
import org.awaitility.Awaitility;
48+
import org.testng.annotations.AfterMethod;
49+
import org.testng.annotations.BeforeMethod;
50+
import org.testng.annotations.Test;
51+
52+
@Slf4j
53+
@Test(groups = "broker")
54+
public class PublishRateLimiterOverconsumingTest extends BrokerTestBase {
55+
@BeforeMethod
56+
@Override
57+
protected void setup() throws Exception {
58+
super.baseSetup();
59+
}
60+
61+
@Override
62+
protected void doInitConf() throws Exception {
63+
super.doInitConf();
64+
}
65+
66+
@AfterMethod(alwaysRun = true)
67+
@Override
68+
protected void cleanup() throws Exception {
69+
super.internalCleanup();
70+
}
71+
72+
/**
73+
* This test verifies the broker publish rate limiting behavior with multiple concurrent publishers.
74+
* This reproduces the issue https://github.com/apache/pulsar/issues/23920 and prevents future regressions.
75+
*/
76+
@Test
77+
public void testOverconsumingTokensWithBrokerPublishRateLimiter() throws Exception {
78+
int rateInMsg = 500;
79+
int durationSeconds = 5;
80+
int numberOfConsumers = 5;
81+
int numberOfProducersWithIndependentClients = 5;
82+
int numberOfMessagesForEachProducer = (rateInMsg * (durationSeconds + 1)) / 5;
83+
84+
// configure dispatch throttling rate
85+
BrokerService brokerService = pulsar.getBrokerService();
86+
admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxMessageRate", String.valueOf(rateInMsg));
87+
Awaitility.await().untilAsserted(() -> {
88+
PublishRateLimiterImpl publishRateLimiter =
89+
(PublishRateLimiterImpl) brokerService.getBrokerPublishRateLimiter();
90+
assertEquals(publishRateLimiter.getTokenBucketOnMessage().getRate(), rateInMsg);
91+
assertNull(publishRateLimiter.getTokenBucketOnByte());
92+
});
93+
94+
final String topicName = "persistent://" + newTopicName();
95+
96+
// state for calculating message rate
97+
AtomicLong startTimeNanos = new AtomicLong();
98+
AtomicLong lastReceivedMessageTimeNanos = new AtomicLong();
99+
AtomicInteger totalMessagesReceived = new AtomicInteger();
100+
AtomicInteger currentSecondMessagesCount = new AtomicInteger();
101+
AtomicInteger lastCalculatedSecond = new AtomicInteger(0);
102+
List<Integer> collectedRatesForEachSecond = Collections.synchronizedList(new ArrayList<>());
103+
104+
// track actual consuming rate of messages per second
105+
Runnable rateTracker = () -> {
106+
long startTime = startTimeNanos.get();
107+
if (startTime == 0) {
108+
startTimeNanos.compareAndSet(0, System.nanoTime());
109+
startTime = startTimeNanos.get();
110+
}
111+
long durationNanos = System.nanoTime() - startTime;
112+
int elapsedFullSeconds = (int) (durationNanos / 1e9);
113+
if (elapsedFullSeconds > 0 && lastCalculatedSecond.compareAndSet(elapsedFullSeconds - 1,
114+
elapsedFullSeconds)) {
115+
int messagesCountForPreviousSecond = currentSecondMessagesCount.getAndSet(0);
116+
log.info("Rate for second {}: {} msg/s {}", elapsedFullSeconds, messagesCountForPreviousSecond, TimeUnit.NANOSECONDS.toMillis(durationNanos));
117+
collectedRatesForEachSecond.add(messagesCountForPreviousSecond);
118+
}
119+
};
120+
@Cleanup("shutdownNow")
121+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
122+
ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(rateTracker, 0, 500, TimeUnit.MILLISECONDS);
123+
124+
// message listener implementation used for all consumers
125+
MessageListener<Integer> messageListener = new MessageListener<>() {
126+
@Override
127+
public void received(Consumer<Integer> consumer, Message<Integer> msg) {
128+
lastReceivedMessageTimeNanos.set(System.nanoTime());
129+
currentSecondMessagesCount.incrementAndGet();
130+
totalMessagesReceived.incrementAndGet();
131+
consumer.acknowledgeAsync(msg);
132+
}
133+
};
134+
135+
// create consumers using a shared subscription called "sub"
136+
List<Consumer<Integer>> consumerList = IntStream.range(0, numberOfConsumers)
137+
.mapToObj(i -> {
138+
try {
139+
return pulsarClient.newConsumer(Schema.INT32)
140+
.topic(topicName)
141+
.consumerName("consumer-" + (i + 1))
142+
.subscriptionType(SubscriptionType.Shared)
143+
.subscriptionName("sub")
144+
.messageListener(messageListener)
145+
.subscribe();
146+
} catch (Exception e) {
147+
throw new RuntimeException(e);
148+
}
149+
}).toList();
150+
// handle consumer cleanup when the test completes
151+
@Cleanup
152+
AutoCloseable consumerCloser = () -> {
153+
consumerList.forEach(c -> {
154+
try {
155+
c.close();
156+
} catch (Exception e) {
157+
// ignore
158+
}
159+
});
160+
};
161+
162+
// create independent clients for producers so that they don't get blocked by throttling
163+
List<PulsarClient> producerClients = IntStream.range(0, numberOfProducersWithIndependentClients)
164+
.mapToObj(i -> {
165+
try {
166+
return PulsarClient.builder()
167+
.serviceUrl(pulsar.getBrokerServiceUrl())
168+
.ioThreads(1)
169+
.statsInterval(0, TimeUnit.SECONDS)
170+
.connectionsPerBroker(1)
171+
.build();
172+
} catch (Exception e) {
173+
throw new RuntimeException(e);
174+
}
175+
}).toList();
176+
@Cleanup
177+
AutoCloseable producerClientsCloser = () -> {
178+
producerClients.forEach(c -> {
179+
try {
180+
c.close();
181+
} catch (Exception e) {
182+
// ignore
183+
}
184+
});
185+
};
186+
187+
List<Producer<Integer>> producers = IntStream.range(0, numberOfProducersWithIndependentClients)
188+
.mapToObj(i -> {
189+
try {
190+
return producerClients.get(i)
191+
.newProducer(Schema.INT32).enableBatching(true)
192+
.producerName("producer-" + (i + 1))
193+
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
194+
.batchingMaxMessages(numberOfMessagesForEachProducer / 100)
195+
.topic(topicName).create();
196+
} catch (PulsarClientException e) {
197+
throw new RuntimeException(e);
198+
}
199+
}).toList();
200+
201+
// send messages
202+
producers.forEach(producer -> {
203+
IntStream.range(0, numberOfMessagesForEachProducer).forEach(i -> {
204+
try {
205+
int messageNumber = i + 1;
206+
producer.sendAsync(messageNumber).exceptionally(e -> {
207+
log.error("Failed to send message #{}", messageNumber, e);
208+
return null;
209+
});
210+
} catch (Exception e) {
211+
throw new RuntimeException(e);
212+
}
213+
});
214+
producer.flushAsync().whenComplete((r, e) -> {
215+
if (e != null) {
216+
log.error("Failed to flush producer", e);
217+
} else {
218+
log.info("Producer {} flushed", producer.getProducerName());
219+
}
220+
});
221+
});
222+
223+
@Cleanup
224+
AutoCloseable producersClose = () -> {
225+
producers.forEach(p -> {
226+
try {
227+
p.close();
228+
} catch (Exception e) {
229+
// ignore
230+
}
231+
});
232+
};
233+
234+
// wait for results
235+
Awaitility.await()
236+
.atMost(Duration.ofSeconds(durationSeconds * 2))
237+
.pollInterval(Duration.ofMillis(100))
238+
.untilAsserted(
239+
() -> assertThat(collectedRatesForEachSecond).hasSizeGreaterThanOrEqualTo(durationSeconds));
240+
List<Integer> collectedRatesSnapshot = new ArrayList<>(collectedRatesForEachSecond);
241+
log.info("Collected rates for each second: {}", collectedRatesSnapshot);
242+
long avgMsgRate =
243+
totalMessagesReceived.get() / TimeUnit.NANOSECONDS.toSeconds(
244+
lastReceivedMessageTimeNanos.get() - startTimeNanos.get());
245+
log.info("Average rate during the test run: {} msg/s", avgMsgRate);
246+
247+
assertSoftly(softly -> {
248+
// check the rate during the test run
249+
softly.assertThat(avgMsgRate).describedAs("average rate during the test run")
250+
// allow rate in 40% range
251+
.isCloseTo(rateInMsg, Percentage.withPercentage(40));
252+
253+
// check that rates were collected
254+
softly.assertThat(collectedRatesSnapshot).describedAs("actual rates for each second")
255+
.allSatisfy(rates -> {
256+
assertThat(rates).describedAs("actual rate for each second")
257+
.isCloseTo(rateInMsg, Percentage.withPercentage(50));
258+
});
259+
});
260+
scheduledFuture.cancel(true);
261+
producersClose.close();
262+
consumerCloser.close();
263+
}
264+
}

0 commit comments

Comments
 (0)