@@ -8,7 +8,6 @@ use std::{
8
8
9
9
use current_mapping:: CurrentMapping ;
10
10
use futures_lite:: StreamExt ;
11
- use iroh_metrics:: inc;
12
11
use netwatch:: interfaces:: HomeRouter ;
13
12
use tokio:: sync:: { mpsc, oneshot, watch} ;
14
13
use tokio_util:: task:: AbortOnDropHandle ;
@@ -141,6 +140,8 @@ pub struct Client {
141
140
port_mapping : watch:: Receiver < Option < SocketAddrV4 > > ,
142
141
/// Channel used to communicate with the port mapping service.
143
142
service_tx : mpsc:: Sender < Message > ,
143
+ /// Metrics collected by the service.
144
+ metrics : Metrics ,
144
145
/// A handle to the service that will cancel the spawned task once the client is dropped.
145
146
_service_handle : std:: sync:: Arc < AbortOnDropHandle < ( ) > > ,
146
147
}
@@ -154,9 +155,14 @@ impl Default for Client {
154
155
impl Client {
155
156
/// Create a new port mapping client.
156
157
pub fn new ( config : Config ) -> Self {
158
+ Self :: with_metrics ( config, Default :: default ( ) )
159
+ }
160
+
161
+ /// Creates a new port mapping client with a previously created metrics collector.
162
+ pub fn with_metrics ( config : Config , metrics : Metrics ) -> Self {
157
163
let ( service_tx, service_rx) = mpsc:: channel ( SERVICE_CHANNEL_CAPACITY ) ;
158
164
159
- let ( service, watcher) = Service :: new ( config, service_rx) ;
165
+ let ( service, watcher) = Service :: new ( config, service_rx, metrics . clone ( ) ) ;
160
166
161
167
let handle = AbortOnDropHandle :: new ( tokio:: spawn (
162
168
async move { service. run ( ) . await } . instrument ( info_span ! ( "portmapper.service" ) ) ,
@@ -165,6 +171,7 @@ impl Client {
165
171
Client {
166
172
port_mapping : watcher,
167
173
service_tx,
174
+ metrics,
168
175
_service_handle : std:: sync:: Arc :: new ( handle) ,
169
176
}
170
177
}
@@ -232,6 +239,11 @@ impl Client {
232
239
pub fn watch_external_address ( & self ) -> watch:: Receiver < Option < SocketAddrV4 > > {
233
240
self . port_mapping . clone ( )
234
241
}
242
+
243
+ /// Returns the metrics collected by the service.
244
+ pub fn metrics ( & self ) -> & Metrics {
245
+ & self . metrics
246
+ }
235
247
}
236
248
237
249
/// Port mapping protocol information obtained during a probe.
@@ -263,6 +275,7 @@ impl Probe {
263
275
output : ProbeOutput ,
264
276
local_ip : Ipv4Addr ,
265
277
gateway : Ipv4Addr ,
278
+ metrics : Metrics ,
266
279
) -> Probe {
267
280
let ProbeOutput { upnp, pcp, nat_pmp } = output;
268
281
let Config {
@@ -282,8 +295,9 @@ impl Probe {
282
295
283
296
let mut pcp_probing_task = util:: MaybeFuture {
284
297
inner : ( enable_pcp && !pcp) . then ( || {
285
- Box :: pin ( async {
286
- inc ! ( Metrics , pcp_probes) ;
298
+ let metrics = metrics. clone ( ) ;
299
+ Box :: pin ( async move {
300
+ metrics. pcp_probes . inc ( ) ;
287
301
pcp:: probe_available ( local_ip, gateway)
288
302
. await
289
303
. then ( Instant :: now)
@@ -302,7 +316,7 @@ impl Probe {
302
316
} ;
303
317
304
318
if upnp_probing_task. inner . is_some ( ) {
305
- inc ! ( Metrics , upnp_probes ) ;
319
+ metrics . upnp_probes . inc ( ) ;
306
320
}
307
321
308
322
let mut upnp_done = upnp_probing_task. inner . is_none ( ) ;
@@ -361,15 +375,15 @@ impl Probe {
361
375
}
362
376
363
377
/// Updates a probe with the `Some` values of another probe that is _assumed_ newer.
364
- fn update ( & mut self , probe : Probe ) {
378
+ fn update ( & mut self , probe : Probe , metrics : & Metrics ) {
365
379
let Probe {
366
380
last_probe,
367
381
last_upnp_gateway_addr,
368
382
last_pcp,
369
383
last_nat_pmp,
370
384
} = probe;
371
385
if last_upnp_gateway_addr. is_some ( ) {
372
- inc ! ( Metrics , upnp_available ) ;
386
+ metrics . upnp_available . inc ( ) ;
373
387
let new_gateway = last_upnp_gateway_addr
374
388
. as_ref ( )
375
389
. map ( |( addr, _last_seen) | addr) ;
@@ -378,7 +392,7 @@ impl Probe {
378
392
. as_ref ( )
379
393
. map ( |( addr, _last_seen) | addr) ;
380
394
if new_gateway != old_gateway {
381
- inc ! ( Metrics , upnp_gateway_updated ) ;
395
+ metrics . upnp_gateway_updated . inc ( ) ;
382
396
debug ! (
383
397
"upnp gateway changed {:?} -> {:?}" ,
384
398
old_gateway
@@ -392,7 +406,7 @@ impl Probe {
392
406
self . last_upnp_gateway_addr = last_upnp_gateway_addr;
393
407
}
394
408
if last_pcp. is_some ( ) {
395
- inc ! ( Metrics , pcp_available ) ;
409
+ metrics . pcp_available . inc ( ) ;
396
410
self . last_pcp = last_pcp;
397
411
}
398
412
if last_nat_pmp. is_some ( ) {
@@ -430,12 +444,14 @@ pub struct Service {
430
444
/// Requests for a probe that arrive while this task is still in progress will receive the same
431
445
/// result.
432
446
probing_task : Option < ( AbortOnDropHandle < Probe > , Vec < oneshot:: Sender < ProbeResult > > ) > ,
447
+ metrics : Metrics ,
433
448
}
434
449
435
450
impl Service {
436
451
fn new (
437
452
config : Config ,
438
453
rx : mpsc:: Receiver < Message > ,
454
+ metrics : Metrics ,
439
455
) -> ( Self , watch:: Receiver < Option < SocketAddrV4 > > ) {
440
456
let ( current_mapping, watcher) = CurrentMapping :: new ( ) ;
441
457
let mut full_probe = Probe :: empty ( ) ;
@@ -454,6 +470,7 @@ impl Service {
454
470
full_probe,
455
471
mapping_task : None ,
456
472
probing_task : None ,
473
+ metrics,
457
474
} ;
458
475
459
476
( service, watcher)
@@ -518,7 +535,7 @@ impl Service {
518
535
receivers : Vec < oneshot:: Sender < ProbeResult > > ,
519
536
) {
520
537
let result = result. map ( |probe| {
521
- self . full_probe . update ( probe) ;
538
+ self . full_probe . update ( probe, & self . metrics ) ;
522
539
// TODO(@divma): the gateway of the current mapping could have changed. Tailscale
523
540
// still assumes the current mapping is valid/active and will return it even after
524
541
// this
@@ -542,11 +559,11 @@ impl Service {
542
559
}
543
560
Ok ( Err ( e) ) => {
544
561
debug ! ( "failed to get a port mapping {e}" ) ;
545
- inc ! ( Metrics , mapping_failures ) ;
562
+ self . metrics . mapping_failures . inc ( ) ;
546
563
}
547
564
Err ( e) => {
548
565
debug ! ( "failed to get a port mapping {e}" ) ;
549
- inc ! ( Metrics , mapping_failures ) ;
566
+ self . metrics . mapping_failures . inc ( ) ;
550
567
}
551
568
}
552
569
}
@@ -566,7 +583,7 @@ impl Service {
566
583
async fn update_local_port ( & mut self , local_port : Option < NonZeroU16 > ) {
567
584
// ignore requests to update the local port in a way that does not produce a change
568
585
if local_port != self . local_port {
569
- inc ! ( Metrics , local_port_updates ) ;
586
+ self . metrics . local_port_updates . inc ( ) ;
570
587
let old_port = std:: mem:: replace ( & mut self . local_port , local_port) ;
571
588
572
589
// clear the current mapping task if any
@@ -604,7 +621,7 @@ impl Service {
604
621
605
622
fn get_mapping ( & mut self , external_addr : Option < ( Ipv4Addr , NonZeroU16 ) > ) {
606
623
if let Some ( local_port) = self . local_port {
607
- inc ! ( Metrics , mapping_attempts ) ;
624
+ self . metrics . mapping_attempts . inc ( ) ;
608
625
609
626
let ( local_ip, gateway) = match ip_and_gateway ( ) {
610
627
Ok ( ip_and_gw) => ip_and_gw,
@@ -683,7 +700,7 @@ impl Service {
683
700
// we don't care if the requester is no longer there
684
701
let _ = result_tx. send ( Ok ( probe_output) ) ;
685
702
} else {
686
- inc ! ( Metrics , probes_started ) ;
703
+ self . metrics . probes_started . inc ( ) ;
687
704
688
705
let ( local_ip, gateway) = match ip_and_gateway ( ) {
689
706
Ok ( ip_and_gw) => ip_and_gw,
@@ -696,13 +713,14 @@ impl Service {
696
713
} ;
697
714
698
715
let config = self . config . clone ( ) ;
699
- let handle =
700
- tokio:: spawn (
701
- async move {
702
- Probe :: from_output ( config, probe_output, local_ip, gateway) . await
703
- }
704
- . instrument ( info_span ! ( "portmapper.probe" ) ) ,
705
- ) ;
716
+ let metrics = self . metrics . clone ( ) ;
717
+ let handle = tokio:: spawn (
718
+ async move {
719
+ Probe :: from_output ( config, probe_output, local_ip, gateway, metrics)
720
+ . await
721
+ }
722
+ . instrument ( info_span ! ( "portmapper.probe" ) ) ,
723
+ ) ;
706
724
let receivers = vec ! [ result_tx] ;
707
725
self . probing_task = Some ( ( AbortOnDropHandle :: new ( handle) , receivers) ) ;
708
726
}
0 commit comments