Skip to content

Commit 4ab0678

Browse files
committed
Fix custodial peer assumption on lookup custody requests
1 parent 06329ec commit 4ab0678

File tree

5 files changed

+79
-52
lines changed

5 files changed

+79
-52
lines changed

beacon_node/network/src/sync/block_lookups/common.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
99
use beacon_chain::block_verification_types::RpcBlock;
1010
use beacon_chain::BeaconChainTypes;
1111
use lighthouse_network::service::api_types::Id;
12+
use parking_lot::RwLock;
13+
use std::collections::HashSet;
1214
use std::sync::Arc;
1315
use types::blob_sidecar::FixedBlobSidecarList;
1416
use types::{DataColumnSidecarList, SignedBeaconBlock};
@@ -41,7 +43,7 @@ pub trait RequestState<T: BeaconChainTypes> {
4143
fn make_request(
4244
&self,
4345
id: Id,
44-
peer_id: PeerId,
46+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
4547
expected_blobs: usize,
4648
cx: &mut SyncNetworkContext<T>,
4749
) -> Result<LookupRequestResult, LookupRequestError>;
@@ -76,11 +78,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
7678
fn make_request(
7779
&self,
7880
id: SingleLookupId,
79-
peer_id: PeerId,
81+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
8082
_: usize,
8183
cx: &mut SyncNetworkContext<T>,
8284
) -> Result<LookupRequestResult, LookupRequestError> {
83-
cx.block_lookup_request(id, peer_id, self.requested_block_root)
85+
cx.block_lookup_request(id, lookup_peers, self.requested_block_root)
8486
.map_err(LookupRequestError::SendFailedNetwork)
8587
}
8688

@@ -124,11 +126,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
124126
fn make_request(
125127
&self,
126128
id: Id,
127-
peer_id: PeerId,
129+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
128130
expected_blobs: usize,
129131
cx: &mut SyncNetworkContext<T>,
130132
) -> Result<LookupRequestResult, LookupRequestError> {
131-
cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs)
133+
cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs)
132134
.map_err(LookupRequestError::SendFailedNetwork)
133135
}
134136

@@ -172,12 +174,11 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
172174
fn make_request(
173175
&self,
174176
id: Id,
175-
// TODO(das): consider selecting peers that have custody but are in this set
176-
_peer_id: PeerId,
177+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
177178
_: usize,
178179
cx: &mut SyncNetworkContext<T>,
179180
) -> Result<LookupRequestResult, LookupRequestError> {
180-
cx.custody_lookup_request(id, self.block_root)
181+
cx.custody_lookup_request(id, self.block_root, lookup_peers)
181182
.map_err(LookupRequestError::SendFailedNetwork)
182183
}
183184

beacon_node/network/src/sync/block_lookups/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
283283
.find(|(_, l)| l.block_root() == parent_chain_tip)
284284
{
285285
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
286-
peers: lookup.all_peers().copied().collect(),
286+
peers: lookup.all_peers(),
287287
head_slot: tip_lookup.peek_downloaded_block_slot(),
288288
head_root: parent_chain_tip,
289289
});
@@ -682,7 +682,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
682682
lookup.continue_requests(cx)
683683
}
684684
Action::ParentUnknown { parent_root } => {
685-
let peers = lookup.all_peers().copied().collect::<Vec<_>>();
685+
let peers = lookup.all_peers();
686686
lookup.set_awaiting_parent(parent_root);
687687
debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root);
688688
self.search_parent_of_child(parent_root, block_root, &peers, cx);

beacon_node/network/src/sync/block_lookups/single_block_lookup.rs

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::sync::network_context::{
77
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
88
use derivative::Derivative;
99
use lighthouse_network::service::api_types::Id;
10-
use rand::seq::IteratorRandom;
10+
use parking_lot::RwLock;
1111
use std::collections::HashSet;
1212
use std::fmt::Debug;
1313
use std::sync::Arc;
@@ -33,8 +33,6 @@ pub enum LookupRequestError {
3333
/// The failed attempts were primarily due to processing failures.
3434
cannot_process: bool,
3535
},
36-
/// No peers left to serve this lookup
37-
NoPeers,
3836
/// Error sending event to network
3937
SendFailedNetwork(RpcRequestSendError),
4038
/// Error sending event to processor
@@ -63,9 +61,12 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
6361
pub id: Id,
6462
pub block_request_state: BlockRequestState<T::EthSpec>,
6563
pub component_requests: ComponentRequests<T::EthSpec>,
66-
/// Peers that claim to have imported this set of block components
64+
/// Peers that claim to have imported this set of block components. This state is shared with
65+
/// the custody request to have an updated view of the peers that claim to have imported the
66+
/// block associated with this lookup. The peer set of a lookup can change rapidly, and faster
67+
/// than the lifetime of a custody request.
6768
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
68-
peers: HashSet<PeerId>,
69+
peers: Arc<RwLock<HashSet<PeerId>>>,
6970
block_root: Hash256,
7071
awaiting_parent: Option<Hash256>,
7172
created: Instant,
@@ -92,7 +93,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
9293
id,
9394
block_request_state: BlockRequestState::new(requested_block_root),
9495
component_requests: ComponentRequests::WaitingForBlock,
95-
peers: HashSet::from_iter(peers.iter().copied()),
96+
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
9697
block_root: requested_block_root,
9798
awaiting_parent,
9899
created: Instant::now(),
@@ -283,24 +284,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
283284
return Err(LookupRequestError::TooManyAttempts { cannot_process });
284285
}
285286

