Skip to content

Ensure /eth/v2/beacon/pool/attestations honors committee_index #7298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
pub use state_id::StateId;
use std::collections::HashSet;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
Expand All @@ -85,13 +86,14 @@ use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use types::AttestationData;
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset,
Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData,
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
fork_versioned_response::EmptyMetadata, Attestation, AttestationShufflingId, AttesterSlashing,
BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -2032,11 +2034,11 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
query: api_types::AttestationPoolQuery| {
task_spawner.blocking_response_task(Priority::P1, move || {
let query_filter = |data: &AttestationData| {
let query_filter = |data: &AttestationData, committee_indices: HashSet<u64>| {
query.slot.is_none_or(|slot| slot == data.slot)
&& query
.committee_index
.is_none_or(|index| index == data.index)
.is_none_or(|index| committee_indices.contains(&index))
};

let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
Expand All @@ -2045,7 +2047,9 @@ pub fn serve<T: BeaconChainTypes>(
.naive_aggregation_pool
.read()
.iter()
.filter(|&att| query_filter(att.data()))
.filter(|&att| {
query_filter(att.data(), att.get_committee_indices_map())
})
.cloned(),
);
// Use the current slot to find the fork version, and convert all messages to the
Expand Down
98 changes: 95 additions & 3 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use http_api::{
use lighthouse_network::{types::SyncState, Enr, EnrExt, PeerId};
use logging::test_logger;
use network::NetworkReceivers;
use operation_pool::attestation_storage::CheckpointKey;
use proto_array::ExecutionStatus;
use sensitive_url::SensitiveUrl;
use slot_clock::SlotClock;
Expand Down Expand Up @@ -2119,7 +2120,7 @@ impl ApiTester {
self
}

pub async fn test_get_beacon_pool_attestations(self) -> Self {
pub async fn test_get_beacon_pool_attestations(self) {
let result = self
.client
.get_beacon_pool_attestations_v1(None, None)
Expand All @@ -2138,9 +2139,80 @@ impl ApiTester {
.await
.unwrap()
.data;

assert_eq!(result, expected);

self
let result_committee_index_filtered = self
.client
.get_beacon_pool_attestations_v1(None, Some(0))
.await
.unwrap()
.data;

let expected_committee_index_filtered = expected
.clone()
.into_iter()
.filter(|att| att.get_committee_indices_map().contains(&0))
.collect::<Vec<_>>();

assert_eq!(
result_committee_index_filtered,
expected_committee_index_filtered
);

let result_committee_index_filtered = self
.client
.get_beacon_pool_attestations_v1(None, Some(1))
.await
.unwrap()
.data;

let expected_committee_index_filtered = expected
.clone()
.into_iter()
.filter(|att| att.get_committee_indices_map().contains(&1))
.collect::<Vec<_>>();

assert_eq!(
result_committee_index_filtered,
expected_committee_index_filtered
);

let fork_name = self
.harness
.chain
.spec
.fork_name_at_slot::<E>(self.harness.chain.slot().unwrap());

// aggregate electra attestations
if fork_name.electra_enabled() {
// Take and drop the lock in a block to avoid clippy complaining
// about taking locks across await points
{
let mut all_attestations = self.chain.op_pool.attestations.write();
let (prev_epoch_key, curr_epoch_key) =
CheckpointKey::keys_for_state(&self.harness.get_current_state());
all_attestations.aggregate_across_committees(prev_epoch_key);
all_attestations.aggregate_across_committees(curr_epoch_key);
}
let result_committee_index_filtered = self
.client
.get_beacon_pool_attestations_v2(None, Some(0))
.await
.unwrap()
.data;
let mut expected = self.chain.op_pool.get_all_attestations();
expected.extend(self.chain.naive_aggregation_pool.read().iter().cloned());
let expected_committee_index_filtered = expected
.clone()
.into_iter()
.filter(|att| att.get_committee_indices_map().contains(&0))
.collect::<Vec<_>>();
assert_eq!(
result_committee_index_filtered,
expected_committee_index_filtered
);
}
}

pub async fn test_post_beacon_pool_attester_slashings_valid_v1(mut self) -> Self {
Expand Down Expand Up @@ -6463,10 +6535,30 @@ async fn beacon_get_blocks() {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_get_pools() {
async fn test_beacon_pool_attestations_electra() {
let mut config = ApiTesterConfig::default();
config.spec.altair_fork_epoch = Some(Epoch::new(0));
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
config.spec.capella_fork_epoch = Some(Epoch::new(0));
config.spec.deneb_fork_epoch = Some(Epoch::new(0));
config.spec.electra_fork_epoch = Some(Epoch::new(0));
ApiTester::new_from_config(config)
.await
.test_get_beacon_pool_attestations()
.await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_beacon_pool_attestations_base() {
ApiTester::new()
.await
.test_get_beacon_pool_attestations()
.await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_get_pools() {
ApiTester::new()
.await
.test_get_beacon_pool_attester_slashings()
.await
Expand Down
20 changes: 18 additions & 2 deletions beacon_node/operation_pool/src/attestation_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::AttestationStats;
use itertools::Itertools;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use types::{
attestation::{AttestationBase, AttestationElectra},
superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector,
Expand Down Expand Up @@ -119,6 +119,18 @@ impl<E: EthSpec> CompactAttestationRef<'_, E> {
}
}

pub fn get_committee_indices_map(&self) -> HashSet<u64> {
match self.indexed {
CompactIndexedAttestation::Base(_) => HashSet::from([self.data.index]),
CompactIndexedAttestation::Electra(indexed_att) => indexed_att
.committee_bits
.iter()
.enumerate()
.filter_map(|(index, bit)| if bit { Some(index as u64) } else { None })
.collect(),
}
}

pub fn clone_as_attestation(&self) -> Attestation<E> {
match self.indexed {
CompactIndexedAttestation::Base(indexed_att) => Attestation::Base(AttestationBase {
Expand Down Expand Up @@ -268,7 +280,11 @@ impl<E: EthSpec> CompactIndexedAttestationElectra<E> {
}

pub fn committee_index(&self) -> Option<u64> {
self.get_committee_indices().first().copied()
self.committee_bits
.iter()
.enumerate()
.find(|&(_, bit)| bit)
.map(|(index, _)| index as u64)
}

pub fn get_committee_indices(&self) -> Vec<u64> {
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod attestation;
mod attestation_storage;
pub mod attestation_storage;
mod attester_slashing;
mod bls_to_execution_changes;
mod max_cover;
Expand Down Expand Up @@ -47,7 +47,7 @@ type SyncContributions<E> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeCon
#[derive(Default, Debug)]
pub struct OperationPool<E: EthSpec + Default> {
/// Map from attestation ID (see below) to vectors of attestations.
attestations: RwLock<AttestationMap<E>>,
pub attestations: RwLock<AttestationMap<E>>,
/// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID.
sync_contributions: SyncContributions<E>,
/// Set of attester slashings, and the fork version they were verified against.
Expand Down Expand Up @@ -673,12 +673,12 @@ impl<E: EthSpec> OperationPool<E> {
/// This method may return objects that are invalid for block inclusion.
pub fn get_filtered_attestations<F>(&self, filter: F) -> Vec<Attestation<E>>
where
F: Fn(&AttestationData) -> bool,
F: Fn(&AttestationData, HashSet<u64>) -> bool,
{
self.attestations
.read()
.iter()
.filter(|att| filter(&att.attestation_data()))
.filter(|att| filter(&att.attestation_data(), att.get_committee_indices_map()))
.map(|att| att.clone_as_attestation())
.collect()
}
Expand Down
14 changes: 13 additions & 1 deletion consensus/types/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use derivative::Derivative;
use serde::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use ssz_types::BitVector;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use superstruct::superstruct;
use test_random_derive::TestRandom;
Expand Down Expand Up @@ -209,6 +210,13 @@ impl<E: EthSpec> Attestation<E> {
}
}

pub fn get_committee_indices_map(&self) -> HashSet<u64> {
match self {
Attestation::Base(att) => HashSet::from([att.data.index]),
Attestation::Electra(att) => att.get_committee_indices().into_iter().collect(),
}
}

pub fn is_aggregation_bits_zero(&self) -> bool {
match self {
Attestation::Base(att) => att.aggregation_bits.is_zero(),
Expand Down Expand Up @@ -292,7 +300,11 @@ impl<E: EthSpec> AttestationRef<'_, E> {

impl<E: EthSpec> AttestationElectra<E> {
pub fn committee_index(&self) -> Option<u64> {
self.get_committee_indices().first().cloned()
self.committee_bits
.iter()
.enumerate()
.find(|&(_, bit)| bit)
.map(|(index, _)| index as u64)
}

pub fn get_aggregation_bits(&self) -> Vec<u64> {
Expand Down