Skip to content

Commit 72953bf

Browse files
authored
[fix][ml]Received more than once callback when calling cursor.delete (apache#24405)
1 parent 7702bde commit 72953bf

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2361,7 +2361,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
23612361
Position newMarkDeletePosition = null;
23622362

23632363
lock.writeLock().lock();
2364-
2364+
boolean skipMarkDeleteBecauseAckedNothing = false;
23652365
try {
23662366
if (log.isDebugEnabled()) {
23672367
log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
@@ -2431,6 +2431,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
24312431

24322432
if (individualDeletedMessages.isEmpty()) {
24332433
// No changes to individually deleted messages, so nothing to do at this point
2434+
skipMarkDeleteBecauseAckedNothing = true;
24342435
return;
24352436
}
24362437

@@ -2448,6 +2449,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
24482449

24492450
if (range == null) {
24502451
// The set was completely cleaned up now
2452+
skipMarkDeleteBecauseAckedNothing = true;
24512453
return;
24522454
}
24532455

@@ -2474,9 +2476,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
24742476
callback.deleteFailed(getManagedLedgerException(e), ctx);
24752477
return;
24762478
} finally {
2477-
boolean empty = individualDeletedMessages.isEmpty();
24782479
lock.writeLock().unlock();
2479-
if (empty) {
2480+
if (skipMarkDeleteBecauseAckedNothing) {
24802481
callback.deleteComplete(ctx);
24812482
}
24822483
}

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5304,6 +5304,34 @@ public void testApplyMaxSizeCap() throws Exception {
53045304
ml.delete();
53055305
}
53065306

5307+
@Test
5308+
public void testCallbackTimes() throws Exception {
5309+
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testCallbackTimes");
5310+
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1");
5311+
Position position1 = ml.addEntry(new byte[1]);
5312+
Position position2 = ml.addEntry(new byte[2]);
5313+
AtomicInteger executedCallbackTimes = new AtomicInteger();
5314+
cursor.asyncDelete(Arrays.asList(position1, position2), new DeleteCallback() {
5315+
@Override
5316+
public void deleteComplete(Object ctx) {
5317+
executedCallbackTimes.incrementAndGet();
5318+
}
5319+
5320+
@Override
5321+
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
5322+
executedCallbackTimes.incrementAndGet();
5323+
}
5324+
}, new Object());
5325+
// Verify that the executed count of callback is "1".
5326+
Awaitility.await().untilAsserted(() -> {
5327+
assertTrue(executedCallbackTimes.get() > 0);
5328+
});
5329+
Thread.sleep(2000);
5330+
assertEquals(executedCallbackTimes.get(), 1);
5331+
// cleanup.
5332+
ml.delete();
5333+
}
5334+
53075335
@Test
53085336
void testForceCursorRecovery() throws Exception {
53095337
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);

0 commit comments

Comments
 (0)