Skip to content

Make range sync chain Id sequential #6868

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use rand::seq::SliceRandom;
use rand::Rng;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use strum::IntoStaticStr;
use types::{Epoch, EthSpec, Hash256, Slot};

Expand Down Expand Up @@ -56,7 +55,7 @@ pub enum RemoveChain {
pub struct KeepChain;

/// A chain identifier
pub type ChainId = u64;
pub type ChainId = Id;
pub type BatchId = Epoch;

#[derive(Debug, Copy, Clone, IntoStaticStr)]
Expand Down Expand Up @@ -127,14 +126,9 @@ pub enum ChainSyncingState {
}

impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
(target_root, target_slot).hash(&mut hasher);
hasher.finish()
}

#[allow(clippy::too_many_arguments)]
pub fn new(
id: Id,
start_epoch: Epoch,
target_head_slot: Slot,
target_head_root: Hash256,
Expand All @@ -145,8 +139,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let mut peers = FnvHashMap::default();
peers.insert(peer_id, Default::default());

let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);

SyncingChain {
id,
chain_type,
Expand All @@ -165,6 +157,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}

/// Returns true if this chain has the same target
pub fn has_same_target(&self, target_head_slot: Slot, target_head_root: Hash256) -> bool {
self.target_head_slot == target_head_slot && self.target_head_root == target_head_root
}

/// Check if the chain has peers from which to process batches.
pub fn available_peers(&self) -> usize {
self.peers.len()
Expand Down Expand Up @@ -1258,7 +1255,7 @@ impl<T: BeaconChainTypes> slog::KV for SyncingChain<T> {
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
use slog::Value;
serializer.emit_u64("id", self.id)?;
serializer.emit_u32("id", self.id)?;
Value::serialize(&self.start_epoch, record, "from", serializer)?;
Value::serialize(
&self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()),
Expand Down
31 changes: 18 additions & 13 deletions beacon_node/network/src/sync/range_sync/chain_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::metrics;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::PeerId;
use lighthouse_network::SyncInfo;
use slog::{crit, debug, error};
Expand All @@ -29,9 +30,9 @@ const MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS: u64 = 10;
#[derive(Clone)]
pub enum RangeSyncState {
/// A finalized chain is being synced.
Finalized(u64),
Finalized(Id),
/// There are no finalized chains and we are syncing one more head chains.
Head(SmallVec<[u64; PARALLEL_HEAD_CHAINS]>),
Head(SmallVec<[Id; PARALLEL_HEAD_CHAINS]>),
/// There are no head or finalized chains and no long range sync is in progress.
Idle,
}
Expand Down Expand Up @@ -74,7 +75,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
if syncing_id == id {
// the finalized chain that was syncing was removed
debug_assert!(was_syncing && sync_type == RangeSyncType::Finalized);
let syncing_head_ids: SmallVec<[u64; PARALLEL_HEAD_CHAINS]> = self
let syncing_head_ids: SmallVec<[Id; PARALLEL_HEAD_CHAINS]> = self
.head_chains
.iter()
.filter(|(_id, chain)| chain.is_syncing())
Expand Down Expand Up @@ -355,7 +356,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.collect::<Vec<_>>();
preferred_ids.sort_unstable();

let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new();
let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new();
for (_, _, id) in preferred_ids {
let chain = self.head_chains.get_mut(&id).expect("known chain");
if syncing_chains.len() < PARALLEL_HEAD_CHAINS {
Expand Down Expand Up @@ -465,15 +466,17 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
sync_type: RangeSyncType,
network: &mut SyncNetworkContext<T>,
) {
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
let collection = if let RangeSyncType::Finalized = sync_type {
&mut self.finalized_chains
} else {
&mut self.head_chains
};
match collection.entry(id) {
Entry::Occupied(mut entry) => {
let chain = entry.get_mut();

match collection
.iter_mut()
.find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have multiple chains with the same target, this will return the first one.
But this isn't possible I think because we only create a new chain when we cannot find one matching the same target (the None case), so I think this is fine.

{
Some((&id, chain)) => {
debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain);
debug_assert_eq!(chain.target_head_root, target_head_root);
debug_assert_eq!(chain.target_head_slot, target_head_slot);
Expand All @@ -483,23 +486,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
} else {
error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason);
}
let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing(), sync_type);
let is_syncing = chain.is_syncing();
collection.remove(&id);
self.on_chain_removed(&id, is_syncing, sync_type);
}
}
Entry::Vacant(entry) => {
None => {
let peer_rpr = peer.to_string();
let id = network.next_id();
let new_chain = SyncingChain::new(
id,
start_epoch,
target_head_slot,
target_head_root,
peer,
sync_type.into(),
&self.log,
);
debug_assert_eq!(new_chain.get_id(), id);
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
entry.insert(new_chain);
collection.insert(id, new_chain);
metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]);
self.update_metrics();
}
Expand Down
Loading