From 63104bd70fe6260f3af759ccf642d60dd7be4739 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 31 Jan 2023 15:39:50 +0100 Subject: [PATCH 01/15] Add IndexInput#prefetch. This adds `IndexInput#prefetch`, which is an optional operation that instructs the `IndexInput` to start fetching bytes from storage in the background. These bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods. In the future, this will help Lucene move from a maximum of one I/O operation per search thread to one I/O operation per search thread per `IndexInput`. Typically, when running a query on two terms, the I/O into the terms dictionary is sequential today. In the future, we would ideally do these I/Os in parallel using this new API. Note that this will require API changes to some classes including `TermsEnum`. I settled on this API because it's simple and wouldn't require making all Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I worry would make the query evaluation logic too complicated. Currently, only `NIOFSDirectory` implements this new API. I played with `MMapDirectory` as well and found an approach that worked better in the benchmark I've been playing with, but I'm not sure it makes sense to implement this API on this directory as it either requires adding an explicit buffer on `MMapDirectory`, or forcing data to be loaded into the page cache even though the OS may have decided that it's not a good idea due to too few cache hits. This change will require follow-ups to start using this new API when working with terms dictionaries, postings, etc. Relates #13179 --- .../lucene/store/BufferedIndexInput.java | 134 +++++++++++++++++- .../org/apache/lucene/store/IndexInput.java | 9 ++ .../apache/lucene/store/NIOFSDirectory.java | 56 +++++++- .../tests/store/BaseDirectoryTestCase.java | 61 ++++++++ 4 files changed, 248 insertions(+), 12 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java index 13151692bc06..774571f161cb 100644 --- a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java @@ -20,7 +20,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.lucene.util.GroupVIntUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.ThreadInterruptedException; /** Base implementation class for buffered {@link IndexInput}. */ public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput { @@ -45,7 +50,15 @@ public abstract class BufferedIndexInput extends IndexInput implements RandomAcc private final int bufferSize; - private ByteBuffer buffer = EMPTY_BYTEBUFFER; + // Despite the two buffer references below, BufferedIndexInput only tracks a single buffer. Either + // prefetch() has been called last and `buffer` is set to EMPTY_BYTEBUFFER while `prefetchBuffer` + // tracks the actual buffer, or prefetchBuffer is set to EMPTY_BYTEBUFFER and `buffer` tracks the + // actual buffer. This approach helps only check if `buffer.hasRemaining()` to know whether to + // trigger a refill(), and refill() will check if there is a pending prefetch() before actually + // reading bytes. + private ByteBuffer buffer = EMPTY_BYTEBUFFER; // initialized lazily + private ByteBuffer prefetchBuffer = EMPTY_BYTEBUFFER; + private Future pendingPrefetch; // only non-null if there is a pending prefetch() private long bufferStart = 0; // position in file of buffer @@ -90,6 +103,13 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException { @Override public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException { + // We need to finish pending prefetch operations to use data from the prefetch() instead of + // reading directly bytes into the user's buffer. + // Other readXXX methods don't need to do this since they always call refill() when they don't + // have enough data, which in-turn calls finishPendingPrefetch(). But readBytes() may read bytes + // into the user's buffer without refilling the internal buffer. + finishPendingPrefetch(); + int available = buffer.remaining(); if (len <= available) { // the buffer contains enough data to satisfy this request @@ -297,7 +317,24 @@ public final long readLong(long pos) throws IOException { return buffer.getLong((int) index); } + private void maybeInitBuffer() throws IOException { + assert pendingPrefetch == null; + assert prefetchBuffer == EMPTY_BYTEBUFFER; + + if (buffer == EMPTY_BYTEBUFFER) { + buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN).limit(0); + seekInternal(bufferStart); + } + } + private void refill() throws IOException { + assert buffer.hasRemaining() == false; + + // Wait for pending prefetching to finish. + if (finishPendingPrefetch()) { + return; + } + long start = bufferStart + buffer.position(); long end = start + bufferSize; if (end > length()) // don't read past EOF @@ -305,11 +342,8 @@ private void refill() throws IOException { int newLength = (int) (end - start); if (newLength <= 0) throw new EOFException("read past EOF: " + this); - if (buffer == EMPTY_BYTEBUFFER) { - buffer = - ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); // allocate buffer lazily - seekInternal(bufferStart); - } + // allocate buffer lazily + maybeInitBuffer(); buffer.position(0); buffer.limit(newLength); bufferStart = start; @@ -321,6 +355,77 @@ private void refill() throws IOException { buffer.flip(); } + private boolean finishPendingPrefetch() throws IOException { + if (pendingPrefetch != null) { + final int i; + try { + i = pendingPrefetch.get(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch (ExecutionException e) { + throw IOUtils.rethrowAlways(e.getCause()); + } finally { + // Always clear pendingPrefetch and swap buffers, regardless of success/failure so that + // future read() operations work on the correct buffer. + pendingPrefetch = null; + prefetchBuffer.flip(); + buffer = prefetchBuffer; + prefetchBuffer = EMPTY_BYTEBUFFER; + } + + if (i < 0) { + // be defensive here, even though we checked before hand, something could have changed + throw new EOFException( + "read past EOF: " + this + " buffer: " + prefetchBuffer + " length: " + length()); + } + + return buffer.hasRemaining(); + } + return false; + } + + @Override + public void prefetch() throws IOException { + final long pos = getFilePointer(); + final long length = length(); + if (pos >= length) { + throw new EOFException("read past EOF: " + this); + } + + // Make sure to never have two concurrent prefetch() calls trying to push bytes to the same + // buffer. + if (pendingPrefetch != null) { + // prefetch() got called twice without reading bytes in-between? + // nocommit should we fail instead? + return; + } + + if (buffer.hasRemaining()) { + // the seek() that preceded prefetch() moved within the buffer, so we still have valid bytes + // TODO: should we still prefetch more bytes in this case if there are very few bytes left? + return; + } else { + // The buffer may not have been initialized yet, e.g. if prefetch() was called immediately + // after calling clone() then seek(). + maybeInitBuffer(); + } + + assert buffer.capacity() > 0; + assert prefetchBuffer == EMPTY_BYTEBUFFER; + + bufferStart = pos; + final ByteBuffer prefetchBuffer = buffer; + prefetchBuffer.position(0); + final int limit = (int) Math.min(length - bufferStart, prefetchBuffer.capacity()); + assert limit > 0; + prefetchBuffer.limit(limit); + // Note: The read operation may read fewer bytes than requested. This is ok. + pendingPrefetch = readInternalAsync(prefetchBuffer); + // Only swap buffers if we successfully scheduled an async read. + this.prefetchBuffer = prefetchBuffer; + this.buffer = EMPTY_BYTEBUFFER; // trigger refill on next read() + } + /** * Expert: implements buffer refill. Reads bytes from the current position in the input. * @@ -328,6 +433,16 @@ private void refill() throws IOException { */ protected abstract void readInternal(ByteBuffer b) throws IOException; + /** + * Expert: implements asynchronous buffer refill. Unlike {@link #readInternal}, this may read less + * than {@link ByteBuffer#remaining()} bytes. + */ + protected Future readInternalAsync(ByteBuffer b) throws IOException { + CompletableFuture res = new CompletableFuture<>(); + res.complete(0); + return res; + } + @Override public final long getFilePointer() { return bufferStart + buffer.position(); @@ -335,11 +450,16 @@ public final long getFilePointer() { @Override public final void seek(long pos) throws IOException { + // If there is a pending prefetch(), wait for it to finish before moving the file pointer. + finishPendingPrefetch(); + assert prefetchBuffer == EMPTY_BYTEBUFFER; + if (pos >= bufferStart && pos < (bufferStart + buffer.limit())) buffer.position((int) (pos - bufferStart)); // seek within buffer else { bufferStart = pos; buffer.limit(0); // trigger refill() on read + prefetchBuffer.limit(0); seekInternal(pos); } } @@ -357,6 +477,8 @@ public BufferedIndexInput clone() { BufferedIndexInput clone = (BufferedIndexInput) super.clone(); clone.buffer = EMPTY_BYTEBUFFER; + clone.prefetchBuffer = EMPTY_BYTEBUFFER; + clone.pendingPrefetch = null; clone.bufferStart = getFilePointer(); return clone; diff --git a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java index 3f703bc54b26..1c0780a2127e 100644 --- a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java @@ -191,4 +191,13 @@ public String toString() { }; } } + + /** + * Optional method: Give a hint to this input that some bytes will be read in the near future. + * IndexInput implementations may take advantage of this hint to start fetching a page of data + * immediately from storage. + * + *

