Skip to content
This repository was archived by the owner on Apr 1, 2024. It is now read-only.

Commit 5e21470

Browse files
wangjialing218codelipenghui
authored andcommitted
fix the closed ledger did not delete after expired (apache#9136)
Fixes apache#9057 When current ledger closed, if there is no incoming traffic, the read position of the cursor is still point to the last entry of the closed ledger, that casue the `slowestReaderLedgerId` point to the closed ledger in `internalTrimConsumedLedgers()` and fail to delete the closed ledger. When close current ledger, if cursor's mark delete position point to the last entry of current ledger, move the read position to the new created ledger. add test case: testDeletionAfterLedgerClosedAndRetention() (cherry picked from commit 0e5c536)
1 parent 801ba85 commit 5e21470

File tree

3 files changed

+43
-15
lines changed

3 files changed

+43
-15
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,8 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
12591259
} else {
12601260
log.info("[{}] Created new ledger {}", name, lh.getId());
12611261
ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
1262+
final long previousEntries = currentLedgerEntries;
1263+
final long previousLedgerId = currentLedger.getId();
12621264
currentLedger = lh;
12631265
currentLedgerEntries = 0;
12641266
currentLedgerSize = 0;
@@ -1276,6 +1278,14 @@ public void operationComplete(Void v, Stat stat) {
12761278
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
12771279
TimeUnit.MILLISECONDS);
12781280
}
1281+
// Move cursor read point to new ledger
1282+
for (ManagedCursor cursor : cursors) {
1283+
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
1284+
if (markDeletedPosition.getLedgerId() == previousLedgerId && markDeletedPosition.getEntryId() + 1 >= previousEntries) {
1285+
// All entries in last ledger are marked delete, move read point to the new ledger
1286+
updateCursor((ManagedCursorImpl) cursor, PositionImpl.get(currentLedger.getId(), -1));
1287+
}
1288+
}
12791289
}
12801290

12811291
@Override

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@
3333
import static org.testng.Assert.assertSame;
3434
import static org.testng.Assert.assertTrue;
3535
import static org.testng.Assert.fail;
36-
3736
import com.google.common.base.Charsets;
3837
import com.google.common.collect.Sets;
39-
4038
import io.netty.buffer.ByteBufAllocator;
41-
39+
import io.netty.util.concurrent.DefaultThreadFactory;
4240
import java.lang.reflect.Field;
4341
import java.nio.charset.Charset;
4442
import java.security.GeneralSecurityException;
@@ -63,8 +61,6 @@
6361
import java.util.concurrent.atomic.AtomicInteger;
6462
import java.util.concurrent.atomic.AtomicReference;
6563
import java.util.function.Predicate;
66-
67-
import io.netty.util.concurrent.DefaultThreadFactory;
6864
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
6965
import org.apache.bookkeeper.client.BKException;
7066
import org.apache.bookkeeper.client.BookKeeper;
@@ -1828,6 +1824,34 @@ public void testDeletionAfterRetention() throws Exception {
18281824
ml.close();
18291825
}
18301826

1827+
@Test
1828+
public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
1829+
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
1830+
ManagedLedgerConfig config = new ManagedLedgerConfig();
1831+
config.setRetentionSizeInMB(0);
1832+
config.setMaxEntriesPerLedger(1);
1833+
config.setRetentionTime(1, TimeUnit.SECONDS);
1834+
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
1835+
1836+
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
1837+
ManagedCursor c1 = ml.openCursor("testCursor1");
1838+
ManagedCursor c2 = ml.openCursor("testCursor2");
1839+
ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
1840+
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
1841+
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
1842+
// let current ledger close
1843+
ml.rollCurrentLedgerIfFull();
1844+
// let retention expire
1845+
Thread.sleep(1500);
1846+
// delete the expired ledger
1847+
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
1848+
1849+
// the closed and expired ledger should be deleted
1850+
assertTrue(ml.getLedgersInfoAsList().size() <= 1);
1851+
assertEquals(ml.getTotalSize(), 0);
1852+
ml.close();
1853+
}
1854+
18311855
/**
18321856
* Set retention time = 0 and create a empty ledger,
18331857
* first position can't higher than last after trim ledgers.

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,17 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import java.util.concurrent.TimeUnit;
2122
import lombok.Cleanup;
2223
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
2324
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
24-
import org.apache.bookkeeper.mledger.util.Futures;
2525
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2626
import org.apache.pulsar.client.api.Consumer;
2727
import org.apache.pulsar.client.api.Message;
2828
import org.apache.pulsar.client.api.Producer;
2929
import org.junit.Test;
3030
import org.testng.Assert;
3131

32-
import java.util.concurrent.TimeUnit;
33-
3432
public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
3533
@Override
3634
protected void setup() throws Exception {
@@ -90,14 +88,10 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
9088
Assert.assertNotEquals(managedLedger.getCurrentLedgerSize(), 0);
9189

9290
// trigger a ledger rollover
93-
// and now we have two ledgers, one with expired data and one for empty
91+
// the last ledger will be closed and removed and we have one ledger for empty
9492
managedLedger.rollCurrentLedgerIfFull();
9593
Thread.sleep(1000);
96-
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
97-
98-
// trigger a ledger trimming
99-
// and now we only have the empty ledger
100-
managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
101-
Assert.assertEquals(managedLedger.getCurrentLedgerSize(), 0);
94+
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
95+
Assert.assertEquals(managedLedger.getTotalSize(), 0);
10296
}
10397
}

0 commit comments

Comments
 (0)