Skip to content

Expose send_ping from service.rs for discv5.rs #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ use tracing::{debug, warn};
use libp2p_core::Multiaddr;

// Create lazy static variable for the global permit/ban list
use crate::metrics::{Metrics, METRICS};
use crate::{
metrics::{Metrics, METRICS},
service::Pong,
};

lazy_static! {
pub static ref PERMIT_BAN_LIST: RwLock<crate::PermitBanList> =
RwLock::new(crate::PermitBanList::default());
Expand Down Expand Up @@ -308,6 +312,31 @@ impl<P: ProtocolIdentity> Discv5<P> {
None
}

/// Sends a PING request to a node.
pub fn send_ping(
&mut self,
enr: Enr,
) -> impl Future<Output = Result<Pong, RequestError>> + 'static {
let (callback_send, callback_recv) = oneshot::channel();
let channel = self.clone_channel();

async move {
let channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;

let event = ServiceRequest::Ping(enr, Some(callback_send));

// send the request
channel
.send(event)
.await
.map_err(|_| RequestError::ChannelFailed("Service channel closed".into()))?;
// await the response
callback_recv
.await
.map_err(|e| RequestError::ChannelFailed(e.to_string()))?
}
}

/// Bans a node from the server. This will remove the node from the routing table if it exists
/// and block all incoming packets from the node until the timeout specified. Setting the
/// timeout to `None` creates a permanent ban.
Expand Down
221 changes: 135 additions & 86 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ use futures::prelude::*;
use more_asserts::debug_unreachable;
use parking_lot::RwLock;
use rpc::*;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, task::Poll, time::Instant};
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::Arc,
task::Poll,
time::Instant,
};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -146,6 +152,8 @@ pub enum ServiceRequest {
Vec<u8>,
oneshot::Sender<Result<Vec<u8>, RequestError>>,
),
/// The PING discv5 RPC function.
Ping(Enr, Option<oneshot::Sender<Result<Pong, RequestError>>>),
/// Sets up an event stream where the discv5 server will return various events such as
/// discovered nodes as it traverses the DHT.
RequestEventStream(oneshot::Sender<mpsc::Receiver<Discv5Event>>),
Expand Down Expand Up @@ -213,12 +221,24 @@ struct ActiveRequest {
pub callback: Option<CallbackResponse>,
}

#[derive(Debug)]
pub struct Pong {
/// The current ENR sequence number of the responder.
pub enr_seq: u64,
/// Our external IP address as observed by the responder.
pub ip: IpAddr,
/// Our external UDP port as observed by the responder.
pub port: u16,
}

/// The kinds of responses we can send back to the discv5 layer.
pub enum CallbackResponse {
/// A response to a requested ENR.
Enr(oneshot::Sender<Result<Enr, RequestError>>),
/// A response from a TALK request
Talk(oneshot::Sender<Result<Vec<u8>, RequestError>>),
/// A response from a Pong request
Pong(oneshot::Sender<Result<Pong, RequestError>>),
}

