Skip to content

Commit dd4e289

Browse files
dao-junlhotari
authored andcommitted
[improve][broker] Optimize subscription seek (cursor reset) by timestamp (apache#22792)
Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 2eb4eab)
1 parent 63fe42a commit dd4e289

File tree

10 files changed

+708
-29
lines changed

10 files changed

+708
-29
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,31 @@ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
658658
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
659659
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
660660

661+
662+
/**
663+
* Find the newest entry that matches the given predicate.
664+
*
665+
* @param constraint
666+
* search only active entries or all entries
667+
* @param condition
668+
* predicate that reads an entry an applies a condition
669+
* @param callback
670+
* callback object returning the resultant position
671+
* @param startPosition
672+
* start position to search from.
673+
* @param endPosition
674+
* end position to search to.
675+
* @param ctx
676+
* opaque context
677+
* @param isFindFromLedger
678+
* find the newest entry from ledger
679+
*/
680+
default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
681+
PositionImpl startPosition, PositionImpl endPosition, FindEntryCallback callback,
682+
Object ctx, boolean isFindFromLedger) {
683+
asyncFindNewestMatching(constraint, condition, callback, ctx, isFindFromLedger);
684+
}
685+
661686
/**
662687
* reset the cursor to specified position to enable replay of messages.
663688
*

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222
import java.util.Map;
23+
import java.util.NavigableMap;
2324
import java.util.Optional;
2425
import java.util.concurrent.CompletableFuture;
2526
import java.util.function.Predicate;
@@ -691,4 +692,9 @@ default void skipNonRecoverableLedger(long ledgerId){}
691692
* Check if managed ledger should cache backlog reads.
692693
*/
693694
void checkCursorsToCacheEntries();
695+
696+
/**
697+
* Get all the managed ledgers.
698+
*/
699+
NavigableMap<Long, LedgerInfo> getLedgersInfo();
694700
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,28 +1280,56 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
12801280

12811281
@Override
12821282
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1283-
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
1284-
OpFindNewest op;
1285-
PositionImpl startPosition = null;
1286-
long max = 0;
1283+
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
1284+
asyncFindNewestMatching(constraint, condition, null, null, callback, ctx,
1285+
isFindFromLedger);
1286+
}
1287+
1288+
1289+
@Override
1290+
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1291+
PositionImpl start, PositionImpl end, FindEntryCallback callback,
1292+
Object ctx, boolean isFindFromLedger) {
1293+
PositionImpl startPosition;
12871294
switch (constraint) {
1288-
case SearchAllAvailableEntries:
1289-
startPosition = (PositionImpl) getFirstPosition();
1290-
max = ledger.getNumberOfEntries() - 1;
1291-
break;
1292-
case SearchActiveEntries:
1293-
startPosition = ledger.getNextValidPosition(markDeletePosition);
1294-
max = getNumberOfEntriesInStorage();
1295-
break;
1296-
default:
1297-
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
1298-
return;
1295+
case SearchAllAvailableEntries ->
1296+
startPosition = start == null ? (PositionImpl) getFirstPosition() : start;
1297+
case SearchActiveEntries -> {
1298+
if (start == null) {
1299+
startPosition = ledger.getNextValidPosition(markDeletePosition);
1300+
} else {
1301+
startPosition = start;
1302+
startPosition = startPosition.compareTo(markDeletePosition) <= 0
1303+
? ledger.getNextValidPosition(startPosition) : startPosition;
1304+
}
1305+
}
1306+
default -> {
1307+
callback.findEntryFailed(
1308+
new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
1309+
return;
1310+
}
12991311
}
1312+
// startPosition can't be null, should never go here.
13001313
if (startPosition == null) {
13011314
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
13021315
Optional.empty(), ctx);
13031316
return;
13041317
}
1318+
// Calculate the end position
1319+
PositionImpl endPosition = end == null ? ledger.lastConfirmedEntry : end;
1320+
endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? ledger.lastConfirmedEntry : endPosition;
1321+
// Calculate the number of entries between the startPosition and endPosition
1322+
long max = 0;
1323+
if (startPosition.compareTo(endPosition) <= 0) {
1324+
max = ledger.getNumberOfEntries(Range.closed(startPosition, endPosition));
1325+
}
1326+
1327+
if (max <= 0) {
1328+
callback.findEntryComplete(null, ctx);
1329+
return;
1330+
}
1331+
1332+
OpFindNewest op;
13051333
if (isFindFromLedger) {
13061334
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
13071335
} else {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3883,6 +3883,7 @@ public List<LedgerInfo> getLedgersInfoAsList() {
38833883
return Lists.newArrayList(ledgers.values());
38843884
}
38853885

3886+
@Override
38863887
public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
38873888
return ledgers;
38883889
}

0 commit comments

Comments
 (0)