Skip to content

Fork aware max values in rpc #6847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 30 additions & 31 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,40 +576,26 @@ fn handle_rpc_request<E: EthSpec>(
BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blocks as usize,
spec.max_request_blocks(current_fork),
)?,
}),
))),
SupportedProtocol::BlocksByRootV1 => Ok(Some(RequestType::BlocksByRoot(
BlocksByRootRequest::V1(BlocksByRootRequestV1 {
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blocks as usize,
spec.max_request_blocks(current_fork),
)?,
}),
))),
SupportedProtocol::BlobsByRangeV1 => {
let req = BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?;
let max_requested_blobs = req
.count
.saturating_mul(spec.max_blobs_per_block_by_fork(current_fork));
// TODO(pawan): change this to max_blobs_per_rpc_request in the alpha10 PR
if max_requested_blobs > spec.max_request_blob_sidecars {
return Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
format!(
"requested exceeded limit. allowed: {}, requested: {}",
spec.max_request_blob_sidecars, max_requested_blobs
),
));
}
Ok(Some(RequestType::BlobsByRange(req)))
}
SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::BlobsByRootV1 => {
Ok(Some(RequestType::BlobsByRoot(BlobsByRootRequest {
blob_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blob_sidecars as usize,
spec.max_request_blob_sidecars(current_fork),
)?,
})))
}
Expand Down Expand Up @@ -1097,21 +1083,21 @@ mod tests {
}
}

fn bbroot_request_v1(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()], spec)
fn bbroot_request_v1(fork_name: ForkName) -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()], &fork_context(fork_name))
}

fn bbroot_request_v2(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()], spec)
fn bbroot_request_v2(fork_name: ForkName) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()], &fork_context(fork_name))
}

fn blbroot_request(spec: &ChainSpec) -> BlobsByRootRequest {
fn blbroot_request(fork_name: ForkName) -> BlobsByRootRequest {
BlobsByRootRequest::new(
vec![BlobIdentifier {
block_root: Hash256::zero(),
index: 0,
}],
spec,
&fork_context(fork_name),
)
}

Expand Down Expand Up @@ -1909,29 +1895,42 @@ mod tests {

#[test]
fn test_encode_then_decode_request() {
let chain_spec = Spec::default_spec();
let fork_context = fork_context(ForkName::Electra);
let chain_spec = fork_context.spec.clone();

let requests: &[RequestType<Spec>] = &[
RequestType::Ping(ping_message()),
RequestType::Status(status_message()),
RequestType::Goodbye(GoodbyeReason::Fault),
RequestType::BlocksByRange(bbrange_request_v1()),
RequestType::BlocksByRange(bbrange_request_v2()),
RequestType::BlocksByRoot(bbroot_request_v1(&chain_spec)),
RequestType::BlocksByRoot(bbroot_request_v2(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v1()),
RequestType::BlobsByRange(blbrange_request()),
RequestType::BlobsByRoot(blbroot_request(&chain_spec)),
RequestType::DataColumnsByRange(dcbrange_request()),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v2()),
];

for req in requests.iter() {
for fork_name in ForkName::list_all() {
encode_then_decode_request(req.clone(), fork_name, &chain_spec);
}
}

// Request types that have different length limits depending on the fork
// Handled separately to have consistent `ForkName` across request and responses
let fork_dependent_requests = |fork_name| {
[
RequestType::BlobsByRoot(blbroot_request(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v1(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v2(fork_name)),
]
};
for fork_name in ForkName::list_all() {
let requests = fork_dependent_requests(fork_name);
for req in requests {
encode_then_decode_request(req.clone(), fork_name, &chain_spec);
}
}
}

/// Test a malicious snappy encoding for a V1 `Status` message where the attacker
Expand Down
39 changes: 39 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,45 @@ where
}

let (req, substream) = substream;
let current_fork = self.fork_context.current_fork();
let spec = &self.fork_context.spec;

match &req {
RequestType::BlocksByRange(request) => {
let max_allowed = spec.max_request_blocks(current_fork) as u64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we ever thought about making the limit dependent on the protocol version or the active fork at the time of the request.slot range? It seems like we could inadvertently reject requests from clients that use different limits for pre-Deneb blocks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Michael, can you elaborate on what you mean by request.slot range?
If it's pre Deneb max_request_blocks will return the value for pre Deneb right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are always using the current fork according to wall clock time, so if the request is for pre-Deneb data but the current fork is post-Deneb, we will use the Deneb value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't do it that way to keep things consistent between byroot and byrange requests (since we cannot know the slot when we get root requests).
I asked other CL teams how they handle it and it seems like prysm and lodestar do by slot, but nimbus does it by current fork.
In practice, I don't think this matters that much because all clients request 1/2 epochs per by_range request which doesn't hit the limits.

if *request.count() > max_allowed {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: Protocol::BlocksByRange,
error: RPCError::InvalidData(format!(
"requested exceeded limit. allowed: {}, requested: {}",
max_allowed,
request.count()
)),
}));
return self.shutdown(None);
}
}
RequestType::BlobsByRange(request) => {
let max_requested_blobs = request
.count
.saturating_mul(spec.max_blobs_per_block_by_fork(current_fork));
let max_allowed = spec.max_request_blob_sidecars(current_fork) as u64;
if max_requested_blobs > max_allowed {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: Protocol::BlobsByRange,
error: RPCError::InvalidData(format!(
"requested exceeded limit. allowed: {}, requested: {}",
max_allowed, max_requested_blobs
)),
}));
return self.shutdown(None);
}
}
_ => {}
};

