@@ -29,8 +29,7 @@ use std::{
29
29
time:: Duration ,
30
30
} ;
31
31
32
- use futures:: StreamExt ;
33
- use futures_ticker:: Ticker ;
32
+ use futures:: FutureExt ;
34
33
use hashlink:: LinkedHashMap ;
35
34
use prometheus_client:: registry:: Registry ;
36
35
use rand:: { seq:: SliceRandom , thread_rng} ;
@@ -74,6 +73,7 @@ use super::{
74
73
types:: RpcOut ,
75
74
} ;
76
75
use super :: { PublishError , SubscriptionError , TopicScoreParams , ValidationError } ;
76
+ use futures_timer:: Delay ;
77
77
use quick_protobuf:: { MessageWrite , Writer } ;
78
78
use std:: { cmp:: Ordering :: Equal , fmt:: Debug } ;
79
79
@@ -301,7 +301,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
301
301
mcache : MessageCache ,
302
302
303
303
/// Heartbeat interval stream.
304
- heartbeat : Ticker ,
304
+ heartbeat : Delay ,
305
305
306
306
/// Number of heartbeats since the beginning of time; this allows us to amortize some resource
307
307
/// clean up -- eg backoff clean up.
@@ -318,7 +318,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
318
318
outbound_peers : HashSet < PeerId > ,
319
319
320
320
/// Stores optional peer score data together with thresholds and decay interval.
321
- peer_score : Option < ( PeerScore , PeerScoreThresholds , Ticker ) > ,
321
+ peer_score : Option < ( PeerScore , PeerScoreThresholds , Delay ) > ,
322
322
323
323
/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
324
324
count_received_ihave : HashMap < PeerId , usize > ,
@@ -466,10 +466,7 @@ where
466
466
config. backoff_slack ( ) ,
467
467
) ,
468
468
mcache : MessageCache :: new ( config. history_gossip ( ) , config. history_length ( ) ) ,
469
- heartbeat : Ticker :: new_with_next (
470
- config. heartbeat_interval ( ) ,
471
- config. heartbeat_initial_delay ( ) ,
472
- ) ,
469
+ heartbeat : Delay :: new ( config. heartbeat_interval ( ) + config. heartbeat_initial_delay ( ) ) ,
473
470
heartbeat_ticks : 0 ,
474
471
px_peers : HashSet :: new ( ) ,
475
472
outbound_peers : HashSet :: new ( ) ,
@@ -938,7 +935,7 @@ where
938
935
return Err ( "Peer score set twice" . into ( ) ) ;
939
936
}
940
937
941
- let interval = Ticker :: new ( params. decay_interval ) ;
938
+ let interval = Delay :: new ( params. decay_interval ) ;
942
939
let peer_score = PeerScore :: new_with_message_delivery_time_callback ( params, callback) ;
943
940
self . peer_score = Some ( ( peer_score, threshold, interval) ) ;
944
941
Ok ( ( ) )
@@ -1208,7 +1205,7 @@ where
1208
1205
}
1209
1206
1210
1207
fn score_below_threshold_from_scores (
1211
- peer_score : & Option < ( PeerScore , PeerScoreThresholds , Ticker ) > ,
1208
+ peer_score : & Option < ( PeerScore , PeerScoreThresholds , Delay ) > ,
1212
1209
peer_id : & PeerId ,
1213
1210
threshold : impl Fn ( & PeerScoreThresholds ) -> f64 ,
1214
1211
) -> ( bool , f64 ) {
@@ -3427,14 +3424,16 @@ where
3427
3424
}
3428
3425
3429
3426
// update scores
3430
- if let Some ( ( peer_score, _, interval ) ) = & mut self . peer_score {
3431
- while let Poll :: Ready ( Some ( _ ) ) = interval . poll_next_unpin ( cx ) {
3427
+ if let Some ( ( peer_score, _, delay ) ) = & mut self . peer_score {
3428
+ if delay . poll_unpin ( cx ) . is_ready ( ) {
3432
3429
peer_score. refresh_scores ( ) ;
3430
+ delay. reset ( peer_score. params . decay_interval ) ;
3433
3431
}
3434
3432
}
3435
3433
3436
- while let Poll :: Ready ( Some ( _ ) ) = self . heartbeat . poll_next_unpin ( cx) {
3434
+ if self . heartbeat . poll_unpin ( cx) . is_ready ( ) {
3437
3435
self . heartbeat ( ) ;
3436
+ self . heartbeat . reset ( self . config . heartbeat_interval ( ) ) ;
3438
3437
}
3439
3438
3440
3439
Poll :: Pending
0 commit comments