Skip to content

Commit 9d2c746

Browse files
sandeep-mstnodece
authored andcommitted
[improve][broker] Added synchronized for sendMessages in Non-Persistent message dispatchers (apache#24386)
(cherry picked from commit cb0fe5b)
1 parent 53cffca commit 9d2c746

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int
5252
}
5353

5454
@Override
55-
public void sendMessages(List<Entry> entries) {
55+
public synchronized void sendMessages(List<Entry> entries) {
5656
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
5757
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
5858
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
127127
};
128128

129129
@Override
130-
public void sendMessages(List<Entry> entries) {
130+
public synchronized void sendMessages(List<Entry> entries) {
131131
if (entries.isEmpty()) {
132132
return;
133133
}

0 commit comments

Comments
 (0)