Skip to content

Commit 92f292c

Browse files
poorbarcodeganesh-ctds
authored andcommitted
[fix][ml]Still got BK ledger, even though it has been deleted after offloaded (apache#24432)
(cherry picked from commit 73a4ae4) (cherry picked from commit e046212)
1 parent 2f7f47f commit 92f292c

File tree

3 files changed

+154
-13
lines changed

3 files changed

+154
-13
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2869,7 +2869,15 @@ public void operationComplete(Void result, Stat stat) {
28692869
for (LedgerInfo ls : offloadedLedgersToDelete) {
28702870
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
28712871
ls.getSize());
2872-
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
2872+
invalidateReadHandle(ls.getLedgerId());
2873+
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()).thenAccept(__ -> {
2874+
log.info("[{}] Deleted and invalidated offloaded ledger {} from bookkeeper - size: {}",
2875+
name, ls.getLedgerId(), ls.getSize());
2876+
}).exceptionally(ex -> {
2877+
log.error("[{}] Failed to delete offloaded ledger {} from bookkeeper - size: {}",
2878+
name, ls.getLedgerId(), ls.getSize(), ex);
2879+
return null;
2880+
});
28732881
}
28742882
promise.complete(null);
28752883
}
@@ -3090,8 +3098,8 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
30903098
}
30913099
}
30923100

3093-
private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
3094-
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
3101+
private CompletableFuture<Void> asyncDeleteLedgerFromBookKeeper(long ledgerId) {
3102+
return asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
30953103
}
30963104

30973105
private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
@@ -3108,22 +3116,32 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
31083116
}
31093117
}
31103118

3111-
private void asyncDeleteLedger(long ledgerId, long retry) {
3112-
if (retry <= 0) {
3113-
log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId);
3114-
return;
3115-
}
3119+
private CompletableFuture<Void> asyncDeleteLedger(long ledgerId, long retry) {
3120+
CompletableFuture<Void> future = new CompletableFuture<>();
3121+
asyncDeleteLedgerWithRetry(future, ledgerId, retry);
3122+
return future;
3123+
}
3124+
3125+
private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future, long ledgerId, long retry) {
31163126
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
31173127
if (isNoSuchLedgerExistsException(rc)) {
31183128
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
3129+
future.complete(null);
31193130
} else if (rc != BKException.Code.OK) {
31203131
log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
3132+
if (retry <= 1) {
3133+
// The latest once of retry has failed
3134+
log.warn("[{}] Failed to delete ledger after retries {}, code: {}", name, ledgerId, rc);
3135+
future.completeExceptionally(BKException.create(rc));
3136+
return;
3137+
}
31213138
scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, retry - 1),
31223139
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
31233140
} else {
31243141
if (log.isDebugEnabled()) {
31253142
log.debug("[{}] Deleted ledger {}", name, ledgerId);
31263143
}
3144+
future.complete(null);
31273145
}
31283146
}, null);
31293147
}

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,33 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
22-
2322
import java.util.Map;
2423
import java.util.Optional;
2524
import java.util.Set;
2625
import java.util.UUID;
2726
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.CountDownLatch;
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.stream.Collectors;
31-
31+
import org.apache.bookkeeper.client.BKException;
3232
import org.apache.bookkeeper.client.api.ReadHandle;
33+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
34+
import org.apache.bookkeeper.mledger.Entry;
3335
import org.apache.bookkeeper.mledger.LedgerOffloader;
3436
import org.apache.bookkeeper.mledger.ManagedCursor;
3537
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
38+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
39+
import org.apache.bookkeeper.mledger.Position;
3640
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
3741
import org.apache.bookkeeper.mledger.util.MockClock;
3842
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
39-
4043
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
44+
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
45+
import org.awaitility.Awaitility;
4146
import org.mockito.Mockito;
4247
import org.slf4j.Logger;
4348
import org.slf4j.LoggerFactory;
44-
4549
import org.testng.Assert;
4650
import org.testng.annotations.Test;
4751

@@ -207,6 +211,125 @@ public void testLaggedDelete() throws Exception {
207211
assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
208212
}
209213