The default implementation is a no-op. + */ + public void prefetch() throws IOException {} } diff --git a/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java b/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java index 246f48082cfe..76efbc7b112f 100644 --- a/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java +++ b/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java @@ -19,10 +19,16 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.ClosedChannelException; // javadoc @link import java.nio.channels.FileChannel; +import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; // javadoc import org.apache.lucene.util.IOUtils; @@ -47,6 +53,8 @@ */ public class NIOFSDirectory extends FSDirectory { + private final ExecutorService executorService; + /** * Create a new NIOFSDirectory for the named location. The directory is created at the named * location if it does not yet exist. @@ -57,6 +65,7 @@ public class NIOFSDirectory extends FSDirectory { */ public NIOFSDirectory(Path path, LockFactory lockFactory) throws IOException { super(path, lockFactory); + this.executorService = Executors.newVirtualThreadPerTaskExecutor(); } /** @@ -70,21 +79,38 @@ public NIOFSDirectory(Path path) throws IOException { this(path, FSLockFactory.getDefault()); } + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + executorService.shutdown(); + } + } + @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); ensureCanRead(name); Path path = getDirectory().resolve(name); - FileChannel fc = FileChannel.open(path, StandardOpenOption.READ); + Set openOptions = Collections.singleton(StandardOpenOption.READ); + // nocommit: does it really make sense to open both a sync and an async channel on the same + // file? or should we do sync reads via the async channel (but this seems to come with + // noticeable overhead when data fits in the cache)? or do the async I/O naively using the sync + // interface? + FileChannel fc = null; + AsynchronousFileChannel afc = null; boolean success = false; try { + fc = FileChannel.open(path, openOptions); + afc = AsynchronousFileChannel.open(path, openOptions, executorService); final NIOFSIndexInput indexInput = - new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context); + new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, afc, context); success = true; return indexInput; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(fc); + IOUtils.closeWhileHandlingException(fc, afc); } } } @@ -97,6 +123,9 @@ static final class NIOFSIndexInput extends BufferedIndexInput { /** the file channel we will read from */ protected final FileChannel channel; + /** the asynchronous channel to use for prefetching */ + protected final AsynchronousFileChannel asynchronousChannel; + /** is this instance a clone and hence does not own the file to close it */ boolean isClone = false; @@ -106,18 +135,26 @@ static final class NIOFSIndexInput extends BufferedIndexInput { /** end offset (start+length) */ protected final long end; - public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context) + public NIOFSIndexInput( + String resourceDesc, FileChannel fc, AsynchronousFileChannel afc, IOContext context) throws IOException { super(resourceDesc, context); this.channel = fc; + this.asynchronousChannel = afc; this.off = 0L; this.end = fc.size(); } public NIOFSIndexInput( - String resourceDesc, FileChannel fc, long off, long length, int bufferSize) { + String resourceDesc, + FileChannel fc, + AsynchronousFileChannel afc, + long off, + long length, + int bufferSize) { super(resourceDesc, bufferSize); this.channel = fc; + this.asynchronousChannel = afc; this.off = off; this.end = off + length; this.isClone = true; @@ -126,7 +163,7 @@ public NIOFSIndexInput( @Override public void close() throws IOException { if (!isClone) { - channel.close(); + IOUtils.close(channel, asynchronousChannel); } } @@ -155,6 +192,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw return new NIOFSIndexInput( getFullSliceDescription(sliceDescription), channel, + asynchronousChannel, off + offset, length, getBufferSize()); @@ -204,6 +242,12 @@ protected void readInternal(ByteBuffer b) throws IOException { } } + @Override + protected Future readInternalAsync(ByteBuffer b) throws IOException { + long pos = getFilePointer() + off; + return asynchronousChannel.read(b, pos); + } + @Override protected void seekInternal(long pos) throws IOException { if (pos > length()) { diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java index 24d8db0b02f6..f8b65281a0e0 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java @@ -58,6 +58,7 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BitUtil; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.packed.PackedInts; import org.junit.Assert; @@ -1512,4 +1513,64 @@ protected void doTestGroupVInt( dir.deleteFile("group-varint"); dir.deleteFile("vint"); } + + public void testPrefetch() throws IOException { + doTestPrefetch(0); + } + + public void testPrefetchOnSlice() throws IOException { + doTestPrefetch(TestUtil.nextInt(random(), 1, 1024)); + } + + private void doTestPrefetch(int startOffset) throws IOException { + try (Directory dir = getDirectory(createTempDir())) { + final int totalLength = startOffset + TestUtil.nextInt(random(), 16384, 65536); + byte[] arr = new byte[totalLength]; + random().nextBytes(arr); + try (IndexOutput out = dir.createOutput("temp.bin", IOContext.DEFAULT)) { + out.writeBytes(arr, arr.length); + } + byte[] temp = new byte[2048]; + + try (IndexInput orig = dir.openInput("temp.bin", IOContext.DEFAULT)) { + IndexInput in; + if (startOffset == 0) { + in = orig.clone(); + } else { + in = orig.slice("slice", startOffset, totalLength - startOffset); + } + for (int i = 0; i < 10_000; ++i) { + final int startPointer = (int) in.getFilePointer(); + assertTrue(startPointer < in.length()); + if (random().nextBoolean()) { + in.prefetch(); + } + assertEquals(startPointer, in.getFilePointer()); + switch (random().nextInt(100)) { + case 0: + assertEquals(arr[startOffset + startPointer], in.readByte()); + break; + case 1: + if (in.length() - startPointer >= Long.BYTES) { + assertEquals( + (long) BitUtil.VH_LE_LONG.get(arr, startOffset + startPointer), in.readLong()); + } + break; + default: + final int readLength = + TestUtil.nextInt( + random(), 1, (int) Math.min(temp.length, in.length() - startPointer)); + in.readBytes(temp, 0, readLength); + assertArrayEquals( + ArrayUtil.copyOfSubArray( + arr, startOffset + startPointer, startOffset + startPointer + readLength), + ArrayUtil.copyOfSubArray(temp, 0, readLength)); + } + if (in.getFilePointer() == in.length() || random().nextBoolean()) { + in.seek(TestUtil.nextInt(random(), 0, (int) in.length() - 1)); + } + } + } + } + } } From 3c60086d7254a7868774c59dd696b4b6822c5e8a Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Mon, 6 May 2024 10:14:22 +0200 Subject: [PATCH 02/15] Add support for prefetch() on `MMapDirectory` via `ReadAdvice#WILL_NEED`. --- .../org/apache/lucene/store/IOContext.java | 26 +++++-- .../org/apache/lucene/store/ReadAdvice.java | 7 +- .../lucene/store/MemorySegmentIndexInput.java | 72 ++++++++++++++++--- .../MemorySegmentIndexInputProvider.java | 9 ++- .../lucene/store/PosixNativeAccess.java | 1 + 5 files changed, 97 insertions(+), 18 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/IOContext.java b/lucene/core/src/java/org/apache/lucene/store/IOContext.java index b2d82af20f80..f618f41afca4 100644 --- a/lucene/core/src/java/org/apache/lucene/store/IOContext.java +++ b/lucene/core/src/java/org/apache/lucene/store/IOContext.java @@ -16,7 +16,8 @@ */ package org.apache.lucene.store; -import java.util.Arrays; +import java.util.EnumMap; +import java.util.Map; import java.util.Objects; import org.apache.lucene.util.Constants; @@ -73,6 +74,10 @@ public enum Context { throw new IllegalArgumentException( "The FLUSH and MERGE contexts must use the SEQUENTIAL read access advice"); } + if (readAdvice == ReadAdvice.WILL_NEED) { + throw new IllegalArgumentException( + "WILL_NEED is not a valid ReadAdvice for IOContext creation"); + } } /** Creates a default {@link IOContext} for reading/writing with the given {@link ReadAdvice} */ @@ -91,8 +96,17 @@ public IOContext(MergeInfo mergeInfo) { this(Context.MERGE, mergeInfo, null, ReadAdvice.SEQUENTIAL); } - private static final IOContext[] READADVICE_TO_IOCONTEXT = - Arrays.stream(ReadAdvice.values()).map(IOContext::new).toArray(IOContext[]::new); + private static final Map READADVICE_TO_IOCONTEXT = + new EnumMap<>(ReadAdvice.class); + + static { + for (ReadAdvice advice : ReadAdvice.values()) { + if (advice == ReadAdvice.WILL_NEED) { + continue; + } + READADVICE_TO_IOCONTEXT.put(advice, new IOContext(advice)); + } + } /** * Return an updated {@link IOContext} that has the provided {@link ReadAdvice} if the {@link @@ -102,8 +116,12 @@ public IOContext(MergeInfo mergeInfo) { * ReadAdvice}s. */ public IOContext withReadAdvice(ReadAdvice advice) { + if (readAdvice == ReadAdvice.WILL_NEED) { + throw new IllegalArgumentException( + "WILL_NEED is not a valid ReadAdvice for IOContext#withReadAdvice"); + } if (context == Context.DEFAULT) { - return READADVICE_TO_IOCONTEXT[advice.ordinal()]; + return Objects.requireNonNull(READADVICE_TO_IOCONTEXT.get(advice)); } else { return this; } diff --git a/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java b/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java index 5c706e2abf50..4dda64988dfc 100644 --- a/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java +++ b/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java @@ -40,5 +40,10 @@ public enum ReadAdvice { * loads the content of the file into the page cache at open time. This should only be used on * very small files that can be expected to fit in RAM with very high confidence. */ - RANDOM_PRELOAD + RANDOM_PRELOAD, + /** + * Data will be needed soon. {@link Directory} implementations may start fetching bytes from + * storage in the background. + */ + WILL_NEED; } diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index 7d1e2572fdbc..60949820cf26 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -24,6 +24,7 @@ import java.nio.ByteOrder; import java.util.Arrays; import java.util.Objects; +import java.util.Optional; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.GroupVIntUtil; @@ -50,6 +51,7 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces final int chunkSizePower; final Arena arena; final MemorySegment[] segments; + final Optional nativeAccess; int curSegmentIndex = -1; MemorySegment @@ -61,12 +63,15 @@ public static MemorySegmentIndexInput newInstance( Arena arena, MemorySegment[] segments, long length, - int chunkSizePower) { + int chunkSizePower, + Optional nativeAccess) { assert Arrays.stream(segments).map(MemorySegment::scope).allMatch(arena.scope()::equals); if (segments.length == 1) { - return new SingleSegmentImpl(resourceDescription, arena, segments[0], length, chunkSizePower); + return new SingleSegmentImpl( + resourceDescription, arena, segments[0], length, chunkSizePower, nativeAccess); } else { - return new MultiSegmentImpl(resourceDescription, arena, segments, 0, length, chunkSizePower); + return new MultiSegmentImpl( + resourceDescription, arena, segments, 0, length, chunkSizePower, nativeAccess); } } @@ -75,7 +80,8 @@ private MemorySegmentIndexInput( Arena arena, MemorySegment[] segments, long length, - int chunkSizePower) { + int chunkSizePower, + Optional nativeAccess) { super(resourceDescription); this.arena = arena; this.segments = segments; @@ -83,6 +89,7 @@ private MemorySegmentIndexInput( this.chunkSizePower = chunkSizePower; this.chunkSizeMask = (1L << chunkSizePower) - 1L; this.curSegment = segments[0]; + this.nativeAccess = nativeAccess; } void ensureOpen() { @@ -310,6 +317,41 @@ public void seek(long pos) throws IOException { } } + @Override + public void prefetch() throws IOException { + ensureOpen(); + + if (nativeAccess.isEmpty()) { + return; + } + final NativeAccess nativeAccess = this.nativeAccess.get(); + + // If at the boundary between two slices, move to the next one. + seek(getFilePointer()); + try { + // nocommit: how to retrieve the page size? and disable prefetching if the page size is not + // 4kB? + final long offsetInPage = (curSegment.address() + curPosition) & 0x0FFF; + if (offsetInPage > curPosition) { + // The start of the page is outside of this segment. + return; + } + MemorySegment currentPageSlice = + curSegment.asSlice(curPosition - offsetInPage, offsetInPage + 1); + // Tell the OS we'll need this page. nocommit: do we need to restore the original read advice? + // Source code for madvise.c suggests we don't since WILL_NEED only triggers read-ahead + // without updating the state of the virtual mapping? + // https://github.com/torvalds/linux/blob/master/mm/madvise.c + nativeAccess.madvise(currentPageSlice, ReadAdvice.WILL_NEED); + } catch ( + @SuppressWarnings("unused") + IndexOutOfBoundsException e) { + throw new EOFException("Read past EOF: " + this); + } catch (NullPointerException | IllegalStateException e) { + throw alreadyClosed(e); + } + } + @Override public byte readByte(long pos) throws IOException { try { @@ -491,7 +533,8 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long le null, // clones don't have an Arena, as they can't close) slices[0].asSlice(offset, length), length, - chunkSizePower); + chunkSizePower, + nativeAccess); } else { return new MultiSegmentImpl( newResourceDescription, @@ -499,7 +542,8 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long le slices, offset, length, - chunkSizePower); + chunkSizePower, + nativeAccess); } } @@ -539,8 +583,15 @@ static final class SingleSegmentImpl extends MemorySegmentIndexInput { Arena arena, MemorySegment segment, long length, - int chunkSizePower) { - super(resourceDescription, arena, new MemorySegment[] {segment}, length, chunkSizePower); + int chunkSizePower, + Optional nativeAccess) { + super( + resourceDescription, + arena, + new MemorySegment[] {segment}, + length, + chunkSizePower, + nativeAccess); this.curSegmentIndex = 0; } @@ -626,8 +677,9 @@ static final class MultiSegmentImpl extends MemorySegmentIndexInput { MemorySegment[] segments, long offset, long length, - int chunkSizePower) { - super(resourceDescription, arena, segments, length, chunkSizePower); + int chunkSizePower, + Optional nativeAccess) { + super(resourceDescription, arena, segments, length, chunkSizePower, nativeAccess); this.offset = offset; try { seek(0L); diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java index 887956f306c2..61e47497eaa8 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java @@ -59,9 +59,11 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo context.readAdvice(), chunkSizePower, preload, - fileSize), + fileSize, + nativeAccess), fileSize, - chunkSizePower); + chunkSizePower, + nativeAccess); success = true; return in; } finally { @@ -88,7 +90,8 @@ private final MemorySegment[] map( ReadAdvice readAdvice, int chunkSizePower, boolean preload, - long length) + long length, + Optional nativeAccess) throws IOException { if ((length >>> chunkSizePower) >= Integer.MAX_VALUE) throw new IllegalArgumentException("File too big for chunk size: " + resourceDescription); diff --git a/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java b/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java index b74bd7fe3652..760a5c7aea0f 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java +++ b/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java @@ -141,6 +141,7 @@ private Integer mapReadAdvice(ReadAdvice readAdvice) { case RANDOM -> POSIX_MADV_RANDOM; case SEQUENTIAL -> POSIX_MADV_SEQUENTIAL; case RANDOM_PRELOAD -> null; + case WILL_NEED -> POSIX_MADV_WILLNEED; }; } } From b5665e049979d8db7cfbc7413a77755da24e1369 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Mon, 6 May 2024 22:52:50 +0200 Subject: [PATCH 03/15] Implement background read as blocking read in a virtual thread. --- .../lucene/store/BufferedIndexInput.java | 38 +++++-------- .../apache/lucene/store/NIOFSDirectory.java | 56 ++----------------- 2 files changed, 20 insertions(+), 74 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java index 774571f161cb..87bcb9657ea7 100644 --- a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java @@ -20,9 +20,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import org.apache.lucene.util.GroupVIntUtil; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; @@ -58,7 +57,7 @@ public abstract class BufferedIndexInput extends IndexInput implements RandomAcc // reading bytes. private ByteBuffer buffer = EMPTY_BYTEBUFFER; // initialized lazily private ByteBuffer prefetchBuffer = EMPTY_BYTEBUFFER; - private Future pendingPrefetch; // only non-null if there is a pending prefetch() + private FutureTask pendingPrefetch; // only non-null if there is a pending prefetch() private long bufferStart = 0; // position in file of buffer @@ -357,9 +356,8 @@ private void refill() throws IOException { private boolean finishPendingPrefetch() throws IOException { if (pendingPrefetch != null) { - final int i; try { - i = pendingPrefetch.get(); + pendingPrefetch.get(); } catch (InterruptedException e) { throw new ThreadInterruptedException(e); } catch (ExecutionException e) { @@ -373,12 +371,6 @@ private boolean finishPendingPrefetch() throws IOException { prefetchBuffer = EMPTY_BYTEBUFFER; } - if (i < 0) { - // be defensive here, even though we checked before hand, something could have changed - throw new EOFException( - "read past EOF: " + this + " buffer: " + prefetchBuffer + " length: " + length()); - } - return buffer.hasRemaining(); } return false; @@ -419,9 +411,17 @@ public void prefetch() throws IOException { final int limit = (int) Math.min(length - bufferStart, prefetchBuffer.capacity()); assert limit > 0; prefetchBuffer.limit(limit); - // Note: The read operation may read fewer bytes than requested. This is ok. - pendingPrefetch = readInternalAsync(prefetchBuffer); - // Only swap buffers if we successfully scheduled an async read. + + FutureTask pendingPrefetch = + new FutureTask<>( + () -> { + readInternal(prefetchBuffer); + return null; + }); + Thread.startVirtualThread(pendingPrefetch::run); + + // We could schedule a background read successfully, now update state + this.pendingPrefetch = pendingPrefetch; this.prefetchBuffer = prefetchBuffer; this.buffer = EMPTY_BYTEBUFFER; // trigger refill on next read() } @@ -433,16 +433,6 @@ public void prefetch() throws IOException { */ protected abstract void readInternal(ByteBuffer b) throws IOException; - /** - * Expert: implements asynchronous buffer refill. Unlike {@link #readInternal}, this may read less - * than {@link ByteBuffer#remaining()} bytes. - */ - protected Future readInternalAsync(ByteBuffer b) throws IOException { - CompletableFuture res = new CompletableFuture<>(); - res.complete(0); - return res; - } - @Override public final long getFilePointer() { return bufferStart + buffer.position(); diff --git a/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java b/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java index 76efbc7b112f..246f48082cfe 100644 --- a/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java +++ b/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java @@ -19,16 +19,10 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.ClosedChannelException; // javadoc @link import java.nio.channels.FileChannel; -import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; // javadoc import org.apache.lucene.util.IOUtils; @@ -53,8 +47,6 @@ */ public class NIOFSDirectory extends FSDirectory { - private final ExecutorService executorService; - /** * Create a new NIOFSDirectory for the named location. The directory is created at the named * location if it does not yet exist. @@ -65,7 +57,6 @@ public class NIOFSDirectory extends FSDirectory { */ public NIOFSDirectory(Path path, LockFactory lockFactory) throws IOException { super(path, lockFactory); - this.executorService = Executors.newVirtualThreadPerTaskExecutor(); } /** @@ -79,38 +70,21 @@ public NIOFSDirectory(Path path) throws IOException { this(path, FSLockFactory.getDefault()); } - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - executorService.shutdown(); - } - } - @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); ensureCanRead(name); Path path = getDirectory().resolve(name); - Set openOptions = Collections.singleton(StandardOpenOption.READ); - // nocommit: does it really make sense to open both a sync and an async channel on the same - // file? or should we do sync reads via the async channel (but this seems to come with - // noticeable overhead when data fits in the cache)? or do the async I/O naively using the sync - // interface? - FileChannel fc = null; - AsynchronousFileChannel afc = null; + FileChannel fc = FileChannel.open(path, StandardOpenOption.READ); boolean success = false; try { - fc = FileChannel.open(path, openOptions); - afc = AsynchronousFileChannel.open(path, openOptions, executorService); final NIOFSIndexInput indexInput = - new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, afc, context); + new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context); success = true; return indexInput; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(fc, afc); + IOUtils.closeWhileHandlingException(fc); } } } @@ -123,9 +97,6 @@ static final class NIOFSIndexInput extends BufferedIndexInput { /** the file channel we will read from */ protected final FileChannel channel; - /** the asynchronous channel to use for prefetching */ - protected final AsynchronousFileChannel asynchronousChannel; - /** is this instance a clone and hence does not own the file to close it */ boolean isClone = false; @@ -135,26 +106,18 @@ static final class NIOFSIndexInput extends BufferedIndexInput { /** end offset (start+length) */ protected final long end; - public NIOFSIndexInput( - String resourceDesc, FileChannel fc, AsynchronousFileChannel afc, IOContext context) + public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context) throws IOException { super(resourceDesc, context); this.channel = fc; - this.asynchronousChannel = afc; this.off = 0L; this.end = fc.size(); } public NIOFSIndexInput( - String resourceDesc, - FileChannel fc, - AsynchronousFileChannel afc, - long off, - long length, - int bufferSize) { + String resourceDesc, FileChannel fc, long off, long length, int bufferSize) { super(resourceDesc, bufferSize); this.channel = fc; - this.asynchronousChannel = afc; this.off = off; this.end = off + length; this.isClone = true; @@ -163,7 +126,7 @@ public NIOFSIndexInput( @Override public void close() throws IOException { if (!isClone) { - IOUtils.close(channel, asynchronousChannel); + channel.close(); } } @@ -192,7 +155,6 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw return new NIOFSIndexInput( getFullSliceDescription(sliceDescription), channel, - asynchronousChannel, off + offset, length, getBufferSize()); @@ -242,12 +204,6 @@ protected void readInternal(ByteBuffer b) throws IOException { } } - @Override - protected Future readInternalAsync(ByteBuffer b) throws IOException { - long pos = getFilePointer() + off; - return asynchronousChannel.read(b, pos); - } - @Override protected void seekInternal(long pos) throws IOException { if (pos > length()) { From 34cdb22f008006a83215311408963334516ff4a6 Mon Sep 17 00:00:00 2001 From: Uwe Schindler Date: Mon, 6 May 2024 23:04:27 +0200 Subject: [PATCH 04/15] fetch native page size from libc (should work on linux and macos); not tested! --- .../lucene/store/MemorySegmentIndexInput.java | 6 ++--- .../org/apache/lucene/store/NativeAccess.java | 3 +++ .../lucene/store/PosixNativeAccess.java | 25 ++++++++++++++++--- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index 60949820cf26..89fdceb95f25 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -326,12 +326,10 @@ public void prefetch() throws IOException { } final NativeAccess nativeAccess = this.nativeAccess.get(); - // If at the boundary between two slices, move to the next one. + // If at the boundary between two chunks, move to the next one. seek(getFilePointer()); try { - // nocommit: how to retrieve the page size? and disable prefetching if the page size is not - // 4kB? - final long offsetInPage = (curSegment.address() + curPosition) & 0x0FFF; + final long offsetInPage = (curSegment.address() + curPosition) % nativeAccess.getPageSize(); if (offsetInPage > curPosition) { // The start of the page is outside of this segment. return; diff --git a/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java b/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java index f4bc4f89d58f..d961883a418c 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java +++ b/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java @@ -27,6 +27,9 @@ abstract class NativeAccess { /** Invoke the {@code madvise} call for the given {@link MemorySegment}. */ public abstract void madvise(MemorySegment segment, ReadAdvice readAdvice) throws IOException; + /** Returns native page size. */ + public abstract int getPageSize(); + /** * Return the NativeAccess instance for this platform. At moment we only support Linux and MacOS */ diff --git a/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java b/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java index 760a5c7aea0f..fa119a2984f6 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java +++ b/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java @@ -50,6 +50,7 @@ final class PosixNativeAccess extends NativeAccess { public static final int POSIX_MADV_DONTNEED = 4; private static final MethodHandle MH$posix_madvise; + private static final int PAGE_SIZE; private static final Optional INSTANCE; @@ -60,10 +61,14 @@ static Optional getInstance() { } static { + final Linker linker = Linker.nativeLinker(); + final SymbolLookup stdlib = linker.defaultLookup(); MethodHandle adviseHandle = null; + int pagesize = -1; PosixNativeAccess instance = null; try { - adviseHandle = lookupMadvise(); + adviseHandle = lookupMadvise(linker, stdlib); + pagesize = (int) lookupGetPageSize(linker, stdlib).invokeExact(); instance = new PosixNativeAccess(); } catch (UnsupportedOperationException uoe) { LOG.warning(uoe.getMessage()); @@ -77,14 +82,17 @@ static Optional getInstance() { + "pass the following on command line: --enable-native-access=%s", Optional.ofNullable(PosixNativeAccess.class.getModule().getName()) .orElse("ALL-UNNAMED"))); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new AssertionError(e); } MH$posix_madvise = adviseHandle; + PAGE_SIZE = pagesize; INSTANCE = Optional.ofNullable(instance); } - private static MethodHandle lookupMadvise() { - final Linker linker = Linker.nativeLinker(); - final SymbolLookup stdlib = linker.defaultLookup(); + private static MethodHandle lookupMadvise(Linker linker, SymbolLookup stdlib) { return findFunction( linker, stdlib, @@ -96,6 +104,10 @@ private static MethodHandle lookupMadvise() { ValueLayout.JAVA_INT)); } + private static MethodHandle lookupGetPageSize(Linker linker, SymbolLookup stdlib) { + return findFunction(linker, stdlib, "getpagesize", FunctionDescriptor.of(ValueLayout.JAVA_INT)); + } + private static MethodHandle findFunction( Linker linker, SymbolLookup lookup, String name, FunctionDescriptor desc) { final MemorySegment symbol = @@ -144,4 +156,9 @@ private Integer mapReadAdvice(ReadAdvice readAdvice) { case WILL_NEED -> POSIX_MADV_WILLNEED; }; } + + @Override + public int getPageSize() { + return PAGE_SIZE; + } } From cb053a13e85c169fcc283349c202b0769e07b47d Mon Sep 17 00:00:00 2001 From: Uwe Schindler Date: Mon, 6 May 2024 23:16:09 +0200 Subject: [PATCH 05/15] fix code to pass future directly --- .../src/java/org/apache/lucene/store/BufferedIndexInput.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java index 87bcb9657ea7..1327384032d6 100644 --- a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java @@ -418,7 +418,7 @@ public void prefetch() throws IOException { readInternal(prefetchBuffer); return null; }); - Thread.startVirtualThread(pendingPrefetch::run); + Thread.startVirtualThread(pendingPrefetch); // We could schedule a background read successfully, now update state this.pendingPrefetch = pendingPrefetch; From ad853e362d907b5350ffa543ecf186102dd4719f Mon Sep 17 00:00:00 2001 From: Uwe Schindler Date: Mon, 6 May 2024 23:37:53 +0200 Subject: [PATCH 06/15] use page size also for the check of segment alignment for small chunks (TestMultiMMap) --- .../apache/lucene/store/MemorySegmentIndexInputProvider.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java index 61e47497eaa8..fe1b168f9d8c 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java @@ -114,10 +114,11 @@ private final MemorySegment[] map( throw convertMapFailedIOException(ioe, resourceDescription, segSize); } // if preload apply it without madvise. - // if chunk size is too small (2 MiB), disable madvise support (incorrect alignment) + // skip madvise if the address of our segment is not page-aligned (small segments due to + // internal FileChannel logic) if (preload) { segment.load(); - } else if (nativeAccess.isPresent() && chunkSizePower >= 21) { + } else if (nativeAccess.filter(na -> segment.address() % na.getPageSize() == 0).isPresent()) { nativeAccess.get().madvise(segment, readAdvice); } segments[segNr] = segment; From 9b817d536eda2317255658b58d26c0e097d4880c Mon Sep 17 00:00:00 2001 From: Uwe Schindler Date: Tue, 7 May 2024 00:59:30 +0200 Subject: [PATCH 07/15] Restore the old READADVICE_TO_IOCONTEXT array and centralize error handling of allowed ReadAdvice enum constants --- .../org/apache/lucene/store/IOContext.java | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/IOContext.java b/lucene/core/src/java/org/apache/lucene/store/IOContext.java index f618f41afca4..d1f110c92055 100644 --- a/lucene/core/src/java/org/apache/lucene/store/IOContext.java +++ b/lucene/core/src/java/org/apache/lucene/store/IOContext.java @@ -16,8 +16,7 @@ */ package org.apache.lucene.store; -import java.util.EnumMap; -import java.util.Map; +import java.util.Arrays; import java.util.Objects; import org.apache.lucene.util.Constants; @@ -74,10 +73,7 @@ public enum Context { throw new IllegalArgumentException( "The FLUSH and MERGE contexts must use the SEQUENTIAL read access advice"); } - if (readAdvice == ReadAdvice.WILL_NEED) { - throw new IllegalArgumentException( - "WILL_NEED is not a valid ReadAdvice for IOContext creation"); - } + checkReadAdvice(readAdvice); } /** Creates a default {@link IOContext} for reading/writing with the given {@link ReadAdvice} */ @@ -96,18 +92,21 @@ public IOContext(MergeInfo mergeInfo) { this(Context.MERGE, mergeInfo, null, ReadAdvice.SEQUENTIAL); } - private static final Map READADVICE_TO_IOCONTEXT = - new EnumMap<>(ReadAdvice.class); + private static boolean isAllowedReadAdvice(ReadAdvice advice) { + return advice != ReadAdvice.WILL_NEED; + } - static { - for (ReadAdvice advice : ReadAdvice.values()) { - if (advice == ReadAdvice.WILL_NEED) { - continue; - } - READADVICE_TO_IOCONTEXT.put(advice, new IOContext(advice)); + private static void checkReadAdvice(ReadAdvice advice) { + if (false == isAllowedReadAdvice(advice)) { + throw new IllegalArgumentException(advice + " is not a valid ReadAdvice for IOContext usage"); } } + private static final IOContext[] READADVICE_TO_IOCONTEXT = + Arrays.stream(ReadAdvice.values()) + .map(a -> isAllowedReadAdvice(a) ? new IOContext(a) : null) + .toArray(IOContext[]::new); + /** * Return an updated {@link IOContext} that has the provided {@link ReadAdvice} if the {@link * Context} is a {@link Context#DEFAULT} context, otherwise return this existing instance. This @@ -116,12 +115,9 @@ public IOContext(MergeInfo mergeInfo) { * ReadAdvice}s. */ public IOContext withReadAdvice(ReadAdvice advice) { - if (readAdvice == ReadAdvice.WILL_NEED) { - throw new IllegalArgumentException( - "WILL_NEED is not a valid ReadAdvice for IOContext#withReadAdvice"); - } + checkReadAdvice(readAdvice); if (context == Context.DEFAULT) { - return Objects.requireNonNull(READADVICE_TO_IOCONTEXT.get(advice)); + return READADVICE_TO_IOCONTEXT[advice.ordinal()]; } else { return this; } From 60aef05828eda50876cd4929bf6339d450a1327b Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 7 May 2024 09:08:00 +0200 Subject: [PATCH 08/15] Unde prefetch() on NIOFSDirectory. --- .../lucene/store/BufferedIndexInput.java | 124 +----------------- 1 file changed, 6 insertions(+), 118 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java index 1327384032d6..13151692bc06 100644 --- a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java @@ -20,11 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; import org.apache.lucene.util.GroupVIntUtil; -import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.ThreadInterruptedException; /** Base implementation class for buffered {@link IndexInput}. */ public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput { @@ -49,15 +45,7 @@ public abstract class BufferedIndexInput extends IndexInput implements RandomAcc private final int bufferSize; - // Despite the two buffer references below, BufferedIndexInput only tracks a single buffer. Either - // prefetch() has been called last and `buffer` is set to EMPTY_BYTEBUFFER while `prefetchBuffer` - // tracks the actual buffer, or prefetchBuffer is set to EMPTY_BYTEBUFFER and `buffer` tracks the - // actual buffer. This approach helps only check if `buffer.hasRemaining()` to know whether to - // trigger a refill(), and refill() will check if there is a pending prefetch() before actually - // reading bytes. - private ByteBuffer buffer = EMPTY_BYTEBUFFER; // initialized lazily - private ByteBuffer prefetchBuffer = EMPTY_BYTEBUFFER; - private FutureTask pendingPrefetch; // only non-null if there is a pending prefetch() + private ByteBuffer buffer = EMPTY_BYTEBUFFER; private long bufferStart = 0; // position in file of buffer @@ -102,13 +90,6 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException { @Override public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException { - // We need to finish pending prefetch operations to use data from the prefetch() instead of - // reading directly bytes into the user's buffer. - // Other readXXX methods don't need to do this since they always call refill() when they don't - // have enough data, which in-turn calls finishPendingPrefetch(). But readBytes() may read bytes - // into the user's buffer without refilling the internal buffer. - finishPendingPrefetch(); - int available = buffer.remaining(); if (len <= available) { // the buffer contains enough data to satisfy this request @@ -316,24 +297,7 @@ public final long readLong(long pos) throws IOException { return buffer.getLong((int) index); } - private void maybeInitBuffer() throws IOException { - assert pendingPrefetch == null; - assert prefetchBuffer == EMPTY_BYTEBUFFER; - - if (buffer == EMPTY_BYTEBUFFER) { - buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN).limit(0); - seekInternal(bufferStart); - } - } - private void refill() throws IOException { - assert buffer.hasRemaining() == false; - - // Wait for pending prefetching to finish. - if (finishPendingPrefetch()) { - return; - } - long start = bufferStart + buffer.position(); long end = start + bufferSize; if (end > length()) // don't read past EOF @@ -341,8 +305,11 @@ private void refill() throws IOException { int newLength = (int) (end - start); if (newLength <= 0) throw new EOFException("read past EOF: " + this); - // allocate buffer lazily - maybeInitBuffer(); + if (buffer == EMPTY_BYTEBUFFER) { + buffer = + ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); // allocate buffer lazily + seekInternal(bufferStart); + } buffer.position(0); buffer.limit(newLength); bufferStart = start; @@ -354,78 +321,6 @@ private void refill() throws IOException { buffer.flip(); } - private boolean finishPendingPrefetch() throws IOException { - if (pendingPrefetch != null) { - try { - pendingPrefetch.get(); - } catch (InterruptedException e) { - throw new ThreadInterruptedException(e); - } catch (ExecutionException e) { - throw IOUtils.rethrowAlways(e.getCause()); - } finally { - // Always clear pendingPrefetch and swap buffers, regardless of success/failure so that - // future read() operations work on the correct buffer. - pendingPrefetch = null; - prefetchBuffer.flip(); - buffer = prefetchBuffer; - prefetchBuffer = EMPTY_BYTEBUFFER; - } - - return buffer.hasRemaining(); - } - return false; - } - - @Override - public void prefetch() throws IOException { - final long pos = getFilePointer(); - final long length = length(); - if (pos >= length) { - throw new EOFException("read past EOF: " + this); - } - - // Make sure to never have two concurrent prefetch() calls trying to push bytes to the same - // buffer. - if (pendingPrefetch != null) { - // prefetch() got called twice without reading bytes in-between? - // nocommit should we fail instead? - return; - } - - if (buffer.hasRemaining()) { - // the seek() that preceded prefetch() moved within the buffer, so we still have valid bytes - // TODO: should we still prefetch more bytes in this case if there are very few bytes left? - return; - } else { - // The buffer may not have been initialized yet, e.g. if prefetch() was called immediately - // after calling clone() then seek(). - maybeInitBuffer(); - } - - assert buffer.capacity() > 0; - assert prefetchBuffer == EMPTY_BYTEBUFFER; - - bufferStart = pos; - final ByteBuffer prefetchBuffer = buffer; - prefetchBuffer.position(0); - final int limit = (int) Math.min(length - bufferStart, prefetchBuffer.capacity()); - assert limit > 0; - prefetchBuffer.limit(limit); - - FutureTask pendingPrefetch = - new FutureTask<>( - () -> { - readInternal(prefetchBuffer); - return null; - }); - Thread.startVirtualThread(pendingPrefetch); - - // We could schedule a background read successfully, now update state - this.pendingPrefetch = pendingPrefetch; - this.prefetchBuffer = prefetchBuffer; - this.buffer = EMPTY_BYTEBUFFER; // trigger refill on next read() - } - /** * Expert: implements buffer refill. Reads bytes from the current position in the input. * @@ -440,16 +335,11 @@ public final long getFilePointer() { @Override public final void seek(long pos) throws IOException { - // If there is a pending prefetch(), wait for it to finish before moving the file pointer. - finishPendingPrefetch(); - assert prefetchBuffer == EMPTY_BYTEBUFFER; - if (pos >= bufferStart && pos < (bufferStart + buffer.limit())) buffer.position((int) (pos - bufferStart)); // seek within buffer else { bufferStart = pos; buffer.limit(0); // trigger refill() on read - prefetchBuffer.limit(0); seekInternal(pos); } } @@ -467,8 +357,6 @@ public BufferedIndexInput clone() { BufferedIndexInput clone = (BufferedIndexInput) super.clone(); clone.buffer = EMPTY_BYTEBUFFER; - clone.prefetchBuffer = EMPTY_BYTEBUFFER; - clone.pendingPrefetch = null; clone.bufferStart = getFilePointer(); return clone; From b95a9b8404f910a0ade74e057c0d66be2562f668 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 8 May 2024 22:52:48 +0200 Subject: [PATCH 09/15] Remove ReadAdvice#WILL_NEED. --- .../org/apache/lucene/store/IOContext.java | 16 +-------------- .../org/apache/lucene/store/ReadAdvice.java | 7 +------ .../lucene/store/MemorySegmentIndexInput.java | 2 +- .../org/apache/lucene/store/NativeAccess.java | 5 +++++ .../lucene/store/PosixNativeAccess.java | 20 +++++++++++++------ 5 files changed, 22 insertions(+), 28 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/IOContext.java b/lucene/core/src/java/org/apache/lucene/store/IOContext.java index d1f110c92055..b2d82af20f80 100644 --- a/lucene/core/src/java/org/apache/lucene/store/IOContext.java +++ b/lucene/core/src/java/org/apache/lucene/store/IOContext.java @@ -73,7 +73,6 @@ public enum Context { throw new IllegalArgumentException( "The FLUSH and MERGE contexts must use the SEQUENTIAL read access advice"); } - checkReadAdvice(readAdvice); } /** Creates a default {@link IOContext} for reading/writing with the given {@link ReadAdvice} */ @@ -92,20 +91,8 @@ public IOContext(MergeInfo mergeInfo) { this(Context.MERGE, mergeInfo, null, ReadAdvice.SEQUENTIAL); } - private static boolean isAllowedReadAdvice(ReadAdvice advice) { - return advice != ReadAdvice.WILL_NEED; - } - - private static void checkReadAdvice(ReadAdvice advice) { - if (false == isAllowedReadAdvice(advice)) { - throw new IllegalArgumentException(advice + " is not a valid ReadAdvice for IOContext usage"); - } - } - private static final IOContext[] READADVICE_TO_IOCONTEXT = - Arrays.stream(ReadAdvice.values()) - .map(a -> isAllowedReadAdvice(a) ? new IOContext(a) : null) - .toArray(IOContext[]::new); + Arrays.stream(ReadAdvice.values()).map(IOContext::new).toArray(IOContext[]::new); /** * Return an updated {@link IOContext} that has the provided {@link ReadAdvice} if the {@link @@ -115,7 +102,6 @@ private static void checkReadAdvice(ReadAdvice advice) { * ReadAdvice}s. */ public IOContext withReadAdvice(ReadAdvice advice) { - checkReadAdvice(readAdvice); if (context == Context.DEFAULT) { return READADVICE_TO_IOCONTEXT[advice.ordinal()]; } else { diff --git a/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java b/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java index 4dda64988dfc..5c706e2abf50 100644 --- a/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java +++ b/lucene/core/src/java/org/apache/lucene/store/ReadAdvice.java @@ -40,10 +40,5 @@ public enum ReadAdvice { * loads the content of the file into the page cache at open time. This should only be used on * very small files that can be expected to fit in RAM with very high confidence. */ - RANDOM_PRELOAD, - /** - * Data will be needed soon. {@link Directory} implementations may start fetching bytes from - * storage in the background. - */ - WILL_NEED; + RANDOM_PRELOAD } diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index 89fdceb95f25..17308f3ea5bb 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -340,7 +340,7 @@ public void prefetch() throws IOException { // Source code for madvise.c suggests we don't since WILL_NEED only triggers read-ahead // without updating the state of the virtual mapping? // https://github.com/torvalds/linux/blob/master/mm/madvise.c - nativeAccess.madvise(currentPageSlice, ReadAdvice.WILL_NEED); + nativeAccess.madviseWillNeed(currentPageSlice); } catch ( @SuppressWarnings("unused") IndexOutOfBoundsException e) { diff --git a/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java b/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java index d961883a418c..affc0e2ac719 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java +++ b/lucene/core/src/java21/org/apache/lucene/store/NativeAccess.java @@ -27,6 +27,11 @@ abstract class NativeAccess { /** Invoke the {@code madvise} call for the given {@link MemorySegment}. */ public abstract void madvise(MemorySegment segment, ReadAdvice readAdvice) throws IOException; + /** + * Invoke the {@code madvise} call for the given {@link MemorySegment} with {@code MADV_WILLNEED}. + */ + public abstract void madviseWillNeed(MemorySegment segment) throws IOException; + /** Returns native page size. */ public abstract int getPageSize(); diff --git a/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java b/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java index fa119a2984f6..93caca788b1b 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java +++ b/lucene/core/src/java21/org/apache/lucene/store/PosixNativeAccess.java @@ -122,17 +122,26 @@ private static MethodHandle findFunction( @Override public void madvise(MemorySegment segment, ReadAdvice readAdvice) throws IOException { - // Note: madvise is bypassed if the segment should be preloaded via MemorySegment#load. - if (segment.byteSize() == 0L) { - return; // empty segments should be excluded, because they may have no address at all - } final Integer advice = mapReadAdvice(readAdvice); if (advice == null) { return; // do nothing } + madvise(segment, advice); + } + + @Override + public void madviseWillNeed(MemorySegment segment) throws IOException { + madvise(segment, POSIX_MADV_WILLNEED); + } + + private void madvise(MemorySegment segment, int advice) throws IOException { + // Note: madvise is bypassed if the segment should be preloaded via MemorySegment#load. + if (segment.byteSize() == 0L) { + return; // empty segments should be excluded, because they may have no address at all + } final int ret; try { - ret = (int) MH$posix_madvise.invokeExact(segment, segment.byteSize(), advice.intValue()); + ret = (int) MH$posix_madvise.invokeExact(segment, segment.byteSize(), advice); } catch (Throwable th) { throw new AssertionError(th); } @@ -153,7 +162,6 @@ private Integer mapReadAdvice(ReadAdvice readAdvice) { case RANDOM -> POSIX_MADV_RANDOM; case SEQUENTIAL -> POSIX_MADV_SEQUENTIAL; case RANDOM_PRELOAD -> null; - case WILL_NEED -> POSIX_MADV_WILLNEED; }; } From 21275e412c04c973b05bad39c7afe521f8f507bb Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 8 May 2024 23:17:16 +0200 Subject: [PATCH 10/15] Add `length` parameter to `prefetch()`. --- .../org/apache/lucene/store/IndexInput.java | 6 ++-- .../lucene/store/MemorySegmentIndexInput.java | 28 ++++++++++++++----- .../tests/store/BaseDirectoryTestCase.java | 3 +- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java index 1c0780a2127e..ec7a1294d407 100644 --- a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java @@ -194,10 +194,12 @@ public String toString() { /** * Optional method: Give a hint to this input that some bytes will be read in the near future. - * IndexInput implementations may take advantage of this hint to start fetching a page of data + * IndexInput implementations may take advantage of this hint to start fetching pages of data * immediately from storage. * *