286-
let Some(peer_id) = self.use_rand_available_peer() else {
287-
// Allow lookup to not have any peers and do nothing. This is an optimization to not
288-
// lose progress of lookups created from a block with unknown parent before we receive
289-
// attestations for said block.
290-
// Lookup sync event safety: If a lookup requires peers to make progress, and does
291-
// not receive any new peers for some time it will be dropped. If it receives a new
292-
// peer it must attempt to make progress.
293-
R::request_state_mut(self)
294-
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
295-
.get_state_mut()
296-
.update_awaiting_download_status("no peers");
297-
return Ok(());
298-
};
299-
287+
let peers = self.peers.clone();
300288
let request = R::request_state_mut(self)
301289
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
302290

303-
match request.make_request(id, peer_id, expected_blobs, cx)? {
291+
match request.make_request(id, peers, expected_blobs, cx)? {
304292
LookupRequestResult::RequestSent(req_id) => {
305293
// Lookup sync event safety: If make_request returns `RequestSent`, we are
306294
// guaranteed that `BlockLookups::on_download_response` will be called exactly
@@ -348,29 +336,24 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
348336
}
349337

350338
/// Get all unique peers that claim to have imported this set of block components
351-
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
352-
self.peers.iter()
339+
pub fn all_peers(&self) -> Vec<PeerId> {
340+
self.peers.read().iter().copied().collect()
353341
}
354342

355343
/// Add peer to all request states. The peer must be able to serve this request.
356344
/// Returns true if the peer was newly inserted into some request state.
357345
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
358-
self.peers.insert(peer_id)
346+
self.peers.write().insert(peer_id)
359347
}
360348

361349
/// Remove peer from available peers.
362350
pub fn remove_peer(&mut self, peer_id: &PeerId) {
363-
self.peers.remove(peer_id);
351+
self.peers.write().remove(peer_id);
364352
}
365353

366354
/// Returns true if this lookup has zero peers
367355
pub fn has_no_peers(&self) -> bool {
368-
self.peers.is_empty()
369-
}
370-
371-
/// Selects a random peer from available peers if any
372-
fn use_rand_available_peer(&mut self) -> Option<PeerId> {
373-
self.peers.iter().choose(&mut rand::thread_rng()).copied()
356+
self.peers.read().is_empty()
374357
}
375358
}
376359

@@ -689,8 +672,8 @@ impl<T: Clone> std::fmt::Debug for State<T> {
689672
}
690673

691674
fn fmt_peer_set_as_len(
692-
peer_set: &HashSet<PeerId>,
675+
peer_set: &Arc<RwLock<HashSet<PeerId>>>,
693676
f: &mut std::fmt::Formatter,
694677
) -> Result<(), std::fmt::Error> {
695-
write!(f, "{}", peer_set.len())
678+
write!(f, "{}", peer_set.read().len())
696679
}

