Skip to content

Commit 89d7625

Browse files
committed
change pinning to happen in a callback
1 parent f0276dc commit 89d7625

File tree

12 files changed

+91
-45
lines changed

12 files changed

+91
-45
lines changed

core/commands/add.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"strings"
88

99
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
10-
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1110

1211
cmds "github.com/ipfs/go-ipfs/commands"
1312
files "github.com/ipfs/go-ipfs/commands/files"
@@ -16,6 +15,7 @@ import (
1615
importer "github.com/ipfs/go-ipfs/importer"
1716
"github.com/ipfs/go-ipfs/importer/chunk"
1817
dag "github.com/ipfs/go-ipfs/merkledag"
18+
pin "github.com/ipfs/go-ipfs/pin"
1919
ft "github.com/ipfs/go-ipfs/unixfs"
2020
u "github.com/ipfs/go-ipfs/util"
2121
)
@@ -113,12 +113,16 @@ remains to be implemented.
113113
return
114114
}
115115

116-
err = n.Pinning.Pin(context.Background(), rootnd, true)
116+
rnk, err := rootnd.Key()
117117
if err != nil {
118118
res.SetError(err, cmds.ErrNormal)
119119
return
120120
}
121121

122+
mp := n.Pinning.GetManual()
123+
mp.RemovePinWithMode(rnk, pin.Indirect)
124+
mp.PinWithMode(rnk, pin.Recursive)
125+
122126
err = n.Pinning.Flush()
123127
if err != nil {
124128
res.SetError(err, cmds.ErrNormal)
@@ -214,7 +218,12 @@ remains to be implemented.
214218
}
215219

216220
func add(n *core.IpfsNode, reader io.Reader) (*dag.Node, error) {
217-
node, err := importer.BuildDagFromReader(reader, n.DAG, nil, chunk.DefaultSplitter)
221+
node, err := importer.BuildDagFromReader(
222+
reader,
223+
n.DAG,
224+
chunk.DefaultSplitter,
225+
importer.PinIndirectCB(n.Pinning.GetManual()),
226+
)
218227
if err != nil {
219228
return nil, err
220229
}

core/corehttp/gateway_handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) {
7272
// TODO(cryptix): change and remove this helper once PR1136 is merged
7373
// return ufs.AddFromReader(i.node, r.Body)
7474
return importer.BuildDagFromReader(
75-
r, i.node.DAG, i.node.Pinning.GetManual(), chunk.DefaultSplitter)
75+
r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning.GetManual()))
7676
}
7777

7878
// TODO(btc): break this apart into separate handlers using a more expressive muxer

core/coreunix/add.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package coreunix
22

33
import (
4-
"errors"
54
"io"
65
"io/ioutil"
76
"os"
@@ -18,6 +17,7 @@ import (
1817
"github.com/ipfs/go-ipfs/pin"
1918
"github.com/ipfs/go-ipfs/thirdparty/eventlog"
2019
unixfs "github.com/ipfs/go-ipfs/unixfs"
20+
u "github.com/ipfs/go-ipfs/util"
2121
)
2222

2323
var log = eventlog.Logger("coreunix")
@@ -29,15 +29,12 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {
2929
dagNode, err := importer.BuildDagFromReader(
3030
r,
3131
n.DAG,
32-
n.Pinning.GetManual(), // Fix this interface
3332
chunk.DefaultSplitter,
33+
importer.BasicPinnerCB(n.Pinning.GetManual()),
3434
)
3535
if err != nil {
3636
return "", err
3737
}
38-
if err := n.Pinning.Flush(); err != nil {
39-
return "", err
40-
}
4138
k, err := dagNode.Key()
4239
if err != nil {
4340
return "", err
@@ -53,18 +50,28 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
5350
return "", err
5451
}
5552
defer f.Close()
53+
5654
ff, err := files.NewSerialFile(root, f)
5755
if err != nil {
5856
return "", err
5957
}
58+
6059
dagnode, err := addFile(n, ff)
6160
if err != nil {
6261
return "", err
6362
}
63+
6464
k, err := dagnode.Key()
6565
if err != nil {
6666
return "", err
6767
}
68+
69+
n.Pinning.GetManual().RemovePinWithMode(k, pin.Indirect)
70+
err = n.Pinning.Flush()
71+
if err != nil {
72+
return "", err
73+
}
74+
6875
return k.String(), nil
6976
}
7077

@@ -87,17 +94,17 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *merkle
8794
}
8895

