Skip to content

Commit 5965637

Browse files
authored
feat(filestore): add mmap reader option (#665)
1 parent 6c7f2b7 commit 5965637

File tree

5 files changed

+161
-68
lines changed

5 files changed

+161
-68
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ The following emojis are used to highlight certain changes:
6666
- `routing/http/server`: added configurable routing timeout (`DefaultRoutingTimeout` being 30s) to prevent indefinite hangs during content/peer routing. Set custom duration via `WithRoutingTimeout`. [#720](https://github.com/ipfs/boxo/pull/720)
6767
- `routing/http/server`: exposes Prometheus metrics on `prometheus.DefaultRegisterer` and a custom one can be provided via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
6868
- `gateway`: `NewCacheBlockStore` and `NewCarBackend` will use `prometheus.DefaultRegisterer` when a custom one is not specified via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
69+
- `filestore`: added opt-in `WithMMapReader` option to `FileManager` to enable memory-mapped file reads [#665](https://github.com/ipfs/boxo/pull/665)
6970

7071
### Changed
7172

filestore/filereader.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package filestore
2+
3+
import (
4+
"io"
5+
"os"
6+
7+
"golang.org/x/exp/mmap"
8+
)
9+
10+
type FileReader interface {
11+
io.ReaderAt
12+
io.Closer
13+
}
14+
15+
var _ FileReader = (*stdReader)(nil)
16+
17+
type stdReader struct {
18+
f *os.File
19+
}
20+
21+
// ReadAt implements the FileReader interface.
22+
func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) {
23+
return r.f.ReadAt(p, off)
24+
}
25+
26+
// Close implements the FileReader interface.
27+
func (r *stdReader) Close() error {
28+
return r.f.Close()
29+
}
30+
31+
func newStdReader(path string) (FileReader, error) {
32+
f, err := os.Open(path)
33+
if err != nil {
34+
return nil, err
35+
}
36+
return &stdReader{f: f}, nil
37+
}
38+
39+
var _ FileReader = (*mmapReader)(nil)
40+
41+
type mmapReader struct {
42+
m *mmap.ReaderAt
43+
}
44+
45+
// ReadAt implements the FileReader interface.
46+
func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) {
47+
return r.m.ReadAt(p, off)
48+
}
49+
50+
// Close implements the FileReader interface.
51+
func (r *mmapReader) Close() error {
52+
return r.m.Close()
53+
}
54+
55+
func newMmapReader(path string) (FileReader, error) {
56+
m, err := mmap.Open(path)
57+
if err != nil {
58+
return nil, err
59+
}
60+
return &mmapReader{m: m}, nil
61+
}

filestore/filestore_test.go

Lines changed: 70 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ import (
1818

1919
var bg = context.Background()
2020

21-
func newTestFilestore(t *testing.T) (string, *Filestore) {
21+
func newTestFilestore(t *testing.T, option ...Option) (string, *Filestore) {
2222
mds := ds.NewMapDatastore()
2323

2424
testdir, err := os.MkdirTemp("", "filestore-test")
2525
if err != nil {
2626
t.Fatal(err)
2727
}
28-
fm := NewFileManager(mds, testdir)
28+
fm := NewFileManager(mds, testdir, option...)
2929
fm.AllowFiles = true
3030

3131
bs := blockstore.NewBlockstore(mds)
@@ -48,62 +48,74 @@ func makeFile(dir string, data []byte) (string, error) {
4848
}
4949

5050
func TestBasicFilestore(t *testing.T) {
51-
dir, fs := newTestFilestore(t)
52-
53-
buf := make([]byte, 1000)
54-
rand.Read(buf)
55-
56-
fname, err := makeFile(dir, buf)
57-
if err != nil {
58-
t.Fatal(err)
59-
}
60-
61-
var cids []cid.Cid
62-
for i := 0; i < 100; i++ {
63-
n := &posinfo.FilestoreNode{
64-
PosInfo: &posinfo.PosInfo{
65-
FullPath: fname,
66-
Offset: uint64(i * 10),
67-
},
68-
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
69-
}
70-
71-
err := fs.Put(bg, n)
72-
if err != nil {
73-
t.Fatal(err)
74-
}
75-
cids = append(cids, n.Node.Cid())
76-
}
77-
78-
for i, c := range cids {
79-
blk, err := fs.Get(bg, c)
80-
if err != nil {
81-
t.Fatal(err)
82-
}
83-
84-
if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
85-
t.Fatal("data didnt match on the way out")
86-
}
87-
}
88-
89-
kch, err := fs.AllKeysChan(context.Background())
90-
if err != nil {
91-
t.Fatal(err)
92-
}
93-
94-
out := make(map[string]struct{})
95-
for c := range kch {
96-
out[c.KeyString()] = struct{}{}
97-
}
98-
99-
if len(out) != len(cids) {
100-
t.Fatal("mismatch in number of entries")
101-
}
102-
103-
for _, c := range cids {
104-
if _, ok := out[c.KeyString()]; !ok {
105-
t.Fatal("missing cid: ", c)
106-
}
51+
cases := []struct {
52+
name string
53+
options []Option
54+
}{
55+
{"default", nil},
56+
{"mmap", []Option{WithMMapReader()}},
57+
}
58+
59+
for _, c := range cases {
60+
t.Run(c.name, func(t *testing.T) {
61+
dir, fs := newTestFilestore(t, c.options...)
62+
63+
buf := make([]byte, 1000)
64+
rand.Read(buf)
65+
66+
fname, err := makeFile(dir, buf)
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
71+
var cids []cid.Cid
72+
for i := 0; i < 100; i++ {
73+
n := &posinfo.FilestoreNode{
74+
PosInfo: &posinfo.PosInfo{
75+
FullPath: fname,
76+
Offset: uint64(i * 10),
77+
},
78+
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
79+
}
80+
81+
err := fs.Put(bg, n)
82+
if err != nil {
83+
t.Fatal(err)
84+
}
85+
cids = append(cids, n.Node.Cid())
86+
}
87+
88+
for i, c := range cids {
89+
blk, err := fs.Get(bg, c)
90+
if err != nil {
91+
t.Fatal(err)
92+
}
93+
94+
if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
95+
t.Fatal("data didnt match on the way out")
96+
}
97+
}
98+
99+
kch, err := fs.AllKeysChan(context.Background())
100+
if err != nil {
101+
t.Fatal(err)
102+
}
103+
104+
out := make(map[string]struct{})
105+
for c := range kch {
106+
out[c.KeyString()] = struct{}{}
107+
}
108+
109+
if len(out) != len(cids) {
110+
t.Fatal("mismatch in number of entries")
111+
}
112+
113+
for _, c := range cids {
114+
if _, ok := out[c.KeyString()]; !ok {
115+
t.Fatal("missing cid: ", c)
116+
}
117+
}
118+
})
107119
}
108120
}
109121

filestore/fsrefstore.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
// FilestorePrefix identifies the key prefix for FileManager blocks.
2626
var FilestorePrefix = ds.NewKey("filestore")
2727

28+
type Option func(*FileManager)
29+
2830
// FileManager is a blockstore implementation which stores special
2931
// blocks FilestoreNode type. These nodes only contain a reference
3032
// to the actual location of the block data in the filesystem
@@ -34,6 +36,7 @@ type FileManager struct {
3436
AllowUrls bool
3537
ds ds.Batching
3638
root string
39+
makeReader func(path string) (FileReader, error)
3740
}
3841

3942
// CorruptReferenceError implements the error interface.
@@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string {
5154
return c.Err.Error()
5255
}
5356

57+
// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O.
58+
// On Windows, when reading and writing to a file simultaneously, the system would consume
59+
// a significant amount of memory due to caching. This memory usage is not reflected in
60+
// the application but in the system. Using memory-mapped files (implemented with
61+
// CreateFileMapping on Windows) avoids this issue.
62+
func WithMMapReader() Option {
63+
return func(f *FileManager) {
64+
f.makeReader = newMmapReader
65+
}
66+
}
67+
5468
// NewFileManager initializes a new file manager with the given
5569
// datastore and root. All FilestoreNodes paths are relative to the
5670
// root path given here, which is prepended for any operations.
57-
func NewFileManager(ds ds.Batching, root string) *FileManager {
58-
return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
71+
func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager {
72+
f := &FileManager{
73+
ds: dsns.Wrap(ds, FilestorePrefix),
74+
root: root,
75+
makeReader: newStdReader,
76+
}
77+
78+
for _, option := range options {
79+
option(f)
80+
}
81+
82+
return f
5983
}
6084

6185
// AllKeysChan returns a channel from which to read the keys stored in
@@ -175,21 +199,16 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er
175199
p := filepath.FromSlash(d.GetFilePath())
176200
abspath := filepath.Join(f.root, p)
177201

178-
fi, err := os.Open(abspath)
202+
fi, err := f.makeReader(abspath)
179203
if os.IsNotExist(err) {
180204
return nil, &CorruptReferenceError{StatusFileNotFound, err}
181205
} else if err != nil {
182206
return nil, &CorruptReferenceError{StatusFileError, err}
183207
}
184208
defer fi.Close()
185209

186-
_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
187-
if err != nil {
188-
return nil, &CorruptReferenceError{StatusFileError, err}
189-
}
190-
191210
outbuf := make([]byte, d.GetSize_())
192-
_, err = io.ReadFull(fi, outbuf)
211+
_, err = fi.ReadAt(outbuf, int64(d.GetOffset()))
193212
if err == io.EOF || err == io.ErrUnexpectedEOF {
194213
return nil, &CorruptReferenceError{StatusFileChanged, err}
195214
} else if err != nil {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ require (
7575
go.opentelemetry.io/otel/trace v1.31.0
7676
go.uber.org/multierr v1.11.0
7777
go.uber.org/zap v1.27.0
78+
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
7879
golang.org/x/oauth2 v0.23.0
7980
golang.org/x/sync v0.8.0
8081
golang.org/x/sys v0.26.0
@@ -185,7 +186,6 @@ require (
185186
go.uber.org/fx v1.23.0 // indirect
186187
go.uber.org/mock v0.5.0 // indirect
187188
golang.org/x/crypto v0.28.0 // indirect
188-
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
189189
golang.org/x/mod v0.21.0 // indirect
190190
golang.org/x/net v0.30.0 // indirect
191191
golang.org/x/text v0.19.0 // indirect

0 commit comments

Comments
 (0)