Skip to content

Commit f901326

Browse files
committed
Merge branch 'main' into cleanup-cleanup
2 parents f7c77e4 + 6182027 commit f901326

File tree

7 files changed

+977
-8
lines changed

7 files changed

+977
-8
lines changed
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.jbellis.jvector.disk;
17+
18+
import java.io.IOException;
19+
import java.nio.ByteBuffer;
20+
import java.nio.ByteOrder;
21+
import java.nio.channels.FileChannel;
22+
import java.nio.file.Path;
23+
import java.nio.file.StandardOpenOption;
24+
25+
/**
26+
* {@code MappedChunkReader} provides random access reading of large files using memory-mapped I/O,
27+
* supporting files larger than 2GB by mapping them in manageable chunks.
28+
* <p>
29+
* This class implements {@link RandomAccessReader} and allows reading primitive types and arrays
30+
* from a file channel, handling chunk remapping transparently.
31+
* <p>
32+
* This class is intended as a replacement for {@link SimpleMappedReader} to provide the capability
33+
* to handle files larger than 2GB in size regardless of the OS or JDK in use.
34+
* </p>
35+
*/
36+
public class MappedChunkReader implements RandomAccessReader {
37+
private static final long CHUNK_SIZE = Integer.MAX_VALUE; // ~2GB
38+
private final FileChannel channel;
39+
private final long fileSize;
40+
private final ByteOrder byteOrder;
41+
private long position;
42+
43+
private ByteBuffer currentBuffer;
44+
private long currentChunkStart;
45+
46+
/**
47+
* Constructs a new {@code MappedChunkReader} for the given file channel and byte order.
48+
*
49+
* @param channel the file channel to read from
50+
* @param byteOrder the byte order to use for reading
51+
* @throws IOException if an I/O error occurs
52+
*/
53+
public MappedChunkReader(FileChannel channel, ByteOrder byteOrder) throws IOException {
54+
this.channel = channel;
55+
this.byteOrder = byteOrder;
56+
this.fileSize = channel.size();
57+
this.position = 0;
58+
mapChunk(0);
59+
}
60+
61+
/**
62+
* {@code Supplier} is a factory for creating {@link MappedChunkReader} instances
63+
* from a given file path.
64+
*/
65+
public static class Supplier implements ReaderSupplier {
66+
private final FileChannel channel;
67+
68+
/**
69+
* Opens a file channel for the specified path in read-only mode.
70+
*
71+
* @param path the path to the file
72+
* @throws IOException if an I/O error occurs
73+
*/
74+
public Supplier(Path path) throws IOException {
75+
this.channel = FileChannel.open(path, StandardOpenOption.READ);
76+
}
77+
78+
/**
79+
* Returns a new {@link MappedChunkReader} using the opened file channel.
80+
*
81+
* @return a new {@code MappedChunkReader}
82+
*/
83+
@Override
84+
public RandomAccessReader get() {
85+
try {
86+
return new MappedChunkReader(channel, ByteOrder.BIG_ENDIAN);
87+
} catch (IOException e) {
88+
throw new RuntimeException(e);
89+
}
90+
}
91+
92+
/**
93+
* Closes the underlying file channel.
94+
*
95+
* @throws IOException if an I/O error occurs
96+
*/
97+
@Override
98+
public void close() throws IOException {
99+
channel.close();
100+
}
101+
}
102+
103+
/**
104+
* Maps a chunk of the file into memory starting at the specified offset.
105+
*
106+
* @param chunkStart the start offset of the chunk
107+
* @throws IOException if an I/O error occurs
108+
*/
109+
private void mapChunk(long chunkStart) throws IOException {
110+
long size = Math.min(CHUNK_SIZE, fileSize - chunkStart);
111+
currentBuffer = channel.map(FileChannel.MapMode.READ_ONLY, chunkStart, size).order(byteOrder);
112+
currentChunkStart = chunkStart;
113+
}
114+
115+
/**
116+
* Ensures that the specified number of bytes are available in the current buffer,
117+
* remapping a new chunk if necessary.
118+
*
119+
* @param size the number of bytes required
120+
* @throws IOException if an I/O error occurs
121+
*/
122+
private void ensureAvailable(int size) throws IOException {
123+
if (position < currentChunkStart || position + size > currentChunkStart + currentBuffer.capacity()) {
124+
mapChunk((position / CHUNK_SIZE) * CHUNK_SIZE);
125+
}
126+
currentBuffer.position((int)(position - currentChunkStart));
127+
}
128+
129+
/**
130+
* Sets the current read position in the file.
131+
*
132+
* @param offset the new position
133+
*/
134+
@Override
135+
public void seek(long offset) {
136+
this.position = offset;
137+
}
138+
139+
/**
140+
* Returns the current read position in the file.
141+
*
142+
* @return the current position
143+
*/
144+
@Override
145+
public long getPosition() {
146+
return position;
147+
}
148+
149+
/**
150+
* Reads a 4-byte integer from the current position.
151+
*
152+
* @return the integer value read
153+
* @throws RuntimeException if an I/O error occurs
154+
*/
155+
@Override
156+
public int readInt() {
157+
try {
158+
ensureAvailable(4);
159+
int v = currentBuffer.getInt();
160+
position += 4;
161+
return v;
162+
} catch (IOException e) {
163+
throw new RuntimeException(e);
164+
}
165+
}
166+
167+
/**
168+
* Reads an 8-byte long from the current position.
169+
*
170+
* @return the long value read
171+
* @throws RuntimeException if an I/O error occurs
172+
*/
173+
@Override
174+
public long readLong() {
175+
try {
176+
ensureAvailable(8);
177+
long v = currentBuffer.getLong();
178+
position += 8;
179+
return v;
180+
} catch (IOException e) {
181+
throw new RuntimeException(e);
182+
}
183+
}
184+
185+
/**
186+
* Reads a 4-byte float from the current position.
187+
*
188+
* @return the float value read
189+
* @throws RuntimeException if an I/O error occurs
190+
*/
191+
@Override
192+
public float readFloat() {
193+
try {
194+
ensureAvailable(4);
195+
float v = currentBuffer.getFloat();
196+
position += 4;
197+
return v;
198+
} catch (IOException e) {
199+
throw new RuntimeException(e);
200+
}
201+
}
202+
203+
/**
204+
* Reads bytes into the provided array, filling it completely.
205+
*
206+
* @param b the byte array to fill
207+
* @throws RuntimeException if an I/O error occurs
208+
*/
209+
@Override
210+
public void readFully(byte[] b) {
211+
try {
212+
int offset = 0;
213+
while (offset < b.length) {
214+
ensureAvailable(1);
215+
int toRead = Math.min(b.length - offset, currentBuffer.remaining());
216+
currentBuffer.get(b, offset, toRead);
217+
offset += toRead;
218+
position += toRead;
219+
}
220+
} catch (IOException e) {
221+
throw new RuntimeException(e);
222+
}
223+
}
224+
225+
/**
226+
* Reads bytes into the provided {@link ByteBuffer}, filling it completely.
227+
*
228+
* @param buffer the buffer to fill
229+
* @throws RuntimeException if an I/O error occurs
230+
*/
231+
@Override
232+
public void readFully(ByteBuffer buffer) {
233+
try {
234+
while (buffer.hasRemaining()) {
235+
ensureAvailable(1);
236+
int toRead = Math.min(buffer.remaining(), currentBuffer.remaining());
237+
ByteBuffer slice = currentBuffer.slice();
238+
slice.limit(toRead);
239+
buffer.put(slice);
240+
currentBuffer.position(currentBuffer.position() + toRead);
241+
position += toRead;
242+
}
243+
} catch (IOException e) {
244+
throw new RuntimeException(e);
245+
}
246+
}
247+
248+
/**
249+
* Reads long values into the provided array, filling it completely.
250+
*
251+
* @param vector the array to fill with long values
252+
* @throws RuntimeException if an I/O error occurs
253+
*/
254+
@Override
255+
public void readFully(long[] vector) {
256+
ByteBuffer tmp = ByteBuffer.allocate(vector.length * Long.BYTES).order(byteOrder);
257+
readFully(tmp);
258+
tmp.flip().asLongBuffer().get(vector);
259+
}
260+
261+
/**
262+
* Reads integer values into the provided array at the specified offset.
263+
*
264+
* @param ints the array to fill with integer values
265+
* @param offset the starting offset in the array
266+
* @param count the number of integers to read
267+
* @throws RuntimeException if an I/O error occurs
268+
*/
269+
@Override
270+
public void read(int[] ints, int offset, int count) {
271+
ByteBuffer tmp = ByteBuffer.allocate(count * Integer.BYTES).order(byteOrder);
272+
readFully(tmp);
273+
tmp.flip().asIntBuffer().get(ints, offset, count);
274+
}
275+
276+
/**
277+
* Reads float values into the provided array at the specified offset.
278+
*
279+
* @param floats the array to fill with float values
280+
* @param offset the starting offset in the array
281+
* @param count the number of floats to read
282+
* @throws RuntimeException if an I/O error occurs
283+
*/
284+
@Override
285+
public void read(float[] floats, int offset, int count) {
286+
ByteBuffer tmp = ByteBuffer.allocate(count * Float.BYTES).order(byteOrder);
287+
readFully(tmp);
288+
tmp.flip().asFloatBuffer().get(floats, offset, count);
289+
}
290+
291+
/**
292+
* Returns the total length of the file.
293+
*
294+
* @return the file size in bytes
295+
*/
296+
@Override
297+
public long length() {
298+
return fileSize;
299+
}
300+
301+
/**
302+
* Closes this reader. The underlying channel is managed by {@link Supplier} and is not closed here.
303+
*/
304+
@Override
305+
public void close() {
306+
// Channel is managed by Supplier
307+
}
308+
}

