Skip to content

Commit 3de1075

Browse files
committed
embed keytransform.Datastore to support more methods
1 parent fd1232c commit 3de1075

File tree

1 file changed

+15
-155
lines changed

1 file changed

+15
-155
lines changed

keytransform/txndatastore.go

Lines changed: 15 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ func WrapTxnDatastore(child ds.TxnDatastore, t KeyTransform) *TxnDatastore {
1919
panic("child (ds.TxnDatastore) is nil")
2020
}
2121

22-
return &TxnDatastore{child: child, KeyTransform: t}
22+
return &TxnDatastore{ds: Wrap(child, t), child: child, KeyTransform: t}
2323
}
2424

2525
// TxnDatastore keeps a KeyTransform function
2626
type TxnDatastore struct {
27+
ds *Datastore
2728
child ds.TxnDatastore
2829

2930
KeyTransform
@@ -45,205 +46,64 @@ func (d *TxnDatastore) Children() []ds.Datastore {
4546

4647
// Put stores the given value, transforming the key first.
4748
func (d *TxnDatastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) {
48-
return d.child.Put(ctx, d.ConvertKey(key), value)
49+
return d.ds.Put(ctx, d.ConvertKey(key), value)
4950
}
5051

5152
// Sync implements Datastore.Sync
5253
func (d *TxnDatastore) Sync(ctx context.Context, prefix ds.Key) error {
53-
return d.child.Sync(ctx, d.ConvertKey(prefix))
54+
return d.ds.Sync(ctx, d.ConvertKey(prefix))
5455
}
5556

5657
// Get returns the value for given key, transforming the key first.
5758
func (d *TxnDatastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
58-
return d.child.Get(ctx, d.ConvertKey(key))
59+
return d.ds.Get(ctx, d.ConvertKey(key))
5960
}
6061

6162
// Has returns whether the datastore has a value for a given key, transforming
6263
// the key first.
6364
func (d *TxnDatastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) {
64-
return d.child.Has(ctx, d.ConvertKey(key))
65+
return d.ds.Has(ctx, d.ConvertKey(key))
6566
}
6667

6768
// GetSize returns the size of the value named by the given key, transforming
6869
// the key first.
6970
func (d *TxnDatastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
70-
return d.child.GetSize(ctx, d.ConvertKey(key))
71+
return d.ds.GetSize(ctx, d.ConvertKey(key))
7172
}
7273

7374
// Delete removes the value for given key
7475
func (d *TxnDatastore) Delete(ctx context.Context, key ds.Key) (err error) {
75-
return d.child.Delete(ctx, d.ConvertKey(key))
76+
return d.ds.Delete(ctx, d.ConvertKey(key))
7677
}
7778

7879
// Query implements Query, inverting keys on the way back out.
7980
func (d *TxnDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
80-
nq, cq := d.prepareQuery(q)
81-
82-
cqr, err := d.child.Query(ctx, cq)
83-
if err != nil {
84-
return nil, err
85-
}
86-
87-
qr := dsq.ResultsFromIterator(q, dsq.Iterator{
88-
Next: func() (dsq.Result, bool) {
89-
r, ok := cqr.NextSync()
90-
if !ok {
91-
return r, false
92-
}
93-
if r.Error == nil {
94-
r.Entry.Key = d.InvertKey(ds.RawKey(r.Entry.Key)).String()
95-
}
96-
return r, true
97-
},
98-
Close: func() error {
99-
return cqr.Close()
100-
},
101-
})
102-
return dsq.NaiveQueryApply(nq, qr), nil
103-
}
104-
105-
// Split the query into a child query and a naive query. That way, we can make
106-
// the child datastore do as much work as possible.
107-
func (d *TxnDatastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {
108-
109-
// First, put everything in the child query. Then, start taking things
110-
// out.
111-
child = q
112-
113-
// Always let the child handle the key prefix.
114-
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()
115-
116-
// Check if the key transform is order-preserving so we can use the
117-
// child datastore's built-in ordering.
118-
orderPreserving := false
119-
switch d.KeyTransform.(type) {
120-
case PrefixTransform, *PrefixTransform:
121-
orderPreserving = true
122-
}
123-
124-
// Try to let the child handle ordering.
125-
orders:
126-
for i, o := range child.Orders {
127-
switch o.(type) {
128-
case dsq.OrderByValue, *dsq.OrderByValue,
129-
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
130-
// Key doesn't matter.
131-
continue
132-
case dsq.OrderByKey, *dsq.OrderByKey,
133-
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
134-
// if the key transform preserves order, we can delegate
135-
// to the child datastore.
136-
if orderPreserving {
137-
// When sorting, we compare with the first
138-
// Order, then, if equal, we compare with the
139-
// second Order, etc. However, keys are _unique_
140-
// so we'll never apply any additional orders
141-
// after ordering by key.
142-
child.Orders = child.Orders[:i+1]
143-
break orders
144-
}
145-
}
146-
147-
// Can't handle this order under transform, punt it to a naive
148-
// ordering.
149-
naive.Orders = q.Orders
150-
child.Orders = nil
151-
naive.Offset = q.Offset
152-
child.Offset = 0
153-
naive.Limit = q.Limit
154-
child.Limit = 0
155-
break
156-
}
157-
158-
// Try to let the child handle the filters.
159-
160-
// don't modify the original filters.
161-
child.Filters = append([]dsq.Filter(nil), child.Filters...)
162-
163-
for i, f := range child.Filters {
164-
switch f := f.(type) {
165-
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
166-
continue
167-
case dsq.FilterKeyCompare:
168-
child.Filters[i] = dsq.FilterKeyCompare{
169-
Op: f.Op,
170-
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
171-
}
172-
continue
173-
case *dsq.FilterKeyCompare:
174-
child.Filters[i] = &dsq.FilterKeyCompare{
175-
Op: f.Op,
176-
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
177-
}
178-
continue
179-
case dsq.FilterKeyPrefix:
180-
child.Filters[i] = dsq.FilterKeyPrefix{
181-
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
182-
}
183-
continue
184-
case *dsq.FilterKeyPrefix:
185-
child.Filters[i] = &dsq.FilterKeyPrefix{
186-
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
187-
}
188-
continue
189-
}
190-
191-
// Not a known filter, defer to the naive implementation.
192-
naive.Filters = q.Filters
193-
child.Filters = nil
194-
naive.Offset = q.Offset
195-
child.Offset = 0
196-
naive.Limit = q.Limit
197-
child.Limit = 0
198-
break
199-
}
200-
return
81+
return d.ds.Query(ctx, q)
20182
}
20283

20384
func (d *TxnDatastore) Close() error {
204-
return d.child.Close()
85+
return d.ds.Close()
20586
}
20687

20788
// DiskUsage implements the PersistentTxnDatastore interface.
20889
func (d *TxnDatastore) DiskUsage(ctx context.Context) (uint64, error) {
209-
return ds.DiskUsage(ctx, d.child)
90+
return d.ds.DiskUsage(ctx)
21091
}
21192

21293
func (d *TxnDatastore) Batch(ctx context.Context) (ds.Batch, error) {
213-
bds, ok := d.child.(ds.Batching)
214-
if !ok {
215-
return nil, ds.ErrBatchUnsupported
216-
}
217-
218-
childbatch, err := bds.Batch(ctx)
219-
if err != nil {
220-
return nil, err
221-
}
222-
return &transformBatch{
223-
dst: childbatch,
224-
f: d.ConvertKey,
225-
}, nil
94+
return d.ds.Batch(ctx)
22695
}
22796

22897
func (d *TxnDatastore) Check(ctx context.Context) error {
229-
if c, ok := d.child.(ds.CheckedDatastore); ok {
230-
return c.Check(ctx)
231-
}
232-
return nil
98+
return d.ds.Check(ctx)
23399
}
234100

235101
func (d *TxnDatastore) Scrub(ctx context.Context) error {
236-
if c, ok := d.child.(ds.ScrubbedDatastore); ok {
237-
return c.Scrub(ctx)
238-
}
239-
return nil
102+
return d.ds.Scrub(ctx)
240103
}
241104

242105
func (d *TxnDatastore) CollectGarbage(ctx context.Context) error {
243-
if c, ok := d.child.(ds.GCDatastore); ok {
244-
return c.CollectGarbage(ctx)
245-
}
246-
return nil
106+
return d.ds.CollectGarbage(ctx)
247107
}
248108

249109
func (d *TxnDatastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {

0 commit comments

Comments
 (0)