Skip to content

Commit 2f2dd9a

Browse files
committed
Wire blocks by range coupling to new request types
1 parent f0987ee commit 2f2dd9a

File tree

6 files changed

+249
-381
lines changed

6 files changed

+249
-381
lines changed

beacon_node/lighthouse_network/src/rpc/self_limiter.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ mod tests {
212212
use crate::rpc::rate_limiter::Quota;
213213
use crate::rpc::self_limiter::SelfRateLimiter;
214214
use crate::rpc::{Ping, Protocol, RequestType};
215-
use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId};
215+
use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId};
216216
use libp2p::PeerId;
217217
use std::time::Duration;
218218
use types::MainnetEthSpec;
@@ -228,12 +228,16 @@ mod tests {
228228
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
229229
SelfRateLimiter::new(config, log).unwrap();
230230
let peer_id = PeerId::random();
231+
let lookup_id = 0;
231232

232233
for i in 1..=5u32 {
233234
let _ = limiter.allows(
234235
peer_id,
235-
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
236-
id: i,
236+
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
237+
id: SingleLookupReqId {
238+
lookup_id,
239+
req_id: i,
240+
},
237241
})),
238242
RequestType::Ping(Ping { data: i as u64 }),
239243
);
@@ -251,9 +255,9 @@ mod tests {
251255
for i in 2..=5u32 {
252256
assert!(matches!(
253257
iter.next().unwrap().request_id,
254-
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
255-
id,
256-
})) if id == i
258+
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
259+
id: SingleLookupReqId { req_id, .. },
260+
})) if req_id == i,
257261
));
258262
}
259263

@@ -276,9 +280,9 @@ mod tests {
276280
for i in 3..=5 {
277281
assert!(matches!(
278282
iter.next().unwrap().request_id,
279-
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
280-
id
281-
})) if id == i
283+
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
284+
id: SingleLookupReqId { req_id, .. },
285+
})) if req_id == i,
282286
));
283287
}
284288

