Skip to content

Commit 00ee67d

Browse files
dao-junlhotari
authored andcommitted
[improve][broker] Optimize subscription seek (cursor reset) by timestamp (#22792)
Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 2eb4eab)
1 parent 7f977d7 commit 00ee67d

File tree

10 files changed

+716
-39
lines changed

10 files changed

+716
-39
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,31 @@ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
658658
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
659659
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
660660

661+
662+
/**
663+
* Find the newest entry that matches the given predicate.
664+
*
665+
* @param constraint
666+
* search only active entries or all entries
667+
* @param condition
668+
* predicate that reads an entry an applies a condition
669+
* @param callback
670+
* callback object returning the resultant position
671+
* @param startPosition
672+
* start position to search from.
673+
* @param endPosition
674+
* end position to search to.
675+
* @param ctx
676+
* opaque context
677+
* @param isFindFromLedger
678+
* find the newest entry from ledger
679+
*/
680+
default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
681+
PositionImpl startPosition, PositionImpl endPosition, FindEntryCallback callback,
682+
Object ctx, boolean isFindFromLedger) {
683+
asyncFindNewestMatching(constraint, condition, callback, ctx, isFindFromLedger);
684+
}
685+
661686
/**
662687
* reset the cursor to specified position to enable replay of messages.
663688
*

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222
import java.util.Map;
23+
import java.util.NavigableMap;
2324
import java.util.Optional;
2425
import java.util.concurrent.CompletableFuture;
2526
import java.util.function.Predicate;
@@ -696,4 +697,9 @@ default void skipNonRecoverableLedger(long ledgerId){}
696697
* Check if managed ledger should cache backlog reads.
697698
*/
698699
void checkCursorsToCacheEntries();
700+
701+
/**
702+
* Get all the managed ledgers.
703+
*/
704+
NavigableMap<Long, LedgerInfo> getLedgersInfo();
699705
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,28 +1288,56 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
12881288

12891289
@Override
12901290
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1291-
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
1292-
OpFindNewest op;
1293-
PositionImpl startPosition = null;
1294-
long max = 0;
1291+
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
1292+
asyncFindNewestMatching(constraint, condition, null, null, callback, ctx,
1293+
isFindFromLedger);
1294+
}
1295+
1296+
1297+
@Override
1298+
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1299+
PositionImpl start, PositionImpl end, FindEntryCallback callback,
1300+
Object ctx, boolean isFindFromLedger) {
1301+
PositionImpl startPosition;
12951302
switch (constraint) {
1296-
case SearchAllAvailableEntries:
1297-
startPosition = (PositionImpl) getFirstPosition();
1298-
max = ledger.getNumberOfEntries() - 1;
1299-
break;
1300-
case SearchActiveEntries:
1301-
startPosition = ledger.getNextValidPosition(markDeletePosition);
1302-
max = getNumberOfEntriesInStorage();
1303-
break;
1304-
default:
1305-
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
1306-
return;
1303+
case SearchAllAvailableEntries ->
1304+
startPosition = start == null ? (PositionImpl) getFirstPosition() : start;
1305+
case SearchActiveEntries -> {
1306+
if (start == null) {
1307+
startPosition = ledger.getNextValidPosition(markDeletePosition);
1308+
} else {
1309+
startPosition = start;
1310+
startPosition = startPosition.compareTo(markDeletePosition) <= 0
1311+
? ledger.getNextValidPosition(startPosition) : startPosition;
1312+
}
1313+
}
1314+
default -> {
1315+
callback.findEntryFailed(
1316+
new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
1317+
return;
1318+
}
13071319
}
1320+
// startPosition can't be null, should never go here.
13081321
if (startPosition == null) {
13091322
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
13101323
Optional.empty(), ctx);
13111324
return;
13121325
}
1326+
// Calculate the end position
1327+
PositionImpl endPosition = end == null ? ledger.lastConfirmedEntry : end;
1328+
endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? ledger.lastConfirmedEntry : endPosition;
1329+
// Calculate the number of entries between the startPosition and endPosition
1330+
long max = 0;
1331+
if (startPosition.compareTo(endPosition) <= 0) {
1332+
max = ledger.getNumberOfEntries(Range.closed(startPosition, endPosition));
1333+
}
1334+
1335+
if (max <= 0) {
1336+
callback.findEntryComplete(null, ctx);
1337+
return;
1338+
}
1339+
1340+
OpFindNewest op;
13131341
if (isFindFromLedger) {
13141342
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
13151343
} else {

0 commit comments

Comments
 (0)