Skip to content

Commit 1445a56

Browse files
AgeManningjxs
andauthored
Prevent dual-stack spamming (#275)
* Prevent dual-stack spamming * Fix clippy lint * update majority function * Improve comments and function naming * remove self and make filter_stale_find_most_frequent a free function * fix test --------- Co-authored-by: João Oliveira <[email protected]>
1 parent 9d211f6 commit 1445a56

File tree

3 files changed

+82
-35
lines changed

3 files changed

+82
-35
lines changed

src/kbucket/bucket.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ where
410410
// Adjust `first_connected_pos` accordingly.
411411
match old_status.state {
412412
ConnectionState::Connected => {
413-
if (self.first_connected_pos == Some(pos.0)) && pos.0 == self.nodes.len() {
413+
if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() {
414414
// It was the last connected node.
415415
self.first_connected_pos = None
416416
}

src/service.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,7 +1504,7 @@ impl Service {
15041504
_ => connection_direction,
15051505
};
15061506

1507-
debug!(node = %node_id, %direction, "Session established with Node");
1507+
debug!(node = %node_id, %direction, %socket, "Session established with Node");
15081508
self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction));
15091509
}
15101510

@@ -1616,13 +1616,16 @@ impl Service {
16161616
let Some(ip_votes) = self.ip_votes.as_mut() else {
16171617
return false;
16181618
};
1619-
match (ip_votes.majority(), is_ipv6) {
1619+
// Here we check the number of non-expired votes, rather than the majority. As if the
1620+
// local router is not SNAT'd we can have many votes but none for the same port and we
1621+
// therefore do excessive pinging.
1622+
match (ip_votes.has_minimum_threshold(), is_ipv6) {
16201623
// We don't have enough ipv4 votes, but this is an IPv4-only node.
1621-
((None, Some(_)), false) |
1624+
((false, true), false) |
16221625
// We don't have enough ipv6 votes, but this is an IPv6 node.
1623-
((Some(_), None), true) |
1626+
((true, false), true) |
16241627
// We don't have enough ipv6 or ipv4 nodes, ping this peer.
1625-
((None, None), _,) => true,
1628+
((false, false), _,) => true,
16261629
// We have enough votes do nothing.
16271630
((_, _), _,) => false,
16281631
}

src/service/ip_vote.rs

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ use enr::NodeId;
22
use fnv::FnvHashMap;
33
use std::{
44
collections::HashMap,
5+
hash::Hash,
56
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
67
time::{Duration, Instant},
78
};
89

910
/// A collection of IP:Ports for our node reported from external peers.
1011
pub(crate) struct IpVote {
11-
/// The current collection of IP:Port votes.
12-
votes: HashMap<NodeId, (SocketAddr, Instant)>,
12+
/// The current collection of IP:Port votes for ipv4.
13+
ipv4_votes: HashMap<NodeId, (SocketAddrV4, Instant)>,
14+
/// The current collection of IP:Port votes for ipv6.
15+
ipv6_votes: HashMap<NodeId, (SocketAddrV6, Instant)>,
1316
/// The minimum number of votes required before an IP/PORT is accepted.
1417
minimum_threshold: usize,
1518
/// The time votes remain valid.
@@ -23,46 +26,86 @@ impl IpVote {
2326
panic!("Setting enr_peer_update_min to a value less than 2 will cause issues with discovery with peers behind NAT");
2427
}
2528
IpVote {
26-
votes: HashMap::new(),
29+
ipv4_votes: HashMap::new(),
30+
ipv6_votes: HashMap::new(),
2731
minimum_threshold,
2832
vote_duration,
2933
}
3034
}
3135

3236
pub fn insert(&mut self, key: NodeId, socket: impl Into<SocketAddr>) {
33-
self.votes
34-
.insert(key, (socket.into(), Instant::now() + self.vote_duration));
37+
match socket.into() {
38+
SocketAddr::V4(socket) => {
39+
self.ipv4_votes
40+
.insert(key, (socket, Instant::now() + self.vote_duration));
41+
}
42+
SocketAddr::V6(socket) => {
43+
self.ipv6_votes
44+
.insert(key, (socket, Instant::now() + self.vote_duration));
45+
}
46+
}
3547
}
3648

37-
/// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None.
38-
pub fn majority(&mut self) -> (Option<SocketAddrV4>, Option<SocketAddrV6>) {
39-
// remove any expired votes
49+
/// Returns true if we have more than the minimum number of non-expired votes for a given ip
50+
/// version.
51+
pub fn has_minimum_threshold(&mut self) -> (bool, bool) {
4052
let instant = Instant::now();
41-
self.votes.retain(|_, v| v.1 > instant);
42-
43-
// count votes, take majority
44-
let mut ip4_count: FnvHashMap<SocketAddrV4, usize> = FnvHashMap::default();
45-
let mut ip6_count: FnvHashMap<SocketAddrV6, usize> = FnvHashMap::default();
46-
for (socket, _) in self.votes.values() {
47-
// NOTE: here we depend on addresses being already cleaned up. No mapped or compat
48-
// addresses should be present. This is done in the codec.
49-
match socket {
50-
SocketAddr::V4(socket) => *ip4_count.entry(*socket).or_insert_with(|| 0) += 1,
51-
SocketAddr::V6(socket) => *ip6_count.entry(*socket).or_insert_with(|| 0) += 1,
53+
self.ipv4_votes.retain(|_, v| v.1 > instant);
54+
self.ipv6_votes.retain(|_, v| v.1 > instant);
55+
56+
(
57+
self.ipv4_votes.len() >= self.minimum_threshold,
58+
self.ipv6_votes.len() >= self.minimum_threshold,
59+
)
60+
}
61+
62+
/// Filter the stale votes and return the majority `SocketAddr` if it exists.
63+
/// If there are not enough votes to meet the threshold this returns None.
64+
fn filter_stale_find_most_frequent<K: Copy + Eq + Hash>(
65+
votes: &HashMap<NodeId, (K, Instant)>,
66+
minimum_threshold: usize,
67+
) -> (HashMap<NodeId, (K, Instant)>, Option<K>) {
68+
let mut updated = HashMap::default();
69+
let mut counter: FnvHashMap<K, usize> = FnvHashMap::default();
70+
let mut max: Option<(K, usize)> = None;
71+
let now = Instant::now();
72+
73+
for (node_id, (vote, instant)) in votes {
74+
// Discard stale votes.
75+
if instant <= &now {
76+
continue;
77+
}
78+
updated.insert(*node_id, (*vote, *instant));
79+
80+
let count = counter.entry(*vote).or_default();
81+
*count += 1;
82+
let current_max = max.map(|(_v, m)| m).unwrap_or_default();
83+
if *count >= current_max && *count >= minimum_threshold {
84+
max = Some((*vote, *count));
5285
}
5386
}
5487

55-
// find the maximum socket addr
56-
let ip4_majority = majority(ip4_count.into_iter(), &self.minimum_threshold);
57-
let ip6_majority = majority(ip6_count.into_iter(), &self.minimum_threshold);
58-
(ip4_majority, ip6_majority)
88+
(updated, max.map(|m| m.0))
5989
}
60-
}
6190

62-
fn majority<K>(iter: impl Iterator<Item = (K, usize)>, threshold: &usize) -> Option<K> {
63-
iter.filter(|(_k, count)| count >= threshold)
64-
.max_by_key(|(_k, count)| *count)
65-
.map(|(k, _count)| k)
91+
/// Returns the majority `SocketAddr`'s of both IPv4 and IPv6 if they exist. If there are not enough votes to meet the threshold this returns None for each stack.
92+
pub fn majority(&mut self) -> (Option<SocketAddrV4>, Option<SocketAddrV6>) {
93+
let (updated_ipv4_votes, ipv4_majority) = Self::filter_stale_find_most_frequent::<
94+
SocketAddrV4,
95+
>(
96+
&self.ipv4_votes, self.minimum_threshold
97+
);
98+
self.ipv4_votes = updated_ipv4_votes;
99+
100+
let (updated_ipv6_votes, ipv6_majority) = Self::filter_stale_find_most_frequent::<
101+
SocketAddrV6,
102+
>(
103+
&self.ipv6_votes, self.minimum_threshold
104+
);
105+
self.ipv6_votes = updated_ipv6_votes;
106+
107+
(ipv4_majority, ipv6_majority)
108+
}
66109
}
67110

68111
#[cfg(test)]
@@ -88,7 +131,8 @@ mod tests {
88131
votes.insert(NodeId::random(), socket_3);
89132
votes.insert(NodeId::random(), socket_3);
90133

91-
assert_eq!(votes.majority(), (Some(socket_2), None));
134+
// Assert that in a draw situation a majority is still chosen.
135+
assert!(votes.majority().0.is_some());
92136
}
93137

94138
#[test]

0 commit comments

Comments
 (0)