Skip to content

Commit aa123dd

Browse files
committed
blockstore: implement UseWholeCIDs
Update the tests too, as a few expected whole CIDs. For now, just make them use the option. This required index.Index to gain a new method, GetAll, so that when using whole CIDs, methods like Get can return true if any of the matching indexed CIDs are an exact whole-CID match, and not just looking at the first matching indexed CID. GetAll is akin to an iteration over all matching indexed CIDs, and its callback returns a boolean to say if the iteration should continue. This allows stopping as soon as we're done. We also remove Index.Get, instead replacing it with a helper called GetFirst, which simply makes simple uses of GetAll a single line. We remove the non-specced and unused index implementations, too. They were left in place in case they were useful again, but they haven't been so far, and their code is still in git. Keeping them around just means updating more code when refactoring. While at it, make ZeroLengthSectionAsEOF take a boolean and return an option, just like the other boolean options, for consistency. Fixes #130. This commit was moved from ipld/go-car@c2e497e
1 parent 523c604 commit aa123dd

15 files changed

+281
-244
lines changed

ipld/car/v2/blockstore/example_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ func ExampleOpenReadOnly() {
2020
// Note, `OpenReadOnly` accepts bot CARv1 and CARv2 formats and transparently generate index
2121
// in the background if necessary.
2222
// This instance sets ZeroLengthSectionAsEOF option to treat zero sized sections in file as EOF.
23-
robs, err := blockstore.OpenReadOnly("../testdata/sample-v1.car", carv2.ZeroLengthSectionAsEOF)
23+
robs, err := blockstore.OpenReadOnly("../testdata/sample-v1.car",
24+
blockstore.UseWholeCIDs(true),
25+
carv2.ZeroLengthSectionAsEOF(true),
26+
)
2427
if err != nil {
2528
panic(err)
2629
}

ipld/car/v2/blockstore/insertionindex.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func newRecordFromCid(c cid.Cid, at uint64) recordDigest {
5555
panic(err)
5656
}
5757

58-
return recordDigest{d.Digest, index.Record{Cid: c, Idx: at}}
58+
return recordDigest{d.Digest, index.Record{Cid: c, Offset: at}}
5959
}
6060

6161
func (ii *insertionIndex) insertNoReplace(key cid.Cid, n uint64) {
@@ -77,7 +77,31 @@ func (ii *insertionIndex) Get(c cid.Cid) (uint64, error) {
7777
return 0, errUnsupported
7878
}
7979

80-
return r.Record.Idx, nil
80+
return r.Record.Offset, nil
81+
}
82+
83+
func (ii *insertionIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
84+
d, err := multihash.Decode(c.Hash())
85+
if err != nil {
86+
return err
87+
}
88+
entry := recordDigest{digest: d.Digest}
89+
90+
any := false
91+
iter := func(i llrb.Item) bool {
92+
existing := i.(recordDigest)
93+
if !bytes.Equal(existing.digest, entry.digest) {
94+
// We've already looked at all entries with matching digests.
95+
return false
96+
}
97+
any = true
98+
return fn(existing.Record.Offset)
99+
}
100+
ii.items.AscendGreaterOrEqual(entry, iter)
101+
if !any {
102+
return index.ErrNotFound
103+
}
104+
return nil
81105
}
82106

