Skip to content

Commit b013512

Browse files
committed
Revert PIP-322 changes in DispatchRateLimiter
1 parent 0719aaa commit b013512

File tree

3 files changed

+371
-38
lines changed

3 files changed

+371
-38
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import java.util.Optional;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.TimeUnit;
24+
import java.util.function.Supplier;
2425
import org.apache.pulsar.broker.ServiceConfiguration;
25-
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
26-
import org.apache.pulsar.broker.qos.AsyncTokenBucketBuilder;
2726
import org.apache.pulsar.broker.service.BrokerService;
2827
import org.apache.pulsar.common.naming.NamespaceName;
2928
import org.apache.pulsar.common.naming.TopicName;
3029
import org.apache.pulsar.common.policies.data.DispatchRate;
3130
import org.apache.pulsar.common.policies.data.Policies;
31+
import org.apache.pulsar.common.util.RateLimiter;
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

@@ -46,8 +46,8 @@ public enum Type {
4646
private final Type type;
4747

4848
private final BrokerService brokerService;
49-
private volatile AsyncTokenBucket dispatchRateLimiterOnMessage;
50-
private volatile AsyncTokenBucket dispatchRateLimiterOnByte;
49+
private volatile RateLimiter dispatchRateLimiterOnMessage;
50+
private volatile RateLimiter dispatchRateLimiterOnByte;
5151

5252
public DispatchRateLimiter(PersistentTopic topic, Type type) {
5353
this(topic, null, type);
@@ -77,9 +77,9 @@ public DispatchRateLimiter(BrokerService brokerService) {
7777
* @return
7878
*/
7979
public long getAvailableDispatchRateLimitOnMsg() {
80-
AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
80+
RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
8181
return localDispatchRateLimiterOnMessage == null ? -1 :
82-
Math.max(localDispatchRateLimiterOnMessage.getTokens(), 0);
82+
Math.max(localDispatchRateLimiterOnMessage.getAvailablePermits(), 0);
8383
}
8484

8585
/**
@@ -88,8 +88,9 @@ public long getAvailableDispatchRateLimitOnMsg() {
8888
* @return
8989
*/
9090
public long getAvailableDispatchRateLimitOnByte() {
91-
AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
92-
return localDispatchRateLimiterOnByte == null ? -1 : Math.max(localDispatchRateLimiterOnByte.getTokens(), 0);
91+
RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
92+
return localDispatchRateLimiterOnByte == null ? -1 :
93+
Math.max(localDispatchRateLimiterOnByte.getAvailablePermits(), 0);
9394
}
9495

9596
/**
@@ -99,13 +100,13 @@ public long getAvailableDispatchRateLimitOnByte() {
99100
* @param byteSize
100101
*/
101102
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
102-
AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
103+
RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
103104
if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) {
104-
localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
105+
localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
105106
}
106-
AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
107+
RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
107108
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
108-
localDispatchRateLimiterOnByte.consumeTokens(byteSize);
109+
localDispatchRateLimiterOnByte.tryAcquire(byteSize);
109110
}
110111
}
111112

@@ -221,50 +222,63 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
221222

222223
long msgRate = dispatchRate.getDispatchThrottlingRateInMsg();
223224
long byteRate = dispatchRate.getDispatchThrottlingRateInByte();
224-
long ratePeriodNanos = TimeUnit.SECONDS.toNanos(Math.max(dispatchRate.getRatePeriodInSecond(), 1));
225+
long ratePeriod = dispatchRate.getRatePeriodInSecond();
225226

