Skip to content

Commit f5ae533

Browse files
author
Masahiro Sakamoto
committed
Split message ranges by ledger ID and store them in individualDeletedMessages
1 parent b410274 commit f5ae533

File tree

2 files changed

+158
-4
lines changed

2 files changed

+158
-4
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
9292
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
9393
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
94+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
9495
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
9596
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
9697
import org.apache.commons.lang3.tuple.Pair;
@@ -414,10 +415,36 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i
414415
lock.writeLock().lock();
415416
try {
416417
individualDeletedMessages.clear();
417-
individualDeletedMessagesList.forEach(messageRange -> individualDeletedMessages
418-
.addOpenClosed(messageRange.getLowerEndpoint().getLedgerId(),
419-
messageRange.getLowerEndpoint().getEntryId(), messageRange.getUpperEndpoint().getLedgerId(),
420-
messageRange.getUpperEndpoint().getEntryId()));
418+
individualDeletedMessagesList.forEach(messageRange -> {
419+
MLDataFormats.NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint();
420+
MLDataFormats.NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint();
421+
422+
if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) {
423+
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
424+
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
425+
} else {
426+
// Store message ranges after splitting them by ledger ID
427+
LedgerInfo lowerEndpointLedgerInfo = ledger.getLedgersInfo().get(lowerEndpoint.getLedgerId());
428+
if (lowerEndpointLedgerInfo != null) {
429+
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
430+
lowerEndpoint.getLedgerId(), lowerEndpointLedgerInfo.getEntries() - 1);
431+
} else {
432+
log.warn("[{}][{}] No info for ledger {} of lower endpoint", ledger.getName(), name,
433+
lowerEndpoint.getLedgerId());
434+
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
435+
lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId());
436+
}
437+
438+
for (LedgerInfo li : ledger.getLedgersInfo()
439+
.subMap(lowerEndpoint.getLedgerId(), false, upperEndpoint.getLedgerId(), false).values()) {
440+
individualDeletedMessages.addOpenClosed(li.getLedgerId(), -1, li.getLedgerId(),
441+
li.getEntries() - 1);
442+
}
443+
444+
individualDeletedMessages.addOpenClosed(upperEndpoint.getLedgerId(), -1,
445+
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
446+
}
447+
});
421448
} finally {
422449
lock.writeLock().unlock();
423450
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import static org.mockito.Mockito.doReturn;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.spy;
24+
import static org.testng.Assert.assertEquals;
25+
26+
import com.google.common.collect.Lists;
27+
import com.google.common.collect.Range;
28+
29+
import java.lang.reflect.Method;
30+
import java.util.List;
31+
import java.util.NavigableMap;
32+
import java.util.concurrent.ConcurrentSkipListMap;
33+
34+
import org.apache.bookkeeper.client.BookKeeper;
35+
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
36+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
37+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
38+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
39+
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
40+
import org.testng.annotations.Test;
41+
42+
public class ManagedCursorIndividualDeletedMessagesTest {
43+
@Test(timeOut = 10000)
44+
void testRecoverIndividualDeletedMessages() throws Exception {
45+
BookKeeper bookkeeper = mock(BookKeeper.class);
46+
47+
ManagedLedgerConfig config = new ManagedLedgerConfig();
48+
config.setUnackedRangesOpenCacheSetEnabled(true);
49+
50+
NavigableMap<Long, LedgerInfo> ledgersInfo = new ConcurrentSkipListMap<>();
51+
ledgersInfo.put(1L, createLedgerInfo(1, 100, 1024));
52+
ledgersInfo.put(3L, createLedgerInfo(3, 50, 512));
53+
ledgersInfo.put(5L, createLedgerInfo(5, 200, 2048));
54+
ledgersInfo.put(10L, createLedgerInfo(10, 2, 32));
55+
ledgersInfo.put(20L, createLedgerInfo(20, 10, 256));
56+
57+
ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class);
58+
doReturn(ledgersInfo).when(ledger).getLedgersInfo();
59+
60+
ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, config, ledger, "test-cursor"));
61+
LongPairRangeSet<PositionImpl> deletedMessages = cursor.getIndividuallyDeletedMessagesSet();
62+
63+
Method recoverMethod = ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages",
64+
List.class);
65+
recoverMethod.setAccessible(true);
66+
67+
// (1) [(1:5..1:10]]
68+
List<MessageRange> messageRangeList = Lists.newArrayList();
69+
messageRangeList.add(createMessageRange(1, 5, 1, 10));
70+
List<Range<PositionImpl>> expectedRangeList = Lists.newArrayList();
71+
expectedRangeList.add(createPositionRange(1, 5, 1, 10));
72+
recoverMethod.invoke(cursor, messageRangeList);
73+
assertEquals(deletedMessages.size(), 1);
74+
assertEquals(deletedMessages.asRanges(), expectedRangeList);
75+
76+
// (2) [(1:10..3:0]]
77+
messageRangeList.clear();
78+
messageRangeList.add(createMessageRange(1, 10, 3, 0));
79+
expectedRangeList.clear();
80+
expectedRangeList.add(createPositionRange(1, 10, 1, 99));
81+
expectedRangeList.add(createPositionRange(3, -1, 3, 0));
82+
recoverMethod.invoke(cursor, messageRangeList);
83+
assertEquals(deletedMessages.size(), 2);
84+
assertEquals(deletedMessages.asRanges(), expectedRangeList);
85+
86+
// (3) [(1:20..10:1],(20:2..20:9]]
87+
messageRangeList.clear();
88+
messageRangeList.add(createMessageRange(1, 20, 10, 1));
89+
messageRangeList.add(createMessageRange(20, 2, 20, 9));
90+
expectedRangeList.clear();
91+
expectedRangeList.add(createPositionRange(1, 20, 1, 99));
92+
expectedRangeList.add(createPositionRange(3, -1, 3, 49));
93+
expectedRangeList.add(createPositionRange(5, -1, 5, 199));
94+
expectedRangeList.add(createPositionRange(10, -1, 10, 1));
95+
expectedRangeList.add(createPositionRange(20, 2, 20, 9));
96+
recoverMethod.invoke(cursor, messageRangeList);
97+
assertEquals(deletedMessages.size(), 5);
98+
assertEquals(deletedMessages.asRanges(), expectedRangeList);
99+
}
100+
101+
private static LedgerInfo createLedgerInfo(long ledgerId, long entries, long size) {
102+
return LedgerInfo.newBuilder().setLedgerId(ledgerId).setEntries(entries).setSize(size)
103+
.setTimestamp(System.currentTimeMillis()).build();
104+
}
105+
106+
private static MessageRange createMessageRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId,
107+
long upperEntryId) {
108+
NestedPositionInfo.Builder nestedPositionBuilder = NestedPositionInfo.newBuilder();
109+
MessageRange.Builder messageRangeBuilder = MessageRange.newBuilder();
110+
111+
nestedPositionBuilder.setLedgerId(lowerLedgerId);
112+
nestedPositionBuilder.setEntryId(lowerEntryId);
113+
messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build());
114+
115+
nestedPositionBuilder.setLedgerId(upperLedgerId);
116+
nestedPositionBuilder.setEntryId(upperEntryId);
117+
messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
118+
119+
return messageRangeBuilder.build();
120+
}
121+
122+
private static Range<PositionImpl> createPositionRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId,
123+
long upperEntryId) {
124+
return Range.openClosed(new PositionImpl(lowerLedgerId, lowerEntryId),
125+
new PositionImpl(upperLedgerId, upperEntryId));
126+
}
127+
}

0 commit comments

Comments
 (0)