Skip to content

Commit 0a566c9

Browse files
authored
routing/http: return PeerRecord for FindPeers (#490)
1 parent aa6fa14 commit 0a566c9

File tree

10 files changed

+109
-68
lines changed

10 files changed

+109
-68
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The following emojis are used to highlight certain changes:
4040
- Eliminate `..` elements that begin a rooted path: that is, replace "`/..`" by "`/`" at the beginning of a path.
4141
* 🛠 The signature of `CoreAPI.ResolvePath` in `coreiface` has changed to now return
4242
the remainder segments as a second return value, matching the signature of `resolver.ResolveToLastNode`.
43+
* 🛠 `routing/http/client.FindPeers` now returns `iter.ResultIter[types.PeerRecord]` instead of `iter.ResultIter[types.Record]`. The specification indicates that records for this method will always be Peer Records.
4344

4445
### Removed
4546

examples/routing/delegated-routing-client/main.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,6 @@ func findProviders(w io.Writer, ctx context.Context, client *client.Client, cidS
6969
return printIter(w, recordsIter)
7070
}
7171

72-
func findPeers(w io.Writer, ctx context.Context, client *client.Client, pidStr string) error {
73-
// Parses the given Peer ID to lookup the information for.
74-
pid, err := peer.Decode(pidStr)
75-
if err != nil {
76-
return err
77-
}
78-
79-
// Ask for information about the peer with the given peer ID.
80-
recordsIter, err := client.FindPeers(ctx, pid)
81-
if err != nil {
82-
return err
83-
}
84-
defer recordsIter.Close()
85-
return printIter(w, recordsIter)
86-
}
87-
8872
func printIter(w io.Writer, iter iter.ResultIter[types.Record]) error {
8973
// The response is streamed. Alternatively, you could use [iter.ReadAll]
9074
// to fetch all the results all at once, instead of iterating as they are
@@ -118,6 +102,44 @@ func printIter(w io.Writer, iter iter.ResultIter[types.Record]) error {
118102
return nil
119103
}
120104

105+
func findPeers(w io.Writer, ctx context.Context, client *client.Client, pidStr string) error {
106+
// Parses the given Peer ID to lookup the information for.
107+
pid, err := peer.Decode(pidStr)
108+
if err != nil {
109+
return err
110+
}
111+
112+
// Ask for information about the peer with the given peer ID.
113+
recordsIter, err := client.FindPeers(ctx, pid)
114+
if err != nil {
115+
return err
116+
}
117+
defer recordsIter.Close()
118+
119+
// The response is streamed. Alternatively, you could use [iter.ReadAll]
120+
// to fetch all the results all at once, instead of iterating as they are
121+
// streamed.
122+
for recordsIter.Next() {
123+
res := recordsIter.Val()
124+
125+
// Check for error, but do not complain if we exceeded the timeout. We are
126+
// expecting that to happen: we explicitly defined a timeout.
127+
if res.Err != nil {
128+
if !errors.Is(res.Err, context.DeadlineExceeded) {
129+
return res.Err
130+
}
131+
132+
return nil
133+
}
134+
135+
fmt.Fprintln(w, res.Val.ID)
136+
fmt.Fprintln(w, "\tProtocols:", res.Val.Protocols)
137+
fmt.Fprintln(w, "\tAddresses:", res.Val.Addrs)
138+
}
139+
140+
return nil
141+
}
142+
121143
func findIPNS(w io.Writer, ctx context.Context, client *client.Client, nameStr string) error {
122144
// Parses the given name string to get a record for.
123145
name, err := ipns.NameFromString(nameStr)

routing/http/client/client.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ func (c *measuringIter[T]) Close() error {
160160
return c.Iter.Close()
161161
}
162162

163+
// FindProviders searches for providers that are able to provide the given [cid.Cid].
164+
// In a more generic way, it is also used as a mapping between CIDs and relevant metadata.
163165
func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter.ResultIter[types.Record], err error) {
164166
// TODO test measurements
165167
m := newMeasurement("FindProviders")
@@ -332,7 +334,8 @@ func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri
332334
return 0, nil
333335
}
334336

335-
func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) {
337+
// FindPeers searches for information for the given [peer.ID].
338+
func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) {
336339
m := newMeasurement("FindPeers")
337340

338341
url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String()
@@ -359,7 +362,7 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI
359362
if resp.StatusCode == http.StatusNotFound {
360363
resp.Body.Close()
361364
m.record(ctx)
362-
return iter.FromSlice[iter.Result[types.Record]](nil), nil
365+
return iter.FromSlice[iter.Result[*types.PeerRecord]](nil), nil
363366
}
364367

365368
if resp.StatusCode != http.StatusOK {
@@ -387,24 +390,27 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI
387390
}
388391
}()
389392

390-
var it iter.ResultIter[types.Record]
393+
var it iter.ResultIter[*types.PeerRecord]
391394
switch mediaType {
392395
case mediaTypeJSON:
393396
parsedResp := &jsontypes.PeersResponse{}
394397
err = json.NewDecoder(resp.Body).Decode(parsedResp)
395-
var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Peers)
398+
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers)
396399
it = iter.ToResultIter(sliceIt)
397400
case mediaTypeNDJSON:
398401
skipBodyClose = true
399-
it = ndjson.NewRecordsIter(resp.Body)
402+
it = ndjson.NewPeerRecordsIter(resp.Body)
400403
default:
401404
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
402405
return nil, errors.New("unknown content type")
403406
}
404407

