Skip to content

Commit 454ee91

Browse files
jpountzuschindler
authored andcommitted
Add IndexInput#prefetch. (apache#13337)
This adds `IndexInput#prefetch`, which is an optional operation that instructs the `IndexInput` to start fetching bytes from storage in the background. These bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods. In the future, this will help Lucene move from a maximum of one I/O operation per search thread to one I/O operation per search thread per `IndexInput`. Typically, when running a query on two terms, the I/O into the terms dictionary is sequential today. In the future, we would ideally do these I/Os in parallel using this new API. Note that this will require API changes to some classes including `TermsEnum`. I settled on this API because it's simple and wouldn't require making all Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I worry would make the query evaluation logic too complicated. This change will require follow-ups to start using this new API when working with terms dictionaries, postings, etc. Relates apache#13179 Co-authored-by: Uwe Schindler <[email protected]>
1 parent 8800670 commit 454ee91

File tree

7 files changed

+171
-11
lines changed

7 files changed

+171
-11
lines changed

lucene/core/src/java/org/apache/lucene/store/IndexInput.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,15 @@ public String toString() {
185185
};
186186
}
187187
}
188+
189+
/**
190+
* Optional method: Give a hint to this input that some bytes will be read in the near future.
191+
* IndexInput implementations may take advantage of this hint to start fetching pages of data
192+
* immediately from storage.
193+
*
194+
* <p>The default implementation is a no-op.
195+
*
196+
* @param length the number of bytes to prefetch
197+
*/
198+
public void prefetch(long length) throws IOException {}
188199
}

lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.nio.ByteOrder;
2525
import java.util.Arrays;
2626
import java.util.Objects;
27+
import java.util.Optional;
2728
import org.apache.lucene.util.ArrayUtil;
2829
import org.apache.lucene.util.GroupVIntUtil;
2930

@@ -45,6 +46,7 @@ abstract class MemorySegmentIndexInput extends IndexInput
4546
ValueLayout.JAVA_LONG_UNALIGNED.withOrder(ByteOrder.LITTLE_ENDIAN);
4647
static final ValueLayout.OfFloat LAYOUT_LE_FLOAT =
4748
ValueLayout.JAVA_FLOAT_UNALIGNED.withOrder(ByteOrder.LITTLE_ENDIAN);
49+
private static final Optional<NativeAccess> NATIVE_ACCESS = NativeAccess.getImplementation();
4850

4951
final long length;
5052
final long chunkSizeMask;
@@ -323,6 +325,49 @@ public void seek(long pos) throws IOException {
323325
}
324326
}
325327