jvector-base/src/main/java/io/github/jbellis/jvector/disk/ReaderSupplierFactory.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,11 @@ public static ReaderSupplier open(Path path) throws IOException {
4545
Constructor<?> ctor = supplierClass.getConstructor(Path.class);
4646
return (ReaderSupplier) ctor.newInstance(path);
4747
} catch (Exception e) {
48-
LOG.log(Level.WARNING, "MMapReaderSupplier not available, falling back to SimpleMappedReaderSupplier. More details available at level FINE.");
48+
LOG.log(Level.WARNING, "MMapReaderSupplier not available, falling back to MappedChunkReader. More details available at level FINE.");
4949
LOG.log(Level.FINE, "MMapReaderSupplier instantiation exception:", e);
50-
if (Files.size(path) > Integer.MAX_VALUE) {
51-
throw new RuntimeException("File sizes greater than 2GB are not supported on older Windows JDKs");
52-
}
5350

54-
// finally, fall back to SimpleMappedReader (available everywhere, but doesn't support files > 2GB)
55-
return new SimpleMappedReader.Supplier(path);
51+
// finally, fall back to MappedChunkReader
52+
return new MappedChunkReader.Supplier(path);
5653
}
5754
}
5855
}

jvector-examples/src/main/java/io/github/jbellis/jvector/example/Grid.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.github.jbellis.jvector.example.util.AccuracyMetrics;
2222
import io.github.jbellis.jvector.example.util.CompressorParameters;
2323
import io.github.jbellis.jvector.example.util.DataSet;
24+
import io.github.jbellis.jvector.example.util.FilteredForkJoinPool;
2425
import io.github.jbellis.jvector.graph.GraphIndex;
2526
import io.github.jbellis.jvector.graph.GraphIndexBuilder;
2627
import io.github.jbellis.jvector.graph.GraphSearcher;
@@ -334,7 +335,7 @@ private static Map<Set<FeatureId>, GraphIndex> buildInMemory(List<? extends Set<
334335
addHierarchy,
335336
refineFinalGraph,
336337
PhysicalCoreExecutor.pool(),
337-
ForkJoinPool.commonPool());
338+
FilteredForkJoinPool.createFilteredPool());
338339
start = System.nanoTime();
339340
var onHeapGraph = builder.build(floatVectors);
340341
System.out.format("Build (%s) M=%d overflow=%.2f ef=%d in %.2fs%n",

0 commit comments

Comments
 (0)