214+
@Test
215+
public void testGetReadLedgerHandleAfterTrimOffloadedLedgers() throws Exception {
216+
// Create managed ledger.
217+
final long offloadThresholdSeconds = 5;
218+
final long offloadDeletionLagInSeconds = 1;
219+
OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
220+
ManagedLedgerConfig config = new ManagedLedgerConfig();
221+
config.setMaxEntriesPerLedger(10);
222+
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
223+
config.setRetentionTime(10, TimeUnit.MINUTES);
224+
config.setRetentionSizeInMB(10);
225+
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInSeconds * 1000);
226+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(offloadThresholdSeconds);
227+
offloader.getOffloadPolicies().setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
228+
config.setLedgerOffloader(offloader);
229+
ManagedLedgerImpl ml =
230+
(ManagedLedgerImpl)factory.open("testGetReadLedgerHandleAfterTrimOffloadedLedgers", config);
231+
ml.openCursor("c1");
232+
233+
// Write entries.
234+
int i = 0;
235+
for (; i < 35; i++) {
236+
String content = "entry-" + i;
237+
ml.addEntry(content.getBytes());
238+
}
239+
Assert.assertEquals(ml.getLedgersInfoAsList().size(), 4);
240+
long ledger1 = ml.getLedgersInfoAsList().get(0).getLedgerId();
241+
long ledger2 = ml.getLedgersInfoAsList().get(1).getLedgerId();
242+
long ledger3 = ml.getLedgersInfoAsList().get(2).getLedgerId();
243+
long ledger4 = ml.getLedgersInfoAsList().get(3).getLedgerId();
244+
245+
// Offload ledgers.
246+
Thread.sleep(offloadThresholdSeconds * 2 * 1000);
247+
CompletableFuture<PositionImpl> offloadFuture = new CompletableFuture<PositionImpl>();
248+
ml.maybeOffloadInBackground(offloadFuture);
249+
offloadFuture.join();
250+
251+
// Cache ledger handle.
252+
CountDownLatch readCountDownLatch = new CountDownLatch(4);
253+
AsyncCallbacks.ReadEntryCallback readCb = new AsyncCallbacks.ReadEntryCallback(){
254+
255+
@Override
256+
public void readEntryComplete(Entry entry, Object ctx) {
257+
readCountDownLatch.countDown();
258+
}
259+
260+
@Override
261+
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
262+
readCountDownLatch.countDown();
263+
}
264+
};
265+
ml.asyncReadEntry(PositionImpl.get(ledger1, 0), readCb, null);
266+
ml.asyncReadEntry(PositionImpl.get(ledger2, 0), readCb, null);
267+
ml.asyncReadEntry(PositionImpl.get(ledger3, 0), readCb, null);
268+
ml.asyncReadEntry(PositionImpl.get(ledger4, 0), readCb, null);
269+
readCountDownLatch.await();
270+
ReadHandle originalReadHandle4 = ml.getLedgerHandle(ledger4).join();
271+
272+
// Trim offloaded BK ledger handles.
273+
Thread.sleep(offloadDeletionLagInSeconds * 2 * 1000);
274+
CompletableFuture<Position> trimLedgerFuture = new CompletableFuture<Position>();
275+
ml.internalTrimLedgers(false, trimLedgerFuture);
276+
trimLedgerFuture.join();
277+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgerInfo(ledger1).get();
278+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgerInfo(ledger2).get();
279+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo3 = ml.getLedgerInfo(ledger3).get();
280+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo4 = ml.getLedgerInfo(ledger4).get();
281+
Assert.assertTrue(ledgerInfo1.hasOffloadContext() && ledgerInfo1.getOffloadContext().getBookkeeperDeleted());
282+
Assert.assertTrue(ledgerInfo2.hasOffloadContext() && ledgerInfo2.getOffloadContext().getBookkeeperDeleted());
283+
Assert.assertTrue(ledgerInfo3.hasOffloadContext() && ledgerInfo3.getOffloadContext().getBookkeeperDeleted());
284+
Assert.assertFalse(ledgerInfo4.hasOffloadContext() || ledgerInfo4.getOffloadContext().getBookkeeperDeleted());
285+
286+
Awaitility.await().untilAsserted(() -> {
287+
try {
288+
factory.getBookKeeper().get().openLedger(ledger3, ml.digestType, ml.config.getPassword());
289+
Assert.fail("Should fail: the ledger has been deleted");
290+
} catch (BKException.BKNoSuchLedgerExistsException ex) {
291+
// Expected.
292+
}
293+
try {
294+
factory.getBookKeeper().get().openLedger(ledger2, ml.digestType, ml.config.getPassword());
295+
Assert.fail("Should fail: the ledger has been deleted");
296+
} catch (BKException.BKNoSuchLedgerExistsException ex) {
297+
// Expected.
298+
}
299+
try {
300+
factory.getBookKeeper().get().openLedger(ledger1, ml.digestType, ml.config.getPassword());
301+
Assert.fail("Should fail: the ledger has been deleted");
302+
} catch (BKException.BKNoSuchLedgerExistsException ex) {
303+
// Expected.
304+
}
305+
});
306+
307+
// Verify: "ml.getLedgerHandle" returns a correct ledger handle.
308+
ReadHandle currentReadHandle4 = ml.getLedgerHandle(ledger4).join();
309+
Assert.assertEquals(currentReadHandle4, originalReadHandle4);
310+
try {
311+
ml.getLedgerHandle(ledger3).join();
312+
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
313+
} catch (Exception ex) {
314+
Assert.assertTrue(ex.getCause().getCause().getMessage()
315+
.contains("MockLedgerOffloader does not support read"));
316+
}
317+
try {
318+
ml.getLedgerHandle(ledger2).join();
319+
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
320+
} catch (Exception ex) {
321+
Assert.assertTrue(ex.getCause().getCause().getMessage()
322+
.contains("MockLedgerOffloader does not support read"));
323+
}
324+
try {
325+
ml.getLedgerHandle(ledger1).join();
326+
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
327+
} catch (Exception ex) {
328+
Assert.assertTrue(ex.getCause().getCause().getMessage()
329+
.contains("MockLedgerOffloader does not support read"));
330+
}
331+
}
332+
210333
@Test(timeOut = 5000)
211334
public void testFileSystemOffloadDeletePath() throws Exception {
212335
MockFileSystemLedgerOffloader offloader = new MockFileSystemLedgerOffloader();

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1292,7 +1292,7 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
12921292
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
12931293
Map<String, String> offloadDriverMetadata) {
12941294
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
1295-
promise.completeExceptionally(new UnsupportedOperationException());
1295+
promise.completeExceptionally(new UnsupportedOperationException("MockLedgerOffloader does not support read"));
12961296
return promise;
12971297
}
12981298

0 commit comments

Comments
 (0)