Skip to content

Commit 73a4ae4

Browse files
authored
[fix][ml]Still got BK ledger, even though it has been deleted after offloaded (apache#24432)
1 parent 9810594 commit 73a4ae4

File tree

3 files changed

+155
-9
lines changed

3 files changed

+155
-9
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2980,7 +2980,15 @@ public void operationComplete(Void result, Stat stat) {
29802980
for (LedgerInfo ls : offloadedLedgersToDelete) {
29812981
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
29822982
ls.getSize());
2983-
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
2983+
invalidateReadHandle(ls.getLedgerId());
2984+
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()).thenAccept(__ -> {
2985+
log.info("[{}] Deleted and invalidated offloaded ledger {} from bookkeeper - size: {}",
2986+
name, ls.getLedgerId(), ls.getSize());
2987+
}).exceptionally(ex -> {
2988+
log.error("[{}] Failed to delete offloaded ledger {} from bookkeeper - size: {}",
2989+
name, ls.getLedgerId(), ls.getSize(), ex);
2990+
return null;
2991+
});
29842992
}
29852993
promise.complete(null);
29862994
}
@@ -3201,8 +3209,8 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
32013209
}
32023210
}
32033211

3204-
private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
3205-
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
3212+
private CompletableFuture<Void> asyncDeleteLedgerFromBookKeeper(long ledgerId) {
3213+
return asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
32063214
}
32073215

32083216
private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
@@ -3219,22 +3227,32 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
32193227
}
32203228
}
32213229

3222-
private void asyncDeleteLedger(long ledgerId, long retry) {
3223-
if (retry <= 0) {
3224-
log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId);
3225-
return;
3226-
}
3230+
private CompletableFuture<Void> asyncDeleteLedger(long ledgerId, long retry) {
3231+
CompletableFuture<Void> future = new CompletableFuture<>();
3232+
asyncDeleteLedgerWithRetry(future, ledgerId, retry);
3233+
return future;
3234+
}
3235+
3236+
private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future, long ledgerId, long retry) {
32273237
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
32283238
if (isNoSuchLedgerExistsException(rc)) {
32293239
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
3240+
future.complete(null);
32303241
} else if (rc != BKException.Code.OK) {
32313242
log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
3243+
if (retry <= 1) {
3244+
// The latest once of retry has failed
3245+
log.warn("[{}] Failed to delete ledger after retries {}, code: {}", name, ledgerId, rc);
3246+
future.completeExceptionally(BKException.create(rc));
3247+
return;
3248+
}
32323249
scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, retry - 1),
32333250
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
32343251
} else {
32353252
if (log.isDebugEnabled()) {
32363253
log.debug("[{}] Deleted ledger {}", name, ledgerId);
32373254
}
3255+
future.complete(null);
32383256
}
32393257
}, null);
32403258
}

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,27 @@
2626
import java.util.UUID;
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.CountDownLatch;
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.stream.Collectors;
3132

33+
import org.apache.bookkeeper.client.BKException;
3234
import org.apache.bookkeeper.client.api.ReadHandle;
35+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
36+
import org.apache.bookkeeper.mledger.Entry;
3337
import org.apache.bookkeeper.mledger.LedgerOffloader;
3438
import org.apache.bookkeeper.mledger.ManagedCursor;
3539
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
40+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
41+
import org.apache.bookkeeper.mledger.Position;
42+
import org.apache.bookkeeper.mledger.PositionFactory;
3643
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
3744
import org.apache.bookkeeper.mledger.util.MockClock;
3845
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
3946

4047
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
48+
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
49+
import org.awaitility.Awaitility;
4150
import org.mockito.Mockito;
4251
import org.slf4j.Logger;
4352
import org.slf4j.LoggerFactory;
@@ -207,6 +216,125 @@ public void testLaggedDelete() throws Exception {
207216
assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
208217
}
209218

