From 3c63ff33fb6d780a3e1a50ab01142bf2b9ee737a Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 10 May 2025 23:24:33 -0700 Subject: [PATCH 1/8] Remove reprocess channel --- beacon_node/beacon_processor/src/lib.rs | 29 ++- beacon_node/client/src/builder.rs | 9 +- .../src/compute_light_client_updates.rs | 12 +- beacon_node/http_api/src/lib.rs | 20 +- .../http_api/src/publish_attestations.rs | 19 +- beacon_node/http_api/src/task_spawner.rs | 27 +++ beacon_node/http_api/src/test_utils.rs | 6 - .../http_api/tests/interactive_tests.rs | 2 +- .../gossip_methods.rs | 181 ++++++++++-------- .../src/network_beacon_processor/mod.rs | 33 +--- .../network_beacon_processor/sync_methods.rs | 23 ++- .../src/network_beacon_processor/tests.rs | 1 - beacon_node/network/src/router.rs | 6 +- beacon_node/network/src/service.rs | 6 +- beacon_node/network/src/service/tests.rs | 1 - 15 files changed, 204 insertions(+), 171 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index e864cb1fd91..57db21a767e 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -262,22 +262,16 @@ impl Default for BeaconProcessorConfig { pub struct BeaconProcessorChannels { pub beacon_processor_tx: BeaconProcessorSend, pub beacon_processor_rx: mpsc::Receiver>, - pub work_reprocessing_tx: mpsc::Sender, - pub work_reprocessing_rx: mpsc::Receiver, } impl BeaconProcessorChannels { pub fn new(config: &BeaconProcessorConfig) -> Self { let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(config.max_work_event_queue_len); - let (work_reprocessing_tx, work_reprocessing_rx) = - mpsc::channel(config.max_scheduled_work_queue_len); Self { beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx), beacon_processor_rx, - work_reprocessing_rx, - work_reprocessing_tx, } } } @@ -638,6 +632,7 @@ pub enum Work { LightClientUpdatesByRangeRequest(BlockingFn), ApiRequestP0(BlockingOrAsync), ApiRequestP1(BlockingOrAsync), + Reprocess(ReprocessQueueMessage), } impl fmt::Debug for Work { @@ -691,6 +686,7 @@ pub enum WorkType { LightClientUpdatesByRangeRequest, ApiRequestP0, ApiRequestP1, + Reprocess, } impl Work { @@ -749,6 +745,7 @@ impl Work { } Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0, Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1, + Work::Reprocess { .. } => WorkType::Reprocess, } } } @@ -845,8 +842,6 @@ impl BeaconProcessor { pub fn spawn_manager( mut self, event_rx: mpsc::Receiver>, - work_reprocessing_tx: mpsc::Sender, - work_reprocessing_rx: mpsc::Receiver, work_journal_tx: Option>, slot_clock: S, maximum_gossip_clock_disparity: Duration, @@ -932,9 +927,13 @@ impl BeaconProcessor { // receive them back once they are ready (`ready_work_rx`). let (ready_work_tx, ready_work_rx) = mpsc::channel::(self.config.max_scheduled_work_queue_len); + + let (reprocess_work_tx, reprocess_work_rx) = + mpsc::channel::(self.config.max_scheduled_work_queue_len); + spawn_reprocess_scheduler( ready_work_tx, - work_reprocessing_rx, + reprocess_work_rx, &self.executor, Arc::new(slot_clock), maximum_gossip_clock_disparity, @@ -962,7 +961,7 @@ impl BeaconProcessor { Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => { match QueuedBackfillBatch::try_from(event) { Ok(backfill_batch) => { - match work_reprocessing_tx + match reprocess_work_tx .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) { Err(e) => { @@ -1313,6 +1312,14 @@ impl BeaconProcessor { let work_type = work.to_type(); match work { + Work::Reprocess(work_event) => { + if let Err(e) = reprocess_work_tx.try_send(work_event) { + error!( + error = ?e, + "Failed to reprocess work event" + ) + } + } _ if can_spawn => self.spawn_worker(work, idle_tx), Work::GossipAttestation { .. } => attestation_queue.push(work), Work::GossipAttestationToConvert { .. } => { @@ -1482,6 +1489,7 @@ impl BeaconProcessor { WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), WorkType::ApiRequestP0 => api_request_p0_queue.len(), WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::Reprocess => 0, }; metrics::observe_vec( &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, @@ -1638,6 +1646,7 @@ impl BeaconProcessor { | Work::LightClientUpdatesByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } + Work::Reprocess(_) => {} }; } } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 3cb7b33aae2..f50e21c08f2 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -544,7 +544,6 @@ where network_senders: None, network_globals: None, beacon_processor_send: None, - beacon_processor_reprocess_send: None, eth1_service: Some(genesis_service.eth1_service.clone()), sse_logging_components: runtime_context.sse_logging_components.clone(), }); @@ -636,7 +635,6 @@ where context.executor, libp2p_registry.as_mut(), beacon_processor_channels.beacon_processor_tx.clone(), - beacon_processor_channels.work_reprocessing_tx.clone(), ) .await .map_err(|e| format!("Failed to start network: {:?}", e))?; @@ -775,9 +773,6 @@ where network_globals: self.network_globals.clone(), eth1_service: self.eth1_service.clone(), beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()), - beacon_processor_reprocess_send: Some( - beacon_processor_channels.work_reprocessing_tx.clone(), - ), sse_logging_components: runtime_context.sse_logging_components.clone(), }); @@ -841,8 +836,6 @@ where } .spawn_manager( beacon_processor_channels.beacon_processor_rx, - beacon_processor_channels.work_reprocessing_tx.clone(), - beacon_processor_channels.work_reprocessing_rx, None, beacon_chain.slot_clock.clone(), beacon_chain.spec.maximum_gossip_clock_disparity(), @@ -916,7 +909,7 @@ where compute_light_client_updates( &inner_chain, light_client_server_rv, - beacon_processor_channels.work_reprocessing_tx, + beacon_processor_channels.beacon_processor_tx, ) .await }, diff --git a/beacon_node/client/src/compute_light_client_updates.rs b/beacon_node/client/src/compute_light_client_updates.rs index fab284c4285..75fa22e7954 100644 --- a/beacon_node/client/src/compute_light_client_updates.rs +++ b/beacon_node/client/src/compute_light_client_updates.rs @@ -1,8 +1,8 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent}; use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; +use beacon_processor::{BeaconProcessorSend, Work, WorkEvent}; use futures::channel::mpsc::Receiver; use futures::StreamExt; -use tokio::sync::mpsc::Sender; use tracing::error; // Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent @@ -14,7 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32; pub async fn compute_light_client_updates( chain: &BeaconChain, mut light_client_server_rv: Receiver>, - reprocess_tx: Sender, + beacon_processor_send: BeaconProcessorSend, ) { // Should only receive events for recent blocks, import_block filters by blocks close to clock. // @@ -31,7 +31,13 @@ pub async fn compute_light_client_updates( }); let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root }; - if reprocess_tx.try_send(msg).is_err() { + if beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: true, + work: Work::Reprocess(msg), + }) + .is_err() + { error!(%parent_root,"Failed to inform light client update") }; } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 386d9fe33aa..3da388cf84e 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -40,7 +40,7 @@ use beacon_chain::{ validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped, }; -use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; +use beacon_processor::BeaconProcessorSend; pub use block_id::BlockId; use builder_states::get_next_withdrawals; use bytes::Bytes; @@ -133,7 +133,6 @@ pub struct Context { pub network_senders: Option>, pub network_globals: Option>>, pub beacon_processor_send: Option>, - pub beacon_processor_reprocess_send: Option>, pub eth1_service: Option, pub sse_logging_components: Option, } @@ -557,11 +556,6 @@ pub fn serve( .filter(|_| config.enable_beacon_processor); let task_spawner_filter = warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone())); - let beacon_processor_reprocess_send = ctx - .beacon_processor_reprocess_send - .clone() - .filter(|_| config.enable_beacon_processor); - let reprocess_send_filter = warp::any().map(move || beacon_processor_reprocess_send.clone()); let duplicate_block_status_code = ctx.config.duplicate_block_status_code; @@ -1944,20 +1938,18 @@ pub fn serve( .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) - .and(reprocess_send_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, - network_tx: UnboundedSender>, - reprocess_tx: Option>| async move { + network_tx: UnboundedSender>| async move { let attestations = attestations.into_iter().map(Either::Left).collect(); let result = crate::publish_attestations::publish_attestations( task_spawner, chain, attestations, network_tx, - reprocess_tx, + true, ) .await .map(|()| warp::reply::json(&())); @@ -1972,14 +1964,12 @@ pub fn serve( .and(warp_utils::json::json::()) .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) - .and(reprocess_send_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, payload: Value, fork_name: Option, - network_tx: UnboundedSender>, - reprocess_tx: Option>| async move { + network_tx: UnboundedSender>| async move { let attestations = match crate::publish_attestations::deserialize_attestation_payload::( payload, fork_name, @@ -2005,7 +1995,7 @@ pub fn serve( chain, attestations, network_tx, - reprocess_tx, + true, ) .await .map(|()| warp::reply::json(&())); diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index cd5e912bdf4..078106cfead 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -40,6 +40,7 @@ use beacon_chain::{ AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes, }; use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage}; +use beacon_processor::{Work, WorkEvent}; use either::Either; use eth2::types::Failure; use lighthouse_network::PubsubMessage; @@ -48,10 +49,7 @@ use serde_json::Value; use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{ - mpsc::{Sender, UnboundedSender}, - oneshot, -}; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tracing::{debug, error, warn}; use types::{Attestation, EthSpec, ForkName, SingleAttestation}; @@ -224,7 +222,7 @@ pub async fn publish_attestations( chain: Arc>, attestations: Vec, SingleAttestation>>, network_tx: UnboundedSender>, - reprocess_send: Option>, + allow_reprocess: bool, ) -> Result<(), warp::Rejection> { // Collect metadata about attestations which we'll use to report failures. We need to // move the `attestations` vec into the blocking task, so this small overhead is unavoidable. @@ -239,6 +237,7 @@ pub async fn publish_attestations( // Gossip validate and publish attestations that can be immediately processed. let seen_timestamp = timestamp_now(); let mut prelim_results = task_spawner + .clone() .blocking_task(Priority::P0, move || { Ok(attestations .into_iter() @@ -253,7 +252,7 @@ pub async fn publish_attestations( Err(Error::Validation(AttestationError::UnknownHeadBlock { beacon_block_root, })) => { - let Some(reprocess_tx) = &reprocess_send else { + if !allow_reprocess { return PublishAttestationResult::Failure(Error::ReprocessDisabled); }; // Re-process. @@ -277,7 +276,13 @@ pub async fn publish_attestations( beacon_block_root, process_fn: Box::new(reprocess_fn), }); - if reprocess_tx.try_send(reprocess_msg).is_err() { + if task_spawner + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { PublishAttestationResult::Failure(Error::ReprocessFull) } else { PublishAttestationResult::Reprocessing(rx) diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index a679b294f65..834cd29971f 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -30,6 +30,7 @@ impl Priority { } /// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor. +#[derive(Clone)] pub struct TaskSpawner { /// Used to send tasks to the `BeaconProcessor`. The tokio executor will be /// used if this is `None`. @@ -155,6 +156,32 @@ impl TaskSpawner { .and_then(|x| x) } } + + pub fn try_send(&self, work_event: WorkEvent) -> Result<(), warp::Rejection> { + if let Some(beacon_processor_send) = &self.beacon_processor_send { + let error_message = match beacon_processor_send.try_send(work_event) { + Ok(()) => None, + Err(TrySendError::Full(_)) => { + Some("The task was dropped. The server is overloaded.") + } + Err(TrySendError::Closed(_)) => { + Some("The task was dropped. The server is shutting down.") + } + }; + + if let Some(error_message) = error_message { + return Err(warp_utils::reject::custom_server_error( + error_message.to_string(), + )); + }; + + Ok(()) + } else { + Err(warp_utils::reject::custom_server_error( + "The beacon processor is unavailable".to_string(), + )) + } + } } /// Send a task to the beacon processor and await execution. diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index f78a361dad3..9c285f4039f 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -201,12 +201,9 @@ pub async fn create_api_server_with_config( let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); let beacon_processor_send = beacon_processor_tx; - let reprocess_send = work_reprocessing_tx.clone(); BeaconProcessor { network_globals: network_globals.clone(), executor: test_runtime.task_executor.clone(), @@ -215,8 +212,6 @@ pub async fn create_api_server_with_config( } .spawn_manager( beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, None, chain.slot_clock.clone(), chain.spec.maximum_gossip_clock_disparity(), @@ -241,7 +236,6 @@ pub async fn create_api_server_with_config( network_senders: Some(network_senders), network_globals: Some(network_globals), beacon_processor_send: Some(beacon_processor_send), - beacon_processor_reprocess_send: Some(reprocess_send), eth1_service: Some(eth1_service), sse_logging_components: None, }); diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index bb3086945bc..447cb9c9997 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -945,7 +945,7 @@ async fn queue_attestations_from_http() { .unwrap(); tester .ctx - .beacon_processor_reprocess_send + .beacon_processor_send .as_ref() .unwrap() .send(ReprocessQueueMessage::BlockImported { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index cf0e98cda89..d3bdf15cb13 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -20,6 +20,7 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer, }; +use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use logging::crit; use operation_pool::ReceivedPreCapella; @@ -31,7 +32,6 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; -use tokio::sync::mpsc; use tracing::{debug, error, info, trace, warn}; use types::{ beacon_block::BlockImportSource, Attestation, AttestationData, AttestationRef, @@ -211,7 +211,7 @@ impl NetworkBeaconProcessor { attestation: Box>, subnet_id: SubnetId, should_import: bool, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { let result = match self @@ -230,7 +230,7 @@ impl NetworkBeaconProcessor { message_id, peer_id, subnet_id, - reprocess_tx, + allow_reprocess, should_import, seen_timestamp, ); @@ -239,7 +239,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_attestation_batch( self: Arc, packages: GossipAttestationBatch, - reprocess_tx: Option>, + allow_reprocess: bool, ) { let attestations_and_subnets = packages .iter() @@ -295,7 +295,7 @@ impl NetworkBeaconProcessor { package.message_id, package.peer_id, package.subnet_id, - reprocess_tx.clone(), + allow_reprocess, package.should_import, package.seen_timestamp, ); @@ -311,7 +311,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, subnet_id: SubnetId, - reprocess_tx: Option>, + allow_reprocess: bool, should_import: bool, seen_timestamp: Duration, ) { @@ -395,7 +395,7 @@ impl NetworkBeaconProcessor { should_import, seen_timestamp, }, - reprocess_tx, + allow_reprocess, error, seen_timestamp, ); @@ -405,9 +405,10 @@ impl NetworkBeaconProcessor { /// Process an unaggregated attestation requiring conversion. /// - /// This function performs the conversion, and if successfull queues a new message to be + /// This function performs the conversion, and if successful queues a new message to be /// processed by `process_gossip_attestation`. If unsuccessful due to block unavailability, - /// a retry message will be pushed to the `reprocess_tx` if it is `Some`. + /// a retry message will be pushed. + /// TODO(beacon-processor) should we add a bool to enable/disable retry? #[allow(clippy::too_many_arguments)] pub fn process_gossip_attestation_to_convert( self: Arc, @@ -416,7 +417,6 @@ impl NetworkBeaconProcessor { single_attestation: Box, subnet_id: SubnetId, should_import: bool, - reprocess_tx: Option>, seen_timestamp: Duration, ) { let conversion_result = self.chain.with_committee_cache( @@ -469,41 +469,45 @@ impl NetworkBeaconProcessor { // Outermost error (from `with_committee_cache`) indicating that the block is not known // and that this conversion should be retried. Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { - if let Some(sender) = reprocess_tx { - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, - ); - // We don't know the block, get the sync manager to handle the block lookup, and - // send the attestation to be scheduled for re-processing. - self.sync_tx - .send(SyncMessage::UnknownBlockHashFromAttestation( - peer_id, - beacon_block_root, - )) - .unwrap_or_else(|_| { - warn!(msg = "UnknownBlockHash", "Failed to send to sync service") - }); - let processor = self.clone(); - // Do not allow this attestation to be re-processed beyond this point. - let reprocess_msg = - ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { - beacon_block_root, - process_fn: Box::new(move || { - processor.process_gossip_attestation_to_convert( - message_id, - peer_id, - single_attestation, - subnet_id, - should_import, - None, - seen_timestamp, - ) - }), - }); - if sender.try_send(reprocess_msg).is_err() { - error!("Failed to send attestation for re-processing") - } - } else { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + // We don't know the block, get the sync manager to handle the block lookup, and + // send the attestation to be scheduled for re-processing. + self.sync_tx + .send(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + beacon_block_root, + )) + .unwrap_or_else(|_| { + warn!(msg = "UnknownBlockHash", "Failed to send to sync service") + }); + let processor = self.clone(); + let msg_id = message_id.clone(); + // Do not allow this attestation to be re-processed beyond this point. + let reprocess_msg = + ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(move || { + processor.process_gossip_attestation_to_convert( + msg_id, + peer_id, + single_attestation, + subnet_id, + should_import, + seen_timestamp, + ) + }), + }); + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { + error!("Failed to send attestation for re-processing"); // We shouldn't make any further attempts to process this attestation. // // Don't downscore the peer since it's not clear if we requested this head @@ -523,7 +527,7 @@ impl NetworkBeaconProcessor { FailedAtt::SingleUnaggregate { attestation: single_attestation, }, - None, + false, error, seen_timestamp, ); @@ -536,7 +540,7 @@ impl NetworkBeaconProcessor { FailedAtt::SingleUnaggregate { attestation: single_attestation, }, - None, + false, AttnError::BeaconChainError(error), seen_timestamp, ); @@ -556,7 +560,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, aggregate: Box>, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root; @@ -580,7 +584,7 @@ impl NetworkBeaconProcessor { beacon_block_root, message_id, peer_id, - reprocess_tx, + allow_reprocess, seen_timestamp, ); } @@ -588,7 +592,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_aggregate_batch( self: Arc, packages: Vec>, - reprocess_tx: Option>, + allow_reprocess: bool, ) { let aggregates = packages.iter().map(|package| package.aggregate.as_ref()); @@ -642,7 +646,7 @@ impl NetworkBeaconProcessor { package.beacon_block_root, package.message_id, package.peer_id, - reprocess_tx.clone(), + allow_reprocess, package.seen_timestamp, ); } @@ -654,7 +658,7 @@ impl NetworkBeaconProcessor { beacon_block_root: Hash256, message_id: MessageId, peer_id: PeerId, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { match result { @@ -733,7 +737,7 @@ impl NetworkBeaconProcessor { attestation: signed_aggregate, seen_timestamp, }, - reprocess_tx, + allow_reprocess, error, seen_timestamp, ); @@ -1194,6 +1198,7 @@ impl NetworkBeaconProcessor { /// be downloaded. /// /// Raises a log if there are errors. + /// TODO(beacon-processor) allow_reprocess flag? #[allow(clippy::too_many_arguments)] pub async fn process_gossip_block( self: Arc, @@ -1201,7 +1206,6 @@ impl NetworkBeaconProcessor { peer_id: PeerId, peer_client: Client, block: Arc>, - reprocess_tx: mpsc::Sender, duplicate_cache: DuplicateCache, invalid_block_storage: InvalidBlockStorage, seen_duration: Duration, @@ -1212,7 +1216,6 @@ impl NetworkBeaconProcessor { peer_id, peer_client, block.clone(), - reprocess_tx.clone(), seen_duration, ) .await @@ -1223,7 +1226,6 @@ impl NetworkBeaconProcessor { self.process_gossip_verified_block( peer_id, gossip_verified_block, - reprocess_tx, invalid_block_storage, seen_duration, ) @@ -1249,7 +1251,6 @@ impl NetworkBeaconProcessor { peer_id: PeerId, peer_client: Client, block: Arc>, - reprocess_tx: mpsc::Sender, seen_duration: Duration, ) -> Option> { let block_delay = @@ -1479,24 +1480,28 @@ impl NetworkBeaconProcessor { let inner_self = self.clone(); let process_fn = Box::pin(async move { - let reprocess_tx = inner_self.reprocess_tx.clone(); let invalid_block_storage = inner_self.invalid_block_storage.clone(); inner_self .process_gossip_verified_block( peer_id, verified_block, - reprocess_tx, invalid_block_storage, seen_duration, ) .await; }); - if reprocess_tx - .try_send(ReprocessQueueMessage::EarlyBlock(QueuedGossipBlock { - beacon_block_slot: block_slot, - beacon_block_root: block_root, - process_fn, - })) + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::EarlyBlock( + QueuedGossipBlock { + beacon_block_slot: block_slot, + beacon_block_root: block_root, + process_fn, + }, + )), + }) .is_err() { error!( @@ -1525,11 +1530,11 @@ impl NetworkBeaconProcessor { /// Process the beacon block that has already passed gossip verification. /// /// Raises a log if there are errors. + /// TODO(beacon-processor) allow reprocess? pub async fn process_gossip_verified_block( self: Arc, peer_id: PeerId, verified_block: GossipVerifiedBlock, - reprocess_tx: mpsc::Sender, invalid_block_storage: InvalidBlockStorage, _seen_duration: Duration, ) { @@ -1579,10 +1584,14 @@ impl NetworkBeaconProcessor { match &result { Ok(AvailabilityProcessingStatus::Imported(block_root)) => { - if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported { - block_root: *block_root, - parent_root: block.message().parent_root(), + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::BlockImported { + block_root: *block_root, + parent_root: block.message().parent_root(), + }), }) .is_err() { @@ -2107,7 +2116,7 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: LightClientOptimisticUpdate, - reprocess_tx: Option>, + allow_reprocess: bool, seen_timestamp: Duration, ) { match self.chain.verify_optimistic_update_for_gossip( @@ -2135,7 +2144,7 @@ impl NetworkBeaconProcessor { "Optimistic update for unknown block" ); - if let Some(sender) = reprocess_tx { + if allow_reprocess { let processor = self.clone(); let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate( QueuedLightClientUpdate { @@ -2145,14 +2154,21 @@ impl NetworkBeaconProcessor { message_id, peer_id, light_client_optimistic_update, - None, // Do not reprocess this message again. + false, // Do not reprocess this message again. seen_timestamp, ) }), }, ); - if sender.try_send(msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: true, + work: Work::Reprocess(msg), + }) + .is_err() + { error!("Failed to send optimistic update for re-processing") } } else { @@ -2222,7 +2238,7 @@ impl NetworkBeaconProcessor { peer_id: PeerId, message_id: MessageId, failed_att: FailedAtt, - reprocess_tx: Option>, + allow_reprocess: bool, error: AttnError, seen_timestamp: Duration, ) { @@ -2462,7 +2478,7 @@ impl NetworkBeaconProcessor { block = ?beacon_block_root, "Attestation for unknown block" ); - if let Some(sender) = reprocess_tx { + if allow_reprocess { // We don't know the block, get the sync manager to handle the block lookup, and // send the attestation to be scheduled for re-processing. self.sync_tx @@ -2489,7 +2505,7 @@ impl NetworkBeaconProcessor { message_id, peer_id, attestation, - None, // Do not allow this attestation to be re-processed beyond this point. + false, // Do not allow this attestation to be re-processed beyond this point. seen_timestamp, ) }), @@ -2524,7 +2540,7 @@ impl NetworkBeaconProcessor { attestation, subnet_id, should_import, - None, // Do not allow this attestation to be re-processed beyond this point. + false, // Do not allow this attestation to be re-processed beyond this point. seen_timestamp, ) }), @@ -2532,7 +2548,14 @@ impl NetworkBeaconProcessor { } }; - if sender.try_send(msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(msg), + }) + .is_err() + { error!("Failed to send attestation for re-processing") } } else { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ba681eed14b..a261ce2867b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -12,8 +12,8 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, }; use beacon_processor::{ - work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, - GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, + BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, + WorkEvent as BeaconWorkEvent, }; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, @@ -61,7 +61,6 @@ pub struct NetworkBeaconProcessor { pub chain: Arc>, pub network_tx: mpsc::UnboundedSender>, pub sync_tx: mpsc::UnboundedSender>, - pub reprocess_tx: mpsc::Sender, pub network_globals: Arc>, pub invalid_block_storage: InvalidBlockStorage, pub executor: TaskExecutor, @@ -87,14 +86,12 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_individual = move |package: GossipAttestationPackage| { - let reprocess_tx = processor.reprocess_tx.clone(); processor.process_gossip_attestation_to_convert( package.message_id, package.peer_id, package.attestation, package.subnet_id, package.should_import, - Some(reprocess_tx), package.seen_timestamp, ) }; @@ -129,24 +126,21 @@ impl NetworkBeaconProcessor { let processor = self.clone(); let process_individual = move |package: GossipAttestationPackage>| { - let reprocess_tx = processor.reprocess_tx.clone(); processor.process_gossip_attestation( package.message_id, package.peer_id, package.attestation, package.subnet_id, package.should_import, - Some(reprocess_tx), + true, package.seen_timestamp, ) }; // Define a closure for processing batches of attestations. let processor = self.clone(); - let process_batch = move |attestations| { - let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_attestation_batch(attestations, Some(reprocess_tx)) - }; + let process_batch = + move |attestations| processor.process_gossip_attestation_batch(attestations, true); self.try_send(BeaconWorkEvent { drop_during_sync: true, @@ -176,22 +170,19 @@ impl NetworkBeaconProcessor { // Define a closure for processing individual attestations. let processor = self.clone(); let process_individual = move |package: GossipAggregatePackage| { - let reprocess_tx = processor.reprocess_tx.clone(); processor.process_gossip_aggregate( package.message_id, package.peer_id, package.aggregate, - Some(reprocess_tx), + true, package.seen_timestamp, ) }; // Define a closure for processing batches of attestations. let processor = self.clone(); - let process_batch = move |aggregates| { - let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_aggregate_batch(aggregates, Some(reprocess_tx)) - }; + let process_batch = + move |aggregates| processor.process_gossip_aggregate_batch(aggregates, true); let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root; self.try_send(BeaconWorkEvent { @@ -221,7 +212,6 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let reprocess_tx = processor.reprocess_tx.clone(); let invalid_block_storage = processor.invalid_block_storage.clone(); let duplicate_cache = processor.duplicate_cache.clone(); processor @@ -230,7 +220,6 @@ impl NetworkBeaconProcessor { peer_id, peer_client, block, - reprocess_tx, duplicate_cache, invalid_block_storage, seen_timestamp, @@ -423,12 +412,11 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move || { - let reprocess_tx = processor.reprocess_tx.clone(); processor.process_gossip_optimistic_update( message_id, peer_id, light_client_optimistic_update, - Some(reprocess_tx), + true, seen_timestamp, ) }; @@ -1165,8 +1153,6 @@ impl NetworkBeaconProcessor> { let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx: _work_reprocessing_rx, } = <_>::default(); let (network_tx, _network_rx) = mpsc::unbounded_channel(); @@ -1177,7 +1163,6 @@ impl NetworkBeaconProcessor> { chain, network_tx, sync_tx, - reprocess_tx: work_reprocessing_tx, network_globals, invalid_block_storage: InvalidBlockStorage::Disabled, executor, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 31b17a41a42..cff6e26165b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -17,11 +17,11 @@ use beacon_processor::{ work_reprocessing_queue::{QueuedRpcBlock, ReprocessQueueMessage}, AsyncFn, BlockingFn, DuplicateCache, }; +use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; use std::sync::Arc; use std::time::Duration; use store::KzgCommitment; -use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; @@ -57,14 +57,12 @@ impl NetworkBeaconProcessor { process_type: BlockProcessType, ) -> AsyncFn { let process_fn = async move { - let reprocess_tx = self.reprocess_tx.clone(); let duplicate_cache = self.duplicate_cache.clone(); self.process_rpc_block( block_root, block, seen_timestamp, process_type, - reprocess_tx, duplicate_cache, ) .await; @@ -106,7 +104,6 @@ impl NetworkBeaconProcessor { block: RpcBlock, seen_timestamp: Duration, process_type: BlockProcessType, - reprocess_tx: mpsc::Sender, duplicate_cache: DuplicateCache, ) { // Check if the block is already being imported through another source @@ -131,7 +128,14 @@ impl NetworkBeaconProcessor { ignore_fn, }); - if reprocess_tx.try_send(reprocess_msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { error!(source = "rpc", %block_root,"Failed to inform block import") }; return; @@ -176,7 +180,14 @@ impl NetworkBeaconProcessor { block_root: *hash, parent_root, }; - if reprocess_tx.try_send(reprocess_msg).is_err() { + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { error!( source = "rpc", block_root = %hash, diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 292e894870f..d3de87dadac 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -237,7 +237,6 @@ impl TestRig { chain: harness.chain.clone(), network_tx, sync_tx, - reprocess_tx: work_reprocessing_tx.clone(), network_globals: network_globals.clone(), invalid_block_storage: InvalidBlockStorage::Disabled, executor: executor.clone(), diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 2a7bc597c26..3c91c1adbbf 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -10,9 +10,7 @@ use crate::service::NetworkMessage; use crate::status::status_message; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use beacon_processor::{ - work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, -}; +use beacon_processor::{BeaconProcessorSend, DuplicateCache}; use futures::prelude::*; use lighthouse_network::rpc::*; use lighthouse_network::{ @@ -87,7 +85,6 @@ impl Router { executor: task_executor::TaskExecutor, invalid_block_storage: InvalidBlockStorage, beacon_processor_send: BeaconProcessorSend, - beacon_processor_reprocess_tx: mpsc::Sender, fork_context: Arc, ) -> Result>, String> { trace!("Service starting"); @@ -103,7 +100,6 @@ impl Router { chain: beacon_chain.clone(), network_tx: network_send.clone(), sync_tx: sync_send.clone(), - reprocess_tx: beacon_processor_reprocess_tx, network_globals: network_globals.clone(), invalid_block_storage, executor: executor.clone(), diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 77204b455da..a396b360cbc 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -6,7 +6,7 @@ use crate::router::{Router, RouterMessage}; use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription}; use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; +use beacon_processor::BeaconProcessorSend; use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; @@ -204,7 +204,6 @@ impl NetworkService { executor: task_executor::TaskExecutor, libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, - beacon_processor_reprocess_tx: mpsc::Sender, ) -> Result< ( NetworkService, @@ -300,7 +299,6 @@ impl NetworkService { executor.clone(), invalid_block_storage, beacon_processor_send, - beacon_processor_reprocess_tx, fork_context.clone(), )?; @@ -352,7 +350,6 @@ impl NetworkService { executor: task_executor::TaskExecutor, libp2p_registry: Option<&'_ mut Registry>, beacon_processor_send: BeaconProcessorSend, - beacon_processor_reprocess_tx: mpsc::Sender, ) -> Result<(Arc>, NetworkSenders), String> { let (network_service, network_globals, network_senders) = Self::build( beacon_chain, @@ -360,7 +357,6 @@ impl NetworkService { executor.clone(), libp2p_registry, beacon_processor_send, - beacon_processor_reprocess_tx, ) .await?; diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 15c3321e94d..fbc72955815 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -137,7 +137,6 @@ fn test_removing_topic_weight_on_old_topics() { executor.clone(), None, beacon_processor_channels.beacon_processor_tx, - beacon_processor_channels.work_reprocessing_tx, ) .await .unwrap() From a8f3afe7eb79997442b458f67a9fa5e23c183c93 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 10 May 2025 23:38:13 -0700 Subject: [PATCH 2/8] fix tests --- beacon_node/http_api/tests/interactive_tests.rs | 12 +++++++----- .../network/src/network_beacon_processor/tests.rs | 4 ---- beacon_node/network/src/service/tests.rs | 3 --- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 447cb9c9997..cc32dce2b18 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -4,7 +4,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BlockStrategy, LightClientStrategy, SyncCommitteeStrategy}, ChainConfig, }; -use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; +use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, Work, WorkEvent}; use either::Either; use eth2::types::ProduceBlockV3Response; use eth2::types::{DepositContractData, StateId}; @@ -948,11 +948,13 @@ async fn queue_attestations_from_http() { .beacon_processor_send .as_ref() .unwrap() - .send(ReprocessQueueMessage::BlockImported { - block_root, - parent_root, + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::BlockImported { + block_root, + parent_root, + }), }) - .await .unwrap(); attestation_future.await.unwrap(); diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index d3de87dadac..753708a9c76 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -192,8 +192,6 @@ impl TestRig { let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); @@ -251,8 +249,6 @@ impl TestRig { } .spawn_manager( beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx, Some(work_journal_tx), harness.chain.slot_clock.clone(), chain.spec.maximum_gossip_clock_disparity(), diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index fbc72955815..db342117473 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -58,8 +58,6 @@ fn test_dht_persistence() { let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx: _beacon_processor_rx, - work_reprocessing_tx, - work_reprocessing_rx: _work_reprocessing_rx, } = <_>::default(); let _network_service = NetworkService::start( @@ -68,7 +66,6 @@ fn test_dht_persistence() { executor, None, beacon_processor_tx, - work_reprocessing_tx, ) .await .unwrap(); From 39c5411aab7584ef0dc43c4170500f3a113f68e7 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 10 May 2025 23:46:03 -0700 Subject: [PATCH 3/8] add allow_reprocess flag --- .../network/src/network_beacon_processor/gossip_methods.rs | 5 ++--- beacon_node/network/src/network_beacon_processor/mod.rs | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index d3bdf15cb13..529b0fdc716 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -408,7 +408,6 @@ impl NetworkBeaconProcessor { /// This function performs the conversion, and if successful queues a new message to be /// processed by `process_gossip_attestation`. If unsuccessful due to block unavailability, /// a retry message will be pushed. - /// TODO(beacon-processor) should we add a bool to enable/disable retry? #[allow(clippy::too_many_arguments)] pub fn process_gossip_attestation_to_convert( self: Arc, @@ -417,6 +416,7 @@ impl NetworkBeaconProcessor { single_attestation: Box, subnet_id: SubnetId, should_import: bool, + should_reprocess: bool, seen_timestamp: Duration, ) { let conversion_result = self.chain.with_committee_cache( @@ -495,6 +495,7 @@ impl NetworkBeaconProcessor { single_attestation, subnet_id, should_import, + false, seen_timestamp, ) }), @@ -1198,7 +1199,6 @@ impl NetworkBeaconProcessor { /// be downloaded. /// /// Raises a log if there are errors. - /// TODO(beacon-processor) allow_reprocess flag? #[allow(clippy::too_many_arguments)] pub async fn process_gossip_block( self: Arc, @@ -1530,7 +1530,6 @@ impl NetworkBeaconProcessor { /// Process the beacon block that has already passed gossip verification. /// /// Raises a log if there are errors. - /// TODO(beacon-processor) allow reprocess? pub async fn process_gossip_verified_block( self: Arc, peer_id: PeerId, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index a261ce2867b..eae6000eefa 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -92,6 +92,7 @@ impl NetworkBeaconProcessor { package.attestation, package.subnet_id, package.should_import, + true, package.seen_timestamp, ) }; From 110c4cc1143883b030b1ec0f07c6fdf14cd45fb5 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sat, 10 May 2025 23:55:53 -0700 Subject: [PATCH 4/8] fix --- .../gossip_methods.rs | 84 ++++++++++--------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 529b0fdc716..a2dc788adc5 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -469,46 +469,50 @@ impl NetworkBeaconProcessor { // Outermost error (from `with_committee_cache`) indicating that the block is not known // and that this conversion should be retried. Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, - ); - // We don't know the block, get the sync manager to handle the block lookup, and - // send the attestation to be scheduled for re-processing. - self.sync_tx - .send(SyncMessage::UnknownBlockHashFromAttestation( - peer_id, - beacon_block_root, - )) - .unwrap_or_else(|_| { - warn!(msg = "UnknownBlockHash", "Failed to send to sync service") - }); - let processor = self.clone(); - let msg_id = message_id.clone(); - // Do not allow this attestation to be re-processed beyond this point. - let reprocess_msg = - ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { - beacon_block_root, - process_fn: Box::new(move || { - processor.process_gossip_attestation_to_convert( - msg_id, - peer_id, - single_attestation, - subnet_id, - should_import, - false, - seen_timestamp, - ) - }), - }); - if self - .beacon_processor_send - .try_send(WorkEvent { - drop_during_sync: false, - work: Work::Reprocess(reprocess_msg), - }) - .is_err() - { - error!("Failed to send attestation for re-processing"); + if should_reprocess { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + + // We don't know the block, get the sync manager to handle the block lookup, and + // send the attestation to be scheduled for re-processing. + self.sync_tx + .send(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + beacon_block_root, + )) + .unwrap_or_else(|_| { + warn!(msg = "UnknownBlockHash", "Failed to send to sync service") + }); + let processor = self.clone(); + let msg_id = message_id.clone(); + // Do not allow this attestation to be re-processed beyond this point. + let reprocess_msg = + ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(move || { + processor.process_gossip_attestation_to_convert( + msg_id, + peer_id, + single_attestation, + subnet_id, + should_import, + false, + seen_timestamp, + ) + }), + }); + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { + error!("Failed to send attestation for re-processing"); + } + } else { // We shouldn't make any further attempts to process this attestation. // // Don't downscore the peer since it's not clear if we requested this head From e1ae209ade5c303397c51de251217ead611e3aed Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 11 May 2025 00:20:53 -0700 Subject: [PATCH 5/8] fix test --- beacon_node/beacon_processor/src/lib.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 57db21a767e..5c5e6c96ee7 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -770,7 +770,7 @@ struct InboundEvents { /// Used by upstream processes to send new work to the `BeaconProcessor`. event_rx: mpsc::Receiver>, /// Used internally for queuing work ready to be re-processed. - reprocess_work_rx: mpsc::Receiver, + ready_work_rx: mpsc::Receiver, } impl Stream for InboundEvents { @@ -791,7 +791,7 @@ impl Stream for InboundEvents { // Poll for delayed blocks before polling for new work. It might be the case that a delayed // block is required to successfully process some new work. - match self.reprocess_work_rx.poll_recv(cx) { + match self.ready_work_rx.poll_recv(cx) { Poll::Ready(Some(ready_work)) => { return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); } @@ -947,7 +947,7 @@ impl BeaconProcessor { let mut inbound_events = InboundEvents { idle_rx, event_rx, - reprocess_work_rx: ready_work_rx, + ready_work_rx, }; let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting; @@ -1023,8 +1023,10 @@ impl BeaconProcessor { .unwrap_or(WORKER_FREED); // We don't care if this message was successfully sent, we only use the journal - // during testing. - let _ = work_journal_tx.try_send(id); + // during testing. We also ignore reprocess messages to ensure our test cases can pass. + if id != "reprocess" { + let _ = work_journal_tx.try_send(id); + } } let can_spawn = self.current_workers < self.config.max_workers; From dd79afacac4c83ab6d0fa6082acbdf93baeaeba2 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 11 May 2025 17:48:46 -0700 Subject: [PATCH 6/8] Move all work queue structs and init logic to a separate file --- beacon_node/beacon_processor/src/lib.rs | 991 +++++++----------- .../beacon_processor/src/work_queue.rs | 398 +++++++ beacon_node/client/src/builder.rs | 2 +- beacon_node/http_api/src/test_utils.rs | 3 +- .../src/network_beacon_processor/tests.rs | 1 + 5 files changed, 784 insertions(+), 611 deletions(-) create mode 100644 beacon_node/beacon_processor/src/work_queue.rs diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 5c5e6c96ee7..1ec889614be 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -38,6 +38,7 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. +use crate::work_queue::BeaconProcessorQueueLengths; use crate::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage, }; @@ -45,12 +46,11 @@ use futures::stream::{Stream, StreamExt}; use futures::task::Poll; use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::crit; -use logging::TimeLatch; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use std::cmp; -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use std::fmt; use std::future::Future; use std::pin::Pin; @@ -63,9 +63,9 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, error, trace, warn}; use types::{ - Attestation, BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, - SingleAttestation, Slot, SubnetId, + Attestation, EthSpec, Hash256, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; +use work_queue::WorkQueues; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, @@ -73,6 +73,7 @@ use work_reprocessing_queue::{ use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest}; mod metrics; +pub mod work_queue; pub mod work_reprocessing_queue; /// The maximum size of the channel for work events to the `BeaconProcessor`. @@ -89,128 +90,6 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4; -/// Over-provision queues based on active validator count by some factor. The beacon chain has -/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning -/// slightly, we don't need to adjust the queues during the lifetime of a process. -const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; - -/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues -/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that -/// seems reasonable. -const MIN_QUEUE_LEN: usize = 128; - -/// Maximum number of queued items that will be stored before dropping them -pub struct BeaconProcessorQueueLengths { - aggregate_queue: usize, - attestation_queue: usize, - unknown_block_aggregate_queue: usize, - unknown_block_attestation_queue: usize, - sync_message_queue: usize, - sync_contribution_queue: usize, - gossip_voluntary_exit_queue: usize, - gossip_proposer_slashing_queue: usize, - gossip_attester_slashing_queue: usize, - unknown_light_client_update_queue: usize, - unknown_block_sampling_request_queue: usize, - rpc_block_queue: usize, - rpc_blob_queue: usize, - rpc_custody_column_queue: usize, - rpc_verify_data_column_queue: usize, - sampling_result_queue: usize, - chain_segment_queue: usize, - backfill_chain_segment: usize, - gossip_block_queue: usize, - gossip_blob_queue: usize, - gossip_data_column_queue: usize, - delayed_block_queue: usize, - status_queue: usize, - bbrange_queue: usize, - bbroots_queue: usize, - blbroots_queue: usize, - blbrange_queue: usize, - dcbroots_queue: usize, - dcbrange_queue: usize, - gossip_bls_to_execution_change_queue: usize, - lc_gossip_finality_update_queue: usize, - lc_gossip_optimistic_update_queue: usize, - lc_bootstrap_queue: usize, - lc_rpc_optimistic_update_queue: usize, - lc_rpc_finality_update_queue: usize, - lc_update_range_queue: usize, - api_request_p0_queue: usize, - api_request_p1_queue: usize, -} - -impl BeaconProcessorQueueLengths { - pub fn from_state( - state: &BeaconState, - spec: &ChainSpec, - ) -> Result { - let active_validator_count = - match state.get_cached_active_validator_indices(RelativeEpoch::Current) { - Ok(indices) => indices.len(), - Err(_) => state - .get_active_validator_indices(state.current_epoch(), spec) - .map_err(|e| format!("Error computing active indices: {:?}", e))? - .len(), - }; - let active_validator_count = - (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; - let slots_per_epoch = E::slots_per_epoch() as usize; - - Ok(Self { - aggregate_queue: 4096, - unknown_block_aggregate_queue: 1024, - // Capacity for a full slot's worth of attestations if subscribed to all subnets - attestation_queue: std::cmp::max( - active_validator_count / slots_per_epoch, - MIN_QUEUE_LEN, - ), - // Capacity for a full slot's worth of attestations if subscribed to all subnets - unknown_block_attestation_queue: std::cmp::max( - active_validator_count / slots_per_epoch, - MIN_QUEUE_LEN, - ), - sync_message_queue: 2048, - sync_contribution_queue: 1024, - gossip_voluntary_exit_queue: 4096, - gossip_proposer_slashing_queue: 4096, - gossip_attester_slashing_queue: 4096, - unknown_light_client_update_queue: 128, - rpc_block_queue: 1024, - rpc_blob_queue: 1024, - // TODO(das): Placeholder values - rpc_custody_column_queue: 1000, - rpc_verify_data_column_queue: 1000, - unknown_block_sampling_request_queue: 16384, - sampling_result_queue: 1000, - chain_segment_queue: 64, - backfill_chain_segment: 64, - gossip_block_queue: 1024, - gossip_blob_queue: 1024, - gossip_data_column_queue: 1024, - delayed_block_queue: 1024, - status_queue: 1024, - bbrange_queue: 1024, - bbroots_queue: 1024, - blbroots_queue: 1024, - blbrange_queue: 1024, - // TODO(das): pick proper values - dcbroots_queue: 1024, - dcbrange_queue: 1024, - gossip_bls_to_execution_change_queue: 16384, - lc_gossip_finality_update_queue: 1024, - lc_gossip_optimistic_update_queue: 1024, - lc_bootstrap_queue: 1024, - lc_rpc_optimistic_update_queue: 512, - lc_rpc_finality_update_queue: 512, - lc_update_range_queue: 512, - api_request_p0_queue: 1024, - api_request_p1_queue: 1024, - }) - } -} - /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -282,89 +161,6 @@ impl Default for BeaconProcessorChannels { } } -/// A simple first-in-first-out queue with a maximum length. -struct FifoQueue { - queue: VecDeque, - max_length: usize, -} - -impl FifoQueue { - /// Create a new, empty queue with the given length. - pub fn new(max_length: usize) -> Self { - Self { - queue: VecDeque::default(), - max_length, - } - } - - /// Add a new item to the queue. - /// - /// Drops `item` if the queue is full. - pub fn push(&mut self, item: T, item_desc: &str) { - if self.queue.len() == self.max_length { - error!( - msg = "the system has insufficient resources for load", - queue_len = self.max_length, - queue = item_desc, - "Work queue is full" - ) - } else { - self.queue.push_back(item); - } - } - - /// Remove the next item from the queue. - pub fn pop(&mut self) -> Option { - self.queue.pop_front() - } - - /// Returns the current length of the queue. - pub fn len(&self) -> usize { - self.queue.len() - } -} - -/// A simple last-in-first-out queue with a maximum length. -struct LifoQueue { - queue: VecDeque, - max_length: usize, -} - -impl LifoQueue { - /// Create a new, empty queue with the given length. - pub fn new(max_length: usize) -> Self { - Self { - queue: VecDeque::default(), - max_length, - } - } - - /// Add a new item to the front of the queue. - /// - /// If the queue is full, the item at the back of the queue is dropped. - pub fn push(&mut self, item: T) { - if self.queue.len() == self.max_length { - self.queue.pop_back(); - } - self.queue.push_front(item); - } - - /// Remove the next item from the queue. - pub fn pop(&mut self) -> Option { - self.queue.pop_front() - } - - /// Returns `true` if the queue is full. - pub fn is_full(&self) -> bool { - self.queue.len() >= self.max_length - } - - /// Returns the current length of the queue. - pub fn len(&self) -> usize { - self.queue.len() - } -} - /// A handle that sends a message on the provided channel to a receiver when it gets dropped. /// /// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache` @@ -850,78 +646,8 @@ impl BeaconProcessor { // Used by workers to communicate that they are finished a task. let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); - // Using LIFO queues for attestations since validator profits rely upon getting fresh - // attestations into blocks. Additionally, later attestations contain more information than - // earlier ones, so we consider them more valuable. - let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); - let mut aggregate_debounce = TimeLatch::default(); - let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); - let mut attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue); - let mut attestation_debounce = TimeLatch::default(); - let mut unknown_block_aggregate_queue = - LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); - let mut unknown_block_attestation_queue = - LifoQueue::new(queue_lengths.unknown_block_attestation_queue); - - let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); - let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); - - // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have - // a strong feeling about queue type for exits. - let mut gossip_voluntary_exit_queue = - FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue); - - // Using a FIFO queue for slashing to prevent people from flushing their slashings from the - // queues with lots of junk messages. - let mut gossip_proposer_slashing_queue = - FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue); - let mut gossip_attester_slashing_queue = - FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); - - // Using a FIFO queue since blocks need to be imported sequentially. - let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); - let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); - let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); - let mut rpc_verify_data_column_queue = - FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); - // TODO(das): the sampling_request_queue is never read - let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); - let mut unknown_block_sampling_request_queue = - FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); - let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); - let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); - let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); - let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); - let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); - let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); - - let mut status_queue = FifoQueue::new(queue_lengths.status_queue); - let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue); - let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); - let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); - let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); - let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); - let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); - - let mut gossip_bls_to_execution_change_queue = - FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); - - // Using FIFO queues for light client updates to maintain sequence order. - let mut lc_gossip_finality_update_queue = - FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue); - let mut lc_gossip_optimistic_update_queue = - FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue); - let mut unknown_light_client_update_queue = - FifoQueue::new(queue_lengths.unknown_light_client_update_queue); - let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); - let mut lc_rpc_optimistic_update_queue = - FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue); - let mut lc_rpc_finality_update_queue = - FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue); - let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue); - - let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); - let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); + // Initialize the worker queues. + let mut work_queues: WorkQueues = WorkQueues::new(queue_lengths); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). @@ -1043,234 +769,248 @@ impl BeaconProcessor { None if can_spawn => { // Check for chain segments first, they're the most efficient way to get // blocks into the system. - let work_event: Option> = - if let Some(item) = chain_segment_queue.pop() { - Some(item) - // Check sync blocks before gossip blocks, since we've already explicitly - // requested these blocks. - } else if let Some(item) = rpc_block_queue.pop() { - Some(item) - } else if let Some(item) = rpc_blob_queue.pop() { - Some(item) - } else if let Some(item) = rpc_custody_column_queue.pop() { - Some(item) - // TODO(das): decide proper prioritization for sampling columns - } else if let Some(item) = rpc_custody_column_queue.pop() { - Some(item) - } else if let Some(item) = rpc_verify_data_column_queue.pop() { - Some(item) - } else if let Some(item) = sampling_result_queue.pop() { - Some(item) - // Check delayed blocks before gossip blocks, the gossip blocks might rely - // on the delayed ones. - } else if let Some(item) = delayed_block_queue.pop() { - Some(item) - // Check gossip blocks before gossip attestations, since a block might be - // required to verify some attestations. - } else if let Some(item) = gossip_block_queue.pop() { - Some(item) - } else if let Some(item) = gossip_blob_queue.pop() { - Some(item) - } else if let Some(item) = gossip_data_column_queue.pop() { - Some(item) - // Check the priority 0 API requests after blocks and blobs, but before attestations. - } else if let Some(item) = api_request_p0_queue.pop() { - Some(item) - // Check the aggregates, *then* the unaggregates since we assume that - // aggregates are more valuable to local validators and effectively give us - // more information with less signature verification time. - } else if aggregate_queue.len() > 0 { - let batch_size = cmp::min( - aggregate_queue.len(), - self.config.max_gossip_aggregate_batch_size, - ); - - if batch_size < 2 { - // One single aggregate is in the queue, process it individually. - aggregate_queue.pop() - } else { - // Collect two or more aggregates into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAggregate` item into a - // `Work::GossipAggregateBatch` item. - let mut aggregates = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = aggregate_queue.pop() { - match item { - Work::GossipAggregate { - aggregate, - process_individual: _, - process_batch, - } => { - aggregates.push(*aggregate); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } - } - _ => { - error!("Invalid item in aggregate queue"); + let work_event: Option> = if let Some(item) = + work_queues.chain_segment_queue.pop() + { + Some(item) + // Check sync blocks before gossip blocks, since we've already explicitly + // requested these blocks. + } else if let Some(item) = work_queues.rpc_block_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_blob_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { + Some(item) + // TODO(das): decide proper prioritization for sampling columns + } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_verify_data_column_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.sampling_result_queue.pop() { + Some(item) + // Check delayed blocks before gossip blocks, the gossip blocks might rely + // on the delayed ones. + } else if let Some(item) = work_queues.delayed_block_queue.pop() { + Some(item) + // Check gossip blocks before gossip attestations, since a block might be + // required to verify some attestations. + } else if let Some(item) = work_queues.gossip_block_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.gossip_blob_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.gossip_data_column_queue.pop() { + Some(item) + // Check the priority 0 API requests after blocks and blobs, but before attestations. + } else if let Some(item) = work_queues.api_request_p0_queue.pop() { + Some(item) + // Check the aggregates, *then* the unaggregates since we assume that + // aggregates are more valuable to local validators and effectively give us + // more information with less signature verification time. + } else if !work_queues.aggregate_queue.is_empty() { + let batch_size = cmp::min( + work_queues.aggregate_queue.len(), + self.config.max_gossip_aggregate_batch_size, + ); + + if batch_size < 2 { + // One single aggregate is in the queue, process it individually. + work_queues.aggregate_queue.pop() + } else { + // Collect two or more aggregates into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAggregate` item into a + // `Work::GossipAggregateBatch` item. + let mut aggregates = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = work_queues.aggregate_queue.pop() { + match item { + Work::GossipAggregate { + aggregate, + process_individual: _, + process_batch, + } => { + aggregates.push(*aggregate); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); } } + _ => { + error!("Invalid item in aggregate queue"); + } } } - - if let Some(process_batch) = process_batch_opt { - // Process all aggregates with a single worker. - Some(Work::GossipAggregateBatch { - aggregates, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing aggregate work"); - None - } } - // Check the unaggregated attestation queue. - // - // Potentially use batching. - } else if attestation_queue.len() > 0 { - let batch_size = cmp::min( - attestation_queue.len(), - self.config.max_gossip_attestation_batch_size, - ); - - if batch_size < 2 { - // One single attestation is in the queue, process it individually. - attestation_queue.pop() + + if let Some(process_batch) = process_batch_opt { + // Process all aggregates with a single worker. + Some(Work::GossipAggregateBatch { + aggregates, + process_batch, + }) } else { - // Collect two or more attestations into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAttestation` item into a - // `Work::GossipAttestationBatch` item. - let mut attestations = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = attestation_queue.pop() { - match item { - Work::GossipAttestation { - attestation, - process_individual: _, - process_batch, - } => { - attestations.push(*attestation); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing aggregate work"); + None + } + } + // Check the unaggregated attestation queue. + // + // Potentially use batching. + } else if !work_queues.attestation_queue.is_empty() { + let batch_size = cmp::min( + work_queues.attestation_queue.len(), + self.config.max_gossip_attestation_batch_size, + ); + + if batch_size < 2 { + // One single attestation is in the queue, process it individually. + work_queues.attestation_queue.pop() + } else { + // Collect two or more attestations into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAttestation` item into a + // `Work::GossipAttestationBatch` item. + let mut attestations = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = work_queues.attestation_queue.pop() { + match item { + Work::GossipAttestation { + attestation, + process_individual: _, + process_batch, + } => { + attestations.push(*attestation); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); } - _ => error!("Invalid item in attestation queue"), } + _ => error!("Invalid item in attestation queue"), } } - - if let Some(process_batch) = process_batch_opt { - // Process all attestations with a single worker. - Some(Work::GossipAttestationBatch { - attestations, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing attestations work"); - None - } } - // Convert any gossip attestations that need to be converted. - } else if let Some(item) = attestation_to_convert_queue.pop() { - Some(item) - // Check sync committee messages after attestations as their rewards are lesser - // and they don't influence fork choice. - } else if let Some(item) = sync_contribution_queue.pop() { - Some(item) - } else if let Some(item) = sync_message_queue.pop() { - Some(item) - // Aggregates and unaggregates queued for re-processing are older and we - // care about fresher ones, so check those first. - } else if let Some(item) = unknown_block_aggregate_queue.pop() { - Some(item) - } else if let Some(item) = unknown_block_attestation_queue.pop() { - Some(item) - // Check RPC methods next. Status messages are needed for sync so - // prioritize them over syncing requests from other peers (BlocksByRange - // and BlocksByRoot) - } else if let Some(item) = status_queue.pop() { - Some(item) - } else if let Some(item) = bbrange_queue.pop() { - Some(item) - } else if let Some(item) = bbroots_queue.pop() { - Some(item) - } else if let Some(item) = blbrange_queue.pop() { - Some(item) - } else if let Some(item) = blbroots_queue.pop() { - Some(item) - } else if let Some(item) = dcbroots_queue.pop() { - Some(item) - } else if let Some(item) = dcbrange_queue.pop() { - Some(item) - // Prioritize sampling requests after block syncing requests - } else if let Some(item) = unknown_block_sampling_request_queue.pop() { - Some(item) - // Check slashings after all other consensus messages so we prioritize - // following head. - // - // Check attester slashings before proposer slashings since they have the - // potential to slash multiple validators at once. - } else if let Some(item) = gossip_attester_slashing_queue.pop() { - Some(item) - } else if let Some(item) = gossip_proposer_slashing_queue.pop() { - Some(item) - // Check exits and address changes late since our validators don't get - // rewards from them. - } else if let Some(item) = gossip_voluntary_exit_queue.pop() { - Some(item) - } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { - Some(item) - // Check the priority 1 API requests after we've - // processed all the interesting things from the network - // and things required for us to stay in good repute - // with our P2P peers. - } else if let Some(item) = api_request_p1_queue.pop() { - Some(item) - // Handle backfill sync chain segments. - } else if let Some(item) = backfill_chain_segment.pop() { - Some(item) - // Handle light client requests. - } else if let Some(item) = lc_gossip_finality_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_gossip_optimistic_update_queue.pop() { - Some(item) - } else if let Some(item) = unknown_light_client_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_bootstrap_queue.pop() { - Some(item) - } else if let Some(item) = lc_rpc_optimistic_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_rpc_finality_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_update_range_queue.pop() { - Some(item) - // This statement should always be the final else statement. - } else { - // Let the journal know that a worker is freed and there's nothing else - // for it to do. - if let Some(work_journal_tx) = &work_journal_tx { - // We don't care if this message was successfully sent, we only use the journal - // during testing. - let _ = work_journal_tx.try_send(NOTHING_TO_DO); + + if let Some(process_batch) = process_batch_opt { + // Process all attestations with a single worker. + Some(Work::GossipAttestationBatch { + attestations, + process_batch, + }) + } else { + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing attestations work"); + None } - None - }; + } + // Convert any gossip attestations that need to be converted. + } else if let Some(item) = work_queues.attestation_to_convert_queue.pop() { + Some(item) + // Check sync committee messages after attestations as their rewards are lesser + // and they don't influence fork choice. + } else if let Some(item) = work_queues.sync_contribution_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.sync_message_queue.pop() { + Some(item) + // Aggregates and unaggregates queued for re-processing are older and we + // care about fresher ones, so check those first. + } else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.unknown_block_attestation_queue.pop() + { + Some(item) + // Check RPC methods next. Status messages are needed for sync so + // prioritize them over syncing requests from other peers (BlocksByRange + // and BlocksByRoot) + } else if let Some(item) = work_queues.status_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.bbrange_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.bbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.blbrange_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.blbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.dcbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.dcbrange_queue.pop() { + Some(item) + // Prioritize sampling requests after block syncing requests + } else if let Some(item) = + work_queues.unknown_block_sampling_request_queue.pop() + { + Some(item) + // Check slashings after all other consensus messages so we prioritize + // following head. + // + // Check attester slashings before proposer slashings since they have the + // potential to slash multiple validators at once. + } else if let Some(item) = work_queues.gossip_attester_slashing_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.gossip_proposer_slashing_queue.pop() + { + Some(item) + // Check exits and address changes late since our validators don't get + // rewards from them. + } else if let Some(item) = work_queues.gossip_voluntary_exit_queue.pop() { + Some(item) + } else if let Some(item) = + work_queues.gossip_bls_to_execution_change_queue.pop() + { + Some(item) + // Check the priority 1 API requests after we've + // processed all the interesting things from the network + // and things required for us to stay in good repute + // with our P2P peers. + } else if let Some(item) = work_queues.api_request_p1_queue.pop() { + Some(item) + // Handle backfill sync chain segments. + } else if let Some(item) = work_queues.backfill_chain_segment.pop() { + Some(item) + // Handle light client requests. + } else if let Some(item) = work_queues.lc_gossip_finality_update_queue.pop() + { + Some(item) + } else if let Some(item) = + work_queues.lc_gossip_optimistic_update_queue.pop() + { + Some(item) + } else if let Some(item) = + work_queues.unknown_light_client_update_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.lc_bootstrap_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.lc_rpc_optimistic_update_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.lc_rpc_finality_update_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.lc_update_range_queue.pop() { + Some(item) + // This statement should always be the final else statement. + } else { + // Let the journal know that a worker is freed and there's nothing else + // for it to do. + if let Some(work_journal_tx) = &work_journal_tx { + // We don't care if this message was successfully sent, we only use the journal + // during testing. + let _ = work_journal_tx.try_send(NOTHING_TO_DO); + } + None + }; if let Some(work_event) = work_event { let work_type = work_event.to_type(); @@ -1323,9 +1063,11 @@ impl BeaconProcessor { } } _ if can_spawn => self.spawn_worker(work, idle_tx), - Work::GossipAttestation { .. } => attestation_queue.push(work), + Work::GossipAttestation { .. } => { + work_queues.attestation_queue.push(work) + } Work::GossipAttestationToConvert { .. } => { - attestation_to_convert_queue.push(work) + work_queues.attestation_to_convert_queue.push(work) } // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. @@ -1333,7 +1075,7 @@ impl BeaconProcessor { work_type = "GossipAttestationBatch", "Unsupported inbound event" ), - Work::GossipAggregate { .. } => aggregate_queue.push(work), + Work::GossipAggregate { .. } => work_queues.aggregate_queue.push(work), // Aggregate batches are formed internally within the `BeaconProcessor`, // they are not sent from external services. Work::GossipAggregateBatch { .. } => { @@ -1342,88 +1084,110 @@ impl BeaconProcessor { "Unsupported inbound event" ) } - Work::GossipBlock { .. } => gossip_block_queue.push(work, work_id), - Work::GossipBlobSidecar { .. } => gossip_blob_queue.push(work, work_id), + Work::GossipBlock { .. } => { + work_queues.gossip_block_queue.push(work, work_id) + } + Work::GossipBlobSidecar { .. } => { + work_queues.gossip_blob_queue.push(work, work_id) + } Work::GossipDataColumnSidecar { .. } => { - gossip_data_column_queue.push(work, work_id) + work_queues.gossip_data_column_queue.push(work, work_id) } Work::DelayedImportBlock { .. } => { - delayed_block_queue.push(work, work_id) + work_queues.delayed_block_queue.push(work, work_id) } Work::GossipVoluntaryExit { .. } => { - gossip_voluntary_exit_queue.push(work, work_id) + work_queues.gossip_voluntary_exit_queue.push(work, work_id) } - Work::GossipProposerSlashing { .. } => { - gossip_proposer_slashing_queue.push(work, work_id) + Work::GossipProposerSlashing { .. } => work_queues + .gossip_proposer_slashing_queue + .push(work, work_id), + Work::GossipAttesterSlashing { .. } => work_queues + .gossip_attester_slashing_queue + .push(work, work_id), + Work::GossipSyncSignature { .. } => { + work_queues.sync_message_queue.push(work) } - Work::GossipAttesterSlashing { .. } => { - gossip_attester_slashing_queue.push(work, work_id) - } - Work::GossipSyncSignature { .. } => sync_message_queue.push(work), Work::GossipSyncContribution { .. } => { - sync_contribution_queue.push(work) - } - Work::GossipLightClientFinalityUpdate { .. } => { - lc_gossip_finality_update_queue.push(work, work_id) - } - Work::GossipLightClientOptimisticUpdate { .. } => { - lc_gossip_optimistic_update_queue.push(work, work_id) + work_queues.sync_contribution_queue.push(work) } + Work::GossipLightClientFinalityUpdate { .. } => work_queues + .lc_gossip_finality_update_queue + .push(work, work_id), + Work::GossipLightClientOptimisticUpdate { .. } => work_queues + .lc_gossip_optimistic_update_queue + .push(work, work_id), Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { - rpc_block_queue.push(work, work_id) + work_queues.rpc_block_queue.push(work, work_id) } - Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id), + Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), Work::RpcCustodyColumn { .. } => { - rpc_custody_column_queue.push(work, work_id) + work_queues.rpc_custody_column_queue.push(work, work_id) } Work::RpcVerifyDataColumn(_) => { - rpc_verify_data_column_queue.push(work, work_id) + work_queues.rpc_verify_data_column_queue.push(work, work_id) + } + Work::SamplingResult(_) => { + work_queues.sampling_result_queue.push(work, work_id) + } + Work::ChainSegment { .. } => { + work_queues.chain_segment_queue.push(work, work_id) } - Work::SamplingResult(_) => sampling_result_queue.push(work, work_id), - Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id), Work::ChainSegmentBackfill { .. } => { - backfill_chain_segment.push(work, work_id) + work_queues.backfill_chain_segment.push(work, work_id) } - Work::Status { .. } => status_queue.push(work, work_id), - Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id), - Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id), - Work::BlobsByRangeRequest { .. } => blbrange_queue.push(work, work_id), - Work::LightClientBootstrapRequest { .. } => { - lc_bootstrap_queue.push(work, work_id) + Work::Status { .. } => work_queues.status_queue.push(work, work_id), + Work::BlocksByRangeRequest { .. } => { + work_queues.bbrange_queue.push(work, work_id) + } + Work::BlocksByRootsRequest { .. } => { + work_queues.bbroots_queue.push(work, work_id) } - Work::LightClientOptimisticUpdateRequest { .. } => { - lc_rpc_optimistic_update_queue.push(work, work_id) + Work::BlobsByRangeRequest { .. } => { + work_queues.blbrange_queue.push(work, work_id) } + Work::LightClientBootstrapRequest { .. } => { + work_queues.lc_bootstrap_queue.push(work, work_id) + } + Work::LightClientOptimisticUpdateRequest { .. } => work_queues + .lc_rpc_optimistic_update_queue + .push(work, work_id), Work::LightClientFinalityUpdateRequest { .. } => { - lc_rpc_finality_update_queue.push(work, work_id) + work_queues.lc_rpc_finality_update_queue.push(work, work_id) } Work::LightClientUpdatesByRangeRequest { .. } => { - lc_update_range_queue.push(work, work_id) + work_queues.lc_update_range_queue.push(work, work_id) } Work::UnknownBlockAttestation { .. } => { - unknown_block_attestation_queue.push(work) + work_queues.unknown_block_attestation_queue.push(work) } Work::UnknownBlockAggregate { .. } => { - unknown_block_aggregate_queue.push(work) + work_queues.unknown_block_aggregate_queue.push(work) } - Work::GossipBlsToExecutionChange { .. } => { - gossip_bls_to_execution_change_queue.push(work, work_id) + Work::GossipBlsToExecutionChange { .. } => work_queues + .gossip_bls_to_execution_change_queue + .push(work, work_id), + Work::BlobsByRootsRequest { .. } => { + work_queues.blbroots_queue.push(work, work_id) } - Work::BlobsByRootsRequest { .. } => blbroots_queue.push(work, work_id), Work::DataColumnsByRootsRequest { .. } => { - dcbroots_queue.push(work, work_id) + work_queues.dcbroots_queue.push(work, work_id) } Work::DataColumnsByRangeRequest { .. } => { - dcbrange_queue.push(work, work_id) + work_queues.dcbrange_queue.push(work, work_id) } - Work::UnknownLightClientOptimisticUpdate { .. } => { - unknown_light_client_update_queue.push(work, work_id) + Work::UnknownLightClientOptimisticUpdate { .. } => work_queues + .unknown_light_client_update_queue + .push(work, work_id), + Work::UnknownBlockSamplingRequest { .. } => work_queues + .unknown_block_sampling_request_queue + .push(work, work_id), + Work::ApiRequestP0 { .. } => { + work_queues.api_request_p0_queue.push(work, work_id) } - Work::UnknownBlockSamplingRequest { .. } => { - unknown_block_sampling_request_queue.push(work, work_id) + Work::ApiRequestP1 { .. } => { + work_queues.api_request_p1_queue.push(work, work_id) } - Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id), - Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id), }; Some(work_type) } @@ -1436,61 +1200,85 @@ impl BeaconProcessor { if let Some(modified_queue_id) = modified_queue_id { let queue_len = match modified_queue_id { - WorkType::GossipAttestation => attestation_queue.len(), - WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), - WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), + WorkType::GossipAttestation => work_queues.attestation_queue.len(), + WorkType::GossipAttestationToConvert => { + work_queues.attestation_to_convert_queue.len() + } + WorkType::UnknownBlockAttestation => { + work_queues.unknown_block_attestation_queue.len() + } WorkType::GossipAttestationBatch => 0, // No queue - WorkType::GossipAggregate => aggregate_queue.len(), - WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::GossipAggregate => work_queues.aggregate_queue.len(), + WorkType::UnknownBlockAggregate => { + work_queues.unknown_block_aggregate_queue.len() + } WorkType::UnknownLightClientOptimisticUpdate => { - unknown_light_client_update_queue.len() + work_queues.unknown_light_client_update_queue.len() } WorkType::UnknownBlockSamplingRequest => { - unknown_block_sampling_request_queue.len() + work_queues.unknown_block_sampling_request_queue.len() } WorkType::GossipAggregateBatch => 0, // No queue - WorkType::GossipBlock => gossip_block_queue.len(), - WorkType::GossipBlobSidecar => gossip_blob_queue.len(), - WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), - WorkType::DelayedImportBlock => delayed_block_queue.len(), - WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), - WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), - WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), - WorkType::GossipSyncSignature => sync_message_queue.len(), - WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipBlock => work_queues.gossip_block_queue.len(), + WorkType::GossipBlobSidecar => work_queues.gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => { + work_queues.gossip_data_column_queue.len() + } + WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => { + work_queues.gossip_voluntary_exit_queue.len() + } + WorkType::GossipProposerSlashing => { + work_queues.gossip_proposer_slashing_queue.len() + } + WorkType::GossipAttesterSlashing => { + work_queues.gossip_attester_slashing_queue.len() + } + WorkType::GossipSyncSignature => work_queues.sync_message_queue.len(), + WorkType::GossipSyncContribution => { + work_queues.sync_contribution_queue.len() + } WorkType::GossipLightClientFinalityUpdate => { - lc_gossip_finality_update_queue.len() + work_queues.lc_gossip_finality_update_queue.len() } WorkType::GossipLightClientOptimisticUpdate => { - lc_gossip_optimistic_update_queue.len() + work_queues.lc_gossip_optimistic_update_queue.len() + } + WorkType::RpcBlock => work_queues.rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { + work_queues.rpc_blob_queue.len() + } + WorkType::RpcCustodyColumn => work_queues.rpc_custody_column_queue.len(), + WorkType::RpcVerifyDataColumn => { + work_queues.rpc_verify_data_column_queue.len() } - WorkType::RpcBlock => rpc_block_queue.len(), - WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), - WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(), - WorkType::SamplingResult => sampling_result_queue.len(), - WorkType::ChainSegment => chain_segment_queue.len(), - WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), - WorkType::Status => status_queue.len(), - WorkType::BlocksByRangeRequest => blbrange_queue.len(), - WorkType::BlocksByRootsRequest => blbroots_queue.len(), - WorkType::BlobsByRangeRequest => bbrange_queue.len(), - WorkType::BlobsByRootsRequest => bbroots_queue.len(), - WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), - WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::SamplingResult => work_queues.sampling_result_queue.len(), + WorkType::ChainSegment => work_queues.chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => work_queues.backfill_chain_segment.len(), + WorkType::Status => work_queues.status_queue.len(), + WorkType::BlocksByRangeRequest => work_queues.blbrange_queue.len(), + WorkType::BlocksByRootsRequest => work_queues.blbroots_queue.len(), + WorkType::BlobsByRangeRequest => work_queues.bbrange_queue.len(), + WorkType::BlobsByRootsRequest => work_queues.bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => work_queues.dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => work_queues.dcbrange_queue.len(), WorkType::GossipBlsToExecutionChange => { - gossip_bls_to_execution_change_queue.len() + work_queues.gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => { + work_queues.lc_bootstrap_queue.len() } - WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), WorkType::LightClientOptimisticUpdateRequest => { - lc_rpc_optimistic_update_queue.len() + work_queues.lc_rpc_optimistic_update_queue.len() } WorkType::LightClientFinalityUpdateRequest => { - lc_rpc_finality_update_queue.len() + work_queues.lc_rpc_finality_update_queue.len() + } + WorkType::LightClientUpdatesByRangeRequest => { + work_queues.lc_update_range_queue.len() } - WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), - WorkType::ApiRequestP0 => api_request_p0_queue.len(), - WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::ApiRequestP0 => work_queues.api_request_p0_queue.len(), + WorkType::ApiRequestP1 => work_queues.api_request_p1_queue.len(), WorkType::Reprocess => 0, }; metrics::observe_vec( @@ -1500,18 +1288,21 @@ impl BeaconProcessor { ); } - if aggregate_queue.is_full() && aggregate_debounce.elapsed() { + if work_queues.aggregate_queue.is_full() && work_queues.aggregate_debounce.elapsed() + { error!( msg = "the system has insufficient resources for load", - queue_len = aggregate_queue.max_length, + queue_len = work_queues.aggregate_queue.max_length, "Aggregate attestation queue full" ) } - if attestation_queue.is_full() && attestation_debounce.elapsed() { + if work_queues.attestation_queue.is_full() + && work_queues.attestation_debounce.elapsed() + { error!( msg = "the system has insufficient resources for load", - queue_len = attestation_queue.max_length, + queue_len = work_queues.attestation_queue.max_length, "Attestation queue full" ) } @@ -1719,21 +1510,3 @@ impl Drop for SendOnDrop { } } } - -#[cfg(test)] -mod tests { - use super::*; - use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec}; - - #[test] - fn min_queue_len() { - // State with no validators. - let spec = ForkName::latest_stable().make_genesis_spec(ChainSpec::mainnet()); - let genesis_time = 0; - let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); - assert_eq!(state.validators().len(), 0); - let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap(); - assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); - assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); - } -} diff --git a/beacon_node/beacon_processor/src/work_queue.rs b/beacon_node/beacon_processor/src/work_queue.rs new file mode 100644 index 00000000000..a1b08125579 --- /dev/null +++ b/beacon_node/beacon_processor/src/work_queue.rs @@ -0,0 +1,398 @@ +use crate::Work; +use logging::TimeLatch; +use std::collections::VecDeque; +use tracing::error; +use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; + +/// Over-provision queues based on active validator count by some factor. The beacon chain has +/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning +/// slightly, we don't need to adjust the queues during the lifetime of a process. +const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; + +/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues +/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that +/// seems reasonable. +const MIN_QUEUE_LEN: usize = 128; + +/// A simple first-in-first-out queue with a maximum length. +pub struct FifoQueue { + queue: VecDeque, + max_length: usize, +} + +impl FifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the queue. + /// + /// Drops `item` if the queue is full. + pub fn push(&mut self, item: T, item_desc: &str) { + if self.queue.len() == self.max_length { + error!( + queue = item_desc, + queue_len = self.max_length, + msg = "the system has insufficient resources for load", + "Work queue is full", + ) + } else { + self.queue.push_back(item); + } + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +/// A simple last-in-first-out queue with a maximum length. +pub struct LifoQueue { + queue: VecDeque, + pub max_length: usize, +} + +impl LifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the front of the queue. + /// + /// If the queue is full, the item at the back of the queue is dropped. + pub fn push(&mut self, item: T) { + if self.queue.len() == self.max_length { + self.queue.pop_back(); + } + self.queue.push_front(item); + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Returns `true` if the queue is full. + pub fn is_full(&self) -> bool { + self.queue.len() >= self.max_length + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +/// Maximum number of queued items that will be stored before dropping them +pub struct BeaconProcessorQueueLengths { + aggregate_queue: usize, + attestation_queue: usize, + unknown_block_aggregate_queue: usize, + unknown_block_attestation_queue: usize, + sync_message_queue: usize, + sync_contribution_queue: usize, + gossip_voluntary_exit_queue: usize, + gossip_proposer_slashing_queue: usize, + gossip_attester_slashing_queue: usize, + unknown_light_client_update_queue: usize, + unknown_block_sampling_request_queue: usize, + rpc_block_queue: usize, + rpc_blob_queue: usize, + rpc_custody_column_queue: usize, + rpc_verify_data_column_queue: usize, + sampling_result_queue: usize, + chain_segment_queue: usize, + backfill_chain_segment: usize, + gossip_block_queue: usize, + gossip_blob_queue: usize, + gossip_data_column_queue: usize, + delayed_block_queue: usize, + status_queue: usize, + bbrange_queue: usize, + bbroots_queue: usize, + blbroots_queue: usize, + blbrange_queue: usize, + dcbroots_queue: usize, + dcbrange_queue: usize, + gossip_bls_to_execution_change_queue: usize, + lc_bootstrap_queue: usize, + lc_rpc_optimistic_update_queue: usize, + lc_rpc_finality_update_queue: usize, + lc_gossip_finality_update_queue: usize, + lc_gossip_optimistic_update_queue: usize, + lc_update_range_queue: usize, + api_request_p0_queue: usize, + api_request_p1_queue: usize, +} + +impl BeaconProcessorQueueLengths { + pub fn from_state( + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { + let active_validator_count = + match state.get_cached_active_validator_indices(RelativeEpoch::Current) { + Ok(indices) => indices.len(), + Err(_) => state + .get_active_validator_indices(state.current_epoch(), spec) + .map_err(|e| format!("Error computing active indices: {:?}", e))? + .len(), + }; + let active_validator_count = + (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; + let slots_per_epoch = E::slots_per_epoch() as usize; + + Ok(Self { + aggregate_queue: 4096, + unknown_block_aggregate_queue: 1024, + // Capacity for a full slot's worth of attestations if subscribed to all subnets + attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), + // Capacity for a full slot's worth of attestations if subscribed to all subnets + unknown_block_attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), + sync_message_queue: 2048, + sync_contribution_queue: 1024, + gossip_voluntary_exit_queue: 4096, + gossip_proposer_slashing_queue: 4096, + gossip_attester_slashing_queue: 4096, + unknown_block_sampling_request_queue: 16384, + unknown_light_client_update_queue: 128, + rpc_block_queue: 1024, + rpc_blob_queue: 1024, + // TODO(das): Placeholder values + rpc_custody_column_queue: 1000, + rpc_verify_data_column_queue: 1000, + sampling_result_queue: 1000, + chain_segment_queue: 64, + backfill_chain_segment: 64, + gossip_block_queue: 1024, + gossip_blob_queue: 1024, + gossip_data_column_queue: 1024, + delayed_block_queue: 1024, + status_queue: 1024, + bbrange_queue: 1024, + bbroots_queue: 1024, + blbroots_queue: 1024, + blbrange_queue: 1024, + // TODO(das): pick proper values + dcbroots_queue: 1024, + dcbrange_queue: 1024, + gossip_bls_to_execution_change_queue: 16384, + lc_gossip_finality_update_queue: 1024, + lc_gossip_optimistic_update_queue: 1024, + lc_bootstrap_queue: 1024, + lc_rpc_optimistic_update_queue: 512, + lc_rpc_finality_update_queue: 512, + lc_update_range_queue: 512, + api_request_p0_queue: 1024, + api_request_p1_queue: 1024, + }) + } +} + +pub struct WorkQueues { + pub aggregate_queue: LifoQueue>, + pub aggregate_debounce: TimeLatch, + pub attestation_queue: LifoQueue>, + pub attestation_to_convert_queue: LifoQueue>, + pub attestation_debounce: TimeLatch, + pub unknown_block_aggregate_queue: LifoQueue>, + pub unknown_block_attestation_queue: LifoQueue>, + pub sync_message_queue: LifoQueue>, + pub sync_contribution_queue: LifoQueue>, + pub gossip_voluntary_exit_queue: FifoQueue>, + pub gossip_proposer_slashing_queue: FifoQueue>, + pub gossip_attester_slashing_queue: FifoQueue>, + pub unknown_light_client_update_queue: FifoQueue>, + pub unknown_block_sampling_request_queue: FifoQueue>, + pub rpc_block_queue: FifoQueue>, + pub rpc_blob_queue: FifoQueue>, + pub rpc_custody_column_queue: FifoQueue>, + pub rpc_verify_data_column_queue: FifoQueue>, + pub sampling_result_queue: FifoQueue>, + pub chain_segment_queue: FifoQueue>, + pub backfill_chain_segment: FifoQueue>, + pub gossip_block_queue: FifoQueue>, + pub gossip_blob_queue: FifoQueue>, + pub gossip_data_column_queue: FifoQueue>, + pub delayed_block_queue: FifoQueue>, + pub status_queue: FifoQueue>, + pub bbrange_queue: FifoQueue>, + pub bbroots_queue: FifoQueue>, + pub blbroots_queue: FifoQueue>, + pub blbrange_queue: FifoQueue>, + pub dcbroots_queue: FifoQueue>, + pub dcbrange_queue: FifoQueue>, + pub gossip_bls_to_execution_change_queue: FifoQueue>, + pub lc_gossip_finality_update_queue: FifoQueue>, + pub lc_gossip_optimistic_update_queue: FifoQueue>, + pub lc_bootstrap_queue: FifoQueue>, + pub lc_rpc_optimistic_update_queue: FifoQueue>, + pub lc_rpc_finality_update_queue: FifoQueue>, + pub lc_update_range_queue: FifoQueue>, + pub api_request_p0_queue: FifoQueue>, + pub api_request_p1_queue: FifoQueue>, +} + +impl WorkQueues { + pub fn new(queue_lengths: BeaconProcessorQueueLengths) -> Self { + let aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); + let aggregate_debounce = TimeLatch::default(); + let attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); + let attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue); + let attestation_debounce = TimeLatch::default(); + let unknown_block_aggregate_queue = + LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); + let unknown_block_attestation_queue = + LifoQueue::new(queue_lengths.unknown_block_attestation_queue); + + let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); + let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); + + // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have + // a strong feeling about queue type for exits. + let gossip_voluntary_exit_queue = FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue); + + // Using a FIFO queue for slashing to prevent people from flushing their slashings from the + // queues with lots of junk messages. + let gossip_proposer_slashing_queue = + FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue); + let gossip_attester_slashing_queue = + FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); + + // Using a FIFO queue for light client updates to maintain sequence order. + let unknown_light_client_update_queue = + FifoQueue::new(queue_lengths.unknown_light_client_update_queue); + let unknown_block_sampling_request_queue = + FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); + + // Using a FIFO queue since blocks need to be imported sequentially. + let rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); + let rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); + let rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); + let rpc_verify_data_column_queue = + FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); + let sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); + let chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); + let backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); + let gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); + let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); + let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); + let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + + let status_queue = FifoQueue::new(queue_lengths.status_queue); + let bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue); + let bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); + let blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); + let blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); + let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); + let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); + + let gossip_bls_to_execution_change_queue = + FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); + + let lc_gossip_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue); + let lc_gossip_finality_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue); + let lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); + let lc_rpc_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue); + let lc_rpc_finality_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue); + let lc_update_range_queue: FifoQueue> = + FifoQueue::new(queue_lengths.lc_update_range_queue); + + let api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); + let api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); + + WorkQueues { + aggregate_queue, + aggregate_debounce, + attestation_queue, + attestation_to_convert_queue, + attestation_debounce, + unknown_block_aggregate_queue, + unknown_block_attestation_queue, + sync_message_queue, + sync_contribution_queue, + gossip_voluntary_exit_queue, + gossip_proposer_slashing_queue, + gossip_attester_slashing_queue, + unknown_light_client_update_queue, + unknown_block_sampling_request_queue, + rpc_block_queue, + rpc_blob_queue, + rpc_custody_column_queue, + rpc_verify_data_column_queue, + sampling_result_queue, + chain_segment_queue, + backfill_chain_segment, + gossip_block_queue, + gossip_blob_queue, + gossip_data_column_queue, + delayed_block_queue, + status_queue, + bbrange_queue, + bbroots_queue, + blbroots_queue, + blbrange_queue, + dcbroots_queue, + dcbrange_queue, + gossip_bls_to_execution_change_queue, + lc_gossip_optimistic_update_queue, + lc_gossip_finality_update_queue, + lc_bootstrap_queue, + lc_rpc_optimistic_update_queue, + lc_rpc_finality_update_queue, + lc_update_range_queue, + api_request_p0_queue, + api_request_p1_queue, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec}; + + #[test] + fn min_queue_len() { + // State with no validators. + let spec = ForkName::latest().make_genesis_spec(ChainSpec::mainnet()); + let genesis_time = 0; + let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); + assert_eq!(state.validators().len(), 0); + let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap(); + assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); + assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f50e21c08f2..b3aa5e03f60 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -18,8 +18,8 @@ use beacon_chain::{ BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler, }; use beacon_chain::{Kzg, LightClientProducerEvent}; +use beacon_processor::{work_queue::BeaconProcessorQueueLengths, BeaconProcessorConfig}; use beacon_processor::{BeaconProcessor, BeaconProcessorChannels}; -use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths}; use environment::RuntimeContext; use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2::{ diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 9c285f4039f..5a76721b460 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -4,7 +4,8 @@ use beacon_chain::{ BeaconChain, BeaconChainTypes, }; use beacon_processor::{ - BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths, + work_queue::BeaconProcessorQueueLengths, BeaconProcessor, BeaconProcessorChannels, + BeaconProcessorConfig, }; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 753708a9c76..26e5d64d007 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -15,6 +15,7 @@ use beacon_chain::test_utils::{ EphemeralHarnessType, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; +use beacon_processor::work_queue::BeaconProcessorQueueLengths; use beacon_processor::{work_reprocessing_queue::*, *}; use itertools::Itertools; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; From 0ddb78262f6a2e46bffd41a0ebd5aa8ed1b656e2 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 11 May 2025 18:20:38 -0700 Subject: [PATCH 7/8] add scheduler mod --- beacon_node/beacon_processor/src/lib.rs | 3 ++- beacon_node/beacon_processor/src/scheduler/mod.rs | 1 + .../src/{ => scheduler}/work_reprocessing_queue.rs | 0 3 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 beacon_node/beacon_processor/src/scheduler/mod.rs rename beacon_node/beacon_processor/src/{ => scheduler}/work_reprocessing_queue.rs (100%) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 5c5e6c96ee7..5c3abcb6b68 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -47,6 +47,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::crit; use logging::TimeLatch; use parking_lot::Mutex; +pub use scheduler::work_reprocessing_queue; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use std::cmp; @@ -73,7 +74,7 @@ use work_reprocessing_queue::{ use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest}; mod metrics; -pub mod work_reprocessing_queue; +pub mod scheduler; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// diff --git a/beacon_node/beacon_processor/src/scheduler/mod.rs b/beacon_node/beacon_processor/src/scheduler/mod.rs new file mode 100644 index 00000000000..e1a076a7c54 --- /dev/null +++ b/beacon_node/beacon_processor/src/scheduler/mod.rs @@ -0,0 +1 @@ +pub mod work_reprocessing_queue; diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs similarity index 100% rename from beacon_node/beacon_processor/src/work_reprocessing_queue.rs rename to beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs From 39d3f4a33fcae40269f332249d800a3366d956ba Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 11 May 2025 18:38:42 -0700 Subject: [PATCH 8/8] sneak in a few helpers and a small refactor --- beacon_node/beacon_processor/src/lib.rs | 148 ++------------- .../beacon_processor/src/scheduler/mod.rs | 170 ++++++++++++++++++ .../src/{ => scheduler}/work_queue.rs | 0 3 files changed, 182 insertions(+), 136 deletions(-) rename beacon_node/beacon_processor/src/{ => scheduler}/work_queue.rs (100%) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index d72c92715a7..ae29b5fe5c9 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -38,16 +38,16 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. +use crate::scheduler::InboundEvents; use crate::work_queue::BeaconProcessorQueueLengths; use crate::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage, }; -use futures::stream::{Stream, StreamExt}; -use futures::task::Poll; use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::crit; use parking_lot::Mutex; -pub use scheduler::work_reprocessing_queue; +pub use scheduler::{work_queue, work_reprocessing_queue}; +use scheduler::{worker_journal, NextWorkEvent}; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use std::cmp; @@ -56,13 +56,12 @@ use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::Context; use std::time::Duration; use strum::IntoStaticStr; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use tracing::{debug, error, trace, warn}; +use tracing::{error, trace, warn}; use types::{ Attestation, EthSpec, Hash256, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; @@ -72,9 +71,7 @@ use work_reprocessing_queue::{ QueuedUnaggregate, ReadyWork, }; use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest}; - mod metrics; -pub mod work_queue; pub mod scheduler; /// The maximum size of the channel for work events to the `BeaconProcessor`. @@ -547,71 +544,6 @@ impl Work { } } -/// Unifies all the messages processed by the `BeaconProcessor`. -enum InboundEvent { - /// A worker has completed a task and is free. - WorkerIdle, - /// There is new work to be done. - WorkEvent(WorkEvent), - /// A work event that was queued for re-processing has become ready. - ReprocessingWork(WorkEvent), -} - -/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. -/// -/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained -/// control (specifically in the ordering of event processing). -struct InboundEvents { - /// Used by workers when they finish a task. - idle_rx: mpsc::Receiver<()>, - /// Used by upstream processes to send new work to the `BeaconProcessor`. - event_rx: mpsc::Receiver>, - /// Used internally for queuing work ready to be re-processed. - ready_work_rx: mpsc::Receiver, -} - -impl Stream for InboundEvents { - type Item = InboundEvent; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Always check for idle workers before anything else. This allows us to ensure that a big - // stream of new events doesn't suppress the processing of existing events. - match self.idle_rx.poll_recv(cx) { - Poll::Ready(Some(())) => { - return Poll::Ready(Some(InboundEvent::WorkerIdle)); - } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => {} - } - - // Poll for delayed blocks before polling for new work. It might be the case that a delayed - // block is required to successfully process some new work. - match self.ready_work_rx.poll_recv(cx) { - Poll::Ready(Some(ready_work)) => { - return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); - } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => {} - } - - match self.event_rx.poll_recv(cx) { - Poll::Ready(Some(event)) => { - return Poll::Ready(Some(InboundEvent::WorkEvent(event))); - } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => {} - } - - Poll::Pending - } -} - /// A mutli-threaded processor for messages received on the network /// that need to be processed by the `BeaconChain` /// @@ -677,59 +609,14 @@ impl BeaconProcessor { ready_work_rx, }; - let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting; - loop { - let work_event = match inbound_events.next().await { - Some(InboundEvent::WorkerIdle) => { - self.current_workers = self.current_workers.saturating_sub(1); - None - } - Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => { - match QueuedBackfillBatch::try_from(event) { - Ok(backfill_batch) => { - match reprocess_work_tx - .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) - { - Err(e) => { - warn!( - error = %e, - "Unable to queue backfill work event. Will try to process now." - ); - match e { - TrySendError::Full(reprocess_queue_message) - | TrySendError::Closed(reprocess_queue_message) => { - match reprocess_queue_message { - ReprocessQueueMessage::BackfillSync( - backfill_batch, - ) => Some(backfill_batch.into()), - other => { - crit!( - message_type = other.as_ref(), - "Unexpected queue message type" - ); - // This is an unhandled exception, drop the message. - continue; - } - } - } - } - } - Ok(..) => { - // backfill work sent to "reprocessing" queue. Process the next event. - continue; - } - } - } - Err(event) => Some(event), - } - } - Some(InboundEvent::WorkEvent(event)) - | Some(InboundEvent::ReprocessingWork(event)) => Some(event), - None => { - debug!(msg = "stream ended", "Gossip processor stopped"); - break; - } + let work_event = match inbound_events + .next_work_event(&reprocess_work_tx, &mut self) + .await + { + NextWorkEvent::WorkEvent(work_event) => work_event, + NextWorkEvent::Continue => continue, + NextWorkEvent::Break => break, }; let _event_timer = @@ -743,18 +630,7 @@ impl BeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL); } - if let Some(work_journal_tx) = &work_journal_tx { - let id = work_event - .as_ref() - .map(|event| event.work.str_id()) - .unwrap_or(WORKER_FREED); - - // We don't care if this message was successfully sent, we only use the journal - // during testing. We also ignore reprocess messages to ensure our test cases can pass. - if id != "reprocess" { - let _ = work_journal_tx.try_send(id); - } - } + worker_journal(&work_event, &work_journal_tx); let can_spawn = self.current_workers < self.config.max_workers; let drop_during_sync = work_event diff --git a/beacon_node/beacon_processor/src/scheduler/mod.rs b/beacon_node/beacon_processor/src/scheduler/mod.rs index e1a076a7c54..e9ec9fac717 100644 --- a/beacon_node/beacon_processor/src/scheduler/mod.rs +++ b/beacon_node/beacon_processor/src/scheduler/mod.rs @@ -1 +1,171 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Stream, StreamExt}; +use logging::crit; +use tokio::sync::mpsc::{self, error::TrySendError, Sender}; +use tracing::{debug, warn}; +use types::EthSpec; +use work_reprocessing_queue::{QueuedBackfillBatch, ReadyWork, ReprocessQueueMessage}; + +use crate::{BeaconProcessor, WorkEvent, WORKER_FREED}; + +pub mod work_queue; pub mod work_reprocessing_queue; + +pub enum NextWorkEvent { + WorkEvent(Option>), + Continue, + Break, +} + +/// Unifies all the messages processed by the `BeaconProcessor`. +pub enum InboundEvent { + /// A worker has completed a task and is free. + WorkerIdle, + /// There is new work to be done. + WorkEvent(WorkEvent), + /// A work event that was queued for re-processing has become ready. + ReprocessingWork(WorkEvent), +} + +/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. +/// +/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained +/// control (specifically in the ordering of event processing). +pub struct InboundEvents { + /// Used by workers when they finish a task. + pub idle_rx: mpsc::Receiver<()>, + /// Used by upstream processes to send new work to the `BeaconProcessor`. + pub event_rx: mpsc::Receiver>, + /// Used internally for queuing work ready to be re-processed. + pub ready_work_rx: mpsc::Receiver, +} + +impl Stream for InboundEvents { + type Item = InboundEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Always check for idle workers before anything else. This allows us to ensure that a big + // stream of new events doesn't suppress the processing of existing events. + match self.idle_rx.poll_recv(cx) { + Poll::Ready(Some(())) => { + return Poll::Ready(Some(InboundEvent::WorkerIdle)); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + // Poll for delayed blocks before polling for new work. It might be the case that a delayed + // block is required to successfully process some new work. + match self.ready_work_rx.poll_recv(cx) { + Poll::Ready(Some(ready_work)) => { + return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + match self.event_rx.poll_recv(cx) { + Poll::Ready(Some(event)) => { + return Poll::Ready(Some(InboundEvent::WorkEvent(event))); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + Poll::Pending + } +} + +impl InboundEvents { + pub async fn next_work_event( + &mut self, + reprocess_work_tx: &Sender, + beacon_processor: &mut BeaconProcessor, + ) -> NextWorkEvent { + match self.next().await { + Some(InboundEvent::WorkerIdle) => { + beacon_processor.current_workers = + beacon_processor.current_workers.saturating_sub(1); + NextWorkEvent::WorkEvent(None) + } + Some(InboundEvent::WorkEvent(event)) + if beacon_processor.config.enable_backfill_rate_limiting => + { + match QueuedBackfillBatch::try_from(event) { + Ok(backfill_batch) => { + match reprocess_work_tx + .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) + { + Err(e) => { + warn!( + error = ?e, + "Unable to queue backfill work event. Will try to process now." + ); + match e { + TrySendError::Full(reprocess_queue_message) + | TrySendError::Closed(reprocess_queue_message) => { + match reprocess_queue_message { + ReprocessQueueMessage::BackfillSync(backfill_batch) => { + NextWorkEvent::WorkEvent(Some( + backfill_batch.into(), + )) + } + other => { + crit!( + message_type = other.as_ref(), + "Unexpected queue message type" + ); + // This is an unhandled exception, drop the message. + NextWorkEvent::Continue + } + } + } + } + } + Ok(..) => { + // backfill work sent to "reprocessing" queue. Process the next event. + NextWorkEvent::Continue + } + } + } + Err(event) => NextWorkEvent::WorkEvent(Some(event)), + } + } + Some(InboundEvent::WorkEvent(event)) | Some(InboundEvent::ReprocessingWork(event)) => { + NextWorkEvent::WorkEvent(Some(event)) + } + None => { + debug!(msg = "stream ended", "Gossip processor stopped",); + NextWorkEvent::Break + } + } + } +} + +pub fn worker_journal( + work_event: &Option>, + work_journal_tx: &Option>, +) { + if let Some(work_journal_tx) = work_journal_tx { + let id = work_event + .as_ref() + .map(|event| event.work.str_id()) + .unwrap_or(WORKER_FREED); + + // We don't care if this message was successfully sent, we only use the journal + // during testing. We also ignore reprocess messages to ensure our test cases can pass. + if id != "reprocess" { + let _ = work_journal_tx.try_send(id); + } + } +} diff --git a/beacon_node/beacon_processor/src/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs similarity index 100% rename from beacon_node/beacon_processor/src/work_queue.rs rename to beacon_node/beacon_processor/src/scheduler/work_queue.rs