227+
Supplier<Long> permitUpdaterMsg = dispatchRate.isRelativeToPublishRate()
228+
? () -> getRelativeDispatchRateInMsg(dispatchRate)
229+
: null;
226230
// update msg-rateLimiter
227231
if (msgRate > 0) {
228-
if (dispatchRate.isRelativeToPublishRate()) {
232+
if (this.dispatchRateLimiterOnMessage == null) {
229233
this.dispatchRateLimiterOnMessage =
230-
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
231-
.rateFunction(() -> getRelativeDispatchRateInMsg(dispatchRate))
232-
.ratePeriodNanosFunction(() -> ratePeriodNanos)
234+
RateLimiter.builder()
235+
.scheduledExecutorService(brokerService.pulsar().getExecutor())
236+
.permits(msgRate)
237+
.rateTime(ratePeriod)
238+
.timeUnit(TimeUnit.SECONDS)
239+
.permitUpdater(permitUpdaterMsg)
240+
.isDispatchOrPrecisePublishRateLimiter(true)
233241
.build();
234242
} else {
235-
this.dispatchRateLimiterOnMessage =
236-
configureAsyncTokenBucket(AsyncTokenBucket.builder())
237-
.rate(msgRate).ratePeriodNanos(ratePeriodNanos)
238-
.build();
243+
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.getRatePeriodInSecond(),
244+
TimeUnit.SECONDS, permitUpdaterMsg);
239245
}
240246
} else {
241-
this.dispatchRateLimiterOnMessage = null;
247+
// message-rate should be disable and close
248+
if (this.dispatchRateLimiterOnMessage != null) {
249+
this.dispatchRateLimiterOnMessage.close();
250+
this.dispatchRateLimiterOnMessage = null;
251+
}
242252
}
243253

254+
Supplier<Long> permitUpdaterByte = dispatchRate.isRelativeToPublishRate()
255+
? () -> getRelativeDispatchRateInByte(dispatchRate)
256+
: null;
244257
// update byte-rateLimiter
245258
if (byteRate > 0) {
246-
if (dispatchRate.isRelativeToPublishRate()) {
259+
if (this.dispatchRateLimiterOnByte == null) {
247260
this.dispatchRateLimiterOnByte =
248-
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
249-
.rateFunction(() -> getRelativeDispatchRateInByte(dispatchRate))
250-
.ratePeriodNanosFunction(() -> ratePeriodNanos)
261+
RateLimiter.builder()
262+
.scheduledExecutorService(brokerService.pulsar().getExecutor())
263+
.permits(byteRate)
264+
.rateTime(ratePeriod)
265+
.timeUnit(TimeUnit.SECONDS)
266+
.permitUpdater(permitUpdaterByte)
267+
.isDispatchOrPrecisePublishRateLimiter(true)
251268
.build();
252269
} else {
253-
this.dispatchRateLimiterOnByte =
254-
configureAsyncTokenBucket(AsyncTokenBucket.builder())
255-
.rate(byteRate).ratePeriodNanos(ratePeriodNanos)
256-
.build();
270+
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.getRatePeriodInSecond(),
271+
TimeUnit.SECONDS, permitUpdaterByte);
257272
}
258273
} else {
259-
this.dispatchRateLimiterOnByte = null;
274+
// message-rate should be disable and close
275+
if (this.dispatchRateLimiterOnByte != null) {
276+
this.dispatchRateLimiterOnByte.close();
277+
this.dispatchRateLimiterOnByte = null;
278+
}
260279
}
261280
}
262281

263-
private <T extends AsyncTokenBucketBuilder<T>> T configureAsyncTokenBucket(T builder) {
264-
builder.clock(brokerService.getPulsar().getMonotonicSnapshotClock());
265-
return builder;
266-
}
267-
268282
private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
269283
return (topic != null && dispatchRate != null)
270284
? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.getDispatchThrottlingRateInMsg()
@@ -283,7 +297,7 @@ private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
283297
* @return
284298
*/
285299
public long getDispatchRateOnMsg() {
286-
AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
300+
RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
287301
return localDispatchRateLimiterOnMessage != null ? localDispatchRateLimiterOnMessage.getRate() : -1;
288302
}
289303

@@ -293,7 +307,7 @@ public long getDispatchRateOnMsg() {
293307
* @return
294308
*/
295309
public long getDispatchRateOnByte() {
296-
AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
310+
RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
297311
return localDispatchRateLimiterOnByte != null ? localDispatchRateLimiterOnByte.getRate() : -1;
298312
}
299313

@@ -306,9 +320,11 @@ public static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
306320
public void close() {
307321
// close rate-limiter
308322
if (dispatchRateLimiterOnMessage != null) {
323+
dispatchRateLimiterOnMessage.close();
309324
dispatchRateLimiterOnMessage = null;
310325
}
311326
if (dispatchRateLimiterOnByte != null) {
327+
dispatchRateLimiterOnByte.close();
312328
dispatchRateLimiterOnByte = null;
313329
}
314330
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.util;
20+
21+
/**
22+
* Function use when rate limiter renew permit.
23+
* */
24+
public interface RateLimitFunction {
25+
void apply();
26+
}

0 commit comments

Comments
 (0)