219+
@Test
220+
public void testGetReadLedgerHandleAfterTrimOffloadedLedgers() throws Exception {
221+
// Create managed ledger.
222+
final long offloadThresholdSeconds = 5;
223+
final long offloadDeletionLagInSeconds = 1;
224+
OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
225+
ManagedLedgerConfig config = new ManagedLedgerConfig();
226+
config.setMaxEntriesPerLedger(10);
227+
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
228+
config.setRetentionTime(10, TimeUnit.MINUTES);
229+
config.setRetentionSizeInMB(10);
230+
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInSeconds * 1000);
231+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(offloadThresholdSeconds);
232+
offloader.getOffloadPolicies().setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
233+
config.setLedgerOffloader(offloader);
234+
ManagedLedgerImpl ml =
235+
(ManagedLedgerImpl)factory.open("testGetReadLedgerHandleAfterTrimOffloadedLedgers", config);
236+
ml.openCursor("c1");
237+
238+
// Write entries.
239+
int i = 0;
240+
for (; i < 35; i++) {
241+
String content = "entry-" + i;
242+
ml.addEntry(content.getBytes());
243+
}
244+
Assert.assertEquals(ml.getLedgersInfoAsList().size(), 4);
245+
long ledger1 = ml.getLedgersInfoAsList().get(0).getLedgerId();
246+
long ledger2 = ml.getLedgersInfoAsList().get(1).getLedgerId();
247+
long ledger3 = ml.getLedgersInfoAsList().get(2).getLedgerId();
248+
long ledger4 = ml.getLedgersInfoAsList().get(3).getLedgerId();
249+
250+
// Offload ledgers.
251+
Thread.sleep(offloadThresholdSeconds * 2 * 1000);
252+
CompletableFuture<Position> offloadFuture = new CompletableFuture<Position>();
253+
ml.maybeOffloadInBackground(offloadFuture);
254+
offloadFuture.join();
255+
256+
// Cache ledger handle.
257+
CountDownLatch readCountDownLatch = new CountDownLatch(4);
258+
AsyncCallbacks.ReadEntryCallback readCb = new AsyncCallbacks.ReadEntryCallback(){
259+
260+
@Override
261+
public void readEntryComplete(Entry entry, Object ctx) {
262+
readCountDownLatch.countDown();
263+
}
264+
265+
@Override
266+
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
267+
readCountDownLatch.countDown();
268+
}
269+
};
270+
ml.asyncReadEntry(PositionFactory.create(ledger1, 0), readCb, null);
271+
ml.asyncReadEntry(PositionFactory.create(ledger2, 0), readCb, null);
272+
ml.asyncReadEntry(PositionFactory.create(ledger3, 0), readCb, null);
273+
ml.asyncReadEntry(PositionFactory.create(ledger4, 0), readCb, null);
274+
readCountDownLatch.await();
275+
ReadHandle originalReadHandle4 = ml.getLedgerHandle(ledger4).join();
276+
277+
// Trim offloaded BK ledger handles.
278+
Thread.sleep(offloadDeletionLagInSeconds * 2 * 1000);
279+
CompletableFuture<Position> trimLedgerFuture = new CompletableFuture<Position>();
280+
ml.internalTrimLedgers(false, trimLedgerFuture);
281+
trimLedgerFuture.join();
282+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgerInfo(ledger1).get();
283+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgerInfo(ledger2).get();
284+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo3 = ml.getLedgerInfo(ledger3).get();
285+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo4 = ml.getLedgerInfo(ledger4).get();
286+
Assert.assertTrue(ledgerInfo1.hasOffloadContext() && ledgerInfo1.getOffloadContext().getBookkeeperDeleted());
287+
Assert.assertTrue(ledgerInfo2.hasOffloadContext() && ledgerInfo2.getOffloadContext().getBookkeeperDeleted());
288+
Assert.assertTrue(ledgerInfo3.hasOffloadContext() && ledgerInfo3.getOffloadContext().getBookkeeperDeleted());
289+
Assert.assertFalse(ledgerInfo4.hasOffloadContext() || ledgerInfo4.getOffloadContext().getBookkeeperDeleted());
290+
291+
Awaitility.await().untilAsserted(() -> {
292+
try {
293+
factory.getBookKeeper().get().openLedger(ledger3, ml.digestType, ml.config.getPassword());
294+
Assert.fail("Should fail: the ledger has been deleted");
295+
} catch (BKException.BKNoSuchLedgerExistsException ex) {
296+
// Expected.
297+
}
298+
try {
299+
factory.getBookKeeper().get().openLedger(ledger2, ml.digestType, ml.config.getPassword());
300+
Assert.fail("Should fail: the ledger has been deleted");
301+
} catch (BKException.BKNoSuchLedgerExistsException ex) {
302+
// Expected.
303+
}
304+
try {
305+
factory.getBookKeeper().get().openLedger(ledger1, ml.digestType, ml.config.getPassword());
306+
Assert.fail("Should fail: the ledger has been deleted");
307+
} catch (BKException.BKNoSuchLedgerExistsException ex) {
308+
// Expected.
309+
}
310+
});
311+
312+
// Verify: "ml.getLedgerHandle" returns a correct ledger handle.
313+
ReadHandle currentReadHandle4 = ml.getLedgerHandle(ledger4).join();
314+
Assert.assertEquals(currentReadHandle4, originalReadHandle4);
315+
try {
316+
ml.getLedgerHandle(ledger3).join();
317+
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
318+
} catch (Exception ex) {
319+
Assert.assertTrue(ex.getCause().getCause().getMessage()
320+
.contains("MockLedgerOffloader does not support read"));
321+
}
322+
try {
323+
ml.getLedgerHandle(ledger2).join();
324+
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
325+
} catch (Exception ex) {
326+
Assert.assertTrue(ex.getCause().getCause().getMessage()
327+
.contains("MockLedgerOffloader does not support read"));
328+
}
329+
try {
330+
ml.getLedgerHandle(ledger1).join();
331+
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
332+
} catch (Exception ex) {
333+
Assert.assertTrue(ex.getCause().getCause().getMessage()
334+
.contains("MockLedgerOffloader does not support read"));
335+
}
336+
}
337+
210338
@Test(timeOut = 5000)
211339
public void testFileSystemOffloadDeletePath() throws Exception {
212340
MockFileSystemLedgerOffloader offloader = new MockFileSystemLedgerOffloader();

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1293,7 +1293,7 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
12931293
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
12941294
Map<String, String> offloadDriverMetadata) {
12951295
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
1296-
promise.completeExceptionally(new UnsupportedOperationException());
1296+
promise.completeExceptionally(new UnsupportedOperationException("MockLedgerOffloader does not support read"));
12971297
return promise;
12981298
}
12991299

0 commit comments

Comments
 (0)