Skip to content

Commit e3145af

Browse files
authored
Merge of #7298
2 parents 6dbf6e4 + 250b5bc commit e3145af

File tree

5 files changed

+143
-19
lines changed

5 files changed

+143
-19
lines changed

beacon_node/http_api/src/lib.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use slog::{crit, debug, error, info, warn, Logger};
6868
use slot_clock::SlotClock;
6969
use ssz::Encode;
7070
pub use state_id::StateId;
71+
use std::collections::HashSet;
7172
use std::future::Future;
7273
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7374
use std::path::PathBuf;
@@ -85,13 +86,14 @@ use tokio_stream::{
8586
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
8687
StreamExt,
8788
};
89+
use types::AttestationData;
8890
use types::{
89-
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
90-
AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset,
91-
Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData,
92-
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
93-
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
94-
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
91+
fork_versioned_response::EmptyMetadata, Attestation, AttestationShufflingId, AttesterSlashing,
92+
BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
93+
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
94+
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
95+
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
96+
SyncCommitteeMessage, SyncContributionData,
9597
};
9698
use validator::pubkey_to_validator_index;
9799
use version::{
@@ -2032,11 +2034,11 @@ pub fn serve<T: BeaconChainTypes>(
20322034
chain: Arc<BeaconChain<T>>,
20332035
query: api_types::AttestationPoolQuery| {
20342036
task_spawner.blocking_response_task(Priority::P1, move || {
2035-
let query_filter = |data: &AttestationData| {
2037+
let query_filter = |data: &AttestationData, committee_indices: HashSet<u64>| {
20362038
query.slot.is_none_or(|slot| slot == data.slot)
20372039
&& query
20382040
.committee_index
2039-
.is_none_or(|index| index == data.index)
2041+
.is_none_or(|index| committee_indices.contains(&index))
20402042
};
20412043

20422044
let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
@@ -2045,7 +2047,9 @@ pub fn serve<T: BeaconChainTypes>(
20452047
.naive_aggregation_pool
20462048
.read()
20472049
.iter()
2048-
.filter(|&att| query_filter(att.data()))
2050+
.filter(|&att| {
2051+
query_filter(att.data(), att.get_committee_indices_map())
2052+
})
20492053
.cloned(),
20502054
);
20512055
// Use the current slot to find the fork version, and convert all messages to the

beacon_node/http_api/tests/tests.rs

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use http_api::{
2828
use lighthouse_network::{types::SyncState, Enr, EnrExt, PeerId};
2929
use logging::test_logger;
3030
use network::NetworkReceivers;
31+
use operation_pool::attestation_storage::CheckpointKey;
3132
use proto_array::ExecutionStatus;
3233
use sensitive_url::SensitiveUrl;
3334
use slot_clock::SlotClock;
@@ -2119,7 +2120,7 @@ impl ApiTester {
21192120
self
21202121
}
21212122

2122-
pub async fn test_get_beacon_pool_attestations(self) -> Self {
2123+
pub async fn test_get_beacon_pool_attestations(self) {
21232124
let result = self
21242125
.client
21252126
.get_beacon_pool_attestations_v1(None, None)
@@ -2138,9 +2139,80 @@ impl ApiTester {
21382139
.await
21392140
.unwrap()
21402141
.data;
2142+
21412143
assert_eq!(result, expected);
21422144

2143-
self
2145+
let result_committee_index_filtered = self
2146+
.client
2147+
.get_beacon_pool_attestations_v1(None, Some(0))
2148+
.await
2149+
.unwrap()
2150+
.data;
2151+
2152+
let expected_committee_index_filtered = expected
2153+
.clone()
2154+
.into_iter()
2155+
.filter(|att| att.get_committee_indices_map().contains(&0))
2156+
.collect::<Vec<_>>();
2157+
2158+
assert_eq!(
2159+
result_committee_index_filtered,
2160+
expected_committee_index_filtered
2161+
);
2162+
2163+
let result_committee_index_filtered = self
2164+
.client
2165+
.get_beacon_pool_attestations_v1(None, Some(1))
2166+
.await
2167+
.unwrap()
2168+
.data;
2169+
2170+
let expected_committee_index_filtered = expected
2171+
.clone()
2172+
.into_iter()
2173+
.filter(|att| att.get_committee_indices_map().contains(&1))
2174+
.collect::<Vec<_>>();
2175+
2176+
assert_eq!(
2177+
result_committee_index_filtered,
2178+
expected_committee_index_filtered
2179+
);
2180+
2181+
let fork_name = self
2182+
.harness
2183+
.chain
2184+
.spec
2185+
.fork_name_at_slot::<E>(self.harness.chain.slot().unwrap());
2186+
2187+
// aggregate electra attestations
2188+
if fork_name.electra_enabled() {
2189+
// Take and drop the lock in a block to avoid clippy complaining
2190+
// about taking locks across await points
2191+
{
2192+
let mut all_attestations = self.chain.op_pool.attestations.write();
2193+
let (prev_epoch_key, curr_epoch_key) =
2194+
CheckpointKey::keys_for_state(&self.harness.get_current_state());
2195+
all_attestations.aggregate_across_committees(prev_epoch_key);
2196+
all_attestations.aggregate_across_committees(curr_epoch_key);
2197+
}
2198+
let result_committee_index_filtered = self
2199+
.client
2200+
.get_beacon_pool_attestations_v2(None, Some(0))
2201+
.await
2202+
.unwrap()
2203+
.data;
2204+
let mut expected = self.chain.op_pool.get_all_attestations();
2205+
expected.extend(self.chain.naive_aggregation_pool.read().iter().cloned());
2206+
let expected_committee_index_filtered = expected
2207+
.clone()
2208+
.into_iter()
2209+
.filter(|att| att.get_committee_indices_map().contains(&0))
2210+
.collect::<Vec<_>>();
2211+
assert_eq!(
2212+
result_committee_index_filtered,
2213+
expected_committee_index_filtered
2214+
);
2215+
}
21442216
}
21452217

21462218
pub async fn test_post_beacon_pool_attester_slashings_valid_v1(mut self) -> Self {
@@ -6463,10 +6535,30 @@ async fn beacon_get_blocks() {
64636535
}
64646536

64656537
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
6466-
async fn beacon_get_pools() {
6538+
async fn test_beacon_pool_attestations_electra() {
6539+
let mut config = ApiTesterConfig::default();
6540+
config.spec.altair_fork_epoch = Some(Epoch::new(0));
6541+
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
6542+
config.spec.capella_fork_epoch = Some(Epoch::new(0));
6543+
config.spec.deneb_fork_epoch = Some(Epoch::new(0));
6544+
config.spec.electra_fork_epoch = Some(Epoch::new(0));
6545+
ApiTester::new_from_config(config)
6546+
.await
6547+
.test_get_beacon_pool_attestations()
6548+
.await;
6549+
}
6550+
6551+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
6552+
async fn test_beacon_pool_attestations_base() {
64676553
ApiTester::new()
64686554
.await
64696555
.test_get_beacon_pool_attestations()
6556+
.await;
6557+
}
6558+
6559+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
6560+
async fn beacon_get_pools() {
6561+
ApiTester::new()
64706562
.await
64716563
.test_get_beacon_pool_attester_slashings()
64726564
.await

beacon_node/operation_pool/src/attestation_storage.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::AttestationStats;
22
use itertools::Itertools;
3-
use std::collections::{BTreeMap, HashMap};
3+
use std::collections::{BTreeMap, HashMap, HashSet};
44
use types::{
55
attestation::{AttestationBase, AttestationElectra},
66
superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector,
@@ -119,6 +119,18 @@ impl<E: EthSpec> CompactAttestationRef<'_, E> {
119119
}
120120
}
121121

122+
pub fn get_committee_indices_map(&self) -> HashSet<u64> {
123+
match self.indexed {
124+
CompactIndexedAttestation::Base(_) => HashSet::from([self.data.index]),
125+
CompactIndexedAttestation::Electra(indexed_att) => indexed_att
126+
.committee_bits
127+
.iter()
128+
.enumerate()
129+
.filter_map(|(index, bit)| if bit { Some(index as u64) } else { None })
130+
.collect(),
131+
}
132+
}
133+
122134
pub fn clone_as_attestation(&self) -> Attestation<E> {
123135
match self.indexed {
124136
CompactIndexedAttestation::Base(indexed_att) => Attestation::Base(AttestationBase {
@@ -268,7 +280,11 @@ impl<E: EthSpec> CompactIndexedAttestationElectra<E> {
268280
}
269281

270282
pub fn committee_index(&self) -> Option<u64> {
271-
self.get_committee_indices().first().copied()
283+
self.committee_bits
284+
.iter()
285+
.enumerate()
286+
.find(|&(_, bit)| bit)
287+
.map(|(index, _)| index as u64)
272288
}
273289

274290
pub fn get_committee_indices(&self) -> Vec<u64> {

beacon_node/operation_pool/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
mod attestation;
2-
mod attestation_storage;
2+
pub mod attestation_storage;
33
mod attester_slashing;
44
mod bls_to_execution_changes;
55
mod max_cover;
@@ -47,7 +47,7 @@ type SyncContributions<E> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeCon
4747
#[derive(Default, Debug)]
4848
pub struct OperationPool<E: EthSpec + Default> {
4949
/// Map from attestation ID (see below) to vectors of attestations.
50-
attestations: RwLock<AttestationMap<E>>,
50+
pub attestations: RwLock<AttestationMap<E>>,
5151
/// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID.
5252
sync_contributions: SyncContributions<E>,
5353
/// Set of attester slashings, and the fork version they were verified against.
@@ -673,12 +673,12 @@ impl<E: EthSpec> OperationPool<E> {
673673
/// This method may return objects that are invalid for block inclusion.
674674
pub fn get_filtered_attestations<F>(&self, filter: F) -> Vec<Attestation<E>>
675675
where
676-
F: Fn(&AttestationData) -> bool,
676+
F: Fn(&AttestationData, HashSet<u64>) -> bool,
677677
{
678678
self.attestations
679679
.read()
680680
.iter()
681-
.filter(|att| filter(&att.attestation_data()))
681+
.filter(|att| filter(&att.attestation_data(), att.get_committee_indices_map()))
682682
.map(|att| att.clone_as_attestation())
683683
.collect()
684684
}

consensus/types/src/attestation.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use derivative::Derivative;
55
use serde::{Deserialize, Serialize};
66
use ssz_derive::{Decode, Encode};
77
use ssz_types::BitVector;
8+
use std::collections::HashSet;
89
use std::hash::{Hash, Hasher};
910
use superstruct::superstruct;
1011
use test_random_derive::TestRandom;
@@ -209,6 +210,13 @@ impl<E: EthSpec> Attestation<E> {
209210
}
210211
}
211212

213+
pub fn get_committee_indices_map(&self) -> HashSet<u64> {
214+
match self {
215+
Attestation::Base(att) => HashSet::from([att.data.index]),
216+
Attestation::Electra(att) => att.get_committee_indices().into_iter().collect(),
217+
}
218+
}
219+
212220
pub fn is_aggregation_bits_zero(&self) -> bool {
213221
match self {
214222
Attestation::Base(att) => att.aggregation_bits.is_zero(),
@@ -292,7 +300,11 @@ impl<E: EthSpec> AttestationRef<'_, E> {
292300

293301
impl<E: EthSpec> AttestationElectra<E> {
294302
pub fn committee_index(&self) -> Option<u64> {
295-
self.get_committee_indices().first().cloned()
303+
self.committee_bits
304+
.iter()
305+
.enumerate()
306+
.find(|&(_, bit)| bit)
307+
.map(|(index, _)| index as u64)
296308
}
297309

298310
pub fn get_aggregation_bits(&self) -> Vec<u64> {

0 commit comments

Comments
 (0)