Skip to content

Commit 21406b5

Browse files
authored
[improve][ml]Release idle offloaded read handle only the ref count is 0 (apache#24381)
1 parent 37e160f commit 21406b5

File tree

3 files changed

+31
-9
lines changed

3 files changed

+31
-9
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ public interface OffloadedLedgerHandle {
2626
default long lastAccessTimestamp() {
2727
return -1;
2828
}
29+
30+
default int getPendingRead() {
31+
return 0;
32+
}
2933
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2751,10 +2751,10 @@ synchronized List<Long> internalEvictOffloadedLedgers() {
27512751
ledgerCache.forEach((ledgerId, ledger) -> {
27522752
if (ledger.isDone() && !ledger.isCompletedExceptionally()) {
27532753
ReadHandle readHandle = ledger.join();
2754-
if (readHandle instanceof OffloadedLedgerHandle) {
2755-
long lastAccessTimestamp = ((OffloadedLedgerHandle) readHandle).lastAccessTimestamp();
2756-
if (lastAccessTimestamp >= 0) {
2757-
long delta = now - lastAccessTimestamp;
2754+
if (readHandle instanceof OffloadedLedgerHandle offloadedLedgerHandle) {
2755+
int pendingRead = offloadedLedgerHandle.getPendingRead();
2756+
if (pendingRead == 0) {
2757+
long delta = now - offloadedLedgerHandle.lastAccessTimestamp();
27582758
if (delta >= inactiveOffloadedLedgerEvictionTimeMs) {
27592759
log.info("[{}] Offloaded ledger {} can be released ({} ms elapsed since last access)",
27602760
name, ledgerId, delta);
@@ -2764,6 +2764,10 @@ synchronized List<Long> internalEvictOffloadedLedgers() {
27642764
"[{}] Offloaded ledger {} cannot be released ({} ms elapsed since last access)",
27652765
name, ledgerId, delta);
27662766
}
2767+
} else if (pendingRead < 0) {
2768+
log.error("[{}] Offloaded ledger {} went to a wrong state because its pending read is a"
2769+
+ " negative value {}. Please raise an issue to https://github.com/apache/pulsar", name,
2770+
ledgerId, pendingRead);
27672771
}
27682772
}
27692773
}

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.ExecutorService;
3030
import java.util.concurrent.ScheduledExecutorService;
3131
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3233
import java.util.concurrent.atomic.AtomicReference;
3334
import org.apache.bookkeeper.client.BKException;
3435
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -56,6 +57,9 @@
5657
public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedgerHandle {
5758
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
5859

60+
protected static final AtomicIntegerFieldUpdater<BlobStoreBackedReadHandleImpl> PENDING_READ_UPDATER =
61+
AtomicIntegerFieldUpdater.newUpdater(BlobStoreBackedReadHandleImpl.class, "pendingRead");
62+
5963
private final long ledgerId;
6064
private final OffloadIndexBlock index;
6165
private final BackedInputStream inputStream;
@@ -71,6 +75,8 @@ enum State {
7175

7276
private volatile State state = null;
7377

78+
private volatile int pendingRead;
79+
7480
private volatile long lastAccessTimestamp = System.currentTimeMillis();
7581

7682
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
@@ -122,9 +128,17 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
122128
getId(), firstEntry, lastEntry, (1 + lastEntry - firstEntry));
123129
}
124130
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
125-
touch();
131+
132+
// Ledger handles will be only marked idle when "pendingRead" is "0", it is not needed to update
133+
// "lastAccessTimestamp" if "pendingRead" is larger than "0".
134+
// Rather than update "lastAccessTimestamp" when starts a reading, updating it when a reading task is finished
135+
// is better.
136+
PENDING_READ_UPDATER.incrementAndGet(this);
137+
promise.whenComplete((__, ex) -> {
138+
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
139+
lastAccessTimestamp = System.currentTimeMillis();
140+
});
126141
executor.execute(() -> {
127-
touch();
128142
if (state == State.Closed) {
129143
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
130144
ledgerId, firstEntry, lastEntry);
@@ -213,7 +227,6 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
213227
}
214228

215229
private void seekToEntry(long nextExpectedId) throws IOException {
216-
touch();
217230
Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId);
218231
if (knownOffset != null) {
219232
inputStream.seek(knownOffset);
@@ -324,7 +337,8 @@ public long lastAccessTimestamp() {
324337
return lastAccessTimestamp;
325338
}
326339

327-
private void touch() {
328-
lastAccessTimestamp = System.currentTimeMillis();
340+
@Override
341+
public int getPendingRead() {
342+
return PENDING_READ_UPDATER.get(this);
329343
}
330344
}

0 commit comments

Comments
 (0)