Skip to content

Commit c7bc7f9

Browse files
poorbarcodeganesh-ctds
authored andcommitted
[improve][ml]Release idle offloaded read handle only the ref count is 0 (apache#24381)
(cherry picked from commit 21406b5) (cherry picked from commit 639521b)
1 parent e1cfe3f commit c7bc7f9

File tree

3 files changed

+31
-8
lines changed

3 files changed

+31
-8
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ public interface OffloadedLedgerHandle {
2626
default long lastAccessTimestamp() {
2727
return -1;
2828
}
29+
30+
default int getPendingRead() {
31+
return 0;
32+
}
2933
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2647,10 +2647,10 @@ List<Long> internalEvictOffloadedLedgers() {
26472647
ledgerCache.forEach((ledgerId, ledger) -> {
26482648
if (ledger.isDone() && !ledger.isCompletedExceptionally()) {
26492649
ReadHandle readHandle = ledger.join();
2650-
if (readHandle instanceof OffloadedLedgerHandle) {
2651-
long lastAccessTimestamp = ((OffloadedLedgerHandle) readHandle).lastAccessTimestamp();
2652-
if (lastAccessTimestamp >= 0) {
2653-
long delta = now - lastAccessTimestamp;
2650+
if (readHandle instanceof OffloadedLedgerHandle offloadedLedgerHandle) {
2651+
int pendingRead = offloadedLedgerHandle.getPendingRead();
2652+
if (pendingRead == 0) {
2653+
long delta = now - offloadedLedgerHandle.lastAccessTimestamp();
26542654
if (delta >= inactiveOffloadedLedgerEvictionTimeMs) {
26552655
log.info("[{}] Offloaded ledger {} can be released ({} ms elapsed since last access)",
26562656
name, ledgerId, delta);
@@ -2659,6 +2659,10 @@ List<Long> internalEvictOffloadedLedgers() {
26592659
log.debug("[{}] Offloaded ledger {} cannot be released ({} ms elapsed since last access)",
26602660
name, ledgerId, delta);
26612661
}
2662+
} else if (pendingRead < 0) {
2663+
log.error("[{}] Offloaded ledger {} went to a wrong state because its pending read is a"
2664+
+ " negative value {}. Please raise an issue to https://github.com/apache/pulsar", name,
2665+
ledgerId, pendingRead);
26622666
}
26632667
}
26642668
}

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.ExecutorService;
3030
import java.util.concurrent.ScheduledExecutorService;
3131
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3233
import java.util.concurrent.atomic.AtomicReference;
3334
import org.apache.bookkeeper.client.BKException;
3435
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -56,6 +57,9 @@
5657
public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedgerHandle {
5758
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
5859

60+
protected static final AtomicIntegerFieldUpdater<BlobStoreBackedReadHandleImpl> PENDING_READ_UPDATER =
61+
AtomicIntegerFieldUpdater.newUpdater(BlobStoreBackedReadHandleImpl.class, "pendingRead");
62+
5963
private final long ledgerId;
6064
private final OffloadIndexBlock index;
6165
private final BackedInputStream inputStream;
@@ -71,6 +75,8 @@ enum State {
7175

7276
private volatile State state = null;
7377

78+
private volatile int pendingRead;
79+
7480
private long lastAccessTimestamp = System.currentTimeMillis();
7581

7682
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
@@ -122,8 +128,17 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
122128
getId(), firstEntry, lastEntry, (1 + lastEntry - firstEntry));
123129
}
124130
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
131+
132+
// Ledger handles will be only marked idle when "pendingRead" is "0", it is not needed to update
133+
// "lastAccessTimestamp" if "pendingRead" is larger than "0".
134+
// Rather than update "lastAccessTimestamp" when starts a reading, updating it when a reading task is finished
135+
// is better.
136+
PENDING_READ_UPDATER.incrementAndGet(this);
137+
promise.whenComplete((__, ex) -> {
138+
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
139+
lastAccessTimestamp = System.currentTimeMillis();
140+
});
125141
executor.execute(() -> {
126-
touch();
127142
if (state == State.Closed) {
128143
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
129144
ledgerId, firstEntry, lastEntry);
@@ -212,7 +227,6 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
212227
}
213228

214229
private void seekToEntry(long nextExpectedId) throws IOException {
215-
touch();
216230
Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId);
217231
if (knownOffset != null) {
218232
inputStream.seek(knownOffset);
@@ -323,7 +337,8 @@ public long lastAccessTimestamp() {
323337
return lastAccessTimestamp;
324338
}
325339

326-
private void touch() {
327-
lastAccessTimestamp = System.currentTimeMillis();
340+
@Override
341+
public int getPendingRead() {
342+
return PENDING_READ_UPDATER.get(this);
328343
}
329344
}

0 commit comments

Comments
 (0)