8996
func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) {
90-
mp, ok := n.Pinning.(pin.ManualPinner)
91-
if !ok {
92-
return nil, errors.New("invalid pinner type! expected manual pinner")
93-
}
94-
95-
node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter)
96-
if err != nil {
97-
return nil, err
98-
}
97+
mp := n.Pinning.GetManual()
9998

100-
err = n.Pinning.Flush()
99+
node, err := importer.BuildDagFromReader(
100+
reader,
101+
n.DAG,
102+
chunk.DefaultSplitter,
103+
func(k u.Key, root bool) error {
104+
mp.PinWithMode(k, pin.Indirect)
105+
return nil
106+
},
107+
)
101108
if err != nil {
102109
return nil, err
103110
}

core/coreunix/metadata_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestMetadata(t *testing.T) {
3737
data := make([]byte, 1000)
3838
u.NewTimeSeededRand().Read(data)
3939
r := bytes.NewReader(data)
40-
nd, err := importer.BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter)
40+
nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
4141
if err != nil {
4242
t.Fatal(err)
4343
}

fuse/readonly/ipfs_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.Node, []byte) {
3434
buf := make([]byte, size)
3535
u.NewTimeSeededRand().Read(buf)
3636
read := bytes.NewReader(buf)
37-
obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, nil, chunk.DefaultSplitter)
37+
obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, chunk.DefaultSplitter, nil)
3838
if err != nil {
3939
t.Fatal(err)
4040
}

importer/helpers/dagbuilder.go

+18-9
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@ package helpers
33
import (
44
dag "github.com/ipfs/go-ipfs/merkledag"
55
"github.com/ipfs/go-ipfs/pin"
6+
u "github.com/ipfs/go-ipfs/util"
67
)
78

9+
type BlockCB func(u.Key, bool) error
10+
11+
var nilFunc BlockCB = func(_ u.Key, _ bool) error { return nil }
12+
813
// DagBuilderHelper wraps together a bunch of objects needed to
914
// efficiently create unixfs dag trees
1015
type DagBuilderHelper struct {
@@ -13,6 +18,7 @@ type DagBuilderHelper struct {
1318
in <-chan []byte
1419
nextData []byte // the next item to return.
1520
maxlinks int
21+
bcb BlockCB
1622
}
1723

1824
type DagBuilderParams struct {
@@ -22,18 +28,23 @@ type DagBuilderParams struct {
2228
// DAGService to write blocks to (required)
2329
Dagserv dag.DAGService
2430

25-
// Pinner to use for pinning files (optionally nil)
26-
Pinner pin.ManualPinner
31+
// Callback for each block added
32+
BlockCB BlockCB
2733
}
2834

2935
// Generate a new DagBuilderHelper from the given params, using 'in' as a
3036
// data source
3137
func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper {
38+
bcb := dbp.BlockCB
39+
if bcb == nil {
40+
bcb = nilFunc
41+
}
42+
3243
return &DagBuilderHelper{
3344
dserv: dbp.Dagserv,
34-
mp: dbp.Pinner,
3545
in: in,
3646
maxlinks: dbp.Maxlinks,
47+
bcb: bcb,
3748
}
3849
}
3950

@@ -130,12 +141,10 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
130141
return nil, err
131142
}
132143

133-
if db.mp != nil {
134-
db.mp.PinWithMode(key, pin.Recursive)
135-
err := db.mp.Flush()
136-
if err != nil {
137-
return nil, err
138-
}
144+
// block callback
145+
err = db.bcb(key, true)
146+
if err != nil {
147+
return nil, err
139148
}
140149

141150
return dn, nil

importer/helpers/helpers.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
113113
}
114114

115115
// Pin the child node indirectly
116-
if db.mp != nil {
117-
db.mp.PinWithMode(childkey, pin.Indirect)
116+
err = db.bcb(childkey, false)
117+
if err != nil {
118+
return err
118119
}
119120

120121
return nil

importer/importer.go

+26-7
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ import (
1313
trickle "github.com/ipfs/go-ipfs/importer/trickle"
1414
dag "github.com/ipfs/go-ipfs/merkledag"
1515
"github.com/ipfs/go-ipfs/pin"
16-
"github.com/ipfs/go-ipfs/util"
16+
u "github.com/ipfs/go-ipfs/util"
1717
)
1818

19-
var log = util.Logger("importer")
19+
var log = u.Logger("importer")
2020

2121
// Builds a DAG from the given file, writing created blocks to disk as they are
2222
// created
@@ -36,31 +36,50 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
3636
}
3737
defer f.Close()
3838

39-
return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
39+
return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp))
4040
}
4141

