Skip to content

Commit 9c0e4cf

Browse files
committed
Add tokens once in every rate period so that PIP-322 dispatch rate limiter behaves in a similar way as previous implementation
1 parent b5b690d commit 9c0e4cf

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiterAsyncTokenBucketImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
112112
if (msgRate > 0) {
113113
if (dispatchRate.isRelativeToPublishRate()) {
114114
this.dispatchRateLimiterOnMessage =
115-
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
115+
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate(), ratePeriodNanos)
116116
.rateFunction(() -> getRelativeDispatchRateInMsg(dispatchRate))
117117
.ratePeriodNanosFunction(() -> ratePeriodNanos)
118118
.build();
119119
} else {
120120
this.dispatchRateLimiterOnMessage =
121-
configureAsyncTokenBucket(AsyncTokenBucket.builder())
121+
configureAsyncTokenBucket(AsyncTokenBucket.builder(), ratePeriodNanos)
122122
.rate(msgRate).ratePeriodNanos(ratePeriodNanos)
123123
.build();
124124
}
@@ -130,13 +130,13 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
130130
if (byteRate > 0) {
131131
if (dispatchRate.isRelativeToPublishRate()) {
132132
this.dispatchRateLimiterOnByte =
133-
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
133+
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate(), ratePeriodNanos)
134134
.rateFunction(() -> getRelativeDispatchRateInByte(dispatchRate))
135135
.ratePeriodNanosFunction(() -> ratePeriodNanos)
136136
.build();
137137
} else {
138138
this.dispatchRateLimiterOnByte =
139-
configureAsyncTokenBucket(AsyncTokenBucket.builder())
139+
configureAsyncTokenBucket(AsyncTokenBucket.builder(), ratePeriodNanos)
140140
.rate(byteRate).ratePeriodNanos(ratePeriodNanos)
141141
.build();
142142
}
@@ -145,8 +145,13 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
145145
}
146146
}
147147

148-
private <T extends AsyncTokenBucketBuilder<T>> T configureAsyncTokenBucket(T builder) {
148+
private <T extends AsyncTokenBucketBuilder<T>> T configureAsyncTokenBucket(T builder,
149+
long addTokensResolutionNanos) {
149150
builder.clock(brokerService.getPulsar().getMonotonicClock());
151+
// configures tokens to be added once in every addTokensResolutionNanos
152+
// this makes AsyncTokenBucket behave in the similar way as the "classic" dispatch rate limiter implementation
153+
// which uses a scheduled task to add tokens to the rate limiter once in every ratePeriod
154+
builder.addTokensResolutionNanos(addTokensResolutionNanos);
150155
return builder;
151156
}
152157

0 commit comments

Comments
 (0)