Skip to content

Commit 366bed3

Browse files
committed
SingleAttestation
1 parent fec502d commit 366bed3

File tree

10 files changed

+359
-23
lines changed

10 files changed

+359
-23
lines changed

beacon_node/beacon_chain/src/attestation_verification.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ use std::borrow::Cow;
6060
use strum::AsRefStr;
6161
use tree_hash::TreeHash;
6262
use types::{
63-
Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec,
64-
CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof,
65-
SignedAggregateAndProof, Slot, SubnetId,
63+
attestation::SingleAttestation, Attestation, AttestationRef, BeaconCommittee,
64+
BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256,
65+
IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId,
6666
};
6767

6868
pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations};
@@ -317,12 +317,23 @@ pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> {
317317
attestation: AttestationRef<'a, T::EthSpec>,
318318
indexed_attestation: IndexedAttestation<T::EthSpec>,
319319
subnet_id: SubnetId,
320+
validator_index: usize,
320321
}
321322

322323
impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'_, T> {
323324
pub fn into_indexed_attestation(self) -> IndexedAttestation<T::EthSpec> {
324325
self.indexed_attestation
325326
}
327+
328+
pub fn single_attestation(&self) -> SingleAttestation {
329+
// TODO(single-attestation) unwrap
330+
SingleAttestation {
331+
committee_index: self.attestation.committee_index().unwrap_or(0) as usize,
332+
attester_index: self.validator_index,
333+
data: self.attestation.data().clone(),
334+
signature: self.attestation.signature().clone(),
335+
}
336+
}
326337
}
327338

328339
/// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive
@@ -1035,6 +1046,7 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
10351046
attestation,
10361047
indexed_attestation,
10371048
subnet_id,
1049+
validator_index: validator_index as usize,
10381050
})
10391051
}
10401052

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,9 +2034,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
20342034
// This method is called for API and gossip attestations, so this covers all unaggregated attestation events
20352035
if let Some(event_handler) = self.event_handler.as_ref() {
20362036
if event_handler.has_attestation_subscribers() {
2037-
event_handler.register(EventKind::Attestation(Box::new(
2038-
v.attestation().clone_as_attestation(),
2039-
)));
2037+
let current_fork = self
2038+
.spec
2039+
.fork_name_at_slot::<T::EthSpec>(v.attestation().data().slot);
2040+
if current_fork.electra_enabled() {
2041+
event_handler.register(EventKind::SingleAttestation(Box::new(
2042+
v.single_attestation(),
2043+
)));
2044+
} else {
2045+
event_handler.register(EventKind::Attestation(Box::new(
2046+
v.attestation().clone_as_attestation(),
2047+
)));
2048+
}
20402049
}
20412050
}
20422051
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES);

beacon_node/beacon_chain/src/events.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
9090
.attestation_tx
9191
.send(kind)
9292
.map(|count| log_count("attestation", count)),
93+
EventKind::SingleAttestation(_) => self
94+
.attestation_tx
95+
.send(kind)
96+
.map(|count| log_count("attestation", count)),
9397
EventKind::Block(_) => self
9498
.block_tx
9599
.send(kind)