405-
return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
408+
return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
406409
}
407410

411+
// GetIPNS tries to retrieve the [ipns.Record] for the given [ipns.Name]. The record is
412+
// validated against the given name. If validation fails, an error is returned, but no
413+
// record.
408414
func (c *Client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
409415
url := c.baseURL + "/routing/v1/ipns/" + name.String()
410416

@@ -443,6 +449,7 @@ func (c *Client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, err
443449
return record, nil
444450
}
445451

452+
// PutIPNS attempts at putting the given [ipns.Record] for the given [ipns.Name].
446453
func (c *Client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
447454
url := c.baseURL + "/routing/v1/ipns/" + name.String()
448455

routing/http/client/client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *server.Bits
4242
return args.Get(0).(time.Duration), args.Error(1)
4343
}
4444

45-
func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) {
45+
func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
4646
args := m.Called(ctx, pid, limit)
47-
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
47+
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
4848
}
4949

5050
func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
@@ -486,7 +486,7 @@ func TestClient_Provide(t *testing.T) {
486486

487487
func TestClient_FindPeers(t *testing.T) {
488488
peerRecord := makePeerRecord()
489-
peerRecords := []iter.Result[types.Record]{
489+
peerRecords := []iter.Result[*types.PeerRecord]{
490490
{Val: &peerRecord},
491491
}
492492
pid := *peerRecord.ID
@@ -495,13 +495,13 @@ func TestClient_FindPeers(t *testing.T) {
495495
name string
496496
httpStatusCode int
497497
stopServer bool
498-
routerResult []iter.Result[types.Record]
498+
routerResult []iter.Result[*types.PeerRecord]
499499
routerErr error
500500
clientRequiresStreaming bool
501501
serverStreamingDisabled bool
502502

503503
expErrContains osErrContains
504-
expResult []iter.Result[types.Record]
504+
expResult []iter.Result[*types.PeerRecord]
505505
expStreamingResponse bool
506506
expJSONResponse bool
507507
}{
@@ -606,7 +606,7 @@ func TestClient_FindPeers(t *testing.T) {
606606
resultIter, err := client.FindPeers(ctx, pid)
607607
c.expErrContains.errContains(t, err)
608608

609-
results := iter.ReadAll[iter.Result[types.Record]](resultIter)
609+
results := iter.ReadAll[iter.Result[*types.PeerRecord]](resultIter)
610610
assert.Equal(t, c.expResult, results)
611611
})
612612
}

routing/http/contentrouter/contentrouter.go

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const ttl = 24 * time.Hour
2626
type Client interface {
2727
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error)
2828
ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error)
29-
FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error)
29+
FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error)
3030
GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error)
3131
PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error
3232
}
@@ -196,28 +196,15 @@ func (c *contentRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInf
196196
logger.Warnw("error iterating provider responses: %s", res.Err)
197197
continue
198198
}
199-
v := res.Val
200-
if v.GetSchema() == types.SchemaPeer {
201-
result, ok := v.(*types.PeerRecord)
202-
if !ok {
203-
logger.Errorw(
204-
"problem casting find providers result",
205-
"Schema", v.GetSchema(),
206-
"Type", reflect.TypeOf(v).String(),
207-
)
208-
continue
209-
}
210-
211-
var addrs []multiaddr.Multiaddr
212-
for _, a := range result.Addrs {
213-
addrs = append(addrs, a.Multiaddr)
214-
}
215-
216-
return peer.AddrInfo{
217-
ID: *result.ID,
218-
Addrs: addrs,
219-
}, nil
199+
var addrs []multiaddr.Multiaddr
200+
for _, a := range res.Val.Addrs {
201+
addrs = append(addrs, a.Multiaddr)
220202
}
203+
204+
return peer.AddrInfo{
205+
ID: *res.Val.ID,
206+
Addrs: addrs,
207+
}, nil
221208
}
222209

223210
return peer.AddrInfo{}, err

routing/http/contentrouter/contentrouter_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.Resul
3232
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
3333
}
3434

35-
func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[types.Record], error) {
35+
func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[*types.PeerRecord], error) {
3636
args := m.Called(ctx, pid)
37-
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
37+
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
3838
}
3939

