Skip to content

Commit 8dfd533

Browse files
authored
Merge of #6630
2 parents 1fd86f8 + 305276b commit 8dfd533

File tree

5 files changed

+38
-29
lines changed

5 files changed

+38
-29
lines changed

Cargo.lock

Lines changed: 23 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/lighthouse_network/gossipsub/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
1010
categories = ["network-programming", "asynchronous"]
1111

1212
[features]
13-
wasm-bindgen = ["getrandom/js"]
13+
wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"]
1414
rsa = []
1515

1616
[dependencies]
@@ -22,7 +22,6 @@ bytes = "1.5"
2222
either = "1.9"
2323
fnv = "1.0.7"
2424
futures = "0.3.30"
25-
futures-ticker = "0.0.3"
2625
futures-timer = "3.0.2"
2726
getrandom = "0.2.12"
2827
hashlink.workspace = true

beacon_node/lighthouse_network/gossipsub/src/behaviour.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ use std::{
2929
time::Duration,
3030
};
3131

32-
use futures::StreamExt;
33-
use futures_ticker::Ticker;
32+
use futures::FutureExt;
3433
use hashlink::LinkedHashMap;
3534
use prometheus_client::registry::Registry;
3635
use rand::{seq::SliceRandom, thread_rng};
@@ -74,6 +73,7 @@ use super::{
7473
types::RpcOut,
7574
};
7675
use super::{PublishError, SubscriptionError, TopicScoreParams, ValidationError};
76+
use futures_timer::Delay;
7777
use quick_protobuf::{MessageWrite, Writer};
7878
use std::{cmp::Ordering::Equal, fmt::Debug};
7979

@@ -301,7 +301,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
301301
mcache: MessageCache,
302302

303303
/// Heartbeat interval stream.
304-
heartbeat: Ticker,
304+
heartbeat: Delay,
305305

306306
/// Number of heartbeats since the beginning of time; this allows us to amortize some resource
307307
/// clean up -- eg backoff clean up.
@@ -318,7 +318,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
318318
outbound_peers: HashSet<PeerId>,
319319

320320
/// Stores optional peer score data together with thresholds and decay interval.
321-
peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker)>,
321+
peer_score: Option<(PeerScore, PeerScoreThresholds, Delay)>,
322322

323323
/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
324324
count_received_ihave: HashMap<PeerId, usize>,
@@ -466,10 +466,7 @@ where
466466
config.backoff_slack(),
467467
),
468468
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
469-
heartbeat: Ticker::new_with_next(
470-
config.heartbeat_interval(),
471-
config.heartbeat_initial_delay(),
472-
),
469+
heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
473470
heartbeat_ticks: 0,
474471
px_peers: HashSet::new(),
475472
outbound_peers: HashSet::new(),
@@ -938,7 +935,7 @@ where
938935
return Err("Peer score set twice".into());
939936
}
940937

941-
let interval = Ticker::new(params.decay_interval);
938+
let interval = Delay::new(params.decay_interval);
942939
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
943940
self.peer_score = Some((peer_score, threshold, interval));
944941
Ok(())
@@ -1208,7 +1205,7 @@ where
12081205
}
12091206

12101207
fn score_below_threshold_from_scores(
1211-
peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker)>,
1208+
peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay)>,
12121209
peer_id: &PeerId,
12131210
threshold: impl Fn(&PeerScoreThresholds) -> f64,
12141211
) -> (bool, f64) {
@@ -3427,14 +3424,16 @@ where
34273424
}
34283425

34293426
// update scores
3430-
if let Some((peer_score, _, interval)) = &mut self.peer_score {
3431-
while let Poll::Ready(Some(_)) = interval.poll_next_unpin(cx) {
3427+
if let Some((peer_score, _, delay)) = &mut self.peer_score {
3428+
if delay.poll_unpin(cx).is_ready() {
34323429
peer_score.refresh_scores();
3430+
delay.reset(peer_score.params.decay_interval);
34333431
}
34343432
}
34353433

3436-
while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) {
3434+
if self.heartbeat.poll_unpin(cx).is_ready() {
34373435
self.heartbeat();
3436+
self.heartbeat.reset(self.config.heartbeat_interval());
34383437
}
34393438

34403439
Poll::Pending

beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::subscription_filter::WhitelistSubscriptionFilter;
2525
use crate::types::RpcReceiver;
2626
use crate::{config::ConfigBuilder, types::Rpc, IdentTopic as Topic};
2727
use byteorder::{BigEndian, ByteOrder};
28+
use futures::StreamExt;
2829
use libp2p::core::ConnectedPoint;
2930
use rand::Rng;
3031
use std::net::Ipv4Addr;

beacon_node/lighthouse_network/gossipsub/src/peer_score.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ mod tests;
4444
const TIME_CACHE_DURATION: u64 = 120;
4545

4646
pub(crate) struct PeerScore {
47-
params: PeerScoreParams,
47+
pub(crate) params: PeerScoreParams,
4848
/// The score parameters.
4949
peer_stats: HashMap<PeerId, PeerStats>,
5050
/// Tracking peers per IP.

0 commit comments

Comments
 (0)