Skip to content

Commit 3992d6b

Browse files
authored
Fix misc PeerDAS todos (#6862)
Address misc PeerDAS TODOs that are not too big for a dedicated PR I'll justify each TODO on an inlined comment
1 parent ec2fe38 commit 3992d6b

File tree

10 files changed

+68
-55
lines changed

10 files changed

+68
-55
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2974,10 +2974,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29742974
/// Only completed sampling results are received. Blocks are unavailable by default and should
29752975
/// be pruned on finalization, on a timeout or by a max count.
29762976
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
2977-
// TODO(das): update fork-choice
2977+
// TODO(das): update fork-choice, act on sampling result, adjust log level
29782978
// NOTE: It is possible that sampling complets before block is imported into fork choice,
29792979
// in that case we may need to update availability cache.
2980-
// TODO(das): These log levels are too high, reduce once DAS matures
29812980
info!(self.log, "Sampling completed"; "block_root" => %block_root);
29822981
}
29832982

beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,6 @@ impl<E: EthSpec> PendingComponents<E> {
110110
self.get_cached_blobs().iter().flatten().count()
111111
}
112112

113-
/// Checks if a data column of a given index exists in the cache.
114-
///
115-
/// Returns:
116-
/// - `true` if a data column for the given index exists.
117-
/// - `false` otherwise.
118-
fn data_column_exists(&self, data_column_index: u64) -> bool {
119-
self.get_cached_data_column(data_column_index).is_some()
120-
}
121-
122113
/// Returns the number of data columns that have been received and are stored in the cache.
123114
pub fn num_received_data_columns(&self) -> usize {
124115
self.verified_data_columns.len()
@@ -182,8 +173,7 @@ impl<E: EthSpec> PendingComponents<E> {
182173
kzg_verified_data_columns: I,
183174
) -> Result<(), AvailabilityCheckError> {
184175
for data_column in kzg_verified_data_columns {
185-
// TODO(das): Add equivalent checks for data columns if necessary
186-
if !self.data_column_exists(data_column.index()) {
176+
if self.get_cached_data_column(data_column.index()).is_none() {
187177
self.verified_data_columns.push(data_column);
188178
}
189179
}

beacon_node/network/src/network_beacon_processor/gossip_methods.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,22 +1477,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
14771477
);
14781478
return None;
14791479
}
1480-
Err(e @ BlockError::InternalError(_)) => {
1480+
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
1481+
Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => {
14811482
error!(self.log, "Internal block gossip validation error";
14821483
"error" => %e
14831484
);
14841485
return None;
14851486
}
1486-
Err(e @ BlockError::BlobNotRequired(_)) => {
1487-
// TODO(das): penalty not implemented yet as other clients may still send us blobs
1488-
// during early stage of implementation.
1489-
debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer";
1490-
"error" => %e,
1491-
"peer_id" => %peer_id,
1492-
);
1493-
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
1494-
return None;
1495-
}
14961487
};
14971488

14981489
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
@@ -1603,9 +1594,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
16031594
let block = verified_block.block.block_cloned();
16041595
let block_root = verified_block.block_root;
16051596

