Skip to content

Commit ce43873

Browse files
sandeep-mstsrinath-ctds
authored andcommitted
[improve][broker] Added synchronized for sendMessages in Non-Persistent message dispatchers (apache#24386)
(cherry picked from commit cb0fe5b) (cherry picked from commit 31bae69)
1 parent b1cf896 commit ce43873

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int
5252
}
5353

5454
@Override
55-
public void sendMessages(List<Entry> entries) {
55+
public synchronized void sendMessages(List<Entry> entries) {
5656
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
5757
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
5858
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
127127
};
128128

129129
@Override
130-
public void sendMessages(List<Entry> entries) {
130+
public synchronized void sendMessages(List<Entry> entries) {
131131
if (entries.isEmpty()) {
132132
return;
133133
}

0 commit comments

Comments
 (0)