The default implementation is a no-op. + * + * @param length the number of bytes to prefetch */ - public void prefetch() throws IOException {} + public void prefetch(long length) throws IOException {} } diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index 17308f3ea5bb..0e8576b46661 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -318,9 +318,11 @@ public void seek(long pos) throws IOException { } @Override - public void prefetch() throws IOException { + public void prefetch(long length) throws IOException { ensureOpen(); + Objects.checkFromIndexSize(getFilePointer(), length, length()); + if (nativeAccess.isEmpty()) { return; } @@ -329,18 +331,30 @@ public void prefetch() throws IOException { // If at the boundary between two chunks, move to the next one. seek(getFilePointer()); try { - final long offsetInPage = (curSegment.address() + curPosition) % nativeAccess.getPageSize(); - if (offsetInPage > curPosition) { - // The start of the page is outside of this segment. + // Compute the intersection of the current segment and the region that should be prefetched. + long offset = curPosition; + if (offset + length > curSegment.byteSize()) { + // Only prefetch bytes that are stored in the current segment. There may be bytes on the + // next segment but this case is rare enough that we don't try to optimize it and keep + // things simple instead. + length = curSegment.byteSize() - curPosition; + } + // Now align offset with the page size, this is required for madvise. + // Compute the offset of the current position in the OS's page. + final long offsetInPage = (curSegment.address() + offset) % nativeAccess.getPageSize(); + offset -= offsetInPage; + length += offsetInPage; + if (offset < 0) { + // The start of the page is outside of this segment, ignore. return; } - MemorySegment currentPageSlice = - curSegment.asSlice(curPosition - offsetInPage, offsetInPage + 1); + + MemorySegment prefetchSlice = curSegment.asSlice(offset, length); // Tell the OS we'll need this page. nocommit: do we need to restore the original read advice? // Source code for madvise.c suggests we don't since WILL_NEED only triggers read-ahead // without updating the state of the virtual mapping? // https://github.com/torvalds/linux/blob/master/mm/madvise.c - nativeAccess.madviseWillNeed(currentPageSlice); + nativeAccess.madviseWillNeed(prefetchSlice); } catch ( @SuppressWarnings("unused") IndexOutOfBoundsException e) { diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java index f8b65281a0e0..775c6d5d3186 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java @@ -1543,7 +1543,8 @@ private void doTestPrefetch(int startOffset) throws IOException { final int startPointer = (int) in.getFilePointer(); assertTrue(startPointer < in.length()); if (random().nextBoolean()) { - in.prefetch(); + final long prefetchLength = TestUtil.nextLong(random(), 1, in.length() - startPointer); + in.prefetch(prefetchLength); } assertEquals(startPointer, in.getFilePointer()); switch (random().nextInt(100)) { From a08298c28723e6ec9ba95782f36af2af102d3ae6 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 9 May 2024 22:48:27 +0200 Subject: [PATCH 11/15] Remove nocommit. --- .../org/apache/lucene/store/MemorySegmentIndexInput.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index 0e8576b46661..c68e8b87e637 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -349,11 +349,7 @@ public void prefetch(long length) throws IOException { return; } - MemorySegment prefetchSlice = curSegment.asSlice(offset, length); - // Tell the OS we'll need this page. nocommit: do we need to restore the original read advice? - // Source code for madvise.c suggests we don't since WILL_NEED only triggers read-ahead - // without updating the state of the virtual mapping? - // https://github.com/torvalds/linux/blob/master/mm/madvise.c + final MemorySegment prefetchSlice = curSegment.asSlice(offset, length); nativeAccess.madviseWillNeed(prefetchSlice); } catch ( @SuppressWarnings("unused") From b841f1bcb42ac8eb1a7c934e1651773340e33ba5 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 9 May 2024 23:08:13 +0200 Subject: [PATCH 12/15] Create constant for NativeAccess instead of passing it through the constructor. --- .../lucene/store/MemorySegmentIndexInput.java | 41 ++++++------------- .../MemorySegmentIndexInputProvider.java | 3 +- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java index c68e8b87e637..a8b8d6da3cd9 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java @@ -45,13 +45,13 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces ValueLayout.JAVA_LONG_UNALIGNED.withOrder(ByteOrder.LITTLE_ENDIAN); static final ValueLayout.OfFloat LAYOUT_LE_FLOAT = ValueLayout.JAVA_FLOAT_UNALIGNED.withOrder(ByteOrder.LITTLE_ENDIAN); + private static final Optional NATIVE_ACCESS = NativeAccess.getImplementation(); final long length; final long chunkSizeMask; final int chunkSizePower; final Arena arena; final MemorySegment[] segments; - final Optional nativeAccess; int curSegmentIndex = -1; MemorySegment @@ -63,15 +63,12 @@ public static MemorySegmentIndexInput newInstance( Arena arena, MemorySegment[] segments, long length, - int chunkSizePower, - Optional nativeAccess) { + int chunkSizePower) { assert Arrays.stream(segments).map(MemorySegment::scope).allMatch(arena.scope()::equals); if (segments.length == 1) { - return new SingleSegmentImpl( - resourceDescription, arena, segments[0], length, chunkSizePower, nativeAccess); + return new SingleSegmentImpl(resourceDescription, arena, segments[0], length, chunkSizePower); } else { - return new MultiSegmentImpl( - resourceDescription, arena, segments, 0, length, chunkSizePower, nativeAccess); + return new MultiSegmentImpl(resourceDescription, arena, segments, 0, length, chunkSizePower); } } @@ -80,8 +77,7 @@ private MemorySegmentIndexInput( Arena arena, MemorySegment[] segments, long length, - int chunkSizePower, - Optional nativeAccess) { + int chunkSizePower) { super(resourceDescription); this.arena = arena; this.segments = segments; @@ -89,7 +85,6 @@ private MemorySegmentIndexInput( this.chunkSizePower = chunkSizePower; this.chunkSizeMask = (1L << chunkSizePower) - 1L; this.curSegment = segments[0]; - this.nativeAccess = nativeAccess; } void ensureOpen() { @@ -323,10 +318,10 @@ public void prefetch(long length) throws IOException { Objects.checkFromIndexSize(getFilePointer(), length, length()); - if (nativeAccess.isEmpty()) { + if (NATIVE_ACCESS.isEmpty()) { return; } - final NativeAccess nativeAccess = this.nativeAccess.get(); + final NativeAccess nativeAccess = NATIVE_ACCESS.get(); // If at the boundary between two chunks, move to the next one. seek(getFilePointer()); @@ -541,8 +536,7 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long le null, // clones don't have an Arena, as they can't close) slices[0].asSlice(offset, length), length, - chunkSizePower, - nativeAccess); + chunkSizePower); } else { return new MultiSegmentImpl( newResourceDescription, @@ -550,8 +544,7 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long le slices, offset, length, - chunkSizePower, - nativeAccess); + chunkSizePower); } } @@ -591,15 +584,8 @@ static final class SingleSegmentImpl extends MemorySegmentIndexInput { Arena arena, MemorySegment segment, long length, - int chunkSizePower, - Optional nativeAccess) { - super( - resourceDescription, - arena, - new MemorySegment[] {segment}, - length, - chunkSizePower, - nativeAccess); + int chunkSizePower) { + super(resourceDescription, arena, new MemorySegment[] {segment}, length, chunkSizePower); this.curSegmentIndex = 0; } @@ -685,9 +671,8 @@ static final class MultiSegmentImpl extends MemorySegmentIndexInput { MemorySegment[] segments, long offset, long length, - int chunkSizePower, - Optional nativeAccess) { - super(resourceDescription, arena, segments, length, chunkSizePower, nativeAccess); + int chunkSizePower) { + super(resourceDescription, arena, segments, length, chunkSizePower); this.offset = offset; try { seek(0L); diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java index fe1b168f9d8c..080b24ced516 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java @@ -62,8 +62,7 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo fileSize, nativeAccess), fileSize, - chunkSizePower, - nativeAccess); + chunkSizePower); success = true; return in; } finally { From 0404fc69f8ff1b41a0e59d8457f5f345d4be0ed5 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 9 May 2024 23:10:45 +0200 Subject: [PATCH 13/15] CHANGES --- lucene/CHANGES.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 51cb9d8a8dff..e75e59927ccd 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -102,6 +102,10 @@ API Changes Additionally, deprecated methods have been removed from ByteBuffersIndexInput, BooleanQuery and others. Please refer to MIGRATE.md for further details. (Sanjay Dutt) +* GITHUB#13337: Introduce new `IndexInput#prefetch(long)` API to give a hint to + the directory about bytes that are about to be read. (Adrien Grand, Uwe + Schindler) + New Features --------------------- From fd69129127ea207b709c60ee87c381782e809fa9 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 9 May 2024 23:13:59 +0200 Subject: [PATCH 14/15] iter --- .../lucene/store/MemorySegmentIndexInputProvider.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java index 080b24ced516..e1655101d75f 100644 --- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java +++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInputProvider.java @@ -59,8 +59,7 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo context.readAdvice(), chunkSizePower, preload, - fileSize, - nativeAccess), + fileSize), fileSize, chunkSizePower); success = true; @@ -89,8 +88,7 @@ private final MemorySegment[] map( ReadAdvice readAdvice, int chunkSizePower, boolean preload, - long length, - Optional nativeAccess) + long length) throws IOException { if ((length >>> chunkSizePower) >= Integer.MAX_VALUE) throw new IllegalArgumentException("File too big for chunk size: " + resourceDescription); From 10c38d0ee5b9a3dc4f5f19cd9a0687d7713ce2b6 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 9 May 2024 23:16:16 +0200 Subject: [PATCH 15/15] Add prefetch() to MDW. --- .../apache/lucene/tests/store/MockIndexInputWrapper.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java index 39c41d468255..7bf3bf56055b 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockIndexInputWrapper.java @@ -130,6 +130,12 @@ public void seek(long pos) throws IOException { in.seek(pos); } + @Override + public void prefetch(long length) throws IOException { + ensureOpen(); + in.prefetch(length); + } + @Override public long length() { ensureOpen();