1
1
//! The server-side representation of an ongoing client relaying connection.
2
2
3
- use std:: { future:: Future , num:: NonZeroU32 , pin:: Pin , sync:: Arc , task:: Poll , time:: Duration } ;
3
+ use std:: {
4
+ collections:: HashSet , future:: Future , num:: NonZeroU32 , pin:: Pin , sync:: Arc , task:: Poll ,
5
+ time:: Duration ,
6
+ } ;
4
7
5
8
use anyhow:: { bail, Context , Result } ;
6
9
use bytes:: Bytes ;
7
10
use iroh_base:: NodeId ;
8
11
use iroh_metrics:: { inc, inc_by} ;
9
12
use n0_future:: { FutureExt , Sink , SinkExt , Stream , StreamExt } ;
10
13
use rand:: Rng ;
14
+ use time:: { Date , OffsetDateTime } ;
11
15
use tokio:: {
12
16
sync:: mpsc:: { self , error:: TrySendError } ,
13
17
time:: MissedTickBehavior ,
@@ -105,6 +109,7 @@ impl Client {
105
109
node_id,
106
110
connection_id,
107
111
clients : clients. clone ( ) ,
112
+ client_counter : ClientCounter :: default ( ) ,
108
113
ping_tracker : PingTracker :: default ( ) ,
109
114
} ;
110
115
@@ -205,6 +210,8 @@ struct Actor {
205
210
connection_id : u64 ,
206
211
/// Reference to the other connected clients.
207
212
clients : Clients ,
213
+ /// Statistics about the connected clients
214
+ client_counter : ClientCounter ,
208
215
ping_tracker : PingTracker ,
209
216
}
210
217
@@ -214,6 +221,9 @@ impl Actor {
214
221
// connection is accepted long before this in the HTTP server, but it is clearer to
215
222
// handle the metric here.
216
223
inc ! ( Metrics , accepts) ;
224
+ if self . client_counter . update ( self . node_id ) {
225
+ inc ! ( Metrics , unique_client_keys) ;
226
+ }
217
227
match self . run_inner ( done) . await {
218
228
Err ( e) => {
219
229
warn ! ( "actor errored {e:#?}, exiting" ) ;
@@ -557,6 +567,38 @@ impl Sink<Frame> for RateLimitedRelayedStream {
557
567
}
558
568
}
559
569
570
+ /// Tracks how many unique nodes have been seen during the last day.
571
+ #[ derive( Debug ) ]
572
+ struct ClientCounter {
573
+ clients : HashSet < NodeId > ,
574
+ last_clear_date : Date ,
575
+ }
576
+
577
+ impl Default for ClientCounter {
578
+ fn default ( ) -> Self {
579
+ Self {
580
+ clients : HashSet :: new ( ) ,
581
+ last_clear_date : OffsetDateTime :: now_utc ( ) . date ( ) ,
582
+ }
583
+ }
584
+ }
585
+
586
+ impl ClientCounter {
587
+ fn check_and_clear ( & mut self ) {
588
+ let today = OffsetDateTime :: now_utc ( ) . date ( ) ;
589
+ if today != self . last_clear_date {
590
+ self . clients . clear ( ) ;
591
+ self . last_clear_date = today;
592
+ }
593
+ }
594
+
595
+ /// Marks this node as seen, returns whether it is new today or not.
596
+ fn update ( & mut self , client : NodeId ) -> bool {
597
+ self . check_and_clear ( ) ;
598
+ self . clients . insert ( client)
599
+ }
600
+ }
601
+
560
602
#[ cfg( test) ]
561
603
mod tests {
562
604
use bytes:: Bytes ;
@@ -595,6 +637,7 @@ mod tests {
595
637
connection_id : 0 ,
596
638
node_id,
597
639
clients : clients. clone ( ) ,
640
+ client_counter : ClientCounter :: default ( ) ,
598
641
ping_tracker : PingTracker :: default ( ) ,
599
642
} ;
600
643
0 commit comments