328+
@Override
329+
public void prefetch(long length) throws IOException {
330+
ensureOpen();
331+
332+
Objects.checkFromIndexSize(getFilePointer(), length, length());
333+
334+
if (NATIVE_ACCESS.isEmpty()) {
335+
return;
336+
}
337+
final NativeAccess nativeAccess = NATIVE_ACCESS.get();
338+
339+
// If at the boundary between two chunks, move to the next one.
340+
seek(getFilePointer());
341+
try {
342+
// Compute the intersection of the current segment and the region that should be prefetched.
343+
long offset = curPosition;
344+
if (offset + length > curSegment.byteSize()) {
345+
// Only prefetch bytes that are stored in the current segment. There may be bytes on the
346+
// next segment but this case is rare enough that we don't try to optimize it and keep
347+
// things simple instead.
348+
length = curSegment.byteSize() - curPosition;
349+
}
350+
// Now align offset with the page size, this is required for madvise.
351+
// Compute the offset of the current position in the OS's page.
352+
final long offsetInPage = (curSegment.address() + offset) % nativeAccess.getPageSize();
353+
offset -= offsetInPage;
354+
length += offsetInPage;
355+
if (offset < 0) {
356+
// The start of the page is outside of this segment, ignore.
357+
return;
358+
}
359+
360+
final MemorySegment prefetchSlice = curSegment.asSlice(offset, length);
361+
nativeAccess.madviseWillNeed(prefetchSlice);
362+
} catch (
363+
@SuppressWarnings("unused")
364+
IndexOutOfBoundsException e) {
365+
throw new EOFException("Read past EOF: " + this);
366+
} catch (NullPointerException | IllegalStateException e) {
367+
throw alreadyClosed(e);
368+
}
369+
}
370+
326371
@Override
327372
public byte readByte(long pos) throws IOException {
328373
try {

lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,11 @@ private final MemorySegment[] map(
143143
throw convertMapFailedIOException(ioe, resourceDescription, segSize);
144144
}
145145
// if preload apply it without madvise.
146-
// if chunk size is too small (2 MiB), disable madvise support (incorrect alignment)
146+
// skip madvise if the address of our segment is not page-aligned (small segments due to
147+
// internal FileChannel logic)
147148
if (preload) {
148149
segment.load();
149-
} else if (nativeAccess.isPresent() && chunkSizePower >= 21) {
150+
} else if (nativeAccess.filter(na -> segment.address() % na.getPageSize() == 0).isPresent()) {
150151
nativeAccess.get().madvise(segment, readAdvice);
151152
}
152153
segments[segNr] = segment;

lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ abstract class NativeAccess {
2727
/** Invoke the {@code madvise} call for the given {@link MemorySegment}. */
2828
public abstract void madvise(MemorySegment segment, ReadAdvice readAdvice) throws IOException;
2929

30+
/**
31+
* Invoke the {@code madvise} call for the given {@link MemorySegment} with {@code MADV_WILLNEED}.
32+
*/
33+
public abstract void madviseWillNeed(MemorySegment segment) throws IOException;
34+
35+
/** Returns native page size. */
36+
public abstract int getPageSize();
37+
3038
/**
3139
* Return the NativeAccess instance for this platform. At moment we only support Linux and MacOS
3240
*/

lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ final class PosixNativeAccess extends NativeAccess {
5353
public static final int POSIX_MADV_DONTNEED = 4;
5454

5555
private static final MethodHandle MH$posix_madvise;
56+
private static final int PAGE_SIZE;
5657

5758
private static final Optional<NativeAccess> INSTANCE;
5859

@@ -63,10 +64,14 @@ static Optional<NativeAccess> getInstance() {
6364
}
6465

6566
static {
67+
final Linker linker = Linker.nativeLinker();
68+
final SymbolLookup stdlib = linker.defaultLookup();
6669
MethodHandle adviseHandle = null;
70+
int pagesize = -1;
6771
PosixNativeAccess instance = null;
6872
try {
69-
adviseHandle = lookupMadvise();
73+
adviseHandle = lookupMadvise(linker, stdlib);
74+
pagesize = (int) lookupGetPageSize(linker, stdlib).invokeExact();
7075
instance = new PosixNativeAccess();
7176
} catch (UnsupportedOperationException uoe) {
7277
LOG.warning(uoe.getMessage());
@@ -80,14 +85,17 @@ static Optional<NativeAccess> getInstance() {
8085
+ "pass the following on command line: --enable-native-access=%s",
8186
Optional.ofNullable(PosixNativeAccess.class.getModule().getName())
8287
.orElse("ALL-UNNAMED")));
88+
} catch (RuntimeException | Error e) {
89+
throw e;
90+
} catch (Throwable e) {
91+
throw new AssertionError(e);
8392
}
8493
MH$posix_madvise = adviseHandle;
94+
PAGE_SIZE = pagesize;
8595
INSTANCE = Optional.ofNullable(instance);
8696
}
8797

88-
private static MethodHandle lookupMadvise() {
89-
final Linker linker = Linker.nativeLinker();
90-
final SymbolLookup stdlib = linker.defaultLookup();
98+
private static MethodHandle lookupMadvise(Linker linker, SymbolLookup stdlib) {
9199
return findFunction(
92100
linker,
93101
stdlib,
@@ -99,6 +107,10 @@ private static MethodHandle lookupMadvise() {
99107
ValueLayout.JAVA_INT));
100108
}
101109

110+
private static MethodHandle lookupGetPageSize(Linker linker, SymbolLookup stdlib) {
111+
return findFunction(linker, stdlib, "getpagesize", FunctionDescriptor.of(ValueLayout.JAVA_INT));
112+
}
113+
102114
private static MethodHandle findFunction(
103115
Linker linker, SymbolLookup lookup, String name, FunctionDescriptor desc) {
104116
final MemorySegment symbol =
@@ -113,17 +125,26 @@ private static MethodHandle findFunction(
113125

114126
@Override
115127
public void madvise(MemorySegment segment, ReadAdvice readAdvice) throws IOException {
116-
// Note: madvise is bypassed if the segment should be preloaded via MemorySegment#load.
117-
if (segment.byteSize() == 0L) {
118-
return; // empty segments should be excluded, because they may have no address at all
119-
}
120128
final Integer advice = mapReadAdvice(readAdvice);
121129
if (advice == null) {
122130
return; // do nothing
123131
}
132+
madvise(segment, advice);
133+
}
134+
135+
@Override
136+
public void madviseWillNeed(MemorySegment segment) throws IOException {
137+
madvise(segment, POSIX_MADV_WILLNEED);
138+
}
139+
140+
private void madvise(MemorySegment segment, int advice) throws IOException {
141+
// Note: madvise is bypassed if the segment should be preloaded via MemorySegment#load.
142+
if (segment.byteSize() == 0L) {
143+
return; // empty segments should be excluded, because they may have no address at all
144+
}
124145
final int ret;
125146
try {
126-
ret = (int) MH$posix_madvise.invokeExact(segment, segment.byteSize(), advice.intValue());
147+
ret = (int) MH$posix_madvise.invokeExact(segment, segment.byteSize(), advice);
127148
} catch (Throwable th) {
128149
throw new AssertionError(th);
129150
}
@@ -148,4 +169,9 @@ private Integer mapReadAdvice(ReadAdvice readAdvice) {
148169
}
149170
return null;
150171
}
172+
173+
@Override
174+
public int getPageSize() {
175+
return PAGE_SIZE;
176+
}
151177
}

lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.apache.lucene.tests.mockfile.ExtrasFS;
5858
import org.apache.lucene.tests.util.LuceneTestCase;
5959
import org.apache.lucene.tests.util.TestUtil;
60+
import org.apache.lucene.util.ArrayUtil;
61+
import org.apache.lucene.util.BitUtil;
6062
import org.apache.lucene.util.IOUtils;
6163
import org.apache.lucene.util.packed.PackedInts;
6264
import org.junit.Assert;
@@ -1527,4 +1529,65 @@ protected void doTestGroupVInt(
15271529
dir.deleteFile("group-varint");
15281530
dir.deleteFile("vint");
15291531
}
1532+
1533+
public void testPrefetch() throws IOException {
1534+
doTestPrefetch(0);
1535+
}
1536+
1537+
public void testPrefetchOnSlice() throws IOException {
1538+
doTestPrefetch(TestUtil.nextInt(random(), 1, 1024));
1539+
}
1540+
1541+
private void doTestPrefetch(int startOffset) throws IOException {
1542+
try (Directory dir = getDirectory(createTempDir())) {
1543+
final int totalLength = startOffset + TestUtil.nextInt(random(), 16384, 65536);
1544+
byte[] arr = new byte[totalLength];
1545+
random().nextBytes(arr);
1546+
try (IndexOutput out = dir.createOutput("temp.bin", IOContext.DEFAULT)) {
1547+
out.writeBytes(arr, arr.length);
1548+
}
1549+
byte[] temp = new byte[2048];
1550+
1551+
try (IndexInput orig = dir.openInput("temp.bin", IOContext.DEFAULT)) {
1552+
IndexInput in;
1553+
if (startOffset == 0) {
1554+
in = orig.clone();
1555+
} else {
1556+
in = orig.slice("slice", startOffset, totalLength - startOffset);
1557+
}
1558+
for (int i = 0; i < 10_000; ++i) {
1559+
final int startPointer = (int) in.getFilePointer();
1560+
assertTrue(startPointer < in.length());
1561+
if (random().nextBoolean()) {
1562+
final long prefetchLength = TestUtil.nextLong(random(), 1, in.length() - startPointer);
1563+
in.prefetch(prefetchLength);
1564+
}
1565+
assertEquals(startPointer, in.getFilePointer());
1566+
switch (random().nextInt(100)) {
1567+
case 0:
1568+
assertEquals(arr[startOffset + startPointer], in.readByte());
1569+
break;
1570+
case 1:
1571+
if (in.length() - startPointer >= Long.BYTES) {
1572+
assertEquals(
1573+
(long) BitUtil.VH_LE_LONG.get(arr, startOffset + startPointer), in.readLong());
1574+
}
1575+
break;
1576+
default:
1577+
final int readLength =
1578+
TestUtil.nextInt(
1579+
random(), 1, (int) Math.min(temp.length, in.length() - startPointer));
1580+
in.readBytes(temp, 0, readLength);
1581+
assertArrayEquals(
1582+
ArrayUtil.copyOfSubArray(
1583+
arr, startOffset + startPointer, startOffset + startPointer + readLength),
1584+
ArrayUtil.copyOfSubArray(temp, 0, readLength));
1585+
}
1586+
if (in.getFilePointer() == in.length() || random().nextBoolean()) {
1587+
in.seek(TestUtil.nextInt(random(), 0, (int) in.length() - 1));
1588+
}
1589+
}
1590+
}
1591+
}
1592+
}
15301593
}

lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ public void seek(long pos) throws IOException {
153153
in.seek(pos);
154154
}
155155

156+
@Override
157+
public void prefetch(long length) throws IOException {
158+
ensureOpen();
159+
in.prefetch(length);
160+
}
161+
156162
@Override
157163
public long length() {
158164
ensureOpen();

0 commit comments

Comments
 (0)