beacon_node/lighthouse_network/src/service/api_types.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ pub enum SyncRequestId {
3131
SingleBlob { id: SingleLookupReqId },
3232
/// Request searching for a set of data columns given a hash and list of column indices.
3333
DataColumnsByRoot(DataColumnsByRootRequestId),
34-
/// Range request that is composed by both a block range request and a blob range request.
35-
RangeBlockAndBlobs { id: Id },
3634
/// Blocks by range request
3735
BlocksByRange(BlocksByRangeRequestId),
3836
/// Blobs by range request
@@ -52,17 +50,25 @@ pub struct DataColumnsByRootRequestId {
5250
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
5351
pub struct BlocksByRangeRequestId {
5452
pub id: Id,
55-
pub requester: RangeRequestId,
53+
pub requester: ComponentsByRangeRequestId,
5654
}
5755

5856
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
5957
pub struct BlobsByRangeRequestId {
6058
pub id: Id,
61-
pub requester: RangeRequestId,
59+
pub requester: ComponentsByRangeRequestId,
6260
}
6361

6462
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
6563
pub struct DataColumnsByRangeRequestId {
64+
pub id: Id,
65+
pub requester: ComponentsByRangeRequestId,
66+
}
67+
68+
/// Block components by range request for range sync. Includes an ID for downstream consumers to
69+
/// handle retries and tie all their sub requests together.
70+
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
71+
pub struct ComponentsByRangeRequestId {
6672
pub id: Id,
6773
pub requester: RangeRequestId,
6874
}

beacon_node/network/src/router.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ impl<T: BeaconChainTypes> Router<T> {
600600
) {
601601
let request_id = match request_id {
602602
AppRequestId::Sync(sync_id) => match sync_id {
603-
id @ SyncRequestId::RangeBlockAndBlobs { .. } => id,
603+
id @ SyncRequestId::BlocksByRange { .. } => id,
604604
other => {
605605
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
606606
return;

beacon_node/network/src/sync/block_sidecar_coupling.rs

Lines changed: 60 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use beacon_chain::{
22
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
33
};
4-
use lighthouse_network::PeerId;
54
use ssz_types::VariableList;
65
use std::{
76
collections::{HashMap, VecDeque},
@@ -29,16 +28,13 @@ pub struct RangeBlockComponentsRequest<E: EthSpec> {
2928
/// Used to determine if the number of data columns stream termination this accumulator should
3029
/// wait for. This may be less than the number of `expects_custody_columns` due to request batching.
3130
num_custody_column_requests: Option<usize>,
32-
/// The peers the request was made to.
33-
pub(crate) peer_ids: Vec<PeerId>,
3431
}
3532

3633
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
3734
pub fn new(
3835
expects_blobs: bool,
3936
expects_custody_columns: Option<Vec<ColumnIndex>>,
4037
num_custody_column_requests: Option<usize>,
41-
peer_ids: Vec<PeerId>,
4238
) -> Self {
4339
Self {
4440
blocks: <_>::default(),
@@ -50,38 +46,31 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
5046
expects_blobs,
5147
expects_custody_columns,
5248
num_custody_column_requests,
53-
peer_ids,
5449
}
5550
}
5651

57-
// TODO: This function should be deprecated when simplying the retry mechanism of this range
58-
// requests.
59-
pub fn get_requirements(&self) -> (bool, Option<Vec<ColumnIndex>>) {
60-
(self.expects_blobs, self.expects_custody_columns.clone())
61-
}
62-
63-
pub fn add_block_response(&mut self, block_opt: Option<Arc<SignedBeaconBlock<E>>>) {
64-
match block_opt {
65-
Some(block) => self.blocks.push_back(block),
66-
None => self.is_blocks_stream_terminated = true,
52+
pub fn add_blocks(&mut self, blocks: Vec<Arc<SignedBeaconBlock<E>>>) {
53+
for block in blocks {
54+
self.blocks.push_back(block);
6755
}
56+
self.is_blocks_stream_terminated = true;
6857
}
6958

70-
pub fn add_sidecar_response(&mut self, sidecar_opt: Option<Arc<BlobSidecar<E>>>) {
71-
match sidecar_opt {
72-
Some(sidecar) => self.blobs.push_back(sidecar),
73-
None => self.is_sidecars_stream_terminated = true,
59+
pub fn add_blobs(&mut self, blobs: Vec<Arc<BlobSidecar<E>>>) {
60+
for blob in blobs {
61+
self.blobs.push_back(blob);
7462
}
63+
self.is_sidecars_stream_terminated = true;
7564
}
7665

77-
pub fn add_data_column(&mut self, column_opt: Option<Arc<DataColumnSidecar<E>>>) {
78-
match column_opt {
79-
Some(column) => self.data_columns.push_back(column),
80-
// TODO(das): this mechanism is dangerous, if somehow there are two requests for the
81-
// same column index it can terminate early. This struct should track that all requests
82-
// for all custody columns terminate.
83-
None => self.custody_columns_streams_terminated += 1,
66+
pub fn add_custody_columns(&mut self, columns: Vec<Arc<DataColumnSidecar<E>>>) {
67+
for column in columns {
68+
self.data_columns.push_back(column);
8469
}
70+
// TODO(das): this mechanism is dangerous, if somehow there are two requests for the
71+
// same column index it can terminate early. This struct should track that all requests
72+
// for all custody columns terminate.
73+
self.custody_columns_streams_terminated += 1;
8574
}
8675

8776
pub fn into_responses(self, spec: &ChainSpec) -> Result<Vec<RpcBlock<E>>, String> {
@@ -249,14 +238,15 @@ mod tests {
249238
let mut info = RangeBlockComponentsRequest::<E>::new(false, None, None, vec![peer_id]);
250239
let mut rng = XorShiftRng::from_seed([42; 16]);
251240
let blocks = (0..4)
252-
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
241+
.map(|_| {
242+
generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng)
243+
.0
244+
.into()
245+
})
253246
.collect::<Vec<_>>();
254247

255248
// Send blocks and complete terminate response
256-
for block in blocks {
257-
info.add_block_response(Some(block.into()));
258-
}
259-
info.add_block_response(None);
249+
info.add_blocks(blocks);
260250

261251
// Assert response is finished and RpcBlocks can be constructed
262252
assert!(info.is_finished());
@@ -271,17 +261,16 @@ mod tests {
271261
let blocks = (0..4)
272262
.map(|_| {
273263
// Always generate some blobs.
274-
generate_rand_block_and_blobs::<E>(ForkName::Deneb, NumBlobs::Number(3), &mut rng).0
264+
generate_rand_block_and_blobs::<E>(ForkName::Deneb, NumBlobs::Number(3), &mut rng)
265+
.0
266+
.into()
275267
})
276268
.collect::<Vec<_>>();
277269

278270
// Send blocks and complete terminate response
279-
for block in blocks {
280-
info.add_block_response(Some(block.into()));
281-
}
282-
info.add_block_response(None);
271+
info.add_blocks(blocks);
283272
// Expect no blobs returned
284-
info.add_sidecar_response(None);
273+
info.add_blobs(vec![]);
285274

286275
// Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned.
287276
// This makes sure we don't expect blobs here when they have expired. Checking this logic should
@@ -294,10 +283,11 @@ mod tests {
294283
fn rpc_block_with_custody_columns() {
295284
let spec = test_spec::<E>();
296285
let expects_custody_columns = vec![1, 2, 3, 4];
286+
let custody_column_request_ids = vec![0, 1, 2, 3];
297287
let mut info = RangeBlockComponentsRequest::<E>::new(
298288
false,
299289
Some(expects_custody_columns.clone()),
300-
Some(expects_custody_columns.len()),
290+
Some(custody_column_request_ids.len()),
301291
vec![PeerId::random()],
302292
);
303293
let mut rng = XorShiftRng::from_seed([42; 16]);
@@ -313,25 +303,18 @@ mod tests {
313303
.collect::<Vec<_>>();
314304

315305
// Send blocks and complete terminate response
316-
for block in &blocks {
317-
info.add_block_response(Some(block.0.clone().into()));
318-
}
319-
info.add_block_response(None);
306+
info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect());
320307
// Assert response is not finished
321308
assert!(!info.is_finished());
322309

323-
// Send data columns interleaved
324-
for block in &blocks {
325-
for column in &block.1 {
326-
if expects_custody_columns.contains(&column.index) {
327-
info.add_data_column(Some(column.clone()));
328-
}
329-
}
330-
}
331-
332-
// Terminate the requests
333-
for (i, _column_index) in expects_custody_columns.iter().enumerate() {
334-
info.add_data_column(None);
310+
// Send data columns
311+
for (i, &column_index) in expects_custody_columns.iter().enumerate() {
312+
info.add_custody_columns(
313+
blocks
314+
.iter()
315+
.flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned())
316+
.collect(),
317+
);
335318

336319
if i < expects_custody_columns.len() - 1 {
337320
assert!(
@@ -353,12 +336,19 @@ mod tests {
353336
#[test]
354337
fn rpc_block_with_custody_columns_batched() {
355338
let spec = test_spec::<E>();
356-
let expects_custody_columns = vec![1, 2, 3, 4];
357-
let num_of_data_column_requests = 2;
339+
let batched_column_requests = vec![vec![1_u64, 2], vec![3, 4]];
340+
let expects_custody_columns = batched_column_requests
341+
.iter()
342+
.cloned()
343+
.flatten()
344+
.collect::<Vec<_>>();
345+
let custody_column_request_ids =
346+
(0..batched_column_requests.len() as u32).collect::<Vec<_>>();
347+
let num_of_data_column_requests = custody_column_request_ids.len();
358348
let mut info = RangeBlockComponentsRequest::<E>::new(
359349
false,
360350
Some(expects_custody_columns.clone()),
361-
Some(num_of_data_column_requests),
351+
Some(custody_column_request_ids.len()),
362352
vec![PeerId::random()],
363353
);
364354
let mut rng = XorShiftRng::from_seed([42; 16]);
@@ -374,25 +364,23 @@ mod tests {
374364
.collect::<Vec<_>>();
375365

376366
// Send blocks and complete terminate response
377-
for block in &blocks {
378-
info.add_block_response(Some(block.0.clone().into()));
379-
}
380-
info.add_block_response(None);
367+
info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect());
381368
// Assert response is not finished
382369
assert!(!info.is_finished());
383370

384-
// Send data columns interleaved
385-
for block in &blocks {
386-
for column in &block.1 {
387-
if expects_custody_columns.contains(&column.index) {
388-
info.add_data_column(Some(column.clone()));
389-
}
390-
}
391-
}
371+
for (i, column_indices) in batched_column_requests.iter().enumerate() {
372+
// Send the set of columns in the same batch request
373+
info.add_custody_columns(
374+
blocks
375+
.iter()
376+
.flat_map(|b| {
377+
b.1.iter()
378+
.filter(|d| column_indices.contains(&d.index))
379+
.cloned()
380+
})
381+
.collect::<Vec<_>>(),
382+
);
392383

393-
// Terminate the requests
394-
for i in 0..num_of_data_column_requests {
395-
info.add_data_column(None);
396384
if i < num_of_data_column_requests - 1 {
397385
assert!(
398386
!info.is_finished(),

0 commit comments

Comments
 (0)