Skip to content

Commit 26771a6

Browse files
authored
KAFKA-18326: fix merge iterator with cache tombstones (#18287)
See https://issues.apache.org/jira/browse/KAFKA-18326 for more information. The main bug here is that in the old implementation, deleted cache entries would be skipped so long as they didn't equal the next store key, which resulted in potentially skipping tombstones for future keys in the store. Reviewers: Guozhang Wang <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
1 parent 639bb51 commit 26771a6

File tree

2 files changed

+61
-8
lines changed

2 files changed

+61
-8
lines changed

streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,28 @@ private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFro
5959
public boolean hasNext() {
6060
// skip over items deleted from cache, and corresponding store items if they have the same key
6161
while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) {
62-
if (storeIterator.hasNext()) {
63-
final KS nextStoreKey = storeIterator.peekNextKey();
64-
// advance the store iterator if the key is the same as the deleted cache key
65-
if (compare(cacheIterator.peekNextKey(), nextStoreKey) == 0) {
66-
storeIterator.next();
67-
}
62+
if (!storeIterator.hasNext()) {
63+
// if storeIterator is exhausted, we can just skip over every tombstone
64+
// in the cache since they don't shadow any valid key
65+
cacheIterator.next();
66+
continue;
67+
}
68+
69+
final KS nextStoreKey = storeIterator.peekNextKey();
70+
final int compare = compare(cacheIterator.peekNextKey(), nextStoreKey);
71+
72+
if (compare == 0) {
73+
// next cache entry is a valid tombstone for the next store key
74+
storeIterator.next();
75+
cacheIterator.next();
76+
} else if (compare < 0) {
77+
// cache has a tombstone for an entry that doesn't exist in the store
78+
cacheIterator.next();
79+
} else {
80+
// store iterator has a valid entry, but we should not advance the cache
81+
// iterator because it may still shadow a future store key
82+
return true;
6883
}
69-
cacheIterator.next();
7084
}
7185

7286
return cacheIterator.hasNext() || storeIterator.hasNext();

streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java

+40-1
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,51 @@ public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() {
148148
assertFalse(createIterator().hasNext());
149149
}
150150

151+
@Test
152+
public void shouldIterateCacheOnly() {
153+
final byte[][] bytes = {{0}, {1}, {2}};
154+
for (final byte[] aByte : bytes) {
155+
cache.put(namespace, Bytes.wrap(aByte), new LRUCacheEntry(aByte));
156+
}
157+
158+
try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
159+
assertArrayEquals(bytes[0], iterator.next().key.get());
160+
assertArrayEquals(bytes[1], iterator.next().key.get());
161+
assertArrayEquals(bytes[2], iterator.next().key.get());
162+
assertFalse(iterator.hasNext());
163+
}
164+
}
165+
166+
@Test
167+
public void shouldIterateStoreOnly() {
168+
final byte[][] bytes = {{0}, {1}, {2}};
169+
for (final byte[] aByte : bytes) {
170+
store.put(Bytes.wrap(aByte), aByte);
171+
}
172+
173+
try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
174+
assertArrayEquals(bytes[0], iterator.next().key.get());
175+
assertArrayEquals(bytes[1], iterator.next().key.get());
176+
assertArrayEquals(bytes[2], iterator.next().key.get());
177+
assertFalse(iterator.hasNext());
178+
}
179+
}
180+
151181
@Test
152182
public void shouldSkipAllDeletedFromCache() {
153183
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
154184
for (final byte[] aByte : bytes) {
155185
final Bytes aBytes = Bytes.wrap(aByte);
156186
store.put(aBytes, aByte);
157-
cache.put(namespace, aBytes, new LRUCacheEntry(aByte));
158187
}
188+
189+
cache.put(namespace, Bytes.wrap(new byte[]{-1}), new LRUCacheEntry(null));
159190
cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
160191
cache.put(namespace, Bytes.wrap(bytes[2]), new LRUCacheEntry(null));
161192
cache.put(namespace, Bytes.wrap(bytes[3]), new LRUCacheEntry(null));
162193
cache.put(namespace, Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
163194
cache.put(namespace, Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
195+
cache.put(namespace, Bytes.wrap(new byte[]{14}), new LRUCacheEntry(null));
164196

165197
try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
166198
assertArrayEquals(bytes[0], iterator.next().key.get());
@@ -174,6 +206,13 @@ public void shouldSkipAllDeletedFromCache() {
174206
}
175207
}
176208

209+
@Test
210+
public void shouldNotHaveNextIfBothIteratorsInitializedEmpty() {
211+
try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
212+
assertFalse(iterator.hasNext());
213+
}
214+
}
215+
177216
@Test
178217
public void shouldPeekNextKey() {
179218
final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore("one");

0 commit comments

Comments
 (0)