Skip to content

Commit 2eb4eab

Browse files
dao-junlhotari
andauthored
[improve][broker] Optimize subscription seek (cursor reset) by timestamp (apache#22792)
Co-authored-by: Lari Hotari <[email protected]>
1 parent 9149720 commit 2eb4eab

File tree

7 files changed

+690
-30
lines changed

7 files changed

+690
-30
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,31 @@ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
660660
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
661661
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
662662

663+
664+
/**
665+
* Find the newest entry that matches the given predicate.
666+
*
667+
* @param constraint
668+
* search only active entries or all entries
669+
* @param condition
670+
* predicate that reads an entry an applies a condition
671+
* @param callback
672+
* callback object returning the resultant position
673+
* @param startPosition
674+
* start position to search from.
675+
* @param endPosition
676+
* end position to search to.
677+
* @param ctx
678+
* opaque context
679+
* @param isFindFromLedger
680+
* find the newest entry from ledger
681+
*/
682+
default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
683+
Position startPosition, Position endPosition, FindEntryCallback callback,
684+
Object ctx, boolean isFindFromLedger) {
685+
asyncFindNewestMatching(constraint, condition, callback, ctx, isFindFromLedger);
686+
}
687+
663688
/**
664689
* reset the cursor to specified position to enable replay of messages.
665690
*

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,27 +1272,55 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
12721272
@Override
12731273
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
12741274
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
1275-
OpFindNewest op;
1276-
Position startPosition = null;
1277-
long max = 0;
1275+
asyncFindNewestMatching(constraint, condition, null, null, callback, ctx,
1276+
isFindFromLedger);
1277+
}
1278+
1279+
1280+
@Override
1281+
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1282+
Position start, Position end, FindEntryCallback callback,
1283+
Object ctx, boolean isFindFromLedger) {
1284+
Position startPosition;
12781285
switch (constraint) {
1279-
case SearchAllAvailableEntries:
1280-
startPosition = getFirstPosition();
1281-
max = ledger.getNumberOfEntries() - 1;
1282-
break;
1283-
case SearchActiveEntries:
1284-
startPosition = ledger.getNextValidPosition(markDeletePosition);
1285-
max = getNumberOfEntriesInStorage();
1286-
break;
1287-
default:
1288-
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
1289-
return;
1286+
case SearchAllAvailableEntries ->
1287+
startPosition = start == null ? getFirstPosition() : start;
1288+
case SearchActiveEntries -> {
1289+
if (start == null) {
1290+
startPosition = ledger.getNextValidPosition(markDeletePosition);
1291+
} else {
1292+
startPosition = start;
1293+
startPosition = startPosition.compareTo(markDeletePosition) <= 0
1294+
? ledger.getNextValidPosition(startPosition) : startPosition;
1295+
}
1296+
}
1297+
default -> {
1298+
callback.findEntryFailed(
1299+
new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
1300+
return;
1301+
}
12901302
}
1303+
// startPosition can't be null, should never go here.
12911304
if (startPosition == null) {
12921305
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
12931306
Optional.empty(), ctx);
12941307
return;
12951308
}
1309+
// Calculate the end position
1310+
Position endPosition = end == null ? ledger.lastConfirmedEntry : end;
1311+
endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? ledger.lastConfirmedEntry : endPosition;
1312+
// Calculate the number of entries between the startPosition and endPosition
1313+
long max = 0;
1314+
if (startPosition.compareTo(endPosition) <= 0) {
1315+
max = ledger.getNumberOfEntries(Range.closed(startPosition, endPosition));
1316+
}
1317+
1318+
if (max <= 0) {
1319+
callback.findEntryComplete(null, ctx);
1320+
return;
1321+
}
1322+
1323+
OpFindNewest op;
12961324
if (isFindFromLedger) {
12971325
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
12981326
} else {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4873,6 +4873,297 @@ public void operationFailed(ManagedLedgerException exception) {
48734873
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
48744874
}
48754875

4876+
@Test
4877+
public void testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws Exception {
4878+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
4879+
managedLedgerConfig.setMaxEntriesPerLedger(2);
4880+
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
4881+
@Cleanup
4882+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd", managedLedgerConfig);
4883+
@Cleanup
4884+
ManagedCursor managedCursor = ledger.openCursor("test");
4885+
4886+
Position position = ledger.addEntry("test".getBytes(Encoding));
4887+
Position position1 = ledger.addEntry("test1".getBytes(Encoding));
4888+
Position position2 = ledger.addEntry("test2".getBytes(Encoding));
4889+
Position position3 = ledger.addEntry("test3".getBytes(Encoding));
4890+
4891+
Predicate<Entry> condition = entry -> {
4892+
try {
4893+
Position p = entry.getPosition();
4894+
return p.compareTo(position1) <= 0;
4895+
} finally {
4896+
entry.release();
4897+
}
4898+
};
4899+
4900+
// find the newest entry with start and end position
4901+
AtomicBoolean failed = new AtomicBoolean(false);
4902+
CountDownLatch latch = new CountDownLatch(1);
4903+
AtomicReference<Position> positionRef = new AtomicReference<>();
4904+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, position2, new AsyncCallbacks.FindEntryCallback() {
4905+
@Override
4906+
public void findEntryComplete(Position position, Object ctx) {
4907+
positionRef.set(position);
4908+
latch.countDown();
4909+
}
4910+
4911+
@Override
4912+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
4913+
failed.set(true);
4914+
latch.countDown();
4915+
}
4916+
}, null, true);
4917+
4918+
latch.await();
4919+
assertFalse(failed.get());
4920+
assertNotNull(positionRef.get());
4921+
assertEquals(positionRef.get(), position1);
4922+
4923+
// find the newest entry with start
4924+
AtomicBoolean failed1 = new AtomicBoolean(false);
4925+
CountDownLatch latch1 = new CountDownLatch(1);
4926+
AtomicReference<Position> positionRef1 = new AtomicReference<>();
4927+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, null, new AsyncCallbacks.FindEntryCallback() {
4928+
@Override
4929+
public void findEntryComplete(Position position, Object ctx) {
4930+
positionRef1.set(position);
4931+
latch1.countDown();
4932+
}
4933+
4934+
@Override
4935+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
4936+
failed1.set(true);
4937+
latch1.countDown();
4938+
}
4939+
}, null, true);
4940+
latch1.await();
4941+
assertFalse(failed1.get());
4942+
assertNotNull(positionRef1.get());
4943+
assertEquals(positionRef1.get(), position1);
4944+
4945+
// find the newest entry with end
4946+
AtomicBoolean failed2 = new AtomicBoolean(false);
4947+
CountDownLatch latch2 = new CountDownLatch(1);
4948+
AtomicReference<Position> positionRef2 = new AtomicReference<>();
4949+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, position2, new AsyncCallbacks.FindEntryCallback() {
4950+
@Override
4951+
public void findEntryComplete(Position position, Object ctx) {
4952+
positionRef2.set(position);
4953+
latch2.countDown();
4954+
}
4955+
4956+
@Override
4957+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
4958+
failed2.set(true);
4959+
latch2.countDown();
4960+
}
4961+
}, null, true);
4962+
latch2.await();
4963+
assertFalse(failed2.get());
4964+
assertNotNull(positionRef2.get());
4965+
assertEquals(positionRef2.get(), position1);
4966+
4967+
// find the newest entry without start and end position
4968+
AtomicBoolean failed3 = new AtomicBoolean(false);
4969+
CountDownLatch latch3 = new CountDownLatch(1);
4970+
AtomicReference<Position> positionRef3 = new AtomicReference<>();
4971+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() {
4972+
@Override
4973+
public void findEntryComplete(Position position, Object ctx) {
4974+
positionRef3.set(position);
4975+
latch3.countDown();
4976+
}
4977+
4978+
@Override
4979+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
4980+
failed3.set(true);
4981+
latch3.countDown();
4982+
}
4983+
}, null, true);
4984+
latch3.await();
4985+
assertFalse(failed3.get());
4986+
assertNotNull(positionRef3.get());
4987+
assertEquals(positionRef3.get(), position1);
4988+
4989+
// find position3
4990+
AtomicBoolean failed4 = new AtomicBoolean(false);
4991+
CountDownLatch latch4 = new CountDownLatch(1);
4992+
AtomicReference<Position> positionRef4 = new AtomicReference<>();
4993+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
4994+
try {
4995+
Position p = entry.getPosition();
4996+
return p.compareTo(position3) <= 0;
4997+
} finally {
4998+
entry.release();
4999+
}
5000+
}, position3, position3, new AsyncCallbacks.FindEntryCallback() {
5001+
@Override
5002+
public void findEntryComplete(Position position, Object ctx) {
5003+
positionRef4.set(position);
5004+
latch4.countDown();
5005+
}
5006+
5007+
@Override
5008+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
5009+
failed4.set(true);
5010+
latch4.countDown();
5011+
}
5012+
}, null, true);
5013+
latch4.await();
5014+
assertFalse(failed4.get());
5015+
assertNotNull(positionRef4.get());
5016+
assertEquals(positionRef4.get(), position3);
5017+
}
5018+
5019+
5020+
@Test
5021+
public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd() throws Exception {
5022+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
5023+
managedLedgerConfig.setMaxEntriesPerLedger(2);
5024+
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
5025+
@Cleanup
5026+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd", managedLedgerConfig);
5027+
@Cleanup
5028+
ManagedCursorImpl managedCursor = (ManagedCursorImpl) ledger.openCursor("test");
5029+
5030+
Position position = ledger.addEntry("test".getBytes(Encoding));
5031+
Position position1 = ledger.addEntry("test1".getBytes(Encoding));
5032+
Position position2 = ledger.addEntry("test2".getBytes(Encoding));
5033+
Position position3 = ledger.addEntry("test3".getBytes(Encoding));
5034+
Position position4 = ledger.addEntry("test4".getBytes(Encoding));
5035+
managedCursor.markDelete(position1);
5036+
assertEquals(managedCursor.getNumberOfEntries(), 3);
5037+
5038+
Predicate<Entry> condition = entry -> {
5039+
try {
5040+
Position p = entry.getPosition();
5041+
return p.compareTo(position3) <= 0;
5042+
} finally {
5043+
entry.release();
5044+
}
5045+
};
5046+
5047+
// find the newest entry with start and end position
5048+
AtomicBoolean failed = new AtomicBoolean(false);
5049+
CountDownLatch latch = new CountDownLatch(1);
5050+
AtomicReference<Position> positionRef = new AtomicReference<>();
5051+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, position4, new AsyncCallbacks.FindEntryCallback() {
5052+
@Override
5053+
public void findEntryComplete(Position position, Object ctx) {
5054+
positionRef.set(position);
5055+
latch.countDown();
5056+
}
5057+
5058+
@Override
5059+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
5060+
failed.set(true);
5061+
latch.countDown();
5062+
}
5063+
}, null, true);
5064+
latch.await();
5065+
assertFalse(failed.get());
5066+
assertNotNull(positionRef.get());
5067+
assertEquals(positionRef.get(), position3);
5068+
5069+
// find the newest entry with start
5070+
AtomicBoolean failed1 = new AtomicBoolean(false);
5071+
CountDownLatch latch1 = new CountDownLatch(1);
5072+
AtomicReference<Position> positionRef1 = new AtomicReference<>();
5073+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, null, new AsyncCallbacks.FindEntryCallback() {
5074+
@Override
5075+
public void findEntryComplete(Position position, Object ctx) {
5076+
positionRef1.set(position);
5077+
latch1.countDown();
5078+
}
5079+
5080+
@Override
5081+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
5082+
failed1.set(true);
5083+
latch1.countDown();
5084+
}
5085+
}, null, true);
5086+
5087+
latch1.await();
5088+
assertFalse(failed1.get());
5089+
assertNotNull(positionRef1.get());
5090+
assertEquals(positionRef1.get(), position3);
5091+
5092+
// find the newest entry with end
5093+
AtomicBoolean failed2 = new AtomicBoolean(false);
5094+
CountDownLatch latch2 = new CountDownLatch(1);
5095+
AtomicReference<Position> positionRef2 = new AtomicReference<>();
5096+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, position4, new AsyncCallbacks.FindEntryCallback() {
5097+
@Override
5098+
public void findEntryComplete(Position position, Object ctx) {
5099+
positionRef2.set(position);
5100+
latch2.countDown();
5101+
}
5102+
5103+
@Override
5104+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
5105+
failed2.set(true);
5106+
latch2.countDown();
5107+
}
5108+
}, null, true);
5109+
5110+
latch2.await();
5111+
assertFalse(failed2.get());
5112+
assertNotNull(positionRef2.get());
5113+
assertEquals(positionRef2.get(), position3);
5114+
5115+
// find the newest entry without start and end position
5116+
AtomicBoolean failed3 = new AtomicBoolean(false);
5117+
CountDownLatch latch3 = new CountDownLatch(1);
5118+
AtomicReference<Position> positionRef3 = new AtomicReference<>();
5119+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() {
5120+
@Override
5121+
public void findEntryComplete(Position position, Object ctx) {
5122+
positionRef3.set(position);
5123+
latch3.countDown();
5124+
}
5125+
5126+
@Override
5127+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
5128+
failed3.set(true);
5129+
latch3.countDown();
5130+
}
5131+
}, null, true);
5132+
latch3.await();
5133+
assertFalse(failed3.get());
5134+
assertNotNull(positionRef3.get());
5135+
assertEquals(positionRef3.get(), position3);
5136+
5137+
// find position4
5138+
AtomicBoolean failed4 = new AtomicBoolean(false);
5139+
CountDownLatch latch4 = new CountDownLatch(1);
5140+
AtomicReference<Position> positionRef4 = new AtomicReference<>();
5141+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
5142+
try {
5143+
Position p = entry.getPosition();
5144+
return p.compareTo(position4) <= 0;
5145+
} finally {
5146+
entry.release();
5147+
}
5148+
}, position4, position4, new AsyncCallbacks.FindEntryCallback() {
5149+
@Override
5150+
public void findEntryComplete(Position position, Object ctx) {
5151+
positionRef4.set(position);
5152+
latch4.countDown();
5153+
}
5154+
5155+
@Override
5156+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
5157+
failed4.set(true);
5158+
latch4.countDown();
5159+
}
5160+
}, null, true);
5161+
latch4.await();
5162+
assertFalse(failed4.get());
5163+
assertNotNull(positionRef4.get());
5164+
assertEquals(positionRef4.get(), position4);
5165+
}
5166+
48765167
@Test
48775168
void testForceCursorRecovery() throws Exception {
48785169
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);

0 commit comments

Comments
 (0)