Skip to content

Commit b6f1d31

Browse files
committed
Remove clone holder and add Cleaner logic to clean up clones in FullFileCachedIndexInput
Signed-off-by: Shreyansh Ray <[email protected]>
1 parent aec3fe9 commit b6f1d31

File tree

2 files changed

+72
-30
lines changed

2 files changed

+72
-30
lines changed

server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,32 @@
1313
import org.apache.lucene.store.AlreadyClosedException;
1414
import org.apache.lucene.store.IndexInput;
1515
import org.opensearch.common.annotation.ExperimentalApi;
16+
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
1617

1718
import java.io.IOException;
19+
import java.lang.ref.Cleaner;
1820
import java.nio.file.Path;
19-
import java.util.HashSet;
20-
import java.util.Set;
2121

2222
/**
2323
* Extension of {@link FileCachedIndexInput} for full files for handling clones and slices
24-
* We maintain a clone map so that we can close them when the parent IndexInput is closed so that ref count is properly maintained in file cache
25-
* Closing of clones explicitly is needed as Lucene does not guarantee that it will close the clones
24+
* Since Lucene does not guarantee that it will close the clones/slices, we have created a Cleaner which handles closing of the clones/slices when they become phantom reachable
2625
* https://github.com/apache/lucene/blob/8340b01c3cc229f33584ce2178b07b8984daa6a9/lucene/core/src/java/org/apache/lucene/store/IndexInput.java#L32-L33
2726
* @opensearch.experimental
2827
*/
2928
@ExperimentalApi
3029
public class FullFileCachedIndexInput extends FileCachedIndexInput {
3130
private static final Logger logger = LogManager.getLogger(FullFileCachedIndexInput.class);
32-
private final Set<FullFileCachedIndexInput> clones;
31+
private final IndexInputHolder indexInputHolder;
32+
private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory("index-input-cleaner"));
3333

3434
public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) {
3535
this(cache, filePath, underlyingIndexInput, false);
3636
}
3737

3838
public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) {
3939
super(cache, filePath, underlyingIndexInput, isClone);
40-
clones = new HashSet<>();
40+
indexInputHolder = new IndexInputHolder(underlyingIndexInput, isClone, cache, filePath);
41+
CLEANER.register(this, indexInputHolder);
4142
}
4243

