Skip to content

Stricter match of BlockError in lookup sync #6321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{self, error::TrySendError};
use types::*;

pub use sync_methods::ChainSegmentProcessId;
pub use sync_methods::{
ChainSegmentProcessId, ErrorCategory as LookupSyncErrorCategory, LookupSyncProcessingResult,
};
use types::blob_sidecar::FixedBlobSidecarList;

pub type Error<T> = TrySendError<BeaconWorkEvent<T>>;
Expand Down Expand Up @@ -983,7 +985,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"result" => "imported block and custody columns",
"block_hash" => %hash,
);
self.chain.recompute_head_at_current_slot().await;
// Head will be recomputed `handle_lookup_sync_processing_result`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't cover the gossip case right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like it might be easier to move the recompute_head logic back here, otherwise we'll have to either do it in two places or having to pass a flag here and conditionally recompute head.

}
AvailabilityProcessingStatus::MissingComponents(_, _) => {
debug!(
Expand Down
287 changes: 207 additions & 80 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use beacon_chain::block_verification_types::{AsBlock, RpcBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::data_availability_checker::MaybeAvailableBlock;
use beacon_chain::data_column_verification::verify_kzg_for_data_column_list;
use beacon_chain::ExecutionPayloadError;
use beacon_chain::{
validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainTypes,
BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer,
Expand Down Expand Up @@ -44,6 +45,60 @@ struct ChainSegmentFailed {
peer_action: Option<PeerAction>,
}

#[derive(Debug)]
pub enum LookupSyncProcessingResult {
FullyImported,
ImportedMissingComponents,
Error(ErrorCategory),
ParentUnknown(Hash256),
Ignored,
}

#[derive(Debug, PartialEq, Eq)]
pub enum ErrorCategory {
/// Internal Errors (not caused by peers).
///
/// An internal no-retry error is permanent and block processing should not be
/// re-attempted.
Internal { retry: bool },
/// Errors caused by faulty / malicious peers.
///
/// No retry errors are deterministic against the block's root. Re-downloading data
/// key-ed by block root MUST result in the same no-retry error (i.e. invalid parent,
/// invalid state root, etc).
///
/// The error also indicates which block component index is malicious if applicable.
Malicious { retry: bool, index: usize },
Comment on lines +70 to +71
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found index slightly confusing - what do you think about calling this sidecar_index or data_sidecar_index, and potentially make this an Option, given we're using this for blocks as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to call this Faulty instead? we don't know if the peer is malicious - i think it might be more consistent with the terminology used in sync, although it doesn't really matter, a faulty peer and malicious peer both send invalid data.

}

impl From<ErrorCategory> for LookupSyncProcessingResult {
fn from(e: ErrorCategory) -> Self {
Self::Error(e)
}
}

impl ErrorCategory {
// Helper functions for readibility on large match statements
pub fn internal_no_retry() -> Self {
Self::Internal { retry: false }
}
pub fn internal_retry() -> Self {
Self::Internal { retry: true }
}
pub fn malicious_no_retry() -> Self {
Self::Malicious {
retry: false,
index: 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it make sense to make this Option, otherwise we can't distinguish between 0 index or None

}
}
pub fn malicious_retry() -> Self {
Self::Malicious {
retry: true,
index: 0,
}
}
}

impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Returns an async closure which processes a beacon block received via RPC.
///
Expand Down Expand Up @@ -92,7 +147,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: crate::sync::manager::BlockProcessingResult::Ignored,
result: LookupSyncProcessingResult::Ignored,
});
};
(process_fn, Box::new(ignore_fn))
Expand Down Expand Up @@ -214,7 +269,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: result.into(),
result: self
.handle_lookup_sync_processing_result(block_root, result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're potentially recomputing head twice here - one above at

self.chain.recompute_head_at_current_slot().await;
and one in self.handle_lookup_sync_processing_result.

.await,
});

// Drop the handle to remove the entry from the cache
Expand Down Expand Up @@ -287,48 +344,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await;