/// For multiple responses to a FindNodes request, this keeps track of the request count
Expand Down Expand Up @@ -335,6 +355,9 @@ impl Service {
ServiceRequest::Talk(node_contact, protocol, request, callback) => {
self.talk_request(node_contact, protocol, request, callback);
}
ServiceRequest::Ping(enr, callback) => {
self.send_ping(enr, callback);
}
ServiceRequest::RequestEventStream(callback) => {
// the channel size needs to be large to handle many discovered peers
// if we are reporting them on the event stream.
Expand Down Expand Up @@ -428,7 +451,7 @@ impl Service {
};

if let Some(enr) = enr {
self.send_ping(enr);
self.send_ping(enr, None);
}
}
}
Expand Down Expand Up @@ -770,102 +793,117 @@ impl Service {
self.discovered(&node_id, nodes, active_request.query_id);
}
ResponseBody::Pong { enr_seq, ip, port } => {
let socket = SocketAddr::new(ip, port);
// perform ENR majority-based update if required.
// Send the response to the user, if they are who asked
if let Some(CallbackResponse::Pong(callback)) = active_request.callback {
let response = Pong { enr_seq, ip, port };
if let Err(e) = callback.send(Ok(response)) {
warn!("Failed to send callback response {:?}", e)
};
} else {
let socket = SocketAddr::new(ip, port);
// perform ENR majority-based update if required.

// Only count votes that from peers we have contacted.
let key: kbucket::Key<NodeId> = node_id.into();
let should_count = matches!(
// Only count votes that from peers we have contacted.
let key: kbucket::Key<NodeId> = node_id.into();
let should_count = matches!(
self.kbuckets.write().entry(&key),
kbucket::Entry::Present(_, status)
if status.is_connected() && !status.is_incoming());

if should_count {
// get the advertised local addresses
let (local_ip4_socket, local_ip6_socket) = {
let local_enr = self.local_enr.read();
(local_enr.udp4_socket(), local_enr.udp6_socket())
};

if let Some(ref mut ip_votes) = self.ip_votes {
ip_votes.insert(node_id, socket);
let (maybe_ip4_majority, maybe_ip6_majority) = ip_votes.majority();
if should_count {
// get the advertised local addresses
let (local_ip4_socket, local_ip6_socket) = {
let local_enr = self.local_enr.read();
(local_enr.udp4_socket(), local_enr.udp6_socket())
};

let new_ip4 = maybe_ip4_majority.and_then(|majority| {
if Some(majority) != local_ip4_socket {
Some(majority)
} else {
None
}
});
let new_ip6 = maybe_ip6_majority.and_then(|majority| {
if Some(majority) != local_ip6_socket {
Some(majority)
} else {
None
}
});
if let Some(ref mut ip_votes) = self.ip_votes {
ip_votes.insert(node_id, socket);
let (maybe_ip4_majority, maybe_ip6_majority) = ip_votes.majority();

if new_ip4.is_some() || new_ip6.is_some() {
let mut updated = false;

// Check if our advertised IPV6 address needs to be updated.
if let Some(new_ip6) = new_ip6 {
let new_ip6: SocketAddr = new_ip6.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip6, &self.enr_key.read());
match result {
Ok(_) => {
updated = true;
info!("Local UDP ip6 socket updated to: {}", new_ip6);
self.send_event(Discv5Event::SocketUpdated(new_ip6));
}
Err(e) => {
warn!("Failed to update local UDP ip6 socket. ip6: {}, error: {:?}", new_ip6, e);
}
let new_ip4 = maybe_ip4_majority.and_then(|majority| {
if Some(majority) != local_ip4_socket {
Some(majority)
} else {
None
}
}
if let Some(new_ip4) = new_ip4 {
let new_ip4: SocketAddr = new_ip4.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip4, &self.enr_key.read());
match result {
Ok(_) => {
updated = true;
info!("Local UDP socket updated to: {}", new_ip4);
self.send_event(Discv5Event::SocketUpdated(new_ip4));
});
let new_ip6 = maybe_ip6_majority.and_then(|majority| {
if Some(majority) != local_ip6_socket {
Some(majority)
} else {
None
}
});

if new_ip4.is_some() || new_ip6.is_some() {
let mut updated = false;

// Check if our advertised IPV6 address needs to be updated.
if let Some(new_ip6) = new_ip6 {
let new_ip6: SocketAddr = new_ip6.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip6, &self.enr_key.read());
match result {
Ok(_) => {
updated = true;
info!(
"Local UDP ip6 socket updated to: {}",
new_ip6,
);
self.send_event(Discv5Event::SocketUpdated(
new_ip6,
));
}
Err(e) => {
warn!("Failed to update local UDP ip6 socket. ip6: {}, error: {:?}", new_ip6, e);
}
}
Err(e) => {
warn!("Failed to update local UDP socket. ip: {}, error: {:?}", new_ip4, e);
}
if let Some(new_ip4) = new_ip4 {
let new_ip4: SocketAddr = new_ip4.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip4, &self.enr_key.read());
match result {
Ok(_) => {
updated = true;
info!("Local UDP socket updated to: {}", new_ip4);
self.send_event(Discv5Event::SocketUpdated(
new_ip4,
));
}
Err(e) => {
warn!("Failed to update local UDP socket. ip: {}, error: {:?}", new_ip4, e);
}
}
}
}
if updated {
self.ping_connected_peers();
if updated {
self.ping_connected_peers();
}
}
}
}
}

// check if we need to request a new ENR
if let Some(enr) = self.find_enr(&node_id) {
if enr.seq() < enr_seq {
// request an ENR update
debug!("Requesting an ENR update from: {}", active_request.contact);
let request_body = RequestBody::FindNode { distances: vec![0] };
let active_request = ActiveRequest {
contact: active_request.contact,
request_body,
query_id: None,
callback: None,
};
self.send_rpc_request(active_request);
// check if we need to request a new ENR
if let Some(enr) = self.find_enr(&node_id) {
if enr.seq() < enr_seq {
// request an ENR update
debug!("Requesting an ENR update from: {}", active_request.contact);
let request_body = RequestBody::FindNode { distances: vec![0] };
let active_request = ActiveRequest {
contact: active_request.contact,
request_body,
query_id: None,
callback: None,
};
self.send_rpc_request(active_request);
}
self.connection_updated(node_id, ConnectionStatus::PongReceived(enr));
}
self.connection_updated(node_id, ConnectionStatus::PongReceived(enr));
}
}
ResponseBody::Talk { response } => {
Expand Down Expand Up @@ -897,7 +935,11 @@ impl Service {
// Send RPC Requests //

/// Sends a PING request to a node.
fn send_ping(&mut self, enr: Enr) {
fn send_ping(
&mut self,
enr: Enr,
callback: Option<oneshot::Sender<Result<Pong, RequestError>>>,
) {
match NodeContact::try_from_enr(enr, self.config.ip_mode) {
Ok(contact) => {
let request_body = RequestBody::Ping {
Expand All @@ -907,7 +949,7 @@ impl Service {
contact,
request_body,
query_id: None,
callback: None,
callback: callback.map(CallbackResponse::Pong),
};
self.send_rpc_request(active_request);
}
Expand All @@ -933,7 +975,7 @@ impl Service {
};

for enr in connected_peers {
self.send_ping(enr.clone());
self.send_ping(enr.clone(), None);
}
}

Expand Down Expand Up @@ -1341,7 +1383,7 @@ impl Service {
}
};
if let Some(enr) = optional_enr {
self.send_ping(enr)
self.send_ping(enr, None)
}
}
}
Expand Down Expand Up @@ -1383,6 +1425,13 @@ impl Service {
.unwrap_or_else(|_| debug!("Couldn't send TALK error response to user"));
return;
}
Some(CallbackResponse::Pong(callback)) => {
// return the error
callback
.send(Err(error))
.unwrap_or_else(|_| debug!("Couldn't send Pong error response to user"));
return;
}
None => {
// no callback to send too
}
Expand Down