Skip to content

Optimise and refine SingleAttestation conversion #6934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ use std::borrow::Cow;
use strum::AsRefStr;
use tree_hash::TreeHash;
use types::{
Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec,
CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof,
SignedAggregateAndProof, SingleAttestation, Slot, SubnetId,
Attestation, AttestationData, AttestationRef, BeaconCommittee,
BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256,
IndexedAttestation, SelectionProof, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId,
};

pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations};
Expand Down Expand Up @@ -115,6 +115,17 @@ pub enum Error {
///
/// The peer has sent an invalid message.
AggregatorNotInCommittee { aggregator_index: u64 },
/// The `attester_index` for a `SingleAttestation` is not a member of the committee defined
/// by its `beacon_block_root`, `committee_index` and `slot`.
///
/// ## Peer scoring
///
/// The peer has sent an invalid message.
AttesterNotInCommittee {
attester_index: u64,
committee_index: u64,
slot: Slot,
},
/// The aggregator index refers to a validator index that we have not seen.
///
/// ## Peer scoring
Expand Down Expand Up @@ -485,7 +496,11 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
//
// We do not queue future attestations for later processing.
verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?;
verify_propagation_slot_range::<_, T::EthSpec>(
&chain.slot_clock,
attestation.data(),
&chain.spec,
)?;

// Check the attestation's epoch matches its target.
if attestation.data().slot.epoch(T::EthSpec::slots_per_epoch())
Expand Down Expand Up @@ -817,7 +832,11 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> {
// MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance).
//
// We do not queue future attestations for later processing.
verify_propagation_slot_range(&chain.slot_clock, attestation, &chain.spec)?;
verify_propagation_slot_range::<_, T::EthSpec>(
&chain.slot_clock,
attestation.data(),
&chain.spec,
)?;

// Check to ensure that the attestation is "unaggregated". I.e., it has exactly one
// aggregation bit set.
Expand Down Expand Up @@ -1133,10 +1152,10 @@ fn verify_head_block_is_known<T: BeaconChainTypes>(
/// Accounts for `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
pub fn verify_propagation_slot_range<S: SlotClock, E: EthSpec>(
slot_clock: &S,
attestation: AttestationRef<E>,
attestation: &AttestationData,
spec: &ChainSpec,
) -> Result<(), Error> {
let attestation_slot = attestation.data().slot;
let attestation_slot = attestation.slot;
let latest_permissible_slot = slot_clock
.now_with_future_tolerance(spec.maximum_gossip_clock_disparity())
.ok_or(BeaconChainError::UnableToReadSlot)?;
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod pre_finalization_cache;
pub mod proposer_prep_service;
pub mod schema_change;
pub mod shuffling_cache;
pub mod single_attestation;
pub mod state_advance_timer;
pub mod sync_committee_rewards;
pub mod sync_committee_verification;
Expand Down
46 changes: 46 additions & 0 deletions beacon_node/beacon_chain/src/single_attestation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::attestation_verification::Error;
use types::{Attestation, AttestationElectra, BitList, BitVector, EthSpec, SingleAttestation};

pub fn single_attestation_to_attestation<E: EthSpec>(
single_attestation: &SingleAttestation,
committee: &[usize],
) -> Result<Attestation<E>, Error> {
let attester_index = single_attestation.attester_index;
let committee_index = single_attestation.committee_index;
let slot = single_attestation.data.slot;

let aggregation_bit = committee
.iter()
.enumerate()
.find_map(|(i, &validator_index)| {
if attester_index as usize == validator_index {
return Some(i);
}
None
})
.ok_or(Error::AttesterNotInCommittee {
attester_index,
committee_index,
slot,
})?;

let mut committee_bits: BitVector<E::MaxCommitteesPerSlot> = BitVector::default();
committee_bits
.set(committee_index as usize, true)
.map_err(|e| Error::Invalid(e.into()))?;

let mut aggregation_bits =
BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?;
aggregation_bits
.set(aggregation_bit, true)
.map_err(|e| Error::Invalid(e.into()))?;

// TODO(electra): consider eventually allowing conversion to non-Electra attestations as well
// to maintain invertability (`Attestation` -> `SingleAttestation` -> `Attestation`).
Ok(Attestation::Electra(AttestationElectra {
aggregation_bits,
committee_bits,
data: single_attestation.data.clone(),
signature: single_attestation.signature.clone(),
}))
}
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain;
pub use crate::{
beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY},
migrate::MigratorConfig,
single_attestation::single_attestation_to_attestation,
sync_committee_verification::Error as SyncCommitteeError,
validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig},
BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification,
Expand Down Expand Up @@ -1133,7 +1134,8 @@ where
let single_attestation =
attestation.to_single_attestation_with_attester_index(attester_index as u64)?;

let attestation: Attestation<E> = single_attestation.to_attestation(committee.committee)?;
let attestation: Attestation<E> =
single_attestation_to_attestation(&single_attestation, committee.committee).unwrap();

assert_eq!(
single_attestation.committee_index,
Expand Down
45 changes: 36 additions & 9 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::{
Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId,
Attestation, BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof,
SingleAttestation, Slot, SubnetId,
};
use types::{EthSpec, Slot};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
Expand Down Expand Up @@ -504,10 +504,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {

/// Items required to verify a batch of unaggregated gossip attestations.
#[derive(Debug)]
pub struct GossipAttestationPackage<E: EthSpec> {
pub struct GossipAttestationPackage<T> {
pub message_id: MessageId,
pub peer_id: PeerId,
pub attestation: Box<Attestation<E>>,
pub attestation: Box<T>,
pub subnet_id: SubnetId,
pub should_import: bool,
pub seen_timestamp: Duration,
Expand Down Expand Up @@ -549,21 +549,32 @@ pub enum BlockingOrAsync {
Blocking(BlockingFn),
Async(AsyncFn),
}
pub type GossipAttestationBatch<E> = Vec<GossipAttestationPackage<Attestation<E>>>;

/// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics.
pub enum Work<E: EthSpec> {
GossipAttestation {
attestation: Box<GossipAttestationPackage<E>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
attestation: Box<GossipAttestationPackage<Attestation<E>>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<Attestation<E>>) + Send + Sync>,
process_batch: Box<dyn FnOnce(GossipAttestationBatch<E>) + Send + Sync>,
},
// Attestation requiring conversion before processing.
//
// For now this is a `SingleAttestation`, but eventually we will switch this around so that
// legacy `Attestation`s are converted and the main processing pipeline operates on
// `SingleAttestation`s.
GossipAttestationToConvert {
attestation: Box<GossipAttestationPackage<SingleAttestation>>,
process_individual:
Box<dyn FnOnce(GossipAttestationPackage<SingleAttestation>) + Send + Sync>,
},
UnknownBlockAttestation {
process_fn: BlockingFn,
},
GossipAttestationBatch {
attestations: Vec<GossipAttestationPackage<E>>,
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
attestations: GossipAttestationBatch<E>,
process_batch: Box<dyn FnOnce(GossipAttestationBatch<E>) + Send + Sync>,
},
GossipAggregate {
aggregate: Box<GossipAggregatePackage<E>>,
Expand Down Expand Up @@ -639,6 +650,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
GossipAttestation,
GossipAttestationToConvert,
UnknownBlockAttestation,
GossipAttestationBatch,
GossipAggregate,
Expand Down Expand Up @@ -690,6 +702,7 @@ impl<E: EthSpec> Work<E> {
fn to_type(&self) -> WorkType {
match self {
Work::GossipAttestation { .. } => WorkType::GossipAttestation,
Work::GossipAttestationToConvert { .. } => WorkType::GossipAttestationToConvert,
Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
Work::GossipAggregate { .. } => WorkType::GossipAggregate,
Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
Expand Down Expand Up @@ -849,6 +862,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue);
let mut aggregate_debounce = TimeLatch::default();
let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
let mut attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue);
let mut attestation_debounce = TimeLatch::default();
let mut unknown_block_aggregate_queue =
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
Expand Down Expand Up @@ -1180,6 +1194,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
None
}
}
// Convert any gossip attestations that need to be converted.
} else if let Some(item) = attestation_to_convert_queue.pop() {
Some(item)
// Check sync committee messages after attestations as their rewards are lesser
// and they don't influence fork choice.
} else if let Some(item) = sync_contribution_queue.pop() {
Expand Down Expand Up @@ -1301,6 +1318,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
match work {
_ if can_spawn => self.spawn_worker(work, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),
Work::GossipAttestationToConvert { .. } => {
attestation_to_convert_queue.push(work)
}
// Attestation batches are formed internally within the
// `BeaconProcessor`, they are not sent from external services.
Work::GossipAttestationBatch { .. } => crit!(
Expand Down Expand Up @@ -1431,6 +1451,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
if let Some(modified_queue_id) = modified_queue_id {
let queue_len = match modified_queue_id {
WorkType::GossipAttestation => attestation_queue.len(),
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => aggregate_queue.len(),
Expand Down Expand Up @@ -1563,6 +1584,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
} => task_spawner.spawn_blocking(move || {
process_individual(*attestation);
}),
Work::GossipAttestationToConvert {
attestation,
process_individual,
} => task_spawner.spawn_blocking(move || {
process_individual(*attestation);
}),
Work::GossipAttestationBatch {
attestations,
process_batch,
Expand Down
51 changes: 30 additions & 21 deletions beacon_node/http_api/src/publish_attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
//! attestations and there's no immediate cause for concern.
use crate::task_spawner::{Priority, TaskSpawner};
use beacon_chain::{
validator_monitor::timestamp_now, AttestationError, BeaconChain, BeaconChainError,
BeaconChainTypes,
single_attestation::single_attestation_to_attestation, validator_monitor::timestamp_now,
AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage};
use either::Either;
Expand Down Expand Up @@ -183,10 +183,10 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
attestation: &'a Either<Attestation<T::EthSpec>, SingleAttestation>,
) -> Result<Cow<'a, Attestation<T::EthSpec>>, Error> {
let a = match attestation {
Either::Left(a) => Cow::Borrowed(a),
Either::Right(single_attestation) => chain
.with_committee_cache(
match attestation {
Either::Left(a) => Ok(Cow::Borrowed(a)),
Either::Right(single_attestation) => {
let conversion_result = chain.with_committee_cache(
single_attestation.data.target.root,
single_attestation
.data
Expand All @@ -197,24 +197,33 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>(
single_attestation.data.slot,
single_attestation.committee_index,
) else {
return Err(BeaconChainError::AttestationError(
types::AttestationError::NoCommitteeForSlotAndIndex {
slot: single_attestation.data.slot,
index: single_attestation.committee_index,
},
));
return Ok(Err(AttestationError::NoCommitteeForSlotAndIndex {
slot: single_attestation.data.slot,
index: single_attestation.committee_index,
}));
};

let attestation =
single_attestation.to_attestation::<T::EthSpec>(committee.committee)?;

Ok(Cow::Owned(attestation))
Ok(single_attestation_to_attestation::<T::EthSpec>(
single_attestation,
committee.committee,
)
.map(Cow::Owned))
},
)
.map_err(Error::FailedConversion)?,
};

Ok(a)
);
match conversion_result {
Ok(Ok(attestation)) => Ok(attestation),
Ok(Err(e)) => Err(Error::Validation(e)),
// Map the error returned by `with_committee_cache` for unknown blocks into the
// `UnknownHeadBlock` error that is gracefully handled.
Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => {
Err(Error::Validation(AttestationError::UnknownHeadBlock {
beacon_block_root,
}))
}
Err(e) => Err(Error::FailedConversion(e)),
}
}
}
}

pub async fn publish_attestations<T: BeaconChainTypes>(
Expand Down
Loading