match &result {
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
debug!(
self.log,
"Block components retrieved";
"result" => "imported block and blobs",
"slot" => %slot,
"block_hash" => %hash,
);
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
debug!(
self.log,
"Missing components over rpc";
"block_hash" => %block_root,
"slot" => %slot,
);
}
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
self.log,
"Blobs have already been imported";
"block_hash" => %block_root,
"slot" => %slot,
);
}
Err(e) => {
warn!(
self.log,
"Error when importing rpc blobs";
"error" => ?e,
"block_hash" => %block_root,
"slot" => %slot,
);
}
}

// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: result.into(),
result: self
.handle_lookup_sync_processing_result(block_root, result)
.await,
});
}

Expand All @@ -344,53 +365,159 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.process_rpc_custody_columns(custody_columns)
.await;

match &result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(hash) => {
debug!(
self.log,
"Block components retrieved";
"result" => "imported block and custody columns",
"block_hash" => %hash,
);
self.chain.recompute_head_at_current_slot().await;
}
AvailabilityProcessingStatus::MissingComponents(_, _) => {
debug!(
self.log,
"Missing components over rpc";
"block_hash" => %block_root,
);
// Attempt reconstruction here before notifying sync, to avoid sending out more requests
// that we may no longer need.
if let Some(availability) =
self.attempt_data_column_reconstruction(block_root).await
{
result = Ok(availability)
}
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
if let &Ok(AvailabilityProcessingStatus::MissingComponents { .. }) = &result {
// Attempt reconstruction here before notifying sync, to avoid sending out more requests
// that we may no longer need.
if let Some(availability) = self.attempt_data_column_reconstruction(block_root).await {
result = Ok(availability)
}
}

self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: self
.handle_lookup_sync_processing_result(block_root, result)
.await,
});
}

