Skip to content

Commit 99e4972

Browse files
committed
pagecache: more proper integration
1 parent 96e2dc7 commit 99e4972

27 files changed

+259
-215
lines changed

catfs/fs.go

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/sahib/brig/catfs/db"
2525
ie "github.com/sahib/brig/catfs/errors"
2626
"github.com/sahib/brig/catfs/mio"
27+
"github.com/sahib/brig/catfs/mio/pagecache"
2728
n "github.com/sahib/brig/catfs/nodes"
2829
"github.com/sahib/brig/catfs/vcs"
2930
"github.com/sahib/brig/repo/hints"
@@ -101,6 +102,10 @@ type FS struct {
101102

102103
// interface to load stream hints
103104
hintManager HintManager
105+
106+
// cache for storing pages written to catfs.Handle
107+
// (may be nil if not used, e.g. for tests)
108+
pageCache pagecache.Cache
104109
}
105110

106111
// ErrReadOnly is returned when a file system was created in read only mode
@@ -354,6 +359,7 @@ func NewFilesystem(
354359
readOnly bool,
355360
fsCfg *config.Config,
356361
hintManager HintManager,
362+
pageCache pagecache.Cache,
357363
) (*FS, error) {
358364
kv, err := db.NewBadgerDatabase(dbPath)
359365
if err != nil {
@@ -394,6 +400,7 @@ func NewFilesystem(
394400
repinControl: make(chan string, 1),
395401
pinner: pinCache,
396402
hintManager: hintManager,
403+
pageCache: pageCache,
397404
}
398405

399406
// Start the garbage collection background task.

catfs/fs_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func withDummyFSReadOnly(t *testing.T, readOnly bool, fn func(fs *FS)) {
5050

5151
fsCfg := cfg.Section("fs")
5252

53-
fs, err := NewFilesystem(backend, dbPath, owner, readOnly, fsCfg, nil)
53+
fs, err := NewFilesystem(backend, dbPath, owner, readOnly, fsCfg, nil, nil)
5454
if err != nil {
5555
t.Fatalf("Failed to create filesystem: %v", err)
5656
}

catfs/handle.go

+28-52
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"sync"
88

99
"github.com/sahib/brig/catfs/mio"
10-
"github.com/sahib/brig/catfs/mio/overlay"
10+
"github.com/sahib/brig/catfs/mio/pagecache"
1111
n "github.com/sahib/brig/catfs/nodes"
1212
)
1313

@@ -24,7 +24,7 @@ type Handle struct {
2424
fs *FS
2525
file *n.File
2626
lock sync.Mutex
27-
layer *overlay.Layer
27+
layer *pagecache.Layer
2828
stream mio.Stream
2929
wasModified bool
3030
isClosed bool
@@ -40,6 +40,10 @@ func newHandle(fs *FS, file *n.File, readOnly bool) *Handle {
4040
}
4141

4242
func (hdl *Handle) initStreamIfNeeded() error {
43+
if hdl.fs.pageCache == nil {
44+
return errors.New("no page cache was initialized")
45+
}
46+
4347
if hdl.stream != nil {
4448
return nil
4549
}
@@ -60,18 +64,20 @@ func (hdl *Handle) initStreamIfNeeded() error {
6064
return err
6165
}
6266

63-
hdl.layer = overlay.NewLayer(hdl.stream)
64-
hdl.layer.Truncate(int64(hdl.file.Size()))
65-
hdl.layer.SetSize(int64(hdl.file.Size()))
66-
return nil
67+
hdl.layer, err = pagecache.NewLayer(
68+
hdl.stream,
69+
hdl.fs.pageCache,
70+
int64(hdl.file.Inode()),
71+
int64(hdl.file.Size()),
72+
)
73+
74+
return err
6775
}
6876

6977
// Read will try to fill `buf` as much as possible.
7078
// The seek pointer will be advanced by the number of bytes written.
7179
// Take care, `buf` might still have contents, even if io.EOF was returned.
7280
func (hdl *Handle) Read(buf []byte) (int, error) {
73-
var err error
74-
7581
hdl.lock.Lock()
7682
defer hdl.lock.Unlock()
7783

@@ -83,19 +89,23 @@ func (hdl *Handle) Read(buf []byte) (int, error) {
8389
return 0, err
8490
}
8591

86-
// TODO: not sure if that makes sense...
87-
// we should just read whatever the underlying stream thinks it has.
88-
n, err := io.ReadFull(hdl.layer, buf)
89-
isEOF := (err == io.ErrUnexpectedEOF || err == io.EOF)
90-
if err != nil && !isEOF {
91-
return 0, err
92+
return hdl.layer.Read(buf)
93+
}
94+
95+
// ReadAt reads from the overlay at `off` into `buf`.
96+
func (hdl *Handle) ReadAt(buf []byte, off int64) (int, error) {
97+
hdl.lock.Lock()
98+
defer hdl.lock.Unlock()
99+
100+
if hdl.isClosed {
101+
return 0, ErrIsClosed
92102
}
93103

94-
if isEOF {
95-
return n, io.EOF
104+
if err := hdl.initStreamIfNeeded(); err != nil {
105+
return 0, err
96106
}
97107

98-
return n, nil
108+
return hdl.layer.ReadAt(buf, off)
99109
}
100110

101111
// Write will write the contents of `buf` to the current position.
@@ -112,42 +122,8 @@ func (hdl *Handle) Write(buf []byte) (int, error) {
112122
return 0, ErrIsClosed
113123
}
114124

115-
if err := hdl.initStreamIfNeeded(); err != nil {
116-
return 0, err
117-
}
118-
119-
// Currently, we do not check if the file was actually modified
120-
// (i.e. data changed compared to before)
121125
hdl.wasModified = true
122-
123-
n, err := hdl.layer.Write(buf)
124-
if err != nil {
125-
return n, err
126-
}
127-
128-
// Advance the write pointer when writing things to the buffer.
129-
if _, err := hdl.stream.Seek(int64(n), io.SeekCurrent); err != nil && err != io.EOF {
130-
return n, err
131-
}
132-
133-
minSize := uint64(hdl.layer.MinSize())
134-
if hdl.file.Size() < minSize {
135-
hdl.fs.mu.Lock()
136-
hdl.file.SetSize(minSize)
137-
138-
// Make sure to save the size change:
139-
if err := hdl.fs.lkr.StageNode(hdl.file); err != nil {
140-
hdl.fs.mu.Unlock()
141-
return 0, err
142-
}
143-
144-
hdl.fs.mu.Unlock()
145-
146-
// Also auto-truncate on every write.
147-
hdl.layer.Truncate(int64(minSize))
148-
}
149-
150-
return n, nil
126+
return hdl.layer.Write(buf)
151127
}
152128

153129
// WriteAt writes data from `buf` at offset `off` counted from the start (0 offset).

catfs/mio/pagecache/cache.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package overlay
1+
package pagecache
22

33
import (
44
"github.com/sahib/brig/catfs/mio/pagecache/page"

catfs/mio/pagecache/doc.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Package overlay implements a io.ReaderAt and io.WriterAt that is similar in
1+
// Package pagecache implements a io.ReaderAt and io.WriterAt that is similar in
22
// function to the OverlayFS of Linux. It overlays a read-only stream and
33
// enables write support. The writes will take priority on the data in stream
44
// and will therefore be visible when calling ReadAt() of the overlay.
@@ -28,4 +28,4 @@
2828
//
2929
// NOTE: Whenever uint32 is used in this code, it refers to per-page offsets or
3030
// size. When int64 is used the content is an offset of the underlying offset.
31-
package overlay
31+
package pagecache

catfs/mio/pagecache/mdcache/l1.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
// 2. We must be able to set a max memory bound.
1919
// 3. We must avoid copying of pages due to performance reasons.
2020
//
21-
// The most popular libraries fail always one of the criterias:
21+
// The most popular libraries fail always one of the criteria:
2222
//
2323
// - fastcache: fails 1 and 3.
2424
// - ristretto: fails 1.
@@ -109,16 +109,14 @@ func (c *l1cache) Get(pk pageKey) (*page.Page, error) {
109109
return item.Page, nil
110110
}
111111

112-
func (c *l1cache) Del(pks []pageKey) error {
112+
func (c *l1cache) Del(pks []pageKey) {
113113
for _, pk := range pks {
114114
delItem, ok := c.m[pk]
115115
if ok {
116116
c.k.Remove(delItem.Link)
117117
delete(c.m, pk)
118118
}
119119
}
120-
121-
return nil
122120
}
123121

124122
func (c *l1cache) Close() error {

catfs/mio/pagecache/mdcache/l1_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestL1GetSetDel(t *testing.T) {
3737
require.Equal(t, pset.Data, pgot.Data)
3838
require.Equal(t, pset.Extents, pgot.Extents)
3939

40-
require.NoError(t, l1.Del([]pageKey{pk}))
40+
l1.Del([]pageKey{pk})
4141
_, err = l1.Get(pk)
4242
require.Error(t, page.ErrCacheMiss)
4343
})

catfs/mio/pagecache/mdcache/l2.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ func (c *l2cache) Get(pk pageKey) (*page.Page, error) {
5858
return page.FromBytes(pdata)
5959
}
6060

61-
func (c *l2cache) Del(pks []pageKey) error {
61+
func (c *l2cache) Del(pks []pageKey) {
6262
if c == nil {
63-
return nil
63+
return
6464
}
6565

6666
for _, pk := range pks {
@@ -70,8 +70,6 @@ func (c *l2cache) Del(pks []pageKey) error {
7070
log.Warnf("page l2: failed to delete %s", path)
7171
}
7272
}
73-
74-
return nil
7573
}
7674

7775
func (c *l2cache) Close() error {

catfs/mio/pagecache/mdcache/l2_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestL2GetSetDel(t *testing.T) {
4646
require.Equal(t, pset.Data, pgot.Data)
4747
require.Equal(t, pset.Extents, pgot.Extents)
4848

49-
require.NoError(t, l2.Del([]pageKey{pk}))
49+
l2.Del([]pageKey{pk})
5050
_, err = l2.Get(pk)
5151
require.Error(t, page.ErrCacheMiss)
5252
})
@@ -61,5 +61,5 @@ func TestL2Nil(t *testing.T) {
6161
require.Error(t, page.ErrCacheMiss)
6262

6363
require.NoError(t, l2.Set(pageKey{0, 1}, dummyPage(0, 1024)))
64-
require.NoError(t, l2.Del([]pageKey{{0, 1}}))
64+
l2.Del([]pageKey{{0, 1}})
6565
}

catfs/mio/pagecache/mdcache/mdcache.go

+23-17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// Package mdcache implements a leveled memory/disk cache combination.
12
package mdcache
23

34
import (
@@ -9,9 +10,21 @@ import (
910
log "github.com/sirupsen/logrus"
1011
)
1112

13+
// Options give room for finetuning the behavior of Memory/Disk cache.
1214
type Options struct {
15+
// MaxMemoryUsage of L1 in bytes
1316
MaxMemoryUsage int64
14-
SwapDirectory string
17+
18+
// SwapDirectory specifies where L2 pages are stored.
19+
// If empty, no l2 cache is used. Instead another l1 cache
20+
// is used in its place, rendering MaxMemoryUsage useless.
21+
// You have to set both for an effect.
22+
SwapDirectory string
23+
24+
// L1CacheMissRefill will propagate
25+
// data from L2 to L1 if it could be found
26+
// successfully.
27+
L1CacheMissRefill bool
1528

1629
// TODO: Those need to be still implemented.
1730

@@ -23,17 +36,12 @@ type Options struct {
2336
// on load. Reduces storage, but increases CPU usage if you're swapping.
2437
// Since swapping is slow anyways this is recommended.
2538
L2Compress bool
26-
27-
// L1CacheMissRefill will propagate
28-
// data from L2 to L1 if it could be found
29-
// successfully.
30-
L1CacheMissRefill bool
3139
}
3240

3341
type cacheLayer interface {
3442
Get(pk pageKey) (*page.Page, error)
3543
Set(pk pageKey, p *page.Page) error
36-
Del(pks []pageKey) error
44+
Del(pks []pageKey)
3745
Close() error
3846
}
3947

@@ -62,7 +70,8 @@ func (pk pageKey) String() string {
6270
return filepath.Join(string(s[:2]), string(s[2:]))
6371
}
6472

65-
func NewDirCache(opts Options) (*MDCache, error) {
73+
// New returns a new Memory/Disk cache
74+
func New(opts Options) (*MDCache, error) {
6675
l2, err := newL2Cache(opts.SwapDirectory)
6776
if err != nil {
6877
return nil, err
@@ -89,6 +98,7 @@ func NewDirCache(opts Options) (*MDCache, error) {
8998
}, nil
9099
}
91100

101+
// Lookup implements pagecache.Cache
92102
func (dc *MDCache) Lookup(inode int64, pageIdx uint32) (*page.Page, error) {
93103
dc.mu.Lock()
94104
defer dc.mu.Unlock()
@@ -120,6 +130,7 @@ func (dc *MDCache) get(pk pageKey) (*page.Page, error) {
120130
}
121131
}
122132

133+
// Merge implements pagecache.Cache
123134
func (dc *MDCache) Merge(inode int64, pageIdx, off uint32, write []byte) error {
124135
dc.mu.Lock()
125136
defer dc.mu.Unlock()
@@ -149,6 +160,7 @@ func (dc *MDCache) Merge(inode int64, pageIdx, off uint32, write []byte) error {
149160
return dc.l1.Set(pk, p)
150161
}
151162

163+
// Evict implements pagecache.Cache
152164
func (dc *MDCache) Evict(inode, size int64) error {
153165
dc.mu.Lock()
154166
defer dc.mu.Unlock()
@@ -164,18 +176,12 @@ func (dc *MDCache) Evict(inode, size int64) error {
164176
pks = append(pks, pageKey{inode: inode, pageIdx: pageIdx})
165177
}
166178

167-
if err := dc.l1.Del(pks); err != nil {
168-
log.WithError(err).Warnf("l1 delete failed for %v", pks)
169-
}
170-
171-
// TODO: This will spam logs in case of no page:
172-
if err := dc.l2.Del(pks); err != nil {
173-
log.WithError(err).Warnf("l2 delete failed for %v", pks)
174-
}
175-
179+
dc.l1.Del(pks)
180+
dc.l2.Del(pks)
176181
return nil
177182
}
178183

184+
// Close closes the cache contents and cleans up resources.
179185
func (dc *MDCache) Close() error {
180186
dc.mu.Lock()
181187
defer dc.mu.Unlock()

catfs/mio/pagecache/mdcache/mdcache_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func withMDCache(t *testing.T, fn func(mdc *MDCache)) {
1515
require.NoError(t, err)
1616
defer os.RemoveAll(tmpDir)
1717

18-
md, err := NewDirCache(Options{
18+
md, err := New(Options{
1919
MaxMemoryUsage: 4 * page.Size,
2020
SwapDirectory: tmpDir,
2121
L1CacheMissRefill: true,

0 commit comments

Comments
 (0)