4040
func (m *mockClient) Ready(ctx context.Context) (bool, error) {
@@ -183,17 +183,14 @@ func TestFindPeer(t *testing.T) {
183183
crc := NewContentRoutingClient(client)
184184

185185
p1 := peer.ID("peer1")
186-
ais := []types.Record{
187-
&types.UnknownRecord{
188-
Schema: "unknown",
189-
},
190-
&types.PeerRecord{
186+
ais := []*types.PeerRecord{
187+
{
191188
Schema: types.SchemaPeer,
192189
ID: &p1,
193190
Protocols: []string{"transport-bitswap"},
194191
},
195192
}
196-
aisIter := iter.ToResultIter[types.Record](iter.FromSlice(ais))
193+
aisIter := iter.ToResultIter[*types.PeerRecord](iter.FromSlice(ais))
197194

198195
client.On("FindPeers", ctx, p1).Return(aisIter, nil)
199196

routing/http/server/server.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type ContentRouter interface {
6363

6464
// FindPeers searches for peers who have the provided [peer.ID].
6565
// Limit indicates the maximum amount of results to return; 0 means unbounded.
66-
FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error)
66+
FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error)
6767

6868
// GetIPNS searches for an [ipns.Record] for the given [ipns.Name].
6969
GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error)
@@ -267,7 +267,7 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
267267
}
268268

269269
var (
270-
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record])
270+
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord])
271271
recordsLimit int
272272
)
273273

@@ -347,7 +347,7 @@ func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) {
347347
writeJSONResult(w, "Provide", resp)
348348
}
349349

350-
func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) {
350+
func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) {
351351
defer peersIter.Close()
352352

353353
peers, err := iter.ReadAllResults(peersIter)
@@ -361,7 +361,7 @@ func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[
361361
})
362362
}
363363

364-
func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) {
364+
func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) {
365365
writeResultsIterNDJSON(w, peersIter)
366366
}
367367

@@ -491,7 +491,7 @@ func logErr(method, msg string, err error) {
491491
logger.Infow(msg, "Method", method, "Error", err)
492492
}
493493

494-
func writeResultsIterNDJSON(w http.ResponseWriter, resultIter iter.ResultIter[types.Record]) {
494+
func writeResultsIterNDJSON[T any](w http.ResponseWriter, resultIter iter.ResultIter[T]) {
495495
defer resultIter.Close()
496496

497497
w.Header().Set("Content-Type", mediaTypeNDJSON)

routing/http/server/server_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func TestPeers(t *testing.T) {
168168
t.Parallel()
169169

170170
_, pid := makePeerID(t)
171-
results := iter.FromSlice([]iter.Result[types.Record]{
171+
results := iter.FromSlice([]iter.Result[*types.PeerRecord]{
172172
{Val: &types.PeerRecord{
173173
Schema: types.SchemaPeer,
174174
ID: &pid,
@@ -203,7 +203,7 @@ func TestPeers(t *testing.T) {
203203
t.Parallel()
204204

205205
_, pid := makePeerID(t)
206-
results := iter.FromSlice([]iter.Result[types.Record]{
206+
results := iter.FromSlice([]iter.Result[*types.PeerRecord]{
207207
{Val: &types.PeerRecord{
208208
Schema: types.SchemaPeer,
209209
ID: &pid,
@@ -374,9 +374,9 @@ func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *BitswapWrit
374374
return args.Get(0).(time.Duration), args.Error(1)
375375
}
376376

377-
func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) {
377+
func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
378378
args := m.Called(ctx, pid, limit)
379-
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
379+
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
380380
}
381381

382382
func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {

routing/http/types/json/responses.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type ProvidersResponse struct {
1313

1414
// PeersResponse is the result of a GET Peers request.
1515
type PeersResponse struct {
16-
Peers RecordsArray
16+
Peers []*types.PeerRecord
1717
}
1818

1919
// RecordsArray is an array of [types.Record]

routing/http/types/ndjson/records.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,30 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] {
4444

4545
return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn)
4646
}
47+
48+
// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given [io.Reader].
49+
// Records with a different schema are safely ignored. If you want to read all records, use
50+
// [NewRecordsIter] instead.
51+
func NewPeerRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.PeerRecord]] {
52+
jsonIter := iter.FromReaderJSON[types.UnknownRecord](r)
53+
mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[*types.PeerRecord] {
54+
var result iter.Result[*types.PeerRecord]
55+
if upr.Err != nil {
56+
result.Err = upr.Err
57+
return result
58+
}
59+
switch upr.Val.Schema {
60+
case types.SchemaPeer:
61+
var prov types.PeerRecord
62+
err := json.Unmarshal(upr.Val.Bytes, &prov)
63+
if err != nil {
64+
result.Err = err
65+
return result
66+
}
67+
result.Val = &prov
68+
}
69+
return result
70+
}
71+
72+
return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn)
73+
}

0 commit comments

Comments
 (0)