async fn handle_lookup_sync_processing_result(
&self,
block_root: Hash256,
result: Result<AvailabilityProcessingStatus, BlockError>,
) -> LookupSyncProcessingResult {
match result {
Ok(AvailabilityProcessingStatus::Imported { .. }) => {
debug!(
self.log,
"Custody columns have already been imported";
"block_hash" => %block_root,
"Fully imported block components over RPC";
"block_root" => %block_root,
);
self.chain.recompute_head_at_current_slot().await;
LookupSyncProcessingResult::FullyImported
}
Err(e) => {
warn!(
Ok(AvailabilityProcessingStatus::MissingComponents { .. }) => {
debug!(
self.log,
"Error when importing rpc custody columns";
"error" => ?e,
"block_hash" => %block_root,
"Missing components over RPC";
"block_root" => %block_root,
);
LookupSyncProcessingResult::ImportedMissingComponents
}
Err(err) => {
debug!(
self.log,
"Error processing block component";
"block_root" => %block_root,
"err" => ?err,
);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to log internal errors at a higher level than debug? If yes I can move the log statement below and match the type of ErrorCategory

match err {
BlockError::ParentUnknown { parent_root } => {
LookupSyncProcessingResult::ParentUnknown(parent_root)
}
// A peer may craft a block that is at a future slot. It's possible that
// eventually the slot will no longer be in the future. However, since it's
// malicious action to serve an RPC with a future slot we will not retry.
BlockError::FutureSlot { .. } => ErrorCategory::malicious_no_retry().into(),
// All these variants are invalid block errors deterministic on the block root,
// no need to retry
BlockError::StateRootMismatch { .. }
| BlockError::GenesisBlock
| BlockError::WouldRevertFinalizedSlot { .. }
| BlockError::NotFinalizedDescendant { .. }
| BlockError::BlockSlotLimitReached
| BlockError::IncorrectBlockProposer { .. }
| BlockError::UnknownValidator { .. }
| BlockError::BlockIsNotLaterThanParent { .. }
| BlockError::PerBlockProcessingError(_)
| BlockError::InconsistentFork(_)
| BlockError::WeakSubjectivityConflict => {
ErrorCategory::malicious_no_retry().into()
}
BlockError::DuplicateFullyImported { .. } => {
LookupSyncProcessingResult::FullyImported
}
BlockError::DuplicateImportStatusUnknown { .. } => {
// This is unreachable because RPC blocks do not undergo gossip verification, and
// this error can *only* come from gossip verification.
ErrorCategory::internal_no_retry().into()
}
// TODO (InvalidSignature): Labeling as recoverable as it may be the proposer signature. We should check
// Which one is it and label as non recoverable if the proposer signature is correct.
BlockError::ProposalSignatureInvalid | BlockError::InvalidSignature => {
ErrorCategory::malicious_retry().into()
}
BlockError::ExecutionPayloadError(e) => match e {
// The peer has nothing to do with this error, do not penalize them.
ExecutionPayloadError::NoExecutionConnection => {
ErrorCategory::internal_no_retry()
}
// The peer has nothing to do with this error, do not penalize them.
ExecutionPayloadError::RequestFailed(_) => ErrorCategory::internal_retry(),
// Execution payload is invalid
ExecutionPayloadError::RejectedByExecutionEngine { .. }
| ExecutionPayloadError::InvalidPayloadTimestamp { .. }
| ExecutionPayloadError::InvalidTerminalPoWBlock { .. }
| ExecutionPayloadError::InvalidActivationEpoch { .. }
| ExecutionPayloadError::InvalidTerminalBlockHash { .. } => {
ErrorCategory::malicious_no_retry()
}
// Do not penalize the peer since it's not their fault that *we're* optimistic.
ExecutionPayloadError::UnverifiedNonOptimisticCandidate => {
ErrorCategory::internal_retry()
}
}
.into(),
BlockError::ParentExecutionPayloadInvalid { .. } => {
ErrorCategory::malicious_no_retry().into()
}
// TODO: Review AvailabilityCheckError variants
BlockError::AvailabilityCheck(e) => match e {
AvailabilityCheckError::SszTypes(_)
| AvailabilityCheckError::StoreError(_)
| AvailabilityCheckError::Unexpected
| AvailabilityCheckError::ParentStateMissing(_)
| AvailabilityCheckError::BlockReplayError(_)
| AvailabilityCheckError::RebuildingStateCaches(_)
| AvailabilityCheckError::SlotClockError => ErrorCategory::internal_retry(),
AvailabilityCheckError::InvalidColumn(index, _) => {
ErrorCategory::Malicious {
retry: true,
index: index as usize,
}
}
AvailabilityCheckError::InvalidBlobs { .. }
| AvailabilityCheckError::MissingBlobs
| AvailabilityCheckError::MissingCustodyColumns
| AvailabilityCheckError::DecodeError(_)
| AvailabilityCheckError::ReconstructColumnsError { .. }
| AvailabilityCheckError::BlobIndexInvalid(_)
| AvailabilityCheckError::DataColumnIndexInvalid(_)
| AvailabilityCheckError::KzgCommitmentMismatch { .. } => {
ErrorCategory::malicious_retry()
} // Do not use a fallback match, handle all errors explicitly
}
.into(),
// The proposer making a slashable block is not the peer's fault nor ours. Mark
// as internal (don't penalize peer), and no retry (the block will forever be
// slashable).
BlockError::Slashable => ErrorCategory::internal_no_retry().into(),
// TODO: BeaconChainError should be retried?
BlockError::BeaconChainError(_) | BlockError::InternalError(_) => {
ErrorCategory::internal_no_retry().into()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these variants cover quite a few scenarios, I'm not sure if it's safe to say no retry to them?

}
// unreachable, this error is only part of gossip
BlockError::BlobNotRequired(_) => ErrorCategory::malicious_retry().into(),
// Unreachable: This variants never happen in lookup sync, only in range sync.
// Does not matter what we set here, just setting `internal_recoverable` to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Does not matter what we set here, just setting `internal_recoverable` to
// Does not matter what we set here, just setting `internal_retry` to

// put something.
BlockError::NonLinearParentRoots | BlockError::NonLinearSlots => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For code path that's expected to be unreachable, is it worth logging a warning here?

ErrorCategory::internal_retry().into()
} //
// Do not use a fallback match, handle all errors explicitly
}
}
}

self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: result.into(),
});
}

/// Validate a list of data columns received from RPC requests
Expand Down
Loading
Loading