42-
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
42+
func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, bcb h.BlockCB) (*dag.Node, error) {
4343
// Start the splitter
4444
blkch := spl.Split(r)
4545

4646
dbp := h.DagBuilderParams{
4747
Dagserv: ds,
4848
Maxlinks: h.DefaultLinksPerBlock,
49-
Pinner: mp,
49+
BlockCB: bcb,
5050
}
5151

5252
return bal.BalancedLayout(dbp.New(blkch))
5353
}
5454

55-
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
55+
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, bcb h.BlockCB) (*dag.Node, error) {
5656
// Start the splitter
5757
blkch := spl.Split(r)
5858

5959
dbp := h.DagBuilderParams{
6060
Dagserv: ds,
6161
Maxlinks: h.DefaultLinksPerBlock,
62-
Pinner: mp,
62+
BlockCB: bcb,
6363
}
6464

6565
return trickle.TrickleLayout(dbp.New(blkch))
6666
}
67+
68+
func BasicPinnerCB(p pin.ManualPinner) h.BlockCB {
69+
return func(k u.Key, root bool) error {
70+
if root {
71+
p.PinWithMode(k, pin.Recursive)
72+
return p.Flush()
73+
} else {
74+
p.PinWithMode(k, pin.Indirect)
75+
return nil
76+
}
77+
}
78+
}
79+
80+
func PinIndirectCB(p pin.ManualPinner) h.BlockCB {
81+
return func(k u.Key, root bool) error {
82+
p.PinWithMode(k, pin.Indirect)
83+
return nil
84+
}
85+
}

importer/importer_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
1818
ds := mdtest.Mock(t)
1919
r := io.LimitReader(u.NewTimeSeededRand(), size)
20-
nd, err := BuildDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize})
20+
nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
2121
if err != nil {
2222
t.Fatal(err)
2323
}
@@ -27,7 +27,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGSe
2727
func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
2828
ds := mdtest.Mock(t)
2929
r := io.LimitReader(u.NewTimeSeededRand(), size)
30-
nd, err := BuildTrickleDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize})
30+
nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
3131
if err != nil {
3232
t.Fatal(err)
3333
}
@@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) {
4040
u.NewTimeSeededRand().Read(buf)
4141
r := bytes.NewReader(buf)
4242

43-
nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter)
43+
nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
4444
if err != nil {
4545
t.Fatal(err)
4646
}

merkledag/merkledag_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
156156

157157
spl := &chunk.SizeSplitter{512}
158158

159-
root, err := imp.BuildDagFromReader(read, dagservs[0], nil, spl)
159+
root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil)
160160
if err != nil {
161161
t.Fatal(err)
162162
}

unixfs/mod/dagmodifier.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
1212
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1313

14+
imp "github.com/ipfs/go-ipfs/importer"
1415
chunk "github.com/ipfs/go-ipfs/importer/chunk"
1516
help "github.com/ipfs/go-ipfs/importer/helpers"
1617
trickle "github.com/ipfs/go-ipfs/importer/trickle"
@@ -308,7 +309,7 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No
308309
dbp := &help.DagBuilderParams{
309310
Dagserv: dm.dagserv,
310311
Maxlinks: help.DefaultLinksPerBlock,
311-
Pinner: dm.mp,
312+
BlockCB: imp.BasicPinnerCB(dm.mp),
312313
}
313314

314315
return trickle.TrickleAppend(node, dbp.New(blks))

unixfs/mod/dagmodifier_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto
5252

5353
func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) {
5454
in := io.LimitReader(u.NewTimeSeededRand(), size)
55-
node, err := imp.BuildTrickleDagFromReader(in, dserv, pinner, &chunk.SizeSplitter{500})
55+
node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner))
5656
if err != nil {
5757
t.Fatal(err)
5858
}

0 commit comments

Comments
 (0)