4344
/**
@@ -48,7 +49,6 @@ public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput under
4849
public FullFileCachedIndexInput clone() {
4950
FullFileCachedIndexInput clonedIndexInput = new FullFileCachedIndexInput(cache, filePath, luceneIndexInput.clone(), true);
5051
cache.incRef(filePath);
51-
clones.add(clonedIndexInput);
5252
return clonedIndexInput;
5353
}
5454

@@ -74,7 +74,6 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw
7474
}
7575
IndexInput slicedLuceneIndexInput = luceneIndexInput.slice(sliceDescription, offset, length);
7676
FullFileCachedIndexInput slicedIndexInput = new FullFileCachedIndexInput(cache, filePath, slicedLuceneIndexInput, true);
77-
clones.add(slicedIndexInput);
7877
cache.incRef(filePath);
7978
return slicedIndexInput;
8079
}
@@ -88,21 +87,37 @@ public void close() throws IOException {
8887
if (isClone) {
8988
cache.decRef(filePath);
9089
}
91-
clones.forEach(indexInput -> {
92-
try {
93-
indexInput.close();
94-
} catch (Exception e) {
95-
logger.trace("Exception while closing clone - {}", e.getMessage());
96-
}
97-
});
9890
try {
9991
luceneIndexInput.close();
10092
} catch (AlreadyClosedException e) {
10193
logger.trace("FullFileCachedIndexInput already closed");
10294
}
10395
luceneIndexInput = null;
104-
clones.clear();
10596
closed = true;
10697
}
10798
}
99+
100+
private static class IndexInputHolder implements Runnable {
101+
private final IndexInput indexInput;
102+
private final FileCache cache;
103+
private final boolean isClone;
104+
private final Path path;
105+
106+
IndexInputHolder(IndexInput indexInput, boolean isClone, FileCache cache, Path path) {
107+
this.indexInput = indexInput;
108+
this.isClone = isClone;
109+
this.cache = cache;
110+
this.path = path;
111+
}
112+
113+
@Override
114+
public void run() {
115+
try {
116+
indexInput.close();
117+
if (isClone) cache.decRef(path);
118+
} catch (IOException e) {
119+
logger.error("Failed to close IndexInput while clearing phantom reachable object");
120+
}
121+
}
122+
}
108123
}

server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,22 @@
88

99
package org.opensearch.index.store.remote.filecache;
1010

11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
1113
import org.apache.lucene.store.AlreadyClosedException;
12-
import org.apache.lucene.store.IndexInput;
14+
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
1315

1416
import java.io.IOException;
17+
import java.util.concurrent.TimeUnit;
1518

19+
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
1620
public class FullFileCachedIndexInputTests extends FileCachedIndexInputTests {
1721
private FullFileCachedIndexInput fullFileCachedIndexInput;
1822

1923
@Override
2024
protected void setupIndexInputAndAddToFileCache() {
2125
fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, filePath, underlyingIndexInput);
26+
// Putting in the file cache would increase refCount to 1
2227
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fullFileCachedIndexInput));
2328
}
2429

@@ -37,15 +42,11 @@ public void testClone() throws IOException {
3742
fileCache.decRef(filePath);
3843
assertFalse(isActiveAndTotalUsageSame());
3944

40-
// After cloning the refCount will increase again and activeUsage and totalUsage will be same again
41-
FileCachedIndexInput clonedFileCachedIndexInput1 = fullFileCachedIndexInput.clone();
42-
FileCachedIndexInput clonedFileCachedIndexInput2 = clonedFileCachedIndexInput1.clone();
43-
FileCachedIndexInput clonedFileCachedIndexInput3 = clonedFileCachedIndexInput2.clone();
44-
assertTrue(isActiveAndTotalUsageSame());
45+
// Since no clones have been done, refCount should be zero
46+
assertEquals((int) fileCache.getRef(filePath), 0);
4547

46-
// closing the first level clone will close all subsequent level clones and reduce ref count to 0
47-
clonedFileCachedIndexInput1.close();
48-
assertFalse(isActiveAndTotalUsageSame());
48+
createUnclosedClonesSlices(false);
49+
triggerGarbageCollectionAndAssertClonesClosed();
4950

5051
fileCache.prune();
5152

@@ -68,12 +69,38 @@ public void testSlice() throws IOException {
6869
fileCache.decRef(filePath);
6970
assertFalse(isActiveAndTotalUsageSame());
7071

71-
// Creating a slice will increase the refCount
72-
IndexInput slicedFileCachedIndexInput = fullFileCachedIndexInput.slice(SLICE_DESC, 1, 2);
73-
assertTrue(isActiveAndTotalUsageSame());
72+
// Since no clones have been done, refCount should be zero
73+
assertEquals((int) fileCache.getRef(filePath), 0);
74+
75+
createUnclosedClonesSlices(true);
76+
triggerGarbageCollectionAndAssertClonesClosed();
7477

75-
// Closing the parent will close all the slices as well decreasing the refCount to 0
76-
fullFileCachedIndexInput.close();
7778
assertFalse(isActiveAndTotalUsageSame());
7879
}
80+
81+
private void triggerGarbageCollectionAndAssertClonesClosed() {
82+
try {
83+
// Clones/Slices will be phantom reachable now, triggering gc should call close on them
84+
assertBusy(() -> {
85+
System.gc(); // Do not rely on GC to be deterministic, hence the polling
86+
assertEquals(
87+
"Expected refCount to drop to zero as all clones/slices should have closed",
88+
(int) fileCache.getRef(filePath),
89+
0
90+
);
91+
}, 5, TimeUnit.SECONDS);
92+
} catch (Exception e) {
93+
logger.error("Exception thrown while triggering gc", e);
94+
fail();
95+
}
96+
}
97+
98+
private void createUnclosedClonesSlices(boolean createSlice) throws IOException {
99+
int NUM_OF_CLONES = 3;
100+
for (int i = 0; i < NUM_OF_CLONES; i++) {
101+
if (createSlice) fullFileCachedIndexInput.slice("slice", 1, 2);
102+
else fullFileCachedIndexInput.clone();
103+
}
104+
assertEquals((int) fileCache.getRef(filePath), NUM_OF_CLONES);
105+
}
79106
}

0 commit comments

Comments
 (0)