Skip to content

Commit 2ed5430

Browse files
lhotarihanmz
authored andcommitted
[fix][broker Fix bug in RangeCache where different instance of the key wouldn't ever match (apache#23903)
1 parent 7c5df39 commit 2ed5430

File tree

7 files changed

+543
-32
lines changed

7 files changed

+543
-32
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java

Lines changed: 84 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,39 @@ K getKey() {
106106
return localKey;
107107
}
108108

109+
/**
110+
* Get the value associated with the key. Returns null if the key does not match the key.
111+
*
112+
* @param key the key to match
113+
* @return the value associated with the key, or null if the value has already been recycled or the key does not
114+
* match
115+
*/
109116
V getValue(K key) {
117+
return getValueInternal(key, false);
118+
}
119+
120+
/**
121+
* Get the value associated with the Map.Entry's key and value. Exact instance of the key is required to match.
122+
* @param entry the entry which contains the key and {@link EntryWrapper} value to get the value from
123+
* @return the value associated with the key, or null if the value has already been recycled or the key does not
124+
* exactly match the same instance
125+
*/
126+
static <K, V> V getValueMatchingMapEntry(Map.Entry<K, EntryWrapper<K, V>> entry) {
127+
return entry.getValue().getValueInternal(entry.getKey(), true);
128+
}
129+
130+
/**
131+
* Get the value associated with the key. Returns null if the key does not match the key associated with the
132+
* value.
133+
*
134+
* @param key the key to match
135+
* @param requireSameKeyInstance when true, the matching will be restricted to exactly the same instance of the
136+
* key as the one stored in the wrapper. This is used to avoid any races
137+
* when retrieving or removing the entries from the cache when the key and value
138+
* instances are available.
139+
* @return the value associated with the key, or null if the key does not match
140+
*/
141+
private V getValueInternal(K key, boolean requireSameKeyInstance) {
110142
long stamp = lock.tryOptimisticRead();
111143
K localKey = this.key;
112144
V localValue = this.value;
@@ -116,7 +148,11 @@ V getValue(K key) {
116148
localValue = this.value;
117149
lock.unlockRead(stamp);
118150
}
119-
if (localKey != key) {
151+
// check that the given key matches the key associated with the value in the entry
152+
// this is used to detect if the entry has already been recycled and contains another key
153+
// when requireSameKeyInstance is true, the key must be exactly the same instance as the one stored in the
154+
// entry to match
155+
if (localKey != key && (requireSameKeyInstance || localKey == null || !localKey.equals(key))) {
120156
return null;
121157
}
122158
return localValue;
@@ -236,34 +272,45 @@ public boolean exists(Key key) {
236272
* The caller is responsible for releasing the reference.
237273
*/
238274
public Value get(Key key) {
239-
return getValue(key, entries.get(key));
275+
return getValueFromWrapper(key, entries.get(key));
240276
}
241277

242-
private Value getValue(Key key, EntryWrapper<Key, Value> valueWrapper) {
278+
private Value getValueFromWrapper(Key key, EntryWrapper<Key, Value> valueWrapper) {
243279
if (valueWrapper == null) {
244280
return null;
245281
} else {
246282
Value value = valueWrapper.getValue(key);
247-
if (value == null) {
248-
// the wrapper has been recycled and contains another key
249-
return null;
250-
}
251-
try {
252-
value.retain();
253-
} catch (IllegalReferenceCountException e) {
254-
// Value was already deallocated
255-
return null;
256-
}
257-
// check that the value matches the key and that there's at least 2 references to it since
258-
// the cache should be holding one reference and a new reference was just added in this method
259-
if (value.refCnt() > 1 && value.matchesKey(key)) {
260-
return value;
261-
} else {
262-
// Value or IdentityWrapper was recycled and already contains another value
263-
// release the reference added in this method
264-
value.release();
265-
return null;
266-
}
283+
return getRetainedValueMatchingKey(key, value);
284+
}
285+
}
286+
287+
private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry) {
288+
Value valueMatchingEntry = EntryWrapper.getValueMatchingMapEntry(entry);
289+
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
290+
}
291+
292+
// validates that the value matches the key and that the value has not been recycled
293+
// which are possible due to the lack of exclusive locks in the cache and the use of reference counted objects
294+
private Value getRetainedValueMatchingKey(Key key, Value value) {
295+
if (value == null) {
296+
// the wrapper has been recycled and contains another key
297+
return null;
298+
}
299+
try {
300+
value.retain();
301+
} catch (IllegalReferenceCountException e) {
302+
// Value was already deallocated
303+
return null;
304+
}
305+
// check that the value matches the key and that there's at least 2 references to it since
306+
// the cache should be holding one reference and a new reference was just added in this method
307+
if (value.refCnt() > 1 && value.matchesKey(key)) {
308+
return value;
309+
} else {
310+
// Value or IdentityWrapper was recycled and already contains another value
311+
// release the reference added in this method
312+
value.release();
313+
return null;
267314
}
268315
}
269316

@@ -280,7 +327,7 @@ public Collection<Value> getRange(Key first, Key last) {
280327

281328
// Return the values of the entries found in cache
282329
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : entries.subMap(first, true, last, true).entrySet()) {
283-
Value value = getValue(entry.getKey(), entry.getValue());
330+
Value value = getValueMatchingEntry(entry);
284331
if (value != null) {
285332
values.add(value);
286333
}
@@ -297,6 +344,9 @@ public Collection<Value> getRange(Key first, Key last) {
297344
* @return an pair of ints, containing the number of removed entries and the total size
298345
*/
299346
public Pair<Integer, Long> removeRange(Key first, Key last, boolean lastInclusive) {
347+
if (log.isDebugEnabled()) {
348+
log.debug("Removing entries in range [{}, {}], lastInclusive: {}", first, last, lastInclusive);
349+
}
300350
RemovalCounters counters = RemovalCounters.create();
301351
Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first, true, last, lastInclusive);
302352
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : subMap.entrySet()) {
@@ -320,7 +370,7 @@ private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key, Value>> e
320370
boolean skipInvalid, Predicate<Value> removeCondition) {
321371
Key key = entry.getKey();
322372
EntryWrapper<Key, Value> entryWrapper = entry.getValue();
323-
Value value = entryWrapper.getValue(key);
373+
Value value = getValueMatchingEntry(entry);
324374
if (value == null) {
325375
// the wrapper has already been recycled and contains another key
326376
if (!skipInvalid) {
@@ -404,6 +454,9 @@ private Pair<Integer, Long> handleRemovalResult(RemovalCounters counters) {
404454
* @return a pair containing the number of entries evicted and their total size
405455
*/
406456
public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
457+
if (log.isDebugEnabled()) {
458+
log.debug("Evicting entries to reach a minimum size of {}", minSize);
459+
}
407460
checkArgument(minSize > 0);
408461
RemovalCounters counters = RemovalCounters.create();
409462
while (counters.removedSize < minSize && !Thread.currentThread().isInterrupted()) {
@@ -422,6 +475,9 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
422475
* @return the tota
423476
*/
424477
public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
478+
if (log.isDebugEnabled()) {
479+
log.debug("Evicting entries with timestamp <= {}", maxTimestamp);
480+
}
425481
RemovalCounters counters = RemovalCounters.create();
426482
while (!Thread.currentThread().isInterrupted()) {
427483
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();
@@ -453,6 +509,9 @@ public long getSize() {
453509
* @return size of removed entries
454510
*/
455511
public Pair<Integer, Long> clear() {
512+
if (log.isDebugEnabled()) {
513+
log.debug("Clearing the cache with {} entries and size {}", entries.size(), size.get());
514+
}
456515
RemovalCounters counters = RemovalCounters.create();
457516
while (!Thread.currentThread().isInterrupted()) {
458517
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,28 @@
2424
import static org.testng.Assert.assertNotNull;
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
27-
2827
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import io.netty.buffer.ByteBuf;
2929
import java.nio.charset.StandardCharsets;
3030
import java.util.ArrayList;
3131
import java.util.List;
3232
import java.util.UUID;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.CopyOnWriteArrayList;
3335
import java.util.concurrent.CountDownLatch;
3436
import java.util.concurrent.CyclicBarrier;
3537
import java.util.concurrent.Future;
38+
import java.util.concurrent.ThreadLocalRandom;
3639
import java.util.concurrent.TimeUnit;
3740
import java.util.concurrent.atomic.AtomicBoolean;
3841
import java.util.concurrent.atomic.AtomicReference;
39-
42+
import lombok.Cleanup;
4043
import lombok.extern.slf4j.Slf4j;
4144
import org.apache.bookkeeper.client.BookKeeper;
4245
import org.apache.bookkeeper.client.BookKeeperTestClient;
4346
import org.apache.bookkeeper.client.LedgerEntry;
4447
import org.apache.bookkeeper.client.api.DigestType;
48+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
4549
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
4650
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
4751
import org.apache.bookkeeper.mledger.Entry;
@@ -53,18 +57,17 @@
5357
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
5458
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
5559
import org.apache.bookkeeper.mledger.Position;
60+
import org.apache.bookkeeper.mledger.PositionFactory;
5661
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
5762
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
5863
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
5964
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
6065
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
66+
import org.apache.pulsar.common.util.FutureUtil;
6167
import org.awaitility.Awaitility;
6268
import org.testng.annotations.DataProvider;
6369
import org.testng.annotations.Test;
6470

65-
import io.netty.buffer.ByteBuf;
66-
import lombok.Cleanup;
67-
6871
@Slf4j
6972
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
7073

@@ -241,6 +244,108 @@ public void verifyConcurrentUsage() throws Exception {
241244
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
242245
}
243246

247+
@Test
248+
public void verifyAsyncReadEntryUsingCache() throws Exception {
249+
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
250+
251+
config.setMaxCacheSize(100 * 1024 * 1024);
252+
config.setCacheEvictionTimeThresholdMillis(10000);
253+
config.setCacheEvictionIntervalMs(10000);
254+
255+
@Cleanup("shutdown")
256+
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);
257+
258+
ManagedLedgerConfig conf = new ManagedLedgerConfig();
259+
conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2)
260+
.setRetentionSizeInMB(-1).setRetentionTime(-1, TimeUnit.MILLISECONDS);
261+
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my-ledger" + testName, conf);
262+
263+
int NumProducers = 5;
264+
int NumConsumers = 10;
265+
266+
final AtomicBoolean done = new AtomicBoolean();
267+
final CyclicBarrier barrier = new CyclicBarrier(NumProducers + NumConsumers + 1);
268+
269+
List<Future<?>> futures = new ArrayList();
270+
List<Position> positions = new CopyOnWriteArrayList<>();
271+
272+
for (int i = 0; i < NumProducers; i++) {
273+
futures.add(executor.submit(() -> {
274+
try {
275+
// wait for all threads to be ready to start at once
276+
barrier.await();
277+
while (!done.get()) {
278+
Position position = ledger.addEntry("entry".getBytes());
279+
positions.add(position);
280+
Thread.sleep(1);
281+
}
282+
} catch (Exception e) {
283+
e.printStackTrace();
284+
throw FutureUtil.wrapToCompletionException(e);
285+
}
286+
}));
287+
}
288+
289+
// create a dummy cursor since caching happens only when there are active consumers
290+
ManagedCursor cursor = ledger.openCursor("dummy");
291+
292+
for (int i = 0; i < NumConsumers; i++) {
293+
futures.add(executor.submit(() -> {
294+
try {
295+
// wait for all threads to be ready to start at once
296+
barrier.await();
297+
while (!done.get()) {
298+
if (positions.isEmpty()) {
299+
Thread.sleep(1);
300+
continue;
301+
}
302+
// Simulate a replay queue read pattern where individual entries are read
303+
Position randomPosition = positions.get(ThreadLocalRandom.current().nextInt(positions.size()));
304+
// Clone the original instance so that another instance is used in the asyncReadEntry call
305+
// This is to test that keys are compared by .equals and not by reference under the covers
306+
randomPosition = PositionFactory.create(randomPosition);
307+
CompletableFuture<Void> future = new CompletableFuture<>();
308+
ledger.asyncReadEntry(randomPosition, new AsyncCallbacks.ReadEntryCallback() {
309+
@Override
310+
public void readEntryComplete(Entry entry, Object ctx) {
311+
entry.release();
312+
future.complete(null);
313+
}
314+
315+
@Override
316+
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
317+
future.completeExceptionally(exception);
318+
}
319+
}, null);
320+
future.get();
321+
Thread.sleep(2);
322+
}
323+
} catch (Exception e) {
324+
e.printStackTrace();
325+
throw FutureUtil.wrapToCompletionException(e);
326+
}
327+
}));
328+
}
329+
330+
// trigger all worker threads at once to continue from the barrier
331+
barrier.await();
332+
333+
int testDurationSeconds = 3;
334+
Thread.sleep(testDurationSeconds * 1000);
335+
336+
done.set(true);
337+
for (Future<?> future : futures) {
338+
future.get();
339+
}
340+
341+
factory.getMbean().refreshStats(testDurationSeconds, TimeUnit.SECONDS);
342+
343+
assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
344+
assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
345+
assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
346+
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
347+
}
348+
244349
@Test
245350
public void testSimple() throws Exception {
246351
@Cleanup("shutdown")

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertNotSame;
2324
import static org.testng.Assert.assertNull;
2425
import static org.testng.Assert.assertTrue;
2526
import static org.testng.Assert.fail;
@@ -338,4 +339,20 @@ public void testRemoveEntryWithInvalidMatchingKey() {
338339
cache.clear();
339340
assertEquals(cache.getNumberOfEntries(), 0);
340341
}
341-
}
342+
343+
@Test
344+
public void testGetKeyWithDifferentInstance() {
345+
RangeCache<Integer, RefString> cache = new RangeCache<>();
346+
Integer key = 129;
347+
cache.put(key, new RefString("129"));
348+
// create a different instance of the key
349+
Integer key2 = Integer.valueOf(129);
350+
// key and key2 are different instances but they are equal
351+
assertNotSame(key, key2);
352+
assertEquals(key, key2);
353+
// get the value using key2
354+
RefString s = cache.get(key2);
355+
// the value should be found
356+
assertEquals(s.s, "129");
357+
}
358+
}

0 commit comments

Comments
 (0)