1606-
// TODO(das) Might be too early to issue a request here. We haven't checked that the block
1607-
// actually includes blob transactions and thus has data. A peer could send a block is
1608-
// garbage commitments, and make us trigger sampling for a block that does not have data.
1597+
// Note: okay to issue sampling request before the block is execution verified. If the
1598+
// proposer sends us a block with invalid blob transactions it can trigger us to issue
1599+
// sampling queries that will never resolve. This attack is equivalent to withholding data.
1600+
// Dismissed proposal to move this block to post-execution: https://github.com/sigp/lighthouse/pull/6492
16091601
if block.num_expected_blobs() > 0 {
16101602
// Trigger sampling for block not yet execution valid. At this point column custodials are
16111603
// unlikely to have received their columns. Triggering sampling so early is only viable with

beacon_node/network/src/network_beacon_processor/sync_methods.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
336336
self: Arc<NetworkBeaconProcessor<T>>,
337337
block_root: Hash256,
338338
custody_columns: DataColumnSidecarList<T::EthSpec>,
339-
_seen_timestamp: Duration,
339+
seen_timestamp: Duration,
340340
process_type: BlockProcessType,
341341
) {
342+
// custody_columns must always have at least one element
343+
let Some(slot) = custody_columns.first().map(|d| d.slot()) else {
344+
return;
345+
};
346+
347+
if let Ok(current_slot) = self.chain.slot() {
348+
if current_slot == slot {
349+
let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock);
350+
metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay);
351+
}
352+
}
353+
354+
let mut indices = custody_columns.iter().map(|d| d.index).collect::<Vec<_>>();
355+
indices.sort_unstable();
356+
debug!(
357+
self.log,
358+
"RPC custody data columns received";
359+
"indices" => ?indices,
360+
"block_root" => %block_root,
361+
"slot" => %slot,
362+
);
363+
342364
let mut result = self
343365
.chain
344366
.process_rpc_custody_columns(custody_columns)

