Skip to content

Commit cb0fe5b

Browse files
authored
[improve][broker] Added synchronized for sendMessages in Non-Persistent message dispatchers (apache#24386)
1 parent 21406b5 commit cb0fe5b

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int
5656
}
5757

5858
@Override
59-
public void sendMessages(List<Entry> entries) {
59+
public synchronized void sendMessages(List<Entry> entries) {
6060
Consumer currentConsumer = getActiveConsumer();
6161
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
6262
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ protected Map<Consumer, List<Integer>> initialValue() throws Exception {
135135
};
136136

137137
@Override
138-
public void sendMessages(List<Entry> entries) {
138+
public synchronized void sendMessages(List<Entry> entries) {
139139
if (entries.isEmpty()) {
140140
return;
141141
}

0 commit comments

Comments
 (0)