Skip to content

Commit f7d35e5

Browse files
lhotarimerlimat
andauthored
[improve][meta] Fix invalid use of drain API and race condition in closing metadata store (#22585)
Co-authored-by: Matteo Merli <[email protected]>
1 parent c2702e9 commit f7d35e5

File tree

1 file changed

+25
-4
lines changed

1 file changed

+25
-4
lines changed

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,13 @@ public void close() throws Exception {
8686
// Fail all the pending items
8787
MetadataStoreException ex =
8888
new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed");
89-
readOps.drain(op -> op.getFuture().completeExceptionally(ex));
90-
writeOps.drain(op -> op.getFuture().completeExceptionally(ex));
91-
89+
MetadataOp op;
90+
while ((op = readOps.poll()) != null) {
91+
op.getFuture().completeExceptionally(ex);
92+
}
93+
while ((op = writeOps.poll()) != null) {
94+
op.getFuture().completeExceptionally(ex);
95+
}
9296
scheduledTask.cancel(true);
9397
}
9498
super.close();
@@ -98,7 +102,13 @@ public void close() throws Exception {
98102
private void flush() {
99103
while (!readOps.isEmpty()) {
100104
List<MetadataOp> ops = new ArrayList<>();
101-
readOps.drain(ops::add, maxOperations);
105+
for (int i = 0; i < maxOperations; i++) {
106+
MetadataOp op = readOps.poll();
107+
if (op == null) {
108+
break;
109+
}
110+
ops.add(op);
111+
}
102112
internalBatchOperation(ops);
103113
}
104114

@@ -167,6 +177,11 @@ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchroniz
167177
}
168178

169179
private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
180+
if (isClosed()) {
181+
MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException();
182+
op.getFuture().completeExceptionally(ex);
183+
return;
184+
}
170185
if (enabled) {
171186
if (!queue.offer(op)) {
172187
// Execute individually if we're failing to enqueue
@@ -182,6 +197,12 @@ private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
182197
}
183198

184199
private void internalBatchOperation(List<MetadataOp> ops) {
200+
if (isClosed()) {
201+
MetadataStoreException ex =
202+
new MetadataStoreException.AlreadyClosedException();
203+
ops.forEach(op -> op.getFuture().completeExceptionally(ex));
204+
return;
205+
}
185206
long now = System.currentTimeMillis();
186207
for (MetadataOp op : ops) {
187208
this.batchMetadataStoreStats.recordOpWaiting(now - op.created());

0 commit comments

Comments
 (0)