Skip to content

Commit a262cc4

Browse files
Masahiro Sakamotowolfstudy
authored andcommitted
Split message ranges by ledger ID and store them in individualDeletedMessages (#7861)
Fixes #7554 ### Motivation As mentioned in #7554, the class of `individualDeletedMessages` is different between Pulsar 2.3.2 (and earlier) and 2.4.0 (and later). This causes some of ranges contained in `individualDeletedMessages` to be lost when the version of Pulsar is upgraded, and a large number of messages that have already been acked can be redelivered to consumers. Also, even if the Pulsar version is 2.4.0 or later, the same phenomenon occurs when the value of `managedLedgerUnackedRangesOpenCacheSetEnabled` is switched from false to true. ### Modifications If the list of individually deleted message ranges loaded from ZK contains ranges that span different ledgers, split the ranges by ledger ID and store them in `individualDeletedMessages`. As a result, information about deleted message ranges is never lost and messages that have already been acked will not be redelivered. (cherry picked from commit 0b7f034)
1 parent c29455d commit a262cc4

File tree

2 files changed

+156
-4
lines changed

2 files changed

+156
-4
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
9292
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
9393
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
94+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
9495
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
9596
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
9697
import org.apache.commons.lang3.tuple.Pair;
@@ -414,10 +415,34 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i
414415
lock.writeLock().lock();
415416
try {
416417
individualDeletedMessages.clear();
417-
individualDeletedMessagesList.forEach(messageRange -> individualDeletedMessages
418-
.addOpenClosed(messageRange.getLowerEndpoint().getLedgerId(),
419-
messageRange.getLowerEndpoint().getEntryId(), messageRange.getUpperEndpoint().getLedgerId(),
420-
messageRange.getUpperEndpoint().getEntryId()));
418+
individualDeletedMessagesList.forEach(messageRange -> {
419+
MLDataFormats.NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint();
420+
MLDataFormats.NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint();
421+
422+
if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) {
423+
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
424+
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
425+
} else {
426+
// Store message ranges after splitting them by ledger ID
427+
LedgerInfo lowerEndpointLedgerInfo = ledger.getLedgersInfo().get(lowerEndpoint.getLedgerId());
428+
if (lowerEndpointLedgerInfo != null) {
429+
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
430+
lowerEndpoint.getLedgerId(), lowerEndpointLedgerInfo.getEntries() - 1);
431+
} else {
432+
log.warn("[{}][{}] No ledger info of lower endpoint {}:{}", ledger.getName(), name,
433+
lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId());
434+
}
435+
436+
for (LedgerInfo li : ledger.getLedgersInfo()
437+
.subMap(lowerEndpoint.getLedgerId(), false, upperEndpoint.getLedgerId(), false).values()) {
438+
individualDeletedMessages.addOpenClosed(li.getLedgerId(), -1, li.getLedgerId(),
439+
li.getEntries() - 1);
440+
}
441+
442+
individualDeletedMessages.addOpenClosed(upperEndpoint.getLedgerId(), -1,
443+
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
444+
}
445+
});
421446
} finally {
422447
lock.writeLock().unlock();
423448
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import static org.mockito.Mockito.doReturn;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.spy;
24+
import static org.testng.Assert.assertEquals;
25+
26+
import com.google.common.collect.Lists;
27+
import com.google.common.collect.Range;
28+
29+
import java.lang.reflect.Method;
30+
import java.util.List;
31+
import java.util.NavigableMap;
32+
import java.util.concurrent.ConcurrentSkipListMap;
33+
34+
import org.apache.bookkeeper.client.BookKeeper;
35+
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
36+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
37+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
38+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
39+
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
40+
import org.testng.annotations.Test;
41+
42+
public class ManagedCursorIndividualDeletedMessagesTest {
43+
@Test(timeOut = 10000)
44+
void testRecoverIndividualDeletedMessages() throws Exception {
45+
BookKeeper bookkeeper = mock(BookKeeper.class);
46+
47+
ManagedLedgerConfig config = new ManagedLedgerConfig();
48+
config.setUnackedRangesOpenCacheSetEnabled(true);
49+
50+
NavigableMap<Long, LedgerInfo> ledgersInfo = new ConcurrentSkipListMap<>();
51+
ledgersInfo.put(1L, createLedgerInfo(1, 100, 1024));
52+
ledgersInfo.put(3L, createLedgerInfo(3, 50, 512));
53+
ledgersInfo.put(5L, createLedgerInfo(5, 200, 2048));
54+
ledgersInfo.put(10L, createLedgerInfo(10, 2, 32));
55+
ledgersInfo.put(20L, createLedgerInfo(20, 10, 256));
56+
57+
ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class);
58+
doReturn(ledgersInfo).when(ledger).getLedgersInfo();
59+
60+
ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, config, ledger, "test-cursor"));
61+
LongPairRangeSet<PositionImpl> deletedMessages = cursor.getIndividuallyDeletedMessagesSet();
62+
63+
Method recoverMethod = ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages",
64+
List.class);
65+
recoverMethod.setAccessible(true);
66+
67+
// (1) [(1:5..1:10]]
68+
List<MessageRange> messageRangeList = Lists.newArrayList();
69+
messageRangeList.add(createMessageRange(1, 5, 1, 10));
70+
List<Range<PositionImpl>> expectedRangeList = Lists.newArrayList();
71+
expectedRangeList.add(createPositionRange(1, 5, 1, 10));
72+
recoverMethod.invoke(cursor, messageRangeList);
73+
assertEquals(deletedMessages.size(), 1);
74+
assertEquals(deletedMessages.asRanges(), expectedRangeList);
75+
76+
// (2) [(1:10..3:0]]
77+
messageRangeList.clear();
78+
messageRangeList.add(createMessageRange(1, 10, 3, 0));
79+
expectedRangeList.clear();
80+
expectedRangeList.add(createPositionRange(1, 10, 1, 99));
81+
expectedRangeList.add(createPositionRange(3, -1, 3, 0));
82+
recoverMethod.invoke(cursor, messageRangeList);
83+
assertEquals(deletedMessages.size(), 2);
84+
assertEquals(deletedMessages.asRanges(), expectedRangeList);
85+
86+
// (3) [(1:20..10:1],(20:2..20:9]]
87+
messageRangeList.clear();
88+
messageRangeList.add(createMessageRange(1, 20, 10, 1));
89+
messageRangeList.add(createMessageRange(20, 2, 20, 9));
90+
expectedRangeList.clear();
91+
expectedRangeList.add(createPositionRange(1, 20, 1, 99));
92+
expectedRangeList.add(createPositionRange(3, -1, 3, 49));
93+
expectedRangeList.add(createPositionRange(5, -1, 5, 199));
94+
expectedRangeList.add(createPositionRange(10, -1, 10, 1));
95+
expectedRangeList.add(createPositionRange(20, 2, 20, 9));
96+
recoverMethod.invoke(cursor, messageRangeList);
97+
assertEquals(deletedMessages.size(), 5);
98+
assertEquals(deletedMessages.asRanges(), expectedRangeList);
99+
}
100+
101+
private static LedgerInfo createLedgerInfo(long ledgerId, long entries, long size) {
102+
return LedgerInfo.newBuilder().setLedgerId(ledgerId).setEntries(entries).setSize(size)
103+
.setTimestamp(System.currentTimeMillis()).build();
104+
}
105+
106+
private static MessageRange createMessageRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId,
107+
long upperEntryId) {
108+
NestedPositionInfo.Builder nestedPositionBuilder = NestedPositionInfo.newBuilder();
109+
MessageRange.Builder messageRangeBuilder = MessageRange.newBuilder();
110+
111+
nestedPositionBuilder.setLedgerId(lowerLedgerId);
112+
nestedPositionBuilder.setEntryId(lowerEntryId);
113+
messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build());
114+
115+
nestedPositionBuilder.setLedgerId(upperLedgerId);
116+
nestedPositionBuilder.setEntryId(upperEntryId);
117+
messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
118+
119+
return messageRangeBuilder.build();
120+
}
121+
122+
private static Range<PositionImpl> createPositionRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId,
123+
long upperEntryId) {
124+
return Range.openClosed(new PositionImpl(lowerLedgerId, lowerEntryId),
125+
new PositionImpl(upperLedgerId, upperEntryId));
126+
}
127+
}

0 commit comments

Comments
 (0)