Skip to content

Commit 2a1ee3a

Browse files
whyrusleepingjbenet
authored andcommitted
use datastore for local data
1 parent 87739b3 commit 2a1ee3a

File tree

1 file changed

+51
-9
lines changed

1 file changed

+51
-9
lines changed

routing/dht/dht.go

+51-9
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package dht
33
import (
44
"sync"
55

6-
peer "github.com/jbenet/go-ipfs/peer"
7-
swarm "github.com/jbenet/go-ipfs/swarm"
8-
u "github.com/jbenet/go-ipfs/util"
6+
peer "github.com/jbenet/go-ipfs/peer"
7+
swarm "github.com/jbenet/go-ipfs/swarm"
8+
u "github.com/jbenet/go-ipfs/util"
9+
10+
ds "github.com/jbenet/datastore.go"
11+
912
"code.google.com/p/goprotobuf/proto"
1013
)
1114

@@ -18,8 +21,11 @@ type IpfsDHT struct {
1821

1922
network *swarm.Swarm
2023

21-
// local data (TEMPORARY: until we formalize data storage with datastore)
22-
data map[string][]byte
24+
// Local peer (yourself)
25+
self *peer.Peer
26+
27+
// Local data
28+
datastore ds.Datastore
2329

2430
// map of channels waiting for reply messages
2531
listeners map[uint64]chan *swarm.Message
@@ -29,6 +35,15 @@ type IpfsDHT struct {
2935
shutdown chan struct{}
3036
}
3137

38+
func NewDHT(p *peer.Peer) *IpfsDHT {
39+
dht := new(IpfsDHT)
40+
dht.self = p
41+
dht.network = swarm.NewSwarm(p)
42+
dht.listeners = make(map[uint64]chan *swarm.Message)
43+
dht.shutdown = make(chan struct{})
44+
return dht
45+
}
46+
3247
// Read in all messages from swarm and handle them appropriately
3348
// NOTE: this function is just a quick sketch
3449
func (dht *IpfsDHT) handleMessages() {
@@ -61,10 +76,13 @@ func (dht *IpfsDHT) handleMessages() {
6176
case DHTMessage_GET_VALUE:
6277
dht.handleGetValue(mes.Peer, pmes)
6378
case DHTMessage_PUT_VALUE:
79+
dht.handlePutValue(mes.Peer, pmes)
6480
case DHTMessage_FIND_NODE:
81+
dht.handleFindNode(mes.Peer, pmes)
6582
case DHTMessage_ADD_PROVIDER:
6683
case DHTMessage_GET_PROVIDERS:
6784
case DHTMessage_PING:
85+
dht.handleFindNode(mes.Peer, pmes)
6886
}
6987

7088
case <-dht.shutdown:
@@ -74,24 +92,37 @@ func (dht *IpfsDHT) handleMessages() {
7492
}
7593

7694
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
77-
val, found := dht.data[pmes.GetKey()]
78-
if found {
95+
dskey := ds.NewKey(pmes.GetKey())
96+
i_val, err := dht.datastore.Get(dskey)
97+
if err == nil {
7998
isResponse := true
8099
resp := new(DHTMessage)
81100
resp.Response = &isResponse
82101
resp.Id = pmes.Id
83102
resp.Key = pmes.Key
103+
104+
val := i_val.([]byte)
84105
resp.Value = val
85-
} else {
106+
107+
mes := new(swarm.Message)
108+
mes.Peer = p
109+
mes.Data = []byte(resp.String())
110+
} else if err == ds.ErrNotFound {
86111
// Find closest node(s) to desired key and reply with that info
87112
// TODO: this will need some other metadata in the protobuf message
88113
// to signal to the querying node that the data its receiving
89114
// is actually a list of other nodes
90115
}
91116
}
92117

118+
// Store a value in this nodes local storage
93119
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
94-
panic("Not implemented.")
120+
dskey := ds.NewKey(pmes.GetKey())
121+
err := dht.datastore.Put(dskey, pmes.GetValue())
122+
if err != nil {
123+
// For now, just panic, handle this better later maybe
124+
panic(err)
125+
}
95126
}
96127

97128
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
@@ -121,6 +152,17 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
121152
return lchan
122153
}
123154

155+
func (dht *IpfsDHT) Unlisten(mesid uint64) {
156+
dht.listenLock.Lock()
157+
ch, ok := dht.listeners[mesid]
158+
if ok {
159+
delete(dht.listeners, mesid)
160+
}
161+
dht.listenLock.Unlock()
162+
close(ch)
163+
}
164+
165+
124166
// Stop all communications from this node and shut down
125167
func (dht *IpfsDHT) Halt() {
126168
dht.shutdown <- struct{}{}

0 commit comments

Comments
 (0)