Skip to content

Commit 2d9ac9b

Browse files
poorbarcodelhotari
authored andcommitted
[fix][ml]Received more than once callback when calling cursor.delete (apache#24405)
(cherry picked from commit 72953bf)
1 parent 9883d92 commit 2d9ac9b

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,7 +2349,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
23492349
PositionImpl newMarkDeletePosition = null;
23502350

23512351
lock.writeLock().lock();
2352-
2352+
boolean skipMarkDeleteBecauseAckedNothing = false;
23532353
try {
23542354
if (log.isDebugEnabled()) {
23552355
log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
@@ -2418,6 +2418,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
24182418

24192419
if (individualDeletedMessages.isEmpty()) {
24202420
// No changes to individually deleted messages, so nothing to do at this point
2421+
skipMarkDeleteBecauseAckedNothing = true;
24212422
return;
24222423
}
24232424

@@ -2435,6 +2436,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
24352436

24362437
if (range == null) {
24372438
// The set was completely cleaned up now
2439+
skipMarkDeleteBecauseAckedNothing = true;
24382440
return;
24392441
}
24402442

@@ -2461,9 +2463,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
24612463
callback.deleteFailed(getManagedLedgerException(e), ctx);
24622464
return;
24632465
} finally {
2464-
boolean empty = individualDeletedMessages.isEmpty();
24652466
lock.writeLock().unlock();
2466-
if (empty) {
2467+
if (skipMarkDeleteBecauseAckedNothing) {
24672468
callback.deleteComplete(ctx);
24682469
}
24692470
}

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5233,5 +5233,33 @@ public void testDeleteBatchedMessageWithEmptyAckSet() throws Exception {
52335233
ml.delete();
52345234
}
52355235

5236+
@Test
5237+
public void testCallbackTimes() throws Exception {
5238+
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testCallbackTimes");
5239+
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1");
5240+
Position position1 = ml.addEntry(new byte[1]);
5241+
Position position2 = ml.addEntry(new byte[2]);
5242+
AtomicInteger executedCallbackTimes = new AtomicInteger();
5243+
cursor.asyncDelete(Arrays.asList(position1, position2), new DeleteCallback() {
5244+
@Override
5245+
public void deleteComplete(Object ctx) {
5246+
executedCallbackTimes.incrementAndGet();
5247+
}
5248+
5249+
@Override
5250+
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
5251+
executedCallbackTimes.incrementAndGet();
5252+
}
5253+
}, new Object());
5254+
// Verify that the executed count of callback is "1".
5255+
Awaitility.await().untilAsserted(() -> {
5256+
assertTrue(executedCallbackTimes.get() > 0);
5257+
});
5258+
Thread.sleep(2000);
5259+
assertEquals(executedCallbackTimes.get(), 1);
5260+
// cleanup.
5261+
ml.delete();
5262+
}
5263+
52365264
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
52375265
}

0 commit comments

Comments
 (0)