let max_responses =
req.max_responses(self.fork_context.current_fork(), &self.fork_context.spec);

Expand Down
26 changes: 16 additions & 10 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use strum::IntoStaticStr;
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::ForkName;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, Slot,
};
use types::{ForkContext, ForkName};

/// Maximum length of error message.
pub type MaxErrorLen = U256;
Expand Down Expand Up @@ -420,15 +420,19 @@ pub struct BlocksByRootRequest {
}

impl BlocksByRootRequest {
pub fn new(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
pub fn new(block_roots: Vec<Hash256>, fork_context: &ForkContext) -> Self {
let max_request_blocks = fork_context
.spec
.max_request_blocks(fork_context.current_fork());
let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks);
Self::V2(BlocksByRootRequestV2 { block_roots })
}

pub fn new_v1(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
pub fn new_v1(block_roots: Vec<Hash256>, fork_context: &ForkContext) -> Self {
let max_request_blocks = fork_context
.spec
.max_request_blocks(fork_context.current_fork());
let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks);
Self::V1(BlocksByRootRequestV1 { block_roots })
}
}
Expand All @@ -441,9 +445,11 @@ pub struct BlobsByRootRequest {
}

impl BlobsByRootRequest {
pub fn new(blob_ids: Vec<BlobIdentifier>, spec: &ChainSpec) -> Self {
let blob_ids =
RuntimeVariableList::from_vec(blob_ids, spec.max_request_blob_sidecars as usize);
pub fn new(blob_ids: Vec<BlobIdentifier>, fork_context: &ForkContext) -> Self {
let max_request_blob_sidecars = fork_context
.spec
.max_request_blob_sidecars(fork_context.current_fork());
let blob_ids = RuntimeVariableList::from_vec(blob_ids, max_request_blob_sidecars);
Self { blob_ids }
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl<E: EthSpec> Network<E> {

let max_topics = ctx.chain_spec.attestation_subnet_count as usize
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
+ ctx.chain_spec.blob_sidecar_subnet_count_electra as usize
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
+ BASE_CORE_TOPICS.len()
+ ALTAIR_CORE_TOPICS.len()
Expand Down
6 changes: 1 addition & 5 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,7 @@ pub(crate) fn create_whitelist_filter(
for id in 0..sync_committee_subnet_count {
add(SyncCommitteeMessage(SyncSubnetId::new(id)));
}
let blob_subnet_count = if spec.electra_fork_epoch.is_some() {
spec.blob_sidecar_subnet_count_electra
} else {
spec.blob_sidecar_subnet_count
};
let blob_subnet_count = spec.blob_sidecar_subnet_count_max();
for id in 0..blob_subnet_count {
add(BlobSidecar(id));
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
ForkName::Deneb => {
// All of deneb blob topics are core topics
let mut deneb_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count {
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Deneb) {
deneb_blob_topics.push(GossipKind::BlobSidecar(i));
}
deneb_blob_topics
}
ForkName::Electra => {
// All of electra blob topics are core topics
let mut electra_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count_electra {
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Electra) {
electra_blob_topics.push(GossipKind::BlobSidecar(i));
}
electra_blob_topics
Expand Down
60 changes: 33 additions & 27 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::time::sleep;
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec,
EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkContext, ForkName, Hash256, MinimalEthSpec,
Signature, SignedBeaconBlock, Slot,
RuntimeVariableList, Signature, SignedBeaconBlock, Slot,
};

type E = MinimalEthSpec;
Expand Down Expand Up @@ -810,17 +810,20 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
.await;

// BlocksByRoot Request
let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::new(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
&spec,
));
let rpc_request =
RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: RuntimeVariableList::from_vec(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
spec.max_request_blocks_upper_bound(),
),
}));

// BlocksByRoot Response
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
Expand Down Expand Up @@ -953,21 +956,24 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
.await;

// BlocksByRoot Request
let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::new(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
&spec,
));
let rpc_request =
RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: RuntimeVariableList::from_vec(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
spec.max_request_blocks_upper_bound(),
),
}));

// BlocksByRoot Response
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
Expand Down
18 changes: 0 additions & 18 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,24 +659,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"start_slot" => req.start_slot(),
);

// Should not send more than max request blocks
let max_request_size =
self.chain
.epoch()
.map_or(self.chain.spec.max_request_blocks, |epoch| {
if self.chain.spec.fork_name_at_epoch(epoch).deneb_enabled() {
self.chain.spec.max_request_blocks_deneb
} else {
self.chain.spec.max_request_blocks
}
});
if *req.count() > max_request_size {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded max size",
));
}

let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock};

/// Handles messages from the network and routes them to the appropriate service to be handled.
pub struct Router<T: BeaconChainTypes> {
Expand Down Expand Up @@ -90,6 +90,7 @@ impl<T: BeaconChainTypes> Router<T> {
invalid_block_storage: InvalidBlockStorage,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
fork_context: Arc<ForkContext>,
log: slog::Logger,
) -> Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>, String> {
let message_handler_log = log.new(o!("service"=> "router"));
Expand Down Expand Up @@ -122,6 +123,7 @@ impl<T: BeaconChainTypes> Router<T> {
network_send.clone(),
network_beacon_processor.clone(),
sync_recv,
fork_context,
sync_logger,
);

Expand Down
Loading