beacon_node/http_api/src/lib.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ use types::{
8686
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
8787
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
8888
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
89-
SyncCommitteeMessage, SyncContributionData,
89+
SyncCommitteeMessage, SyncContributionData, attestation::SingleAttestation
9090
};
9191
use validator::pubkey_to_validator_index;
9292
use version::{
@@ -1831,21 +1831,26 @@ pub fn serve<T: BeaconChainTypes>(
18311831
.and(task_spawner_filter.clone())
18321832
.and(chain_filter.clone());
18331833

1834+
let beacon_pool_path_v2 = eth_v2
1835+
.and(warp::path("beacon"))
1836+
.and(warp::path("pool"))
1837+
.and(task_spawner_filter.clone())
1838+
.and(chain_filter.clone());
1839+
18341840
// POST beacon/pool/attestations
1835-
let post_beacon_pool_attestations = beacon_pool_path_any
1841+
let post_beacon_pool_attestations = beacon_pool_path
18361842
.clone()
18371843
.and(warp::path("attestations"))
18381844
.and(warp::path::end())
18391845
.and(warp_utils::json::json())
18401846
.and(network_tx_filter.clone())
1841-
.and(reprocess_send_filter)
1847+
.and(reprocess_send_filter.clone())
18421848
.and(log_filter.clone())
18431849
.then(
18441850
// V1 and V2 are identical except V2 has a consensus version header in the request.
18451851
// We only require this header for SSZ deserialization, which isn't supported for
18461852
// this endpoint presently.
1847-
|_endpoint_version: EndpointVersion,
1848-
task_spawner: TaskSpawner<T::EthSpec>,
1853+
|task_spawner: TaskSpawner<T::EthSpec>,
18491854
chain: Arc<BeaconChain<T>>,
18501855
attestations: Vec<Attestation<T::EthSpec>>,
18511856
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
@@ -1865,6 +1870,38 @@ pub fn serve<T: BeaconChainTypes>(
18651870
},
18661871
);
18671872

1873+
let post_beacon_pool_attestations_v2 = beacon_pool_path_v2
1874+
.clone()
1875+
.and(warp::path("attestations"))
1876+
.and(warp::path::end())
1877+
.and(warp_utils::json::json())
1878+
.and(network_tx_filter.clone())
1879+
.and(reprocess_send_filter)
1880+
.and(log_filter.clone())
1881+
.then(
1882+
// V1 and V2 are identical except V2 has a consensus version header in the request.
1883+
// We only require this header for SSZ deserialization, which isn't supported for
1884+
// this endpoint presently.
1885+
|task_spawner: TaskSpawner<T::EthSpec>,
1886+
chain: Arc<BeaconChain<T>>,
1887+
attestations: Vec<SingleAttestation>,
1888+
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
1889+
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
1890+
log: Logger| async move {
1891+
let result = crate::publish_attestations::publish_single_attestations(
1892+
task_spawner,
1893+
chain,
1894+
attestations,
1895+
network_tx,
1896+
reprocess_tx,
1897+
log,
1898+
)
1899+
.await
1900+
.map(|()| warp::reply::json(&()));
1901+
convert_rejection(result).await
1902+
},
1903+
);
1904+
18681905
// GET beacon/pool/attestations?committee_index,slot
18691906
let get_beacon_pool_attestations = beacon_pool_path_any
18701907
.clone()
@@ -4732,6 +4769,7 @@ pub fn serve<T: BeaconChainTypes>(
47324769
.uor(post_beacon_blocks_v2)
47334770
.uor(post_beacon_blinded_blocks_v2)
47344771
.uor(post_beacon_pool_attestations)
4772+
.uor(post_beacon_pool_attestations_v2)
47354773
.uor(post_beacon_pool_attester_slashings)
47364774
.uor(post_beacon_pool_proposer_slashings)
47374775
.uor(post_beacon_pool_voluntary_exits)

beacon_node/http_api/src/publish_attestations.rs

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use tokio::sync::{
5050
mpsc::{Sender, UnboundedSender},
5151
oneshot,
5252
};
53-
use types::Attestation;
53+
use types::{attestation::SingleAttestation, Attestation, EthSpec};
5454

5555
// Error variants are only used in `Debug` and considered `dead_code` by the compiler.
5656
#[derive(Debug)]
@@ -82,15 +82,43 @@ fn verify_and_publish_attestation<T: BeaconChainTypes>(
8282
.verify_unaggregated_attestation_for_gossip(attestation, None)
8383
.map_err(Error::Validation)?;
8484

85-
// Publish.
86-
network_tx
87-
.send(NetworkMessage::Publish {
88-
messages: vec![PubsubMessage::Attestation(Box::new((
89-
attestation.subnet_id(),
90-
attestation.attestation().clone_as_attestation(),
91-
)))],
92-
})
93-
.map_err(|_| Error::Publication)?;
85+
match attestation.attestation() {
86+
types::AttestationRef::Base(_) => {
87+
// Publish.
88+
network_tx
89+
.send(NetworkMessage::Publish {
90+
messages: vec![PubsubMessage::Attestation(Box::new((
91+
attestation.subnet_id(),
92+
attestation.attestation().clone_as_attestation(),
93+
)))],
94+
})
95+
.map_err(|_| Error::Publication)?;
96+
}
97+
types::AttestationRef::Electra(attn) => {
98+
chain
99+
.with_committee_cache(
100+
attn.data.target.root,
101+
attn.data.slot.epoch(T::EthSpec::slots_per_epoch()),
102+
|committee_cache, _| {
103+
let committees =
104+
committee_cache.get_beacon_committees_at_slot(attn.data.slot)?;
105+
106+
let single_attestation = attn.to_single_attestation(&committees)?;
107+
108+
network_tx
109+
.send(NetworkMessage::Publish {
110+
messages: vec![PubsubMessage::SingleAttestation(Box::new((
111+
attestation.subnet_id(),
112+
single_attestation,
113+
)))],
114+
})
115+
.map_err(|_| BeaconChainError::UnableToPublish)?;
116+
Ok(())
117+
},
118+
)
119+
.map_err(|_| Error::Publication)?;
120+
}
121+
}
94122

95123
// Notify the validator monitor.
96124
chain
@@ -129,6 +157,48 @@ fn verify_and_publish_attestation<T: BeaconChainTypes>(
129157
}
130158
}
131159

160+
pub async fn publish_single_attestations<T: BeaconChainTypes>(
161+
task_spawner: TaskSpawner<T::EthSpec>,
162+
chain: Arc<BeaconChain<T>>,
163+
single_attestations: Vec<SingleAttestation>,
164+
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
165+
reprocess_send: Option<Sender<ReprocessQueueMessage>>,
166+
log: Logger,
167+
) -> Result<(), warp::Rejection> {
168+
let mut attestations = vec![];
169+
for single_attestation in single_attestations {
170+
let attestation = chain.with_committee_cache(
171+
single_attestation.data.target.root,
172+
single_attestation
173+
.data
174+
.slot
175+
.epoch(T::EthSpec::slots_per_epoch()),
176+
|committee_cache, _| {
177+
let committees =
178+
committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?;
179+
180+
let attestation = single_attestation.to_attestation::<T::EthSpec>(&committees)?;
181+
182+
Ok(attestation)
183+
},
184+
);
185+
186+
if let Ok(attestation) = attestation {
187+
attestations.push(attestation);
188+
}
189+
}
190+
191+
publish_attestations(
192+
task_spawner,
193+
chain,
194+
attestations,
195+
network_tx,
196+
reprocess_send,
197+
log,
198+
)
199+
.await
200+
}
201+
132202
pub async fn publish_attestations<T: BeaconChainTypes>(
133203
task_spawner: TaskSpawner<T::EthSpec>,
134204
chain: Arc<BeaconChain<T>>,

beacon_node/lighthouse_network/src/types/pubsub.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use snap::raw::{decompress_len, Decoder, Encoder};
66
use ssz::{Decode, Encode};
77
use std::io::{Error, ErrorKind};
88
use std::sync::Arc;
9+
use types::attestation::SingleAttestation;
910
use types::{
1011
Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase,
1112
AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec,
@@ -29,6 +30,8 @@ pub enum PubsubMessage<E: EthSpec> {
2930
AggregateAndProofAttestation(Box<SignedAggregateAndProof<E>>),
3031
/// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id.
3132
Attestation(Box<(SubnetId, Attestation<E>)>),
33+
/// Gossipsub message providing notification of a `SingleAttestation`` with its shard id.
34+
SingleAttestation(Box<(SubnetId, SingleAttestation)>),
3235
/// Gossipsub message providing notification of a voluntary exit.
3336
VoluntaryExit(Box<SignedVoluntaryExit>),
3437
/// Gossipsub message providing notification of a new proposer slashing.
@@ -128,6 +131,9 @@ impl<E: EthSpec> PubsubMessage<E> {
128131
PubsubMessage::Attestation(attestation_data) => {
129132
GossipKind::Attestation(attestation_data.0)
130133
}
134+
PubsubMessage::SingleAttestation(attestation_data) => {
135+
GossipKind::Attestation(attestation_data.0)
136+
}
131137
PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit,
132138
PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing,
133139
PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing,
@@ -411,6 +417,7 @@ impl<E: EthSpec> PubsubMessage<E> {
411417
PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(),
412418
PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(),
413419
PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(),
420+
PubsubMessage::SingleAttestation(data) => data.1.as_ssz_bytes(),
414421
PubsubMessage::SignedContributionAndProof(data) => data.as_ssz_bytes(),
415422
PubsubMessage::SyncCommitteeMessage(data) => data.1.as_ssz_bytes(),
416423
PubsubMessage::BlsToExecutionChange(data) => data.as_ssz_bytes(),
@@ -455,6 +462,14 @@ impl<E: EthSpec> std::fmt::Display for PubsubMessage<E> {
455462
data.1.data().slot,
456463
data.1.committee_index(),
457464
),
465+
PubsubMessage::SingleAttestation(data) => write!(
466+
f,
467+
"SingleAttestation: subnet_id: {}, attestation_slot: {}, committee_index: {:?}, attester_index: {:?}",
468+
*data.0,
469+
data.1.data.slot,
470+
data.1.committee_index,
471+
data.1.attester_index,
472+
),
458473
PubsubMessage::VoluntaryExit(_data) => write!(f, "Voluntary Exit"),
459474
PubsubMessage::ProposerSlashing(_data) => write!(f, "Proposer Slashing"),
460475
PubsubMessage::AttesterSlashing(_data) => write!(f, "Attester Slashing"),

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::sync::manager::BlockProcessType;
22
use crate::sync::SamplingId;
33
use crate::{service::NetworkMessage, sync::manager::SyncMessage};
4+
use attestation::SingleAttestation;
45
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
56
use beacon_chain::block_verification_types::RpcBlock;
67
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError};
@@ -28,7 +29,7 @@ use lighthouse_network::{
2829
Client, MessageId, NetworkGlobals, PeerId, PubsubMessage,
2930
};
3031
use rand::prelude::SliceRandom;
31-
use slog::{debug, error, trace, warn, Logger};
32+
use slog::{debug, error, info, trace, warn, Logger};
3233
use slot_clock::ManualSlotClock;
3334
use std::path::PathBuf;
3435
use std::sync::Arc;
@@ -84,6 +85,49 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
8485
.map_err(Into::into)
8586
}
8687

88+
/// Create a new `Work` event for some `SingleAttestation`.
89+
pub fn send_single_attestation(
90+
self: &Arc<Self>,
91+
message_id: MessageId,
92+
peer_id: PeerId,
93+
single_attestation: SingleAttestation,
94+
subnet_id: SubnetId,
95+
should_import: bool,
96+
seen_timestamp: Duration,
97+
) -> Result<(), Error<T::EthSpec>> {
98+
info!(self.log, "SENDING A SINGLE ATTESTATION");
99+
let result = self.chain.with_committee_cache(
100+
single_attestation.data.target.root,
101+
single_attestation
102+
.data
103+
.slot
104+
.epoch(T::EthSpec::slots_per_epoch()),
105+
|committee_cache, _| {
106+
let committees =
107+
committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?;
108+
109+
let attestation = single_attestation.to_attestation(&committees)?;
110+
111+
Ok(self.send_unaggregated_attestation(
112+
message_id.clone(),
113+
peer_id,
114+
attestation,
115+
subnet_id,
116+
should_import,
117+
seen_timestamp,
118+
))
119+
},
120+
);
121+
122+
match result {
123+
Ok(result) => result,
124+
Err(e) => {
125+
warn!(self.log, "Failed to send SingleAttestation"; "error" => ?e);
126+
Ok(())
127+
}
128+
}
129+
}
130+
87131
/// Create a new `Work` event for some unaggregated attestation.
88132
pub fn send_unaggregated_attestation(
89133
self: &Arc<Self>,

0 commit comments

Comments
 (0)