beacon_node/network/src/sync/network_context.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use lighthouse_network::service::api_types::{
2727
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
2828
};
2929
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
30-
use rand::seq::SliceRandom;
30+
use parking_lot::RwLock;
31+
use rand::prelude::IteratorRandom;
3132
use rand::thread_rng;
3233
pub use requests::LookupVerifyError;
3334
use requests::{
@@ -308,8 +309,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
308309

309310
pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option<PeerId> {
310311
self.get_custodial_peers(column_index)
312+
.into_iter()
311313
.choose(&mut thread_rng())
312-
.cloned()
313314
}
314315

315316
pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
@@ -562,9 +563,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
562563
pub fn block_lookup_request(
563564
&mut self,
564565
lookup_id: SingleLookupId,
565-
peer_id: PeerId,
566+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
566567
block_root: Hash256,
567568
) -> Result<LookupRequestResult, RpcRequestSendError> {
569+
let Some(peer_id) = lookup_peers
570+
.read()
571+
.iter()
572+
.choose(&mut rand::thread_rng())
573+
.copied()
574+
else {
575+
// Allow lookup to not have any peers and do nothing. This is an optimization to not
576+
// lose progress of lookups created from a block with unknown parent before we receive
577+
// attestations for said block.
578+
// Lookup sync event safety: If a lookup requires peers to make progress, and does
579+
// not receive any new peers for some time it will be dropped. If it receives a new
580+
// peer it must attempt to make progress.
581+
return Ok(LookupRequestResult::Pending("no peers"));
582+
};
583+
568584
match self.chain.get_block_process_status(&block_root) {
569585
// Unknown block, continue request to download
570586
BlockProcessStatus::Unknown => {}
@@ -634,10 +650,25 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
634650
pub fn blob_lookup_request(
635651
&mut self,
636652
lookup_id: SingleLookupId,
637-
peer_id: PeerId,
653+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
638654
block_root: Hash256,
639655
expected_blobs: usize,
640656
) -> Result<LookupRequestResult, RpcRequestSendError> {
657+
let Some(peer_id) = lookup_peers
658+
.read()
659+
.iter()
660+
.choose(&mut rand::thread_rng())
661+
.copied()
662+
else {
663+
// Allow lookup to not have any peers and do nothing. This is an optimization to not
664+
// lose progress of lookups created from a block with unknown parent before we receive
665+
// attestations for said block.
666+
// Lookup sync event safety: If a lookup requires peers to make progress, and does
667+
// not receive any new peers for some time it will be dropped. If it receives a new
668+
// peer it must attempt to make progress.
669+
return Ok(LookupRequestResult::Pending("no peers"));
670+
};
671+
641672
let imported_blob_indexes = self
642673
.chain
643674
.data_availability_checker
@@ -740,6 +771,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
740771
&mut self,
741772
lookup_id: SingleLookupId,
742773
block_root: Hash256,
774+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
743775
) -> Result<LookupRequestResult, RpcRequestSendError> {
744776
let custody_indexes_imported = self
745777
.chain
@@ -777,6 +809,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
777809
block_root,
778810
CustodyId { requester },
779811
&custody_indexes_to_fetch,
812+
lookup_peers,
780813
self.log.clone(),
781814
);
782815

beacon_node/network/src/sync/network_context/custody.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ use fnv::FnvHashMap;
77
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
88
use lighthouse_network::PeerId;
99
use lru_cache::LRUTimeCache;
10+
use parking_lot::RwLock;
1011
use rand::Rng;
1112
use slog::{debug, warn};
13+
use std::collections::HashSet;
1214
use std::time::{Duration, Instant};
1315
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
1416
use types::EthSpec;
@@ -32,6 +34,8 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
3234
/// Peers that have recently failed to successfully respond to a columns by root request.
3335
/// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
3436
failed_peers: LRUTimeCache<PeerId>,
37+
/// Set of peers that claim to have imported this block and their custody columns
38+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
3539
/// Logger for the `SyncNetworkContext`.
3640
pub log: slog::Logger,
3741
_phantom: PhantomData<T>,
@@ -64,6 +68,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
6468
block_root: Hash256,
6569
custody_id: CustodyId,
6670
column_indices: &[ColumnIndex],
71+
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
6772
log: slog::Logger,
6873
) -> Self {
6974
Self {
@@ -76,6 +81,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
7681
),
7782
active_batch_columns_requests: <_>::default(),
7883
failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)),
84+
lookup_peers,
7985
log,
8086
_phantom: PhantomData,
8187
}
@@ -215,6 +221,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
215221
}
216222

217223
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
224+
let lookup_peers = self.lookup_peers.read();
218225

219226
// Need to:
220227
// - track how many active requests a peer has for load balancing
@@ -244,6 +251,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
244251
.iter()
245252
.map(|peer| {
246253
(
254+
// Prioritize peers that claim to know have imported this block
255+
if lookup_peers.contains(peer) { 0 } else { 1 },
247256
// De-prioritize peers that have failed to successfully respond to
248257
// requests recently
249258
self.failed_peers.contains(peer),
@@ -257,7 +266,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
257266
.collect::<Vec<_>>();
258267
priorized_peers.sort_unstable();
259268

260-
if let Some((_, _, _, peer_id)) = priorized_peers.first() {
269+
if let Some((_, _, _, _, peer_id)) = priorized_peers.first() {
261270
columns_to_request_by_peer
262271
.entry(*peer_id)
263272
.or_default()
@@ -283,10 +292,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
283292
block_root: self.block_root,
284293
indices: indices.clone(),
285294
},
286-
// true = enforce max_requests are returned data_columns_by_root. We only issue requests
287-
// for blocks after we know the block has data, and only request peers after they claim to
288-
// have imported the block+columns and claim to be custodians
289-
true,
295+
// If peer is in the lookup peer set, it claims to have imported the block and
296+
// must have its columns in custody. In that case, set `true = enforce max_requests`
297+
// and downscore if data_columns_by_root does not returned the expected custody
298+
// columns. For the rest of peers, don't downscore if columns are missing.
299+
lookup_peers.contains(&peer_id),
290300
)
291301
.map_err(Error::SendFailed)?;
292302

0 commit comments

Comments
 (0)