@@ -12,6 +12,7 @@ use std::{
12
12
time:: Duration ,
13
13
} ;
14
14
15
+ use anyhow:: Result ;
15
16
use futures_lite:: {
16
17
stream:: { Boxed , StreamExt } ,
17
18
FutureExt ,
@@ -91,6 +92,73 @@ struct Inner {
91
92
republish_delay : Duration ,
92
93
}
93
94
95
+ impl Inner {
96
+ async fn resolve_relay ( & self , key : pkarr:: PublicKey ) -> Option < Result < DiscoveryItem > > {
97
+ tracing:: info!( "resolving {} from relay {:?}" , key. to_z32( ) , self . relay_url) ;
98
+
99
+ let maybe_packet = self
100
+ . pkarr_relay
101
+ . as_ref ( )
102
+ . expect ( "checked" )
103
+ . resolve ( & key)
104
+ . await ;
105
+ match maybe_packet {
106
+ Ok ( Some ( signed_packet) ) => match NodeInfo :: from_pkarr_signed_packet ( & signed_packet) {
107
+ Ok ( node_info) => {
108
+ let node_addr: NodeAddr = node_info. into ( ) ;
109
+
110
+ tracing:: info!( "discovered node info from relay {:?}" , node_addr) ;
111
+ Some ( Ok ( DiscoveryItem {
112
+ node_addr,
113
+ provenance : "relay" ,
114
+ last_updated : None ,
115
+ } ) )
116
+ }
117
+ Err ( _err) => {
118
+ tracing:: debug!( "failed to parse signed packet as node info" ) ;
119
+ None
120
+ }
121
+ } ,
122
+ Ok ( None ) => {
123
+ tracing:: debug!( "no signed packet found in relay" ) ;
124
+ None
125
+ }
126
+ Err ( err) => {
127
+ tracing:: debug!( "failed to get signed packet from relay: {}" , err) ;
128
+ Some ( Err ( err. into ( ) ) )
129
+ }
130
+ }
131
+ }
132
+ async fn resolve_dht ( & self , key : pkarr:: PublicKey ) -> Option < Result < DiscoveryItem > > {
133
+ tracing:: info!( "resolving {} from DHT" , key. to_z32( ) ) ;
134
+
135
+ let maybe_packet = self . pkarr . resolve ( & key) . await ;
136
+ match maybe_packet {
137
+ Ok ( Some ( signed_packet) ) => match NodeInfo :: from_pkarr_signed_packet ( & signed_packet) {
138
+ Ok ( node_info) => {
139
+ let node_addr: NodeAddr = node_info. into ( ) ;
140
+ tracing:: info!( "discovered node info from DHT {:?}" , node_addr) ;
141
+ Some ( Ok ( DiscoveryItem {
142
+ node_addr,
143
+ provenance : "mainline" ,
144
+ last_updated : None ,
145
+ } ) )
146
+ }
147
+ Err ( _err) => {
148
+ tracing:: debug!( "failed to parse signed packet as node info" ) ;
149
+ None
150
+ }
151
+ } ,
152
+ Ok ( None ) => {
153
+ // nothing to do
154
+ tracing:: debug!( "no signed packet found in DHT" ) ;
155
+ None
156
+ }
157
+ Err ( err) => Some ( Err ( err. into ( ) ) ) ,
158
+ }
159
+ }
160
+ }
161
+
94
162
/// Builder for [`DhtDiscovery`].
95
163
///
96
164
/// By default, publishing to the DHT is enabled, and relay publishing is disabled.
@@ -181,7 +249,7 @@ impl Builder {
181
249
}
182
250
183
251
/// Builds the discovery mechanism.
184
- pub fn build ( self ) -> anyhow :: Result < DhtDiscovery > {
252
+ pub fn build ( self ) -> Result < DhtDiscovery > {
185
253
let pkarr = match self . client {
186
254
Some ( client) => client,
187
255
None => PkarrClient :: new ( Default :: default ( ) ) ?,
@@ -319,93 +387,15 @@ impl Discovery for DhtDiscovery {
319
387
320
388
let mut stream = futures_buffered:: FuturesUnorderedBounded :: new ( 2 ) ;
321
389
if self . 0 . pkarr_relay . is_some ( ) {
322
- tracing:: info!(
323
- "resolving {} from relay {:?}" ,
324
- pkarr_public_key. to_z32( ) ,
325
- self . 0 . relay_url
326
- ) ;
327
390
let key = pkarr_public_key. clone ( ) ;
328
391
let discovery = self . 0 . clone ( ) ;
329
- stream. push (
330
- async move {
331
- let maybe_packet = discovery
332
- . pkarr_relay
333
- . as_ref ( )
334
- . expect ( "checked" )
335
- . resolve ( & key)
336
- . await ;
337
- match maybe_packet {
338
- Ok ( Some ( signed_packet) ) => {
339
- match NodeInfo :: from_pkarr_signed_packet ( & signed_packet) {
340
- Ok ( node_info) => {
341
- let node_addr: NodeAddr = node_info. into ( ) ;
342
-
343
- tracing:: info!(
344
- "discovered node info from relay {:?}" ,
345
- node_addr
346
- ) ;
347
- Some ( Ok ( DiscoveryItem {
348
- node_addr,
349
- provenance : "relay" ,
350
- last_updated : None ,
351
- } ) )
352
- }
353
- Err ( _err) => {
354
- tracing:: debug!( "failed to parse signed packet as node info" ) ;
355
- None
356
- }
357
- }
358
- }
359
- Ok ( None ) => {
360
- tracing:: debug!( "no signed packet found in relay" ) ;
361
- None
362
- }
363
- Err ( err) => {
364
- tracing:: debug!( "failed to get signed packet from relay: {}" , err) ;
365
- Some ( Err ( err. into ( ) ) )
366
- }
367
- }
368
- }
369
- . boxed ( ) ,
370
- ) ;
392
+ stream. push ( async move { discovery. resolve_relay ( key) . await } . boxed ( ) ) ;
371
393
}
372
394
373
395
if self . 0 . dht {
374
- tracing:: info!( "resolving {} from DHT" , pkarr_public_key. to_z32( ) ) ;
375
-
376
396
let key = pkarr_public_key. clone ( ) ;
377
397
let discovery = self . 0 . clone ( ) ;
378
- stream. push (
379
- async move {
380
- let maybe_packet = discovery. pkarr . resolve ( & key) . await ;
381
- match maybe_packet {
382
- Ok ( Some ( signed_packet) ) => {
383
- match NodeInfo :: from_pkarr_signed_packet ( & signed_packet) {
384
- Ok ( node_info) => {
385
- let node_addr: NodeAddr = node_info. into ( ) ;
386
- tracing:: info!( "discovered node info from DHT {:?}" , node_addr) ;
387
- Some ( Ok ( DiscoveryItem {
388
- node_addr,
389
- provenance : "mainline" ,
390
- last_updated : None ,
391
- } ) )
392
- }
393
- Err ( _err) => {
394
- tracing:: debug!( "failed to parse signed packet as node info" ) ;
395
- None
396
- }
397
- }
398
- }
399
- Ok ( None ) => {
400
- // nothing to do
401
- tracing:: debug!( "no signed packet found in DHT" ) ;
402
- None
403
- }
404
- Err ( err) => Some ( Err ( err. into ( ) ) ) ,
405
- }
406
- }
407
- . boxed ( ) ,
408
- ) ;
398
+ stream. push ( async move { discovery. resolve_dht ( key) . await } . boxed ( ) ) ;
409
399
}
410
400
411
401
Some ( stream. filter_map ( |t| t) . boxed ( ) )
0 commit comments