beacon_node/network/src/sync/block_lookups/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
479479
// continue_request will send for processing as the request state is AwaitingProcessing
480480
}
481481
Err(e) => {
482-
// TODO(das): is it okay to not log the peer source of request failures? Then we
483-
// should log individual requests failures in the SyncNetworkContext
482+
// No need to log peer source here. When sending a DataColumnsByRoot request we log
483+
// the peer and the request ID which is linked to this `id` value here.
484484
debug!(self.log,
485485
"Received lookup download failure";
486486
"block_root" => ?block_root,

beacon_node/network/src/sync/manager.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,12 +1217,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
12171217
requester: CustodyRequester,
12181218
response: CustodyByRootResult<T::EthSpec>,
12191219
) {
1220-
// TODO(das): get proper timestamp
1221-
let seen_timestamp = timestamp_now();
12221220
self.block_lookups
12231221
.on_download_response::<CustodyRequestState<T::EthSpec>>(
12241222
requester.0,
1225-
response.map(|(columns, peer_group)| (columns, peer_group, seen_timestamp)),
1223+
response,
12261224
&mut self.network,
12271225
);
12281226
}

beacon_node/network/src/sync/network_context.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ impl<T> RpcEvent<T> {
6868

6969
pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
7070

71-
pub type CustodyByRootResult<T> = Result<(DataColumnSidecarList<T>, PeerGroup), RpcResponseError>;
71+
/// Duration = latest seen timestamp of all received data columns
72+
pub type CustodyByRootResult<T> =
73+
Result<(DataColumnSidecarList<T>, PeerGroup, Duration), RpcResponseError>;
7274

7375
#[derive(Debug)]
7476
pub enum RpcResponseError {
@@ -1190,7 +1192,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
11901192
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
11911193
// an Option first to use in an `if let Some() { act on result }` block.
11921194
match result.as_ref() {
1193-
Some(Ok((columns, peer_group))) => {
1195+
Some(Ok((columns, peer_group, _))) => {
11941196
debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group)
11951197
}
11961198
Some(Err(e)) => {
@@ -1208,7 +1210,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
12081210
id: Id,
12091211
block_root: Hash256,
12101212
block: RpcBlock<T::EthSpec>,
1211-
duration: Duration,
1213+
seen_timestamp: Duration,
12121214
) -> Result<(), SendErrorProcessor> {
12131215
let beacon_processor = self
12141216
.beacon_processor_if_enabled()
@@ -1221,7 +1223,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
12211223
.send_rpc_beacon_block(
12221224
block_root,
12231225
block,
1224-
duration,
1226+
seen_timestamp,
12251227
BlockProcessType::SingleBlock { id },
12261228
)
12271229
.map_err(|e| {
@@ -1239,7 +1241,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
12391241
id: Id,
12401242
block_root: Hash256,
12411243
blobs: FixedBlobSidecarList<T::EthSpec>,
1242-
duration: Duration,
1244+
seen_timestamp: Duration,
12431245
) -> Result<(), SendErrorProcessor> {
12441246
let beacon_processor = self
12451247
.beacon_processor_if_enabled()
@@ -1252,7 +1254,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
12521254
.send_rpc_blobs(
12531255
block_root,
12541256
blobs,
1255-
duration,
1257+
seen_timestamp,
12561258
BlockProcessType::SingleBlob { id },
12571259
)
12581260
.map_err(|e| {
@@ -1270,7 +1272,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
12701272
_id: Id,
12711273
block_root: Hash256,
12721274
custody_columns: DataColumnSidecarList<T::EthSpec>,
1273-
duration: Duration,
1275+
seen_timestamp: Duration,
12741276
process_type: BlockProcessType,
12751277
) -> Result<(), SendErrorProcessor> {
12761278
let beacon_processor = self
@@ -1280,7 +1282,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
12801282
debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type);
12811283

12821284
beacon_processor
1283-
.send_rpc_custody_columns(block_root, custody_columns, duration, process_type)
1285+
.send_rpc_custody_columns(block_root, custody_columns, seen_timestamp, process_type)
12841286
.map_err(|e| {
12851287
error!(
12861288
self.log,

beacon_node/network/src/sync/network_context/custody.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::sync::network_context::{
22
DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest,
33
};
4-
4+
use beacon_chain::validator_monitor::timestamp_now;
55
use beacon_chain::BeaconChainTypes;
66
use fnv::FnvHashMap;
77
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
@@ -61,7 +61,8 @@ struct ActiveBatchColumnsRequest {
6161
indices: Vec<ColumnIndex>,
6262
}
6363

64-
pub type CustodyRequestResult<E> = Result<Option<(DataColumnSidecarList<E>, PeerGroup)>, Error>;
64+
pub type CustodyRequestResult<E> =
65+
Result<Option<(DataColumnSidecarList<E>, PeerGroup, Duration)>, Error>;
6566

6667
impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
6768
pub(crate) fn new(
@@ -102,8 +103,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
102103
resp: RpcResponseResult<DataColumnSidecarList<T::EthSpec>>,
103104
cx: &mut SyncNetworkContext<T>,
104105
) -> CustodyRequestResult<T::EthSpec> {
105-
// TODO(das): Should downscore peers for verify errors here
106-
107106
let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else {
108107
warn!(self.log,
109108
"Received custody column response for unrequested index";
@@ -115,7 +114,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
115114
};
116115

117116
match resp {
118-
Ok((data_columns, _seen_timestamp)) => {
117+
Ok((data_columns, seen_timestamp)) => {
119118
debug!(self.log,
120119
"Custody column download success";
121120
"id" => ?self.custody_id,
@@ -141,7 +140,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
141140
.ok_or(Error::BadState("unknown column_index".to_owned()))?;
142141

143142
if let Some(data_column) = data_columns.remove(column_index) {
144-
column_request.on_download_success(req_id, peer_id, data_column)?;
143+
column_request.on_download_success(
144+
req_id,
145+
peer_id,
146+
data_column,
147+
seen_timestamp,
148+
)?;
145149
} else {
146150
// Peer does not have the requested data.
147151
// TODO(das) do not consider this case a success. We know for sure the block has
@@ -204,20 +208,23 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
204208
if self.column_requests.values().all(|r| r.is_downloaded()) {
205209
// All requests have completed successfully.
206210
let mut peers = HashMap::<PeerId, Vec<usize>>::new();
211+
let mut seen_timestamps = vec![];
207212
let columns = std::mem::take(&mut self.column_requests)
208213
.into_values()
209214
.map(|request| {
210-
let (peer, data_column) = request.complete()?;
215+
let (peer, data_column, seen_timestamp) = request.complete()?;
211216
peers
212217
.entry(peer)
213218
.or_default()
214219
.push(data_column.index as usize);
220+
seen_timestamps.push(seen_timestamp);
215221
Ok(data_column)
216222
})
217223
.collect::<Result<Vec<_>, _>>()?;
218224

219225
let peer_group = PeerGroup::from_set(peers);
220-
return Ok(Some((columns, peer_group)));
226+
let max_seen_timestamp = seen_timestamps.into_iter().max().unwrap_or(timestamp_now());
227+
return Ok(Some((columns, peer_group, max_seen_timestamp)));
221228
}
222229

223230
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
@@ -335,7 +342,7 @@ struct ColumnRequest<E: EthSpec> {
335342
enum Status<E: EthSpec> {
336343
NotStarted(Instant),
337344
Downloading(DataColumnsByRootRequestId),
338-
Downloaded(PeerId, Arc<DataColumnSidecar<E>>),
345+
Downloaded(PeerId, Arc<DataColumnSidecar<E>>, Duration),
339346
}
340347

341348
impl<E: EthSpec> ColumnRequest<E> {
@@ -404,6 +411,7 @@ impl<E: EthSpec> ColumnRequest<E> {
404411
req_id: DataColumnsByRootRequestId,
405412
peer_id: PeerId,
406413
data_column: Arc<DataColumnSidecar<E>>,
414+
seen_timestamp: Duration,
407415
) -> Result<(), Error> {
408416
match &self.status {
409417
Status::Downloading(expected_req_id) => {
@@ -413,7 +421,7 @@ impl<E: EthSpec> ColumnRequest<E> {
413421
req_id,
414422
});
415423
}
416-
self.status = Status::Downloaded(peer_id, data_column);
424+
self.status = Status::Downloaded(peer_id, data_column, seen_timestamp);
417425
Ok(())
418426
}
419427
other => Err(Error::BadState(format!(
@@ -422,9 +430,11 @@ impl<E: EthSpec> ColumnRequest<E> {
422430
}
423431
}
424432

425-
fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>), Error> {
433+
fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>, Duration), Error> {
426434
match self.status {
427-
Status::Downloaded(peer_id, data_column) => Ok((peer_id, data_column)),
435+
Status::Downloaded(peer_id, data_column, seen_timestamp) => {
436+
Ok((peer_id, data_column, seen_timestamp))
437+
}
428438
other => Err(Error::BadState(format!(
429439
"bad state complete expected Downloaded got {other:?}"
430440
))),

beacon_node/network/src/sync/tests/lookups.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,6 @@ impl TestRig {
713713
self.complete_data_columns_by_root_request(id, data_columns);
714714

715715
// Expect work event
716-
// TODO(das): worth it to append sender id to the work event for stricter assertion?
717716
self.expect_rpc_sample_verify_work_event();
718717

719718
// Respond with valid result
@@ -755,7 +754,6 @@ impl TestRig {
755754
}
756755

757756
// Expect work event
758-
// TODO(das): worth it to append sender id to the work event for stricter assertion?
759757
self.expect_rpc_custody_column_work_event();
760758

761759
// Respond with valid result

consensus/types/src/data_column_custody_group.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub enum DataColumnCustodyGroupError {
1717
/// The `get_custody_groups` function is used to determine the custody groups that a node is
1818
/// assigned to.
1919
///
20+
/// Note: `get_custody_groups(node_id, x)` is a subset of `get_custody_groups(node_id, y)` if `x < y`.
21+
///
2022
/// spec: https://github.com/ethereum/consensus-specs/blob/8e0d0d48e81d6c7c5a8253ab61340f5ea5bac66a/specs/fulu/das-core.md#get_custody_groups
2123
pub fn get_custody_groups(
2224
raw_node_id: [u8; 32],

0 commit comments

Comments
 (0)