83107
func (ii *insertionIndex) Marshal(w io.Writer) error {
@@ -152,6 +176,10 @@ func (ii *insertionIndex) flatten() (index.Index, error) {
152176
return si, nil
153177
}
154178

179+
// note that hasExactCID is very similar to GetAll,
180+
// but it's separate as it allows us to compare Record.Cid directly,
181+
// whereas GetAll just provides Record.Offset.
182+
155183
func (ii *insertionIndex) hasExactCID(c cid.Cid) bool {
156184
d, err := multihash.Decode(c.Hash())
157185
if err != nil {

ipld/car/v2/blockstore/readonly.go

+92-35
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ type ReadOnly struct {
6565
// go-car/v2 package.
6666
func UseWholeCIDs(enable bool) carv2.ReadOption {
6767
return func(o *carv2.ReadOptions) {
68-
// TODO: update methods like Get, Has, and AllKeysChan to obey this.
6968
o.BlockstoreUseWholeCIDs = enable
7069
}
7170
}
@@ -177,68 +176,121 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) {
177176
b.mu.RLock()
178177
defer b.mu.RUnlock()
179178

180-
offset, err := b.idx.Get(key)
179+
var fnFound bool
180+
var fnErr error
181+
err := b.idx.GetAll(key, func(offset uint64) bool {
182+
uar := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
183+
var err error
184+
_, err = varint.ReadUvarint(uar)
185+
if err != nil {
186+
fnErr = err
187+
return false
188+
}
189+
_, readCid, err := cid.CidFromReader(uar)
190+
if err != nil {
191+
fnErr = err
192+
return false
193+
}
194+
if b.ropts.BlockstoreUseWholeCIDs {
195+
fnFound = readCid.Equals(key)
196+
return !fnFound // continue looking if we haven't found it
197+
} else {
198+
fnFound = bytes.Equal(readCid.Hash(), key.Hash())
199+
return false
200+
}
201+
})
181202
if errors.Is(err, index.ErrNotFound) {
182203
return false, nil
183204
} else if err != nil {
184205
return false, err
185206
}
186-
uar := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
187-
_, err = varint.ReadUvarint(uar)
188-
if err != nil {
189-
return false, err
190-
}
191-
_, c, err := cid.CidFromReader(uar)
192-
if err != nil {
193-
return false, err
194-
}
195-
return bytes.Equal(key.Hash(), c.Hash()), nil
207+
return fnFound, fnErr
196208
}
197209

198210
// Get gets a block corresponding to the given key.
199211
func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {
200212
b.mu.RLock()
201213
defer b.mu.RUnlock()
202214

203-
offset, err := b.idx.Get(key)
204-
if err != nil {
205-
if err == index.ErrNotFound {
206-
err = blockstore.ErrNotFound
215+
var fnData []byte
216+
var fnErr error
217+
err := b.idx.GetAll(key, func(offset uint64) bool {
218+
readCid, data, err := b.readBlock(int64(offset))
219+
if err != nil {
220+
fnErr = err
221+
return false
207222
}
223+
if b.ropts.BlockstoreUseWholeCIDs {
224+
if readCid.Equals(key) {
225+
fnData = data
226+
return false
227+
} else {
228+
return true // continue looking
229+
}
230+
} else {
231+
if bytes.Equal(readCid.Hash(), key.Hash()) {
232+
fnData = data
233+
}
234+
return false
235+
}
236+
})
237+
if errors.Is(err, index.ErrNotFound) {
238+
return nil, blockstore.ErrNotFound
239+
} else if err != nil {
208240
return nil, err
241+
} else if fnErr != nil {
242+
return nil, fnErr
209243
}
210-
entry, data, err := b.readBlock(int64(offset))
211-
if err != nil {
212-
return nil, err
213-
}
214-
if !bytes.Equal(key.Hash(), entry.Hash()) {
244+
if fnData == nil {
215245
return nil, blockstore.ErrNotFound
216246
}
217-
return blocks.NewBlockWithCid(data, key)
247+
return blocks.NewBlockWithCid(fnData, key)
218248
}
219249

220250
// GetSize gets the size of an item corresponding to the given key.
221251
func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
222252
b.mu.RLock()
223253
defer b.mu.RUnlock()
224254

225-
idx, err := b.idx.Get(key)
226-
if err != nil {
227-
return -1, err
228-
}
229-
rdr := internalio.NewOffsetReadSeeker(b.backing, int64(idx))
230-
sectionLen, err := varint.ReadUvarint(rdr)
231-
if err != nil {
255+
var fnSize int = -1
256+
var fnErr error
257+
err := b.idx.GetAll(key, func(offset uint64) bool {
258+
rdr := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
259+
sectionLen, err := varint.ReadUvarint(rdr)
260+
if err != nil {
261+
fnErr = err
262+
return false
263+
}
264+
cidLen, readCid, err := cid.CidFromReader(rdr)
265+
if err != nil {
266+
fnErr = err
267+
return false
268+
}
269+
if b.ropts.BlockstoreUseWholeCIDs {
270+
if readCid.Equals(key) {
271+
fnSize = int(sectionLen) - cidLen
272+
return false
273+
} else {
274+
return true // continue looking
275+
}
276+
} else {
277+
if bytes.Equal(readCid.Hash(), key.Hash()) {
278+
fnSize = int(sectionLen) - cidLen
279+
}
280+
return false
281+
}
282+
})
283+
if errors.Is(err, index.ErrNotFound) {
232284
return -1, blockstore.ErrNotFound
285+
} else if err != nil {
286+
return -1, err
287+
} else if fnErr != nil {
288+
return -1, fnErr
233289
}
234-
cidLen, readCid, err := cid.CidFromReader(rdr)
235-
if err != nil {
236-
return 0, err
237-
}
238-
if !readCid.Equals(key) {
290+
if fnSize == -1 {
239291
return -1, blockstore.ErrNotFound
240292
}
241-
return int(sectionLen) - cidLen, err
293+
return fnSize, nil
242294
}
243295

244296
// Put is not supported and always returns an error.
@@ -304,6 +356,11 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
304356
return // TODO: log this error
305357
}
306358

359+
// If we're just using multihashes, flatten to the "raw" codec.
360+
if !b.ropts.BlockstoreUseWholeCIDs {
361+
c = cid.NewCidV1(cid.Raw, c.Hash())
362+
}
363+
307364
select {
308365
case ch <- c:
309366
case <-ctx.Done():

ipld/car/v2/blockstore/readonly_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ func TestReadOnly(t *testing.T) {
4949
}
5050
for _, tt := range tests {
5151
t.Run(tt.name, func(t *testing.T) {
52-
subject, err := OpenReadOnly(tt.v1OrV2path)
52+
subject, err := OpenReadOnly(tt.v1OrV2path,
53+
UseWholeCIDs(true),
54+
)
5355
t.Cleanup(func() { subject.Close() })
5456
require.NoError(t, err)
5557

ipld/car/v2/blockstore/readwrite.go

-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ func AllowDuplicatePuts(allow bool) carv2.WriteOption {
8989
// Resuming from finalized files is allowed. However, resumption will regenerate the index
9090
// regardless by scanning every existing block in file.
9191
func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) (*ReadWrite, error) {
92-
// TODO: enable deduplication by default now that resumption is automatically attempted.
9392
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o666) // TODO: Should the user be able to configure FileMode permissions?
9493
if err != nil {
9594
return nil, fmt.Errorf("could not open read/write file: %w", err)

0 commit comments

Comments
 (0)