Skip to content

Commit 56233ea

Browse files
lhotarimerlimat
andcommitted
[fix][broker] PIP-322 Fix issue with rate limiters where rates can exceed limits initially and consumption pauses until token balance is positive (apache#24012)
Co-authored-by: Matteo Merli <[email protected]> (cherry picked from commit e547bea)
1 parent d152559 commit 56233ea

File tree

52 files changed

+2025
-1121
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2025
-1121
lines changed

conf/broker.conf

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ messageExpiryCheckIntervalInMinutes=5
243243
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
244244
activeConsumerFailoverDelayTimeMillis=1000
245245

246-
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
246+
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
247247
# For non-partitioned topics, consistent hashing is used by default.
248248
activeConsumerFailoverConsistentHashing=false
249249

@@ -419,6 +419,11 @@ subscribeThrottlingRatePerConsumer=0
419419
# Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
420420
subscribeRatePeriodPerConsumerInSecond=30
421421

422+
# The class name of the factory that creates DispatchRateLimiter implementations. Current options are
423+
# org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket (default, PIP-322 implementation)
424+
# and org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic (legacy implementation)
425+
dispatchRateLimiterFactoryClassName=org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket
426+
422427
# Default messages per second dispatch throttling-limit for whole broker. Using a value of 0, is disabling default
423428
# message dispatch-throttling
424429
dispatchThrottlingRateInMsg=0

conf/standalone.conf

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ maxMessageSizeCheckIntervalInSeconds=60
152152
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
153153
activeConsumerFailoverDelayTimeMillis=1000
154154

155-
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
155+
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
156156
# For non-partitioned topics, consistent hashing is used by default.
157157
activeConsumerFailoverConsistentHashing=false
158158

@@ -276,6 +276,11 @@ brokerPublisherThrottlingMaxMessageRate=0
276276
# (Disable byte rate limit with value 0)
277277
brokerPublisherThrottlingMaxByteRate=0
278278

279+
# The class name of the factory that creates DispatchRateLimiter implementations. Current options are
280+
# org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket (default, PIP-322 implementation)
281+
# and org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic (legacy implementation)
282+
dispatchRateLimiterFactoryClassName=org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket
283+
279284
# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
280285
# message dispatch-throttling
281286
dispatchThrottlingRatePerTopicInMsg=0

microbench/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,12 @@ Checking what benchmarks match the pattern:
6767
java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp
6868
```
6969

70+
Profiling benchmarks with [async-profiler](https://github.com/async-profiler/async-profiler):
71+
72+
```shell
73+
# example of profiling with async-profiler
74+
# download async-profiler from https://github.com/async-profiler/async-profiler/releases
75+
LIBASYNCPROFILER_PATH=$HOME/async-profiler/lib/libasyncProfiler.dylib
76+
java -jar microbench/target/microbenchmarks.jar -prof async:libPath=$LIBASYNCPROFILER_PATH\;output=flamegraph\;dir=profile-results ".*BenchmarkName.*"
77+
```
78+

microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,22 @@
3030
import org.openjdk.jmh.annotations.Scope;
3131
import org.openjdk.jmh.annotations.Setup;
3232
import org.openjdk.jmh.annotations.State;
33-
import org.openjdk.jmh.annotations.TearDown;
3433
import org.openjdk.jmh.annotations.Threads;
3534
import org.openjdk.jmh.annotations.Warmup;
3635
import org.openjdk.jmh.infra.Blackhole;
3736

37+
/**
38+
* On MacOS, the performance of System.nanoTime() is not great. Running benchmarks on Linux is recommended due
39+
* to the bottleneck of System.nanoTime() implementation on MacOS.
40+
*/
3841
@Fork(3)
3942
@BenchmarkMode(Mode.Throughput)
4043
@OutputTimeUnit(TimeUnit.SECONDS)
4144
@State(Scope.Thread)
4245
public class AsyncTokenBucketBenchmark {
4346
private AsyncTokenBucket asyncTokenBucket;
44-
private DefaultMonotonicSnapshotClock monotonicSnapshotClock =
45-
new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(8), System::nanoTime);
47+
private DefaultMonotonicClock monotonicSnapshotClock =
48+
new DefaultMonotonicClock();
4649

4750
@Setup(Level.Iteration)
4851
public void setup() {
@@ -51,11 +54,6 @@ public void setup() {
5154
.initialTokens(2 * ratePerSecond).capacity(2 * ratePerSecond).build();
5255
}
5356

54-
@TearDown(Level.Iteration)
55-
public void teardown() {
56-
monotonicSnapshotClock.close();
57-
}
58-
5957
@Threads(1)
6058
@Benchmark
6159
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -23,80 +23,48 @@
2323
import org.openjdk.jmh.annotations.Benchmark;
2424
import org.openjdk.jmh.annotations.BenchmarkMode;
2525
import org.openjdk.jmh.annotations.Fork;
26-
import org.openjdk.jmh.annotations.Level;
2726
import org.openjdk.jmh.annotations.Measurement;
2827
import org.openjdk.jmh.annotations.Mode;
2928
import org.openjdk.jmh.annotations.OutputTimeUnit;
3029
import org.openjdk.jmh.annotations.Scope;
3130
import org.openjdk.jmh.annotations.State;
32-
import org.openjdk.jmh.annotations.TearDown;
3331
import org.openjdk.jmh.annotations.Threads;
3432
import org.openjdk.jmh.annotations.Warmup;
3533
import org.openjdk.jmh.infra.Blackhole;
3634

35+
/**
36+
* On MacOS, the performance of System.nanoTime() is not great. Running benchmarks on Linux is recommended due
37+
* to the bottleneck of System.nanoTime() implementation on MacOS.
38+
*/
3739
@Fork(3)
3840
@BenchmarkMode(Mode.Throughput)
3941
@OutputTimeUnit(TimeUnit.SECONDS)
4042
@State(Scope.Thread)
41-
public class DefaultMonotonicSnapshotClockBenchmark {
42-
private DefaultMonotonicSnapshotClock monotonicSnapshotClock =
43-
new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), System::nanoTime);
44-
45-
@TearDown(Level.Iteration)
46-
public void teardown() {
47-
monotonicSnapshotClock.close();
48-
}
43+
public class DefaultMonotonicClockBenchmark {
44+
private DefaultMonotonicClock monotonicSnapshotClock =
45+
new DefaultMonotonicClock();
4946

5047
@Threads(1)
5148
@Benchmark
5249
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
5350
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
5451
public void getTickNanos001Threads(Blackhole blackhole) {
55-
consumeTokenAndGetTokens(blackhole, false);
52+
blackhole.consume(monotonicSnapshotClock.getTickNanos());
5653
}
5754

5855
@Threads(10)
5956
@Benchmark
6057
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
6158
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
6259
public void getTickNanos010Threads(Blackhole blackhole) {
63-
consumeTokenAndGetTokens(blackhole, false);
60+
blackhole.consume(monotonicSnapshotClock.getTickNanos());
6461
}
6562

6663
@Threads(100)
6764
@Benchmark
6865
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
6966
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
7067
public void getTickNanos100Threads(Blackhole blackhole) {
71-
consumeTokenAndGetTokens(blackhole, false);
72-
}
73-
74-
@Threads(1)
75-
@Benchmark
76-
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
77-
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
78-
public void getTickNanosRequestSnapshot001Threads(Blackhole blackhole) {
79-
consumeTokenAndGetTokens(blackhole, true);
80-
}
81-
82-
@Threads(10)
83-
@Benchmark
84-
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
85-
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
86-
public void getTickNanosRequestSnapshot010Threads(Blackhole blackhole) {
87-
consumeTokenAndGetTokens(blackhole, true);
88-
}
89-
90-
@Threads(100)
91-
@Benchmark
92-
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
93-
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
94-
public void getTickNanosRequestSnapshot100Threads(Blackhole blackhole) {
95-
consumeTokenAndGetTokens(blackhole, true);
96-
}
97-
98-
private void consumeTokenAndGetTokens(Blackhole blackhole, boolean requestSnapshot) {
99-
// blackhole is used to ensure that the compiler doesn't do dead code elimination
100-
blackhole.consume(monotonicSnapshotClock.getTickNanos(requestSnapshot));
68+
blackhole.consume(monotonicSnapshotClock.getTickNanos());
10169
}
10270
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
10421042
doc = "Enable precise rate limit for topic publish"
10431043
)
10441044
private boolean preciseTopicPublishRateLimiterEnable = false;
1045+
10451046
@FieldContext(
10461047
category = CATEGORY_SERVER,
10471048
dynamic = true,
@@ -1064,6 +1065,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
10641065
+ "when broker publish rate limiting enabled. (Disable byte rate limit with value 0)"
10651066
)
10661067
private long brokerPublisherThrottlingMaxByteRate = 0;
1068+
1069+
@FieldContext(category = CATEGORY_SERVER, doc =
1070+
"The class name of the factory that creates DispatchRateLimiter implementations. Current options are "
1071+
+ "org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket "
1072+
+ "(default, PIP-322 implementation) "
1073+
+ "org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic (legacy "
1074+
+ "implementation)")
1075+
private String dispatchRateLimiterFactoryClassName =
1076+
"org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket";
1077+
10671078
@FieldContext(
10681079
category = CATEGORY_SERVER,
10691080
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@
9696
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
9797
import org.apache.pulsar.broker.namespace.NamespaceService;
9898
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
99-
import org.apache.pulsar.broker.qos.DefaultMonotonicSnapshotClock;
100-
import org.apache.pulsar.broker.qos.MonotonicSnapshotClock;
99+
import org.apache.pulsar.broker.qos.DefaultMonotonicClock;
100+
import org.apache.pulsar.broker.qos.MonotonicClock;
101101
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
102102
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
103103
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
@@ -300,7 +300,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
300300

301301
private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
302302
private final ExecutorProvider transactionExecutorProvider;
303-
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
303+
private final MonotonicClock monotonicClock;
304304
private String brokerId;
305305
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
306306
private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();
@@ -395,8 +395,11 @@ public PulsarService(ServiceConfiguration config,
395395
// here in the constructor we don't have the offloader scheduler yet
396396
this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
397397

398-
this.monotonicSnapshotClock = new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(
399-
DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS), System::nanoTime);
398+
this.monotonicClock = createMonotonicClock(config);
399+
}
400+
401+
protected MonotonicClock createMonotonicClock(ServiceConfiguration config) {
402+
return new DefaultMonotonicClock();
400403
}
401404

402405
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
@@ -701,7 +704,9 @@ public CompletableFuture<Void> closeAsync() {
701704
brokerClientSharedScheduledExecutorProvider.shutdownNow();
702705
brokerClientSharedLookupExecutorProvider.shutdownNow();
703706
brokerClientSharedTimer.stop();
704-
monotonicSnapshotClock.close();
707+
if (monotonicClock instanceof AutoCloseable c) {
708+
c.close();
709+
}
705710

706711
if (openTelemetryTransactionPendingAckStoreStats != null) {
707712
openTelemetryTransactionPendingAckStoreStats.close();
@@ -1982,8 +1987,8 @@ public Optional<Integer> getBrokerListenPortTls() {
19821987
return brokerService.getListenPortTls();
19831988
}
19841989

1985-
public MonotonicSnapshotClock getMonotonicSnapshotClock() {
1986-
return monotonicSnapshotClock;
1990+
public MonotonicClock getMonotonicClock() {
1991+
return monotonicClock;
19871992
}
19881993

19891994
public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig,

0 commit comments

Comments
 (0)