Skip to content

Fix light client plumbing in beacon processor #6993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 43 additions & 30 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ pub struct BeaconProcessorQueueLengths {
gossip_voluntary_exit_queue: usize,
gossip_proposer_slashing_queue: usize,
gossip_attester_slashing_queue: usize,
finality_update_queue: usize,
optimistic_update_queue: usize,
unknown_light_client_update_queue: usize,
unknown_block_sampling_request_queue: usize,
rpc_block_queue: usize,
Expand All @@ -132,9 +130,11 @@ pub struct BeaconProcessorQueueLengths {
dcbroots_queue: usize,
dcbrange_queue: usize,
gossip_bls_to_execution_change_queue: usize,
lc_gossip_finality_update_queue: usize,
lc_gossip_optimistic_update_queue: usize,
lc_bootstrap_queue: usize,
lc_optimistic_update_queue: usize,
lc_finality_update_queue: usize,
lc_rpc_optimistic_update_queue: usize,
lc_rpc_finality_update_queue: usize,
lc_update_range_queue: usize,
api_request_p0_queue: usize,
api_request_p1_queue: usize,
Expand Down Expand Up @@ -175,15 +175,13 @@ impl BeaconProcessorQueueLengths {
gossip_voluntary_exit_queue: 4096,
gossip_proposer_slashing_queue: 4096,
gossip_attester_slashing_queue: 4096,
finality_update_queue: 1024,
optimistic_update_queue: 1024,
unknown_block_sampling_request_queue: 16384,
unknown_light_client_update_queue: 128,
rpc_block_queue: 1024,
rpc_blob_queue: 1024,
// TODO(das): Placeholder values
rpc_custody_column_queue: 1000,
rpc_verify_data_column_queue: 1000,
unknown_block_sampling_request_queue: 16384,
sampling_result_queue: 1000,
chain_segment_queue: 64,
backfill_chain_segment: 64,
Expand All @@ -200,9 +198,11 @@ impl BeaconProcessorQueueLengths {
dcbroots_queue: 1024,
dcbrange_queue: 1024,
gossip_bls_to_execution_change_queue: 16384,
lc_gossip_finality_update_queue: 1024,
lc_gossip_optimistic_update_queue: 1024,
lc_bootstrap_queue: 1024,
lc_optimistic_update_queue: 512,
lc_finality_update_queue: 512,
lc_rpc_optimistic_update_queue: 512,
lc_rpc_finality_update_queue: 512,
lc_update_range_queue: 512,
api_request_p0_queue: 1024,
api_request_p1_queue: 1024,
Expand Down Expand Up @@ -884,21 +884,16 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut gossip_attester_slashing_queue =
FifoQueue::new(queue_lengths.gossip_attester_slashing_queue);

// Using a FIFO queue for light client updates to maintain sequence order.
let mut finality_update_queue = FifoQueue::new(queue_lengths.finality_update_queue);
let mut optimistic_update_queue = FifoQueue::new(queue_lengths.optimistic_update_queue);
let mut unknown_light_client_update_queue =
FifoQueue::new(queue_lengths.unknown_light_client_update_queue);
let mut unknown_block_sampling_request_queue =
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);

// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
let mut rpc_verify_data_column_queue =
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
// TODO(das): the sampling_request_queue is never read
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
let mut unknown_block_sampling_request_queue =
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
Expand All @@ -917,10 +912,18 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut gossip_bls_to_execution_change_queue =
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);

// Using FIFO queues for light client updates to maintain sequence order.
let mut lc_gossip_finality_update_queue =
FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue);
let mut lc_gossip_optimistic_update_queue =
FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue);
let mut unknown_light_client_update_queue =
FifoQueue::new(queue_lengths.unknown_light_client_update_queue);
let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue);
let mut lc_optimistic_update_queue =
FifoQueue::new(queue_lengths.lc_optimistic_update_queue);
let mut lc_finality_update_queue = FifoQueue::new(queue_lengths.lc_finality_update_queue);
let mut lc_rpc_optimistic_update_queue =
FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue);
let mut lc_rpc_finality_update_queue =
FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue);
let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue);

let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue);
Expand Down Expand Up @@ -1254,11 +1257,19 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if let Some(item) = backfill_chain_segment.pop() {
Some(item)
// Handle light client requests.
} else if let Some(item) = lc_gossip_finality_update_queue.pop() {
Some(item)
} else if let Some(item) = lc_gossip_optimistic_update_queue.pop() {
Some(item)
} else if let Some(item) = unknown_light_client_update_queue.pop() {
Some(item)
} else if let Some(item) = lc_bootstrap_queue.pop() {
Some(item)
} else if let Some(item) = lc_optimistic_update_queue.pop() {
} else if let Some(item) = lc_rpc_optimistic_update_queue.pop() {
Some(item)
} else if let Some(item) = lc_finality_update_queue.pop() {
} else if let Some(item) = lc_rpc_finality_update_queue.pop() {
Some(item)
} else if let Some(item) = lc_update_range_queue.pop() {
Some(item)
// This statement should always be the final else statement.
} else {
Expand Down Expand Up @@ -1362,10 +1373,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
sync_contribution_queue.push(work)
}
Work::GossipLightClientFinalityUpdate { .. } => {
finality_update_queue.push(work, work_id, &self.log)
lc_gossip_finality_update_queue.push(work, work_id, &self.log)
}
Work::GossipLightClientOptimisticUpdate { .. } => {
optimistic_update_queue.push(work, work_id, &self.log)
lc_gossip_optimistic_update_queue.push(work, work_id, &self.log)
}
Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => {
rpc_block_queue.push(work, work_id, &self.log)
Expand Down Expand Up @@ -1400,10 +1411,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
lc_bootstrap_queue.push(work, work_id, &self.log)
}
Work::LightClientOptimisticUpdateRequest { .. } => {
lc_optimistic_update_queue.push(work, work_id, &self.log)
lc_rpc_optimistic_update_queue.push(work, work_id, &self.log)
}
Work::LightClientFinalityUpdateRequest { .. } => {
lc_finality_update_queue.push(work, work_id, &self.log)
lc_rpc_finality_update_queue.push(work, work_id, &self.log)
}
Work::LightClientUpdatesByRangeRequest { .. } => {
lc_update_range_queue.push(work, work_id, &self.log)
Expand Down Expand Up @@ -1472,9 +1483,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(),
WorkType::GossipSyncSignature => sync_message_queue.len(),
WorkType::GossipSyncContribution => sync_contribution_queue.len(),
WorkType::GossipLightClientFinalityUpdate => finality_update_queue.len(),
WorkType::GossipLightClientFinalityUpdate => {
lc_gossip_finality_update_queue.len()
}
WorkType::GossipLightClientOptimisticUpdate => {
optimistic_update_queue.len()
lc_gossip_optimistic_update_queue.len()
}
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
Expand All @@ -1495,10 +1508,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
WorkType::LightClientOptimisticUpdateRequest => {
lc_optimistic_update_queue.len()
lc_rpc_optimistic_update_queue.len()
}
WorkType::LightClientFinalityUpdateRequest => {
lc_finality_update_queue.len()
lc_rpc_finality_update_queue.len()
}
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
Expand Down