diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 0f324071a1e..cabaa1a3b66 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -38,41 +38,37 @@ //! checks the queues to see if there are more parcels of work that can be spawned in a new worker //! task. +use crate::scheduler::InboundEvents; +use crate::work_queue::BeaconProcessorQueueLengths; use crate::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, }; -use futures::stream::{Stream, StreamExt}; -use futures::task::Poll; use lighthouse_network::{MessageId, NetworkGlobals, PeerId}; use logging::crit; -use logging::TimeLatch; use parking_lot::Mutex; -pub use scheduler::work_reprocessing_queue; +pub use scheduler::{work_queue, work_reprocessing_queue}; +use scheduler::{worker_journal, NextWorkEvent}; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use std::cmp; -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::Context; use std::time::Duration; use strum::IntoStaticStr; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use tracing::{debug, error, trace, warn}; -use types::{ - BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, - SingleAttestation, Slot, SubnetId, -}; +use tracing::{error, trace, warn}; +use types::{EthSpec, Hash256, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId}; +use work_queue::WorkQueues; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest}; - mod metrics; pub mod scheduler; @@ -90,130 +86,6 @@ const MAX_IDLE_QUEUE_LEN: usize = 16_384; /// The maximum size of the channel for re-processing work events. const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4; -/// Over-provision queues based on active validator count by some factor. The beacon chain has -/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning -/// slightly, we don't need to adjust the queues during the lifetime of a process. -const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; - -/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues -/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that -/// seems reasonable. -const MIN_QUEUE_LEN: usize = 128; - -/// Maximum number of queued items that will be stored before dropping them -pub struct BeaconProcessorQueueLengths { - aggregate_queue: usize, - attestation_queue: usize, - unknown_block_aggregate_queue: usize, - unknown_block_attestation_queue: usize, - sync_message_queue: usize, - sync_contribution_queue: usize, - gossip_voluntary_exit_queue: usize, - gossip_proposer_slashing_queue: usize, - gossip_attester_slashing_queue: usize, - unknown_light_client_update_queue: usize, - unknown_block_sampling_request_queue: usize, - rpc_block_queue: usize, - rpc_blob_queue: usize, - rpc_custody_column_queue: usize, - rpc_verify_data_column_queue: usize, - sampling_result_queue: usize, - column_reconstruction_queue: usize, - chain_segment_queue: usize, - backfill_chain_segment: usize, - gossip_block_queue: usize, - gossip_blob_queue: usize, - gossip_data_column_queue: usize, - delayed_block_queue: usize, - status_queue: usize, - bbrange_queue: usize, - bbroots_queue: usize, - blbroots_queue: usize, - blbrange_queue: usize, - dcbroots_queue: usize, - dcbrange_queue: usize, - gossip_bls_to_execution_change_queue: usize, - lc_gossip_finality_update_queue: usize, - lc_gossip_optimistic_update_queue: usize, - lc_bootstrap_queue: usize, - lc_rpc_optimistic_update_queue: usize, - lc_rpc_finality_update_queue: usize, - lc_update_range_queue: usize, - api_request_p0_queue: usize, - api_request_p1_queue: usize, -} - -impl BeaconProcessorQueueLengths { - pub fn from_state( - state: &BeaconState, - spec: &ChainSpec, - ) -> Result { - let active_validator_count = - match state.get_cached_active_validator_indices(RelativeEpoch::Current) { - Ok(indices) => indices.len(), - Err(_) => state - .get_active_validator_indices(state.current_epoch(), spec) - .map_err(|e| format!("Error computing active indices: {:?}", e))? - .len(), - }; - let active_validator_count = - (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; - let slots_per_epoch = E::slots_per_epoch() as usize; - - Ok(Self { - aggregate_queue: 4096, - unknown_block_aggregate_queue: 1024, - // Capacity for a full slot's worth of attestations if subscribed to all subnets - attestation_queue: std::cmp::max( - active_validator_count / slots_per_epoch, - MIN_QUEUE_LEN, - ), - // Capacity for a full slot's worth of attestations if subscribed to all subnets - unknown_block_attestation_queue: std::cmp::max( - active_validator_count / slots_per_epoch, - MIN_QUEUE_LEN, - ), - sync_message_queue: 2048, - sync_contribution_queue: 1024, - gossip_voluntary_exit_queue: 4096, - gossip_proposer_slashing_queue: 4096, - gossip_attester_slashing_queue: 4096, - unknown_light_client_update_queue: 128, - rpc_block_queue: 1024, - rpc_blob_queue: 1024, - // TODO(das): Placeholder values - rpc_custody_column_queue: 1000, - rpc_verify_data_column_queue: 1000, - unknown_block_sampling_request_queue: 16384, - sampling_result_queue: 1000, - column_reconstruction_queue: 64, - chain_segment_queue: 64, - backfill_chain_segment: 64, - gossip_block_queue: 1024, - gossip_blob_queue: 1024, - gossip_data_column_queue: 1024, - delayed_block_queue: 1024, - status_queue: 1024, - bbrange_queue: 1024, - bbroots_queue: 1024, - blbroots_queue: 1024, - blbrange_queue: 1024, - // TODO(das): pick proper values - dcbroots_queue: 1024, - dcbrange_queue: 1024, - gossip_bls_to_execution_change_queue: 16384, - lc_gossip_finality_update_queue: 1024, - lc_gossip_optimistic_update_queue: 1024, - lc_bootstrap_queue: 1024, - lc_rpc_optimistic_update_queue: 512, - lc_rpc_finality_update_queue: 512, - lc_update_range_queue: 512, - api_request_p0_queue: 1024, - api_request_p1_queue: 1024, - }) - } -} - /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; @@ -285,89 +157,6 @@ impl Default for BeaconProcessorChannels { } } -/// A simple first-in-first-out queue with a maximum length. -struct FifoQueue { - queue: VecDeque, - max_length: usize, -} - -impl FifoQueue { - /// Create a new, empty queue with the given length. - pub fn new(max_length: usize) -> Self { - Self { - queue: VecDeque::default(), - max_length, - } - } - - /// Add a new item to the queue. - /// - /// Drops `item` if the queue is full. - pub fn push(&mut self, item: T, item_desc: &str) { - if self.queue.len() == self.max_length { - error!( - msg = "the system has insufficient resources for load", - queue_len = self.max_length, - queue = item_desc, - "Work queue is full" - ) - } else { - self.queue.push_back(item); - } - } - - /// Remove the next item from the queue. - pub fn pop(&mut self) -> Option { - self.queue.pop_front() - } - - /// Returns the current length of the queue. - pub fn len(&self) -> usize { - self.queue.len() - } -} - -/// A simple last-in-first-out queue with a maximum length. -struct LifoQueue { - queue: VecDeque, - max_length: usize, -} - -impl LifoQueue { - /// Create a new, empty queue with the given length. - pub fn new(max_length: usize) -> Self { - Self { - queue: VecDeque::default(), - max_length, - } - } - - /// Add a new item to the front of the queue. - /// - /// If the queue is full, the item at the back of the queue is dropped. - pub fn push(&mut self, item: T) { - if self.queue.len() == self.max_length { - self.queue.pop_back(); - } - self.queue.push_front(item); - } - - /// Remove the next item from the queue. - pub fn pop(&mut self) -> Option { - self.queue.pop_front() - } - - /// Returns `true` if the queue is full. - pub fn is_full(&self) -> bool { - self.queue.len() >= self.max_length - } - - /// Returns the current length of the queue. - pub fn len(&self) -> usize { - self.queue.len() - } -} - /// A handle that sends a message on the provided channel to a receiver when it gets dropped. /// /// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache` @@ -646,7 +435,6 @@ impl fmt::Debug for Work { #[strum(serialize_all = "snake_case")] pub enum WorkType { GossipAttestation, - GossipAttestationToConvert, UnknownBlockAttestation, GossipAttestationBatch, GossipAggregate, @@ -752,71 +540,6 @@ impl Work { } } -/// Unifies all the messages processed by the `BeaconProcessor`. -enum InboundEvent { - /// A worker has completed a task and is free. - WorkerIdle, - /// There is new work to be done. - WorkEvent(WorkEvent), - /// A work event that was queued for re-processing has become ready. - ReprocessingWork(WorkEvent), -} - -/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. -/// -/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained -/// control (specifically in the ordering of event processing). -struct InboundEvents { - /// Used by workers when they finish a task. - idle_rx: mpsc::Receiver<()>, - /// Used by upstream processes to send new work to the `BeaconProcessor`. - event_rx: mpsc::Receiver>, - /// Used internally for queuing work ready to be re-processed. - ready_work_rx: mpsc::Receiver, -} - -impl Stream for InboundEvents { - type Item = InboundEvent; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Always check for idle workers before anything else. This allows us to ensure that a big - // stream of new events doesn't suppress the processing of existing events. - match self.idle_rx.poll_recv(cx) { - Poll::Ready(Some(())) => { - return Poll::Ready(Some(InboundEvent::WorkerIdle)); - } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => {} - } - - // Poll for delayed blocks before polling for new work. It might be the case that a delayed - // block is required to successfully process some new work. - match self.ready_work_rx.poll_recv(cx) { - Poll::Ready(Some(ready_work)) => { - return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); - } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => {} - } - - match self.event_rx.poll_recv(cx) { - Poll::Ready(Some(event)) => { - return Poll::Ready(Some(InboundEvent::WorkEvent(event))); - } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => {} - } - - Poll::Pending - } -} - /// A mutli-threaded processor for messages received on the network /// that need to be processed by the `BeaconChain` /// @@ -852,80 +575,8 @@ impl BeaconProcessor { // Used by workers to communicate that they are finished a task. let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); - // Using LIFO queues for attestations since validator profits rely upon getting fresh - // attestations into blocks. Additionally, later attestations contain more information than - // earlier ones, so we consider them more valuable. - let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); - let mut aggregate_debounce = TimeLatch::default(); - let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); - let mut attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue); - let mut attestation_debounce = TimeLatch::default(); - let mut unknown_block_aggregate_queue = - LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); - let mut unknown_block_attestation_queue = - LifoQueue::new(queue_lengths.unknown_block_attestation_queue); - - let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); - let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); - - // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have - // a strong feeling about queue type for exits. - let mut gossip_voluntary_exit_queue = - FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue); - - // Using a FIFO queue for slashing to prevent people from flushing their slashings from the - // queues with lots of junk messages. - let mut gossip_proposer_slashing_queue = - FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue); - let mut gossip_attester_slashing_queue = - FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); - - // Using a FIFO queue since blocks need to be imported sequentially. - let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); - let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); - let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); - let mut rpc_verify_data_column_queue = - FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); - // TODO(das): the sampling_request_queue is never read - let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); - let mut column_reconstruction_queue = - FifoQueue::new(queue_lengths.column_reconstruction_queue); - let mut unknown_block_sampling_request_queue = - FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); - let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); - let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); - let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); - let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); - let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); - let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); - - let mut status_queue = FifoQueue::new(queue_lengths.status_queue); - let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue); - let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); - let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); - let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); - let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); - let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); - - let mut gossip_bls_to_execution_change_queue = - FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); - - // Using FIFO queues for light client updates to maintain sequence order. - let mut lc_gossip_finality_update_queue = - FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue); - let mut lc_gossip_optimistic_update_queue = - FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue); - let mut unknown_light_client_update_queue = - FifoQueue::new(queue_lengths.unknown_light_client_update_queue); - let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); - let mut lc_rpc_optimistic_update_queue = - FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue); - let mut lc_rpc_finality_update_queue = - FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue); - let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue); - - let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); - let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); + // Initialize the worker queues. + let mut work_queues: WorkQueues = WorkQueues::new(queue_lengths); // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to // receive them back once they are ready (`ready_work_rx`). @@ -954,59 +605,14 @@ impl BeaconProcessor { ready_work_rx, }; - let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting; - loop { - let work_event = match inbound_events.next().await { - Some(InboundEvent::WorkerIdle) => { - self.current_workers = self.current_workers.saturating_sub(1); - None - } - Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => { - match QueuedBackfillBatch::try_from(event) { - Ok(backfill_batch) => { - match reprocess_work_tx - .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) - { - Err(e) => { - warn!( - error = %e, - "Unable to queue backfill work event. Will try to process now." - ); - match e { - TrySendError::Full(reprocess_queue_message) - | TrySendError::Closed(reprocess_queue_message) => { - match reprocess_queue_message { - ReprocessQueueMessage::BackfillSync( - backfill_batch, - ) => Some(backfill_batch.into()), - other => { - crit!( - message_type = other.as_ref(), - "Unexpected queue message type" - ); - // This is an unhandled exception, drop the message. - continue; - } - } - } - } - } - Ok(..) => { - // backfill work sent to "reprocessing" queue. Process the next event. - continue; - } - } - } - Err(event) => Some(event), - } - } - Some(InboundEvent::WorkEvent(event)) - | Some(InboundEvent::ReprocessingWork(event)) => Some(event), - None => { - debug!(msg = "stream ended", "Gossip processor stopped"); - break; - } + let work_event = match inbound_events + .next_work_event(&reprocess_work_tx, &mut self) + .await + { + NextWorkEvent::WorkEvent(work_event) => work_event, + NextWorkEvent::Continue => continue, + NextWorkEvent::Break => break, }; let _event_timer = @@ -1020,18 +626,7 @@ impl BeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL); } - if let Some(work_journal_tx) = &work_journal_tx { - let id = work_event - .as_ref() - .map(|event| event.work.str_id()) - .unwrap_or(WORKER_FREED); - - // We don't care if this message was successfully sent, we only use the journal - // during testing. We also ignore reprocess messages to ensure our test cases can pass. - if id != "reprocess" { - let _ = work_journal_tx.try_send(id); - } - } + worker_journal(&work_event, &work_journal_tx); let can_spawn = self.current_workers < self.config.max_workers; let drop_during_sync = work_event @@ -1047,236 +642,247 @@ impl BeaconProcessor { None if can_spawn => { // Check for chain segments first, they're the most efficient way to get // blocks into the system. - let work_event: Option> = - if let Some(item) = chain_segment_queue.pop() { - Some(item) - // Check sync blocks before gossip blocks, since we've already explicitly - // requested these blocks. - } else if let Some(item) = rpc_block_queue.pop() { - Some(item) - } else if let Some(item) = rpc_blob_queue.pop() { - Some(item) - } else if let Some(item) = rpc_custody_column_queue.pop() { - Some(item) - // TODO(das): decide proper prioritization for sampling columns - } else if let Some(item) = rpc_custody_column_queue.pop() { - Some(item) - } else if let Some(item) = rpc_verify_data_column_queue.pop() { - Some(item) - } else if let Some(item) = sampling_result_queue.pop() { - Some(item) - // Check delayed blocks before gossip blocks, the gossip blocks might rely - // on the delayed ones. - } else if let Some(item) = delayed_block_queue.pop() { - Some(item) - // Check gossip blocks before gossip attestations, since a block might be - // required to verify some attestations. - } else if let Some(item) = gossip_block_queue.pop() { - Some(item) - } else if let Some(item) = gossip_blob_queue.pop() { - Some(item) - } else if let Some(item) = gossip_data_column_queue.pop() { - Some(item) - } else if let Some(item) = column_reconstruction_queue.pop() { - Some(item) - // Check the priority 0 API requests after blocks and blobs, but before attestations. - } else if let Some(item) = api_request_p0_queue.pop() { - Some(item) - // Check the aggregates, *then* the unaggregates since we assume that - // aggregates are more valuable to local validators and effectively give us - // more information with less signature verification time. - } else if aggregate_queue.len() > 0 { - let batch_size = cmp::min( - aggregate_queue.len(), - self.config.max_gossip_aggregate_batch_size, - ); - - if batch_size < 2 { - // One single aggregate is in the queue, process it individually. - aggregate_queue.pop() - } else { - // Collect two or more aggregates into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAggregate` item into a - // `Work::GossipAggregateBatch` item. - let mut aggregates = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = aggregate_queue.pop() { - match item { - Work::GossipAggregate { - aggregate, - process_individual: _, - process_batch, - } => { - aggregates.push(*aggregate); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } - } - _ => { - error!("Invalid item in aggregate queue"); + let work_event: Option> = if let Some(item) = + work_queues.chain_segment_queue.pop() + { + Some(item) + // Check sync blocks before gossip blocks, since we've already explicitly + // requested these blocks. + } else if let Some(item) = work_queues.rpc_block_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_blob_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { + Some(item) + // TODO(das): decide proper prioritization for sampling columns + } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.rpc_verify_data_column_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.sampling_result_queue.pop() { + Some(item) + // Check delayed blocks before gossip blocks, the gossip blocks might rely + // on the delayed ones. + } else if let Some(item) = work_queues.delayed_block_queue.pop() { + Some(item) + // Check gossip blocks before gossip attestations, since a block might be + // required to verify some attestations. + } else if let Some(item) = work_queues.gossip_block_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.gossip_blob_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.gossip_data_column_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.column_reconstruction_queue.pop() { + Some(item) + // Check the priority 0 API requests after blocks and blobs, but before attestations. + } else if let Some(item) = work_queues.api_request_p0_queue.pop() { + Some(item) + // Check the aggregates, *then* the unaggregates since we assume that + // aggregates are more valuable to local validators and effectively give us + // more information with less signature verification time. + } else if !work_queues.aggregate_queue.is_empty() { + let batch_size = cmp::min( + work_queues.aggregate_queue.len(), + self.config.max_gossip_aggregate_batch_size, + ); + + if batch_size < 2 { + // One single aggregate is in the queue, process it individually. + work_queues.aggregate_queue.pop() + } else { + // Collect two or more aggregates into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAggregate` item into a + // `Work::GossipAggregateBatch` item. + let mut aggregates = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = work_queues.aggregate_queue.pop() { + match item { + Work::GossipAggregate { + aggregate, + process_individual: _, + process_batch, + } => { + aggregates.push(*aggregate); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); } } + _ => { + error!("Invalid item in aggregate queue"); + } } } - - if let Some(process_batch) = process_batch_opt { - // Process all aggregates with a single worker. - Some(Work::GossipAggregateBatch { - aggregates, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing aggregate work"); - None - } } - // Check the unaggregated attestation queue. - // - // Potentially use batching. - } else if attestation_queue.len() > 0 { - let batch_size = cmp::min( - attestation_queue.len(), - self.config.max_gossip_attestation_batch_size, - ); - - if batch_size < 2 { - // One single attestation is in the queue, process it individually. - attestation_queue.pop() + + if let Some(process_batch) = process_batch_opt { + // Process all aggregates with a single worker. + Some(Work::GossipAggregateBatch { + aggregates, + process_batch, + }) } else { - // Collect two or more attestations into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAttestation` item into a - // `Work::GossipAttestationBatch` item. - let mut attestations = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = attestation_queue.pop() { - match item { - Work::GossipAttestation { - attestation, - process_individual: _, - process_batch, - } => { - attestations.push(*attestation); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing aggregate work"); + None + } + } + // Check the unaggregated attestation queue. + // + // Potentially use batching. + } else if !work_queues.attestation_queue.is_empty() { + let batch_size = cmp::min( + work_queues.attestation_queue.len(), + self.config.max_gossip_attestation_batch_size, + ); + + if batch_size < 2 { + // One single attestation is in the queue, process it individually. + work_queues.attestation_queue.pop() + } else { + // Collect two or more attestations into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAttestation` item into a + // `Work::GossipAttestationBatch` item. + let mut attestations = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = work_queues.attestation_queue.pop() { + match item { + Work::GossipAttestation { + attestation, + process_individual: _, + process_batch, + } => { + attestations.push(*attestation); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); } - _ => error!("Invalid item in attestation queue"), } + _ => error!("Invalid item in attestation queue"), } } - - if let Some(process_batch) = process_batch_opt { - // Process all attestations with a single worker. - Some(Work::GossipAttestationBatch { - attestations, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing attestations work"); - None - } } - // Convert any gossip attestations that need to be converted. - } else if let Some(item) = attestation_to_convert_queue.pop() { - Some(item) - // Check sync committee messages after attestations as their rewards are lesser - // and they don't influence fork choice. - } else if let Some(item) = sync_contribution_queue.pop() { - Some(item) - } else if let Some(item) = sync_message_queue.pop() { - Some(item) - // Aggregates and unaggregates queued for re-processing are older and we - // care about fresher ones, so check those first. - } else if let Some(item) = unknown_block_aggregate_queue.pop() { - Some(item) - } else if let Some(item) = unknown_block_attestation_queue.pop() { - Some(item) - // Check RPC methods next. Status messages are needed for sync so - // prioritize them over syncing requests from other peers (BlocksByRange - // and BlocksByRoot) - } else if let Some(item) = status_queue.pop() { - Some(item) - } else if let Some(item) = bbrange_queue.pop() { - Some(item) - } else if let Some(item) = bbroots_queue.pop() { - Some(item) - } else if let Some(item) = blbrange_queue.pop() { - Some(item) - } else if let Some(item) = blbroots_queue.pop() { - Some(item) - } else if let Some(item) = dcbroots_queue.pop() { - Some(item) - } else if let Some(item) = dcbrange_queue.pop() { - Some(item) - // Prioritize sampling requests after block syncing requests - } else if let Some(item) = unknown_block_sampling_request_queue.pop() { - Some(item) - // Check slashings after all other consensus messages so we prioritize - // following head. - // - // Check attester slashings before proposer slashings since they have the - // potential to slash multiple validators at once. - } else if let Some(item) = gossip_attester_slashing_queue.pop() { - Some(item) - } else if let Some(item) = gossip_proposer_slashing_queue.pop() { - Some(item) - // Check exits and address changes late since our validators don't get - // rewards from them. - } else if let Some(item) = gossip_voluntary_exit_queue.pop() { - Some(item) - } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() { - Some(item) - // Check the priority 1 API requests after we've - // processed all the interesting things from the network - // and things required for us to stay in good repute - // with our P2P peers. - } else if let Some(item) = api_request_p1_queue.pop() { - Some(item) - // Handle backfill sync chain segments. - } else if let Some(item) = backfill_chain_segment.pop() { - Some(item) - // Handle light client requests. - } else if let Some(item) = lc_gossip_finality_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_gossip_optimistic_update_queue.pop() { - Some(item) - } else if let Some(item) = unknown_light_client_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_bootstrap_queue.pop() { - Some(item) - } else if let Some(item) = lc_rpc_optimistic_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_rpc_finality_update_queue.pop() { - Some(item) - } else if let Some(item) = lc_update_range_queue.pop() { - Some(item) - // This statement should always be the final else statement. - } else { - // Let the journal know that a worker is freed and there's nothing else - // for it to do. - if let Some(work_journal_tx) = &work_journal_tx { - // We don't care if this message was successfully sent, we only use the journal - // during testing. - let _ = work_journal_tx.try_send(NOTHING_TO_DO); + + if let Some(process_batch) = process_batch_opt { + // Process all attestations with a single worker. + Some(Work::GossipAttestationBatch { + attestations, + process_batch, + }) + } else { + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing attestations work"); + None } - None - }; + } + // Check sync committee messages after attestations as their rewards are lesser + // and they don't influence fork choice. + } else if let Some(item) = work_queues.sync_contribution_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.sync_message_queue.pop() { + Some(item) + // Aggregates and unaggregates queued for re-processing are older and we + // care about fresher ones, so check those first. + } else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.unknown_block_attestation_queue.pop() + { + Some(item) + // Check RPC methods next. Status messages are needed for sync so + // prioritize them over syncing requests from other peers (BlocksByRange + // and BlocksByRoot) + } else if let Some(item) = work_queues.status_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.bbrange_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.bbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.blbrange_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.blbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.dcbroots_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.dcbrange_queue.pop() { + Some(item) + // Prioritize sampling requests after block syncing requests + } else if let Some(item) = + work_queues.unknown_block_sampling_request_queue.pop() + { + Some(item) + // Check slashings after all other consensus messages so we prioritize + // following head. + // + // Check attester slashings before proposer slashings since they have the + // potential to slash multiple validators at once. + } else if let Some(item) = work_queues.gossip_attester_slashing_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.gossip_proposer_slashing_queue.pop() + { + Some(item) + // Check exits and address changes late since our validators don't get + // rewards from them. + } else if let Some(item) = work_queues.gossip_voluntary_exit_queue.pop() { + Some(item) + } else if let Some(item) = + work_queues.gossip_bls_to_execution_change_queue.pop() + { + Some(item) + // Check the priority 1 API requests after we've + // processed all the interesting things from the network + // and things required for us to stay in good repute + // with our P2P peers. + } else if let Some(item) = work_queues.api_request_p1_queue.pop() { + Some(item) + // Handle backfill sync chain segments. + } else if let Some(item) = work_queues.backfill_chain_segment.pop() { + Some(item) + // Handle light client requests. + } else if let Some(item) = work_queues.lc_gossip_finality_update_queue.pop() + { + Some(item) + } else if let Some(item) = + work_queues.lc_gossip_optimistic_update_queue.pop() + { + Some(item) + } else if let Some(item) = + work_queues.unknown_light_client_update_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.lc_bootstrap_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.lc_rpc_optimistic_update_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.lc_rpc_finality_update_queue.pop() { + Some(item) + } else if let Some(item) = work_queues.lc_update_range_queue.pop() { + Some(item) + // This statement should always be the final else statement. + } else { + // Let the journal know that a worker is freed and there's nothing else + // for it to do. + if let Some(work_journal_tx) = &work_journal_tx { + // We don't care if this message was successfully sent, we only use the journal + // during testing. + let _ = work_journal_tx.try_send(NOTHING_TO_DO); + } + None + }; if let Some(work_event) = work_event { let work_type = work_event.to_type(); @@ -1329,14 +935,17 @@ impl BeaconProcessor { } } _ if can_spawn => self.spawn_worker(work, idle_tx), - Work::GossipAttestation { .. } => attestation_queue.push(work), + Work::GossipAttestation { .. } => { + work_queues.attestation_queue.push(work) + } + // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. Work::GossipAttestationBatch { .. } => crit!( work_type = "GossipAttestationBatch", "Unsupported inbound event" ), - Work::GossipAggregate { .. } => aggregate_queue.push(work), + Work::GossipAggregate { .. } => work_queues.aggregate_queue.push(work), // Aggregate batches are formed internally within the `BeaconProcessor`, // they are not sent from external services. Work::GossipAggregateBatch { .. } => { @@ -1345,91 +954,113 @@ impl BeaconProcessor { "Unsupported inbound event" ) } - Work::GossipBlock { .. } => gossip_block_queue.push(work, work_id), - Work::GossipBlobSidecar { .. } => gossip_blob_queue.push(work, work_id), + Work::GossipBlock { .. } => { + work_queues.gossip_block_queue.push(work, work_id) + } + Work::GossipBlobSidecar { .. } => { + work_queues.gossip_blob_queue.push(work, work_id) + } Work::GossipDataColumnSidecar { .. } => { - gossip_data_column_queue.push(work, work_id) + work_queues.gossip_data_column_queue.push(work, work_id) } Work::DelayedImportBlock { .. } => { - delayed_block_queue.push(work, work_id) + work_queues.delayed_block_queue.push(work, work_id) } Work::GossipVoluntaryExit { .. } => { - gossip_voluntary_exit_queue.push(work, work_id) + work_queues.gossip_voluntary_exit_queue.push(work, work_id) } - Work::GossipProposerSlashing { .. } => { - gossip_proposer_slashing_queue.push(work, work_id) + Work::GossipProposerSlashing { .. } => work_queues + .gossip_proposer_slashing_queue + .push(work, work_id), + Work::GossipAttesterSlashing { .. } => work_queues + .gossip_attester_slashing_queue + .push(work, work_id), + Work::GossipSyncSignature { .. } => { + work_queues.sync_message_queue.push(work) } - Work::GossipAttesterSlashing { .. } => { - gossip_attester_slashing_queue.push(work, work_id) - } - Work::GossipSyncSignature { .. } => sync_message_queue.push(work), Work::GossipSyncContribution { .. } => { - sync_contribution_queue.push(work) - } - Work::GossipLightClientFinalityUpdate { .. } => { - lc_gossip_finality_update_queue.push(work, work_id) - } - Work::GossipLightClientOptimisticUpdate { .. } => { - lc_gossip_optimistic_update_queue.push(work, work_id) + work_queues.sync_contribution_queue.push(work) } + Work::GossipLightClientFinalityUpdate { .. } => work_queues + .lc_gossip_finality_update_queue + .push(work, work_id), + Work::GossipLightClientOptimisticUpdate { .. } => work_queues + .lc_gossip_optimistic_update_queue + .push(work, work_id), Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { - rpc_block_queue.push(work, work_id) + work_queues.rpc_block_queue.push(work, work_id) } - Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id), + Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), Work::RpcCustodyColumn { .. } => { - rpc_custody_column_queue.push(work, work_id) + work_queues.rpc_custody_column_queue.push(work, work_id) } Work::RpcVerifyDataColumn(_) => { - rpc_verify_data_column_queue.push(work, work_id) + work_queues.rpc_verify_data_column_queue.push(work, work_id) + } + Work::SamplingResult(_) => { + work_queues.sampling_result_queue.push(work, work_id) + } + Work::ChainSegment { .. } => { + work_queues.chain_segment_queue.push(work, work_id) } - Work::SamplingResult(_) => sampling_result_queue.push(work, work_id), Work::ColumnReconstruction(_) => { - column_reconstruction_queue.push(work, work_id) + work_queues.column_reconstruction_queue.push(work, work_id) } - Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id), Work::ChainSegmentBackfill { .. } => { - backfill_chain_segment.push(work, work_id) + work_queues.backfill_chain_segment.push(work, work_id) } - Work::Status { .. } => status_queue.push(work, work_id), - Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id), - Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id), - Work::BlobsByRangeRequest { .. } => blbrange_queue.push(work, work_id), - Work::LightClientBootstrapRequest { .. } => { - lc_bootstrap_queue.push(work, work_id) + Work::Status { .. } => work_queues.status_queue.push(work, work_id), + Work::BlocksByRangeRequest { .. } => { + work_queues.bbrange_queue.push(work, work_id) + } + Work::BlocksByRootsRequest { .. } => { + work_queues.bbroots_queue.push(work, work_id) + } + Work::BlobsByRangeRequest { .. } => { + work_queues.blbrange_queue.push(work, work_id) } - Work::LightClientOptimisticUpdateRequest { .. } => { - lc_rpc_optimistic_update_queue.push(work, work_id) + Work::LightClientBootstrapRequest { .. } => { + work_queues.lc_bootstrap_queue.push(work, work_id) } + Work::LightClientOptimisticUpdateRequest { .. } => work_queues + .lc_rpc_optimistic_update_queue + .push(work, work_id), Work::LightClientFinalityUpdateRequest { .. } => { - lc_rpc_finality_update_queue.push(work, work_id) + work_queues.lc_rpc_finality_update_queue.push(work, work_id) } Work::LightClientUpdatesByRangeRequest { .. } => { - lc_update_range_queue.push(work, work_id) + work_queues.lc_update_range_queue.push(work, work_id) } Work::UnknownBlockAttestation { .. } => { - unknown_block_attestation_queue.push(work) + work_queues.unknown_block_attestation_queue.push(work) } Work::UnknownBlockAggregate { .. } => { - unknown_block_aggregate_queue.push(work) + work_queues.unknown_block_aggregate_queue.push(work) } - Work::GossipBlsToExecutionChange { .. } => { - gossip_bls_to_execution_change_queue.push(work, work_id) + Work::GossipBlsToExecutionChange { .. } => work_queues + .gossip_bls_to_execution_change_queue + .push(work, work_id), + Work::BlobsByRootsRequest { .. } => { + work_queues.blbroots_queue.push(work, work_id) } - Work::BlobsByRootsRequest { .. } => blbroots_queue.push(work, work_id), Work::DataColumnsByRootsRequest { .. } => { - dcbroots_queue.push(work, work_id) + work_queues.dcbroots_queue.push(work, work_id) } Work::DataColumnsByRangeRequest { .. } => { - dcbrange_queue.push(work, work_id) + work_queues.dcbrange_queue.push(work, work_id) } - Work::UnknownLightClientOptimisticUpdate { .. } => { - unknown_light_client_update_queue.push(work, work_id) + Work::UnknownLightClientOptimisticUpdate { .. } => work_queues + .unknown_light_client_update_queue + .push(work, work_id), + Work::UnknownBlockSamplingRequest { .. } => work_queues + .unknown_block_sampling_request_queue + .push(work, work_id), + Work::ApiRequestP0 { .. } => { + work_queues.api_request_p0_queue.push(work, work_id) } - Work::UnknownBlockSamplingRequest { .. } => { - unknown_block_sampling_request_queue.push(work, work_id) + Work::ApiRequestP1 { .. } => { + work_queues.api_request_p1_queue.push(work, work_id) } - Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id), - Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id), }; Some(work_type) } @@ -1442,62 +1073,85 @@ impl BeaconProcessor { if let Some(modified_queue_id) = modified_queue_id { let queue_len = match modified_queue_id { - WorkType::GossipAttestation => attestation_queue.len(), - WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), - WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), + WorkType::GossipAttestation => work_queues.attestation_queue.len(), + WorkType::UnknownBlockAttestation => { + work_queues.unknown_block_attestation_queue.len() + } WorkType::GossipAttestationBatch => 0, // No queue - WorkType::GossipAggregate => aggregate_queue.len(), - WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::GossipAggregate => work_queues.aggregate_queue.len(), + WorkType::UnknownBlockAggregate => { + work_queues.unknown_block_aggregate_queue.len() + } WorkType::UnknownLightClientOptimisticUpdate => { - unknown_light_client_update_queue.len() + work_queues.unknown_light_client_update_queue.len() } WorkType::UnknownBlockSamplingRequest => { - unknown_block_sampling_request_queue.len() + work_queues.unknown_block_sampling_request_queue.len() } WorkType::GossipAggregateBatch => 0, // No queue - WorkType::GossipBlock => gossip_block_queue.len(), - WorkType::GossipBlobSidecar => gossip_blob_queue.len(), - WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), - WorkType::DelayedImportBlock => delayed_block_queue.len(), - WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), - WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), - WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), - WorkType::GossipSyncSignature => sync_message_queue.len(), - WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipBlock => work_queues.gossip_block_queue.len(), + WorkType::GossipBlobSidecar => work_queues.gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => { + work_queues.gossip_data_column_queue.len() + } + WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => { + work_queues.gossip_voluntary_exit_queue.len() + } + WorkType::GossipProposerSlashing => { + work_queues.gossip_proposer_slashing_queue.len() + } + WorkType::GossipAttesterSlashing => { + work_queues.gossip_attester_slashing_queue.len() + } + WorkType::GossipSyncSignature => work_queues.sync_message_queue.len(), + WorkType::GossipSyncContribution => { + work_queues.sync_contribution_queue.len() + } WorkType::GossipLightClientFinalityUpdate => { - lc_gossip_finality_update_queue.len() + work_queues.lc_gossip_finality_update_queue.len() } WorkType::GossipLightClientOptimisticUpdate => { - lc_gossip_optimistic_update_queue.len() + work_queues.lc_gossip_optimistic_update_queue.len() } - WorkType::RpcBlock => rpc_block_queue.len(), - WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), - WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(), - WorkType::SamplingResult => sampling_result_queue.len(), - WorkType::ColumnReconstruction => column_reconstruction_queue.len(), - WorkType::ChainSegment => chain_segment_queue.len(), - WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), - WorkType::Status => status_queue.len(), - WorkType::BlocksByRangeRequest => blbrange_queue.len(), - WorkType::BlocksByRootsRequest => blbroots_queue.len(), - WorkType::BlobsByRangeRequest => bbrange_queue.len(), - WorkType::BlobsByRootsRequest => bbroots_queue.len(), - WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), - WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::RpcBlock => work_queues.rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { + work_queues.rpc_blob_queue.len() + } + WorkType::RpcCustodyColumn => work_queues.rpc_custody_column_queue.len(), + WorkType::RpcVerifyDataColumn => { + work_queues.rpc_verify_data_column_queue.len() + } + WorkType::SamplingResult => work_queues.sampling_result_queue.len(), + WorkType::ColumnReconstruction => { + work_queues.column_reconstruction_queue.len() + } + WorkType::ChainSegment => work_queues.chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => work_queues.backfill_chain_segment.len(), + WorkType::Status => work_queues.status_queue.len(), + WorkType::BlocksByRangeRequest => work_queues.blbrange_queue.len(), + WorkType::BlocksByRootsRequest => work_queues.blbroots_queue.len(), + WorkType::BlobsByRangeRequest => work_queues.bbrange_queue.len(), + WorkType::BlobsByRootsRequest => work_queues.bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => work_queues.dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => work_queues.dcbrange_queue.len(), WorkType::GossipBlsToExecutionChange => { - gossip_bls_to_execution_change_queue.len() + work_queues.gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => { + work_queues.lc_bootstrap_queue.len() } - WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), WorkType::LightClientOptimisticUpdateRequest => { - lc_rpc_optimistic_update_queue.len() + work_queues.lc_rpc_optimistic_update_queue.len() } WorkType::LightClientFinalityUpdateRequest => { - lc_rpc_finality_update_queue.len() + work_queues.lc_rpc_finality_update_queue.len() } - WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), - WorkType::ApiRequestP0 => api_request_p0_queue.len(), - WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::LightClientUpdatesByRangeRequest => { + work_queues.lc_update_range_queue.len() + } + WorkType::ApiRequestP0 => work_queues.api_request_p0_queue.len(), + WorkType::ApiRequestP1 => work_queues.api_request_p1_queue.len(), WorkType::Reprocess => 0, }; metrics::observe_vec( @@ -1507,18 +1161,21 @@ impl BeaconProcessor { ); } - if aggregate_queue.is_full() && aggregate_debounce.elapsed() { + if work_queues.aggregate_queue.is_full() && work_queues.aggregate_debounce.elapsed() + { error!( msg = "the system has insufficient resources for load", - queue_len = aggregate_queue.max_length, + queue_len = work_queues.aggregate_queue.max_length, "Aggregate attestation queue full" ) } - if attestation_queue.is_full() && attestation_debounce.elapsed() { + if work_queues.attestation_queue.is_full() + && work_queues.attestation_debounce.elapsed() + { error!( msg = "the system has insufficient resources for load", - queue_len = attestation_queue.max_length, + queue_len = work_queues.attestation_queue.max_length, "Attestation queue full" ) } @@ -1721,21 +1378,3 @@ impl Drop for SendOnDrop { } } } - -#[cfg(test)] -mod tests { - use super::*; - use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec}; - - #[test] - fn min_queue_len() { - // State with no validators. - let spec = ForkName::latest_stable().make_genesis_spec(ChainSpec::mainnet()); - let genesis_time = 0; - let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); - assert_eq!(state.validators().len(), 0); - let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap(); - assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); - assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); - } -} diff --git a/beacon_node/beacon_processor/src/scheduler/mod.rs b/beacon_node/beacon_processor/src/scheduler/mod.rs index e1a076a7c54..e9ec9fac717 100644 --- a/beacon_node/beacon_processor/src/scheduler/mod.rs +++ b/beacon_node/beacon_processor/src/scheduler/mod.rs @@ -1 +1,171 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Stream, StreamExt}; +use logging::crit; +use tokio::sync::mpsc::{self, error::TrySendError, Sender}; +use tracing::{debug, warn}; +use types::EthSpec; +use work_reprocessing_queue::{QueuedBackfillBatch, ReadyWork, ReprocessQueueMessage}; + +use crate::{BeaconProcessor, WorkEvent, WORKER_FREED}; + +pub mod work_queue; pub mod work_reprocessing_queue; + +pub enum NextWorkEvent { + WorkEvent(Option>), + Continue, + Break, +} + +/// Unifies all the messages processed by the `BeaconProcessor`. +pub enum InboundEvent { + /// A worker has completed a task and is free. + WorkerIdle, + /// There is new work to be done. + WorkEvent(WorkEvent), + /// A work event that was queued for re-processing has become ready. + ReprocessingWork(WorkEvent), +} + +/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. +/// +/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained +/// control (specifically in the ordering of event processing). +pub struct InboundEvents { + /// Used by workers when they finish a task. + pub idle_rx: mpsc::Receiver<()>, + /// Used by upstream processes to send new work to the `BeaconProcessor`. + pub event_rx: mpsc::Receiver>, + /// Used internally for queuing work ready to be re-processed. + pub ready_work_rx: mpsc::Receiver, +} + +impl Stream for InboundEvents { + type Item = InboundEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Always check for idle workers before anything else. This allows us to ensure that a big + // stream of new events doesn't suppress the processing of existing events. + match self.idle_rx.poll_recv(cx) { + Poll::Ready(Some(())) => { + return Poll::Ready(Some(InboundEvent::WorkerIdle)); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + // Poll for delayed blocks before polling for new work. It might be the case that a delayed + // block is required to successfully process some new work. + match self.ready_work_rx.poll_recv(cx) { + Poll::Ready(Some(ready_work)) => { + return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + match self.event_rx.poll_recv(cx) { + Poll::Ready(Some(event)) => { + return Poll::Ready(Some(InboundEvent::WorkEvent(event))); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => {} + } + + Poll::Pending + } +} + +impl InboundEvents { + pub async fn next_work_event( + &mut self, + reprocess_work_tx: &Sender, + beacon_processor: &mut BeaconProcessor, + ) -> NextWorkEvent { + match self.next().await { + Some(InboundEvent::WorkerIdle) => { + beacon_processor.current_workers = + beacon_processor.current_workers.saturating_sub(1); + NextWorkEvent::WorkEvent(None) + } + Some(InboundEvent::WorkEvent(event)) + if beacon_processor.config.enable_backfill_rate_limiting => + { + match QueuedBackfillBatch::try_from(event) { + Ok(backfill_batch) => { + match reprocess_work_tx + .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch)) + { + Err(e) => { + warn!( + error = ?e, + "Unable to queue backfill work event. Will try to process now." + ); + match e { + TrySendError::Full(reprocess_queue_message) + | TrySendError::Closed(reprocess_queue_message) => { + match reprocess_queue_message { + ReprocessQueueMessage::BackfillSync(backfill_batch) => { + NextWorkEvent::WorkEvent(Some( + backfill_batch.into(), + )) + } + other => { + crit!( + message_type = other.as_ref(), + "Unexpected queue message type" + ); + // This is an unhandled exception, drop the message. + NextWorkEvent::Continue + } + } + } + } + } + Ok(..) => { + // backfill work sent to "reprocessing" queue. Process the next event. + NextWorkEvent::Continue + } + } + } + Err(event) => NextWorkEvent::WorkEvent(Some(event)), + } + } + Some(InboundEvent::WorkEvent(event)) | Some(InboundEvent::ReprocessingWork(event)) => { + NextWorkEvent::WorkEvent(Some(event)) + } + None => { + debug!(msg = "stream ended", "Gossip processor stopped",); + NextWorkEvent::Break + } + } + } +} + +pub fn worker_journal( + work_event: &Option>, + work_journal_tx: &Option>, +) { + if let Some(work_journal_tx) = work_journal_tx { + let id = work_event + .as_ref() + .map(|event| event.work.str_id()) + .unwrap_or(WORKER_FREED); + + // We don't care if this message was successfully sent, we only use the journal + // during testing. We also ignore reprocess messages to ensure our test cases can pass. + if id != "reprocess" { + let _ = work_journal_tx.try_send(id); + } + } +} diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs new file mode 100644 index 00000000000..a2cc9f8184f --- /dev/null +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -0,0 +1,400 @@ +use crate::Work; +use logging::TimeLatch; +use std::collections::VecDeque; +use tracing::error; +use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; + +/// Over-provision queues based on active validator count by some factor. The beacon chain has +/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning +/// slightly, we don't need to adjust the queues during the lifetime of a process. +const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; + +/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues +/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that +/// seems reasonable. +const MIN_QUEUE_LEN: usize = 128; + +/// A simple first-in-first-out queue with a maximum length. +pub struct FifoQueue { + queue: VecDeque, + max_length: usize, +} + +impl FifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the queue. + /// + /// Drops `item` if the queue is full. + pub fn push(&mut self, item: T, item_desc: &str) { + if self.queue.len() == self.max_length { + error!( + queue = item_desc, + queue_len = self.max_length, + msg = "the system has insufficient resources for load", + "Work queue is full", + ) + } else { + self.queue.push_back(item); + } + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +/// A simple last-in-first-out queue with a maximum length. +pub struct LifoQueue { + queue: VecDeque, + pub max_length: usize, +} + +impl LifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the front of the queue. + /// + /// If the queue is full, the item at the back of the queue is dropped. + pub fn push(&mut self, item: T) { + if self.queue.len() == self.max_length { + self.queue.pop_back(); + } + self.queue.push_front(item); + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Returns `true` if the queue is full. + pub fn is_full(&self) -> bool { + self.queue.len() >= self.max_length + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } + + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } +} + +/// Maximum number of queued items that will be stored before dropping them +pub struct BeaconProcessorQueueLengths { + aggregate_queue: usize, + attestation_queue: usize, + unknown_block_aggregate_queue: usize, + unknown_block_attestation_queue: usize, + sync_message_queue: usize, + sync_contribution_queue: usize, + gossip_voluntary_exit_queue: usize, + gossip_proposer_slashing_queue: usize, + gossip_attester_slashing_queue: usize, + unknown_light_client_update_queue: usize, + unknown_block_sampling_request_queue: usize, + rpc_block_queue: usize, + rpc_blob_queue: usize, + rpc_custody_column_queue: usize, + rpc_verify_data_column_queue: usize, + sampling_result_queue: usize, + column_reconstruction_queue: usize, + chain_segment_queue: usize, + backfill_chain_segment: usize, + gossip_block_queue: usize, + gossip_blob_queue: usize, + gossip_data_column_queue: usize, + delayed_block_queue: usize, + status_queue: usize, + bbrange_queue: usize, + bbroots_queue: usize, + blbroots_queue: usize, + blbrange_queue: usize, + dcbroots_queue: usize, + dcbrange_queue: usize, + gossip_bls_to_execution_change_queue: usize, + lc_bootstrap_queue: usize, + lc_rpc_optimistic_update_queue: usize, + lc_rpc_finality_update_queue: usize, + lc_gossip_finality_update_queue: usize, + lc_gossip_optimistic_update_queue: usize, + lc_update_range_queue: usize, + api_request_p0_queue: usize, + api_request_p1_queue: usize, +} + +impl BeaconProcessorQueueLengths { + pub fn from_state( + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { + let active_validator_count = + match state.get_cached_active_validator_indices(RelativeEpoch::Current) { + Ok(indices) => indices.len(), + Err(_) => state + .get_active_validator_indices(state.current_epoch(), spec) + .map_err(|e| format!("Error computing active indices: {:?}", e))? + .len(), + }; + let active_validator_count = + (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100; + let slots_per_epoch = E::slots_per_epoch() as usize; + + Ok(Self { + aggregate_queue: 4096, + unknown_block_aggregate_queue: 1024, + // Capacity for a full slot's worth of attestations if subscribed to all subnets + attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), + // Capacity for a full slot's worth of attestations if subscribed to all subnets + unknown_block_attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), + sync_message_queue: 2048, + sync_contribution_queue: 1024, + gossip_voluntary_exit_queue: 4096, + gossip_proposer_slashing_queue: 4096, + gossip_attester_slashing_queue: 4096, + unknown_block_sampling_request_queue: 16384, + unknown_light_client_update_queue: 128, + rpc_block_queue: 1024, + rpc_blob_queue: 1024, + // TODO(das): Placeholder values + rpc_custody_column_queue: 1000, + rpc_verify_data_column_queue: 1000, + sampling_result_queue: 1000, + column_reconstruction_queue: 64, + chain_segment_queue: 64, + backfill_chain_segment: 64, + gossip_block_queue: 1024, + gossip_blob_queue: 1024, + gossip_data_column_queue: 1024, + delayed_block_queue: 1024, + status_queue: 1024, + bbrange_queue: 1024, + bbroots_queue: 1024, + blbroots_queue: 1024, + blbrange_queue: 1024, + // TODO(das): pick proper values + dcbroots_queue: 1024, + dcbrange_queue: 1024, + gossip_bls_to_execution_change_queue: 16384, + lc_gossip_finality_update_queue: 1024, + lc_gossip_optimistic_update_queue: 1024, + lc_bootstrap_queue: 1024, + lc_rpc_optimistic_update_queue: 512, + lc_rpc_finality_update_queue: 512, + lc_update_range_queue: 512, + api_request_p0_queue: 1024, + api_request_p1_queue: 1024, + }) + } +} + +pub struct WorkQueues { + pub aggregate_queue: LifoQueue>, + pub aggregate_debounce: TimeLatch, + pub attestation_queue: LifoQueue>, + pub attestation_debounce: TimeLatch, + pub unknown_block_aggregate_queue: LifoQueue>, + pub unknown_block_attestation_queue: LifoQueue>, + pub sync_message_queue: LifoQueue>, + pub sync_contribution_queue: LifoQueue>, + pub gossip_voluntary_exit_queue: FifoQueue>, + pub gossip_proposer_slashing_queue: FifoQueue>, + pub gossip_attester_slashing_queue: FifoQueue>, + pub unknown_light_client_update_queue: FifoQueue>, + pub unknown_block_sampling_request_queue: FifoQueue>, + pub rpc_block_queue: FifoQueue>, + pub rpc_blob_queue: FifoQueue>, + pub rpc_custody_column_queue: FifoQueue>, + pub rpc_verify_data_column_queue: FifoQueue>, + pub sampling_result_queue: FifoQueue>, + pub column_reconstruction_queue: FifoQueue>, + pub chain_segment_queue: FifoQueue>, + pub backfill_chain_segment: FifoQueue>, + pub gossip_block_queue: FifoQueue>, + pub gossip_blob_queue: FifoQueue>, + pub gossip_data_column_queue: FifoQueue>, + pub delayed_block_queue: FifoQueue>, + pub status_queue: FifoQueue>, + pub bbrange_queue: FifoQueue>, + pub bbroots_queue: FifoQueue>, + pub blbroots_queue: FifoQueue>, + pub blbrange_queue: FifoQueue>, + pub dcbroots_queue: FifoQueue>, + pub dcbrange_queue: FifoQueue>, + pub gossip_bls_to_execution_change_queue: FifoQueue>, + pub lc_gossip_finality_update_queue: FifoQueue>, + pub lc_gossip_optimistic_update_queue: FifoQueue>, + pub lc_bootstrap_queue: FifoQueue>, + pub lc_rpc_optimistic_update_queue: FifoQueue>, + pub lc_rpc_finality_update_queue: FifoQueue>, + pub lc_update_range_queue: FifoQueue>, + pub api_request_p0_queue: FifoQueue>, + pub api_request_p1_queue: FifoQueue>, +} + +impl WorkQueues { + pub fn new(queue_lengths: BeaconProcessorQueueLengths) -> Self { + let aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue); + let aggregate_debounce = TimeLatch::default(); + let attestation_queue = LifoQueue::new(queue_lengths.attestation_queue); + let attestation_debounce = TimeLatch::default(); + let unknown_block_aggregate_queue = + LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); + let unknown_block_attestation_queue = + LifoQueue::new(queue_lengths.unknown_block_attestation_queue); + + let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); + let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); + + // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have + // a strong feeling about queue type for exits. + let gossip_voluntary_exit_queue = FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue); + + // Using a FIFO queue for slashing to prevent people from flushing their slashings from the + // queues with lots of junk messages. + let gossip_proposer_slashing_queue = + FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue); + let gossip_attester_slashing_queue = + FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); + + // Using a FIFO queue for light client updates to maintain sequence order. + let unknown_light_client_update_queue = + FifoQueue::new(queue_lengths.unknown_light_client_update_queue); + let unknown_block_sampling_request_queue = + FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); + + // Using a FIFO queue since blocks need to be imported sequentially. + let rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); + let rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); + let rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); + let rpc_verify_data_column_queue = + FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); + let sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); + let column_reconstruction_queue = FifoQueue::new(queue_lengths.column_reconstruction_queue); + let chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); + let backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); + let gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); + let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); + let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); + let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + + let status_queue = FifoQueue::new(queue_lengths.status_queue); + let bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue); + let bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); + let blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); + let blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); + let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); + let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); + + let gossip_bls_to_execution_change_queue = + FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); + + let lc_gossip_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue); + let lc_gossip_finality_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue); + let lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); + let lc_rpc_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue); + let lc_rpc_finality_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue); + let lc_update_range_queue: FifoQueue> = + FifoQueue::new(queue_lengths.lc_update_range_queue); + + let api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); + let api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue); + + WorkQueues { + aggregate_queue, + aggregate_debounce, + attestation_queue, + attestation_debounce, + unknown_block_aggregate_queue, + unknown_block_attestation_queue, + sync_message_queue, + sync_contribution_queue, + gossip_voluntary_exit_queue, + gossip_proposer_slashing_queue, + gossip_attester_slashing_queue, + unknown_light_client_update_queue, + unknown_block_sampling_request_queue, + rpc_block_queue, + rpc_blob_queue, + rpc_custody_column_queue, + rpc_verify_data_column_queue, + sampling_result_queue, + chain_segment_queue, + column_reconstruction_queue, + backfill_chain_segment, + gossip_block_queue, + gossip_blob_queue, + gossip_data_column_queue, + delayed_block_queue, + status_queue, + bbrange_queue, + bbroots_queue, + blbroots_queue, + blbrange_queue, + dcbroots_queue, + dcbrange_queue, + gossip_bls_to_execution_change_queue, + lc_gossip_optimistic_update_queue, + lc_gossip_finality_update_queue, + lc_bootstrap_queue, + lc_rpc_optimistic_update_queue, + lc_rpc_finality_update_queue, + lc_update_range_queue, + api_request_p0_queue, + api_request_p1_queue, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec}; + + #[test] + fn min_queue_len() { + // State with no validators. + let spec = ForkName::latest().make_genesis_spec(ChainSpec::mainnet()); + let genesis_time = 0; + let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); + assert_eq!(state.validators().len(), 0); + let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap(); + assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); + assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 479b4b3192a..955d4540cfc 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -17,8 +17,8 @@ use beacon_chain::{ BeaconChain, BeaconChainTypes, MigratorConfig, ServerSentEventHandler, }; use beacon_chain::{Kzg, LightClientProducerEvent}; +use beacon_processor::{work_queue::BeaconProcessorQueueLengths, BeaconProcessorConfig}; use beacon_processor::{BeaconProcessor, BeaconProcessorChannels}; -use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths}; use environment::RuntimeContext; use eth2::{ types::{BlockId, StateId}, diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index a52df6c863f..0000a3a397a 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -4,7 +4,8 @@ use beacon_chain::{ BeaconChain, BeaconChainTypes, }; use beacon_processor::{ - BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths, + work_queue::BeaconProcessorQueueLengths, BeaconProcessor, BeaconProcessorChannels, + BeaconProcessorConfig, }; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 109c361ebe8..78238ed8da3 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -17,6 +17,7 @@ use beacon_chain::test_utils::{ EphemeralHarnessType, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; +use beacon_processor::work_queue::BeaconProcessorQueueLengths; use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools;