@@ -62,9 +62,9 @@ use task_executor::TaskExecutor;
62
62
use tokio:: sync:: mpsc;
63
63
use tokio:: sync:: mpsc:: error:: TrySendError ;
64
64
use types:: {
65
- Attestation , BeaconState , ChainSpec , Hash256 , RelativeEpoch , SignedAggregateAndProof , SubnetId ,
65
+ Attestation , BeaconState , ChainSpec , EthSpec , Hash256 , RelativeEpoch , SignedAggregateAndProof ,
66
+ SingleAttestation , Slot , SubnetId ,
66
67
} ;
67
- use types:: { EthSpec , Slot } ;
68
68
use work_reprocessing_queue:: {
69
69
spawn_reprocess_scheduler, QueuedAggregate , QueuedLightClientUpdate , QueuedRpcBlock ,
70
70
QueuedUnaggregate , ReadyWork ,
@@ -504,10 +504,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
504
504
505
505
/// Items required to verify a batch of unaggregated gossip attestations.
506
506
#[ derive( Debug ) ]
507
- pub struct GossipAttestationPackage < E : EthSpec > {
507
+ pub struct GossipAttestationPackage < T > {
508
508
pub message_id : MessageId ,
509
509
pub peer_id : PeerId ,
510
- pub attestation : Box < Attestation < E > > ,
510
+ pub attestation : Box < T > ,
511
511
pub subnet_id : SubnetId ,
512
512
pub should_import : bool ,
513
513
pub seen_timestamp : Duration ,
@@ -549,21 +549,32 @@ pub enum BlockingOrAsync {
549
549
Blocking ( BlockingFn ) ,
550
550
Async ( AsyncFn ) ,
551
551
}
552
+ pub type GossipAttestationBatch < E > = Vec < GossipAttestationPackage < Attestation < E > > > ;
552
553
553
554
/// Indicates the type of work to be performed and therefore its priority and
554
555
/// queuing specifics.
555
556
pub enum Work < E : EthSpec > {
556
557
GossipAttestation {
557
- attestation : Box < GossipAttestationPackage < E > > ,
558
- process_individual : Box < dyn FnOnce ( GossipAttestationPackage < E > ) + Send + Sync > ,
559
- process_batch : Box < dyn FnOnce ( Vec < GossipAttestationPackage < E > > ) + Send + Sync > ,
558
+ attestation : Box < GossipAttestationPackage < Attestation < E > > > ,
559
+ process_individual : Box < dyn FnOnce ( GossipAttestationPackage < Attestation < E > > ) + Send + Sync > ,
560
+ process_batch : Box < dyn FnOnce ( GossipAttestationBatch < E > ) + Send + Sync > ,
561
+ } ,
562
+ // Attestation requiring conversion before processing.
563
+ //
564
+ // For now this is a `SingleAttestation`, but eventually we will switch this around so that
565
+ // legacy `Attestation`s are converted and the main processing pipeline operates on
566
+ // `SingleAttestation`s.
567
+ GossipAttestationToConvert {
568
+ attestation : Box < GossipAttestationPackage < SingleAttestation > > ,
569
+ process_individual :
570
+ Box < dyn FnOnce ( GossipAttestationPackage < SingleAttestation > ) + Send + Sync > ,
560
571
} ,
561
572
UnknownBlockAttestation {
562
573
process_fn : BlockingFn ,
563
574
} ,
564
575
GossipAttestationBatch {
565
- attestations : Vec < GossipAttestationPackage < E > > ,
566
- process_batch : Box < dyn FnOnce ( Vec < GossipAttestationPackage < E > > ) + Send + Sync > ,
576
+ attestations : GossipAttestationBatch < E > ,
577
+ process_batch : Box < dyn FnOnce ( GossipAttestationBatch < E > ) + Send + Sync > ,
567
578
} ,
568
579
GossipAggregate {
569
580
aggregate : Box < GossipAggregatePackage < E > > ,
@@ -639,6 +650,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
639
650
#[ strum( serialize_all = "snake_case" ) ]
640
651
pub enum WorkType {
641
652
GossipAttestation ,
653
+ GossipAttestationToConvert ,
642
654
UnknownBlockAttestation ,
643
655
GossipAttestationBatch ,
644
656
GossipAggregate ,
@@ -690,6 +702,7 @@ impl<E: EthSpec> Work<E> {
690
702
fn to_type ( & self ) -> WorkType {
691
703
match self {
692
704
Work :: GossipAttestation { .. } => WorkType :: GossipAttestation ,
705
+ Work :: GossipAttestationToConvert { .. } => WorkType :: GossipAttestationToConvert ,
693
706
Work :: GossipAttestationBatch { .. } => WorkType :: GossipAttestationBatch ,
694
707
Work :: GossipAggregate { .. } => WorkType :: GossipAggregate ,
695
708
Work :: GossipAggregateBatch { .. } => WorkType :: GossipAggregateBatch ,
@@ -849,6 +862,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
849
862
let mut aggregate_queue = LifoQueue :: new ( queue_lengths. aggregate_queue ) ;
850
863
let mut aggregate_debounce = TimeLatch :: default ( ) ;
851
864
let mut attestation_queue = LifoQueue :: new ( queue_lengths. attestation_queue ) ;
865
+ let mut attestation_to_convert_queue = LifoQueue :: new ( queue_lengths. attestation_queue ) ;
852
866
let mut attestation_debounce = TimeLatch :: default ( ) ;
853
867
let mut unknown_block_aggregate_queue =
854
868
LifoQueue :: new ( queue_lengths. unknown_block_aggregate_queue ) ;
@@ -1180,6 +1194,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
1180
1194
None
1181
1195
}
1182
1196
}
1197
+ // Convert any gossip attestations that need to be converted.
1198
+ } else if let Some ( item) = attestation_to_convert_queue. pop ( ) {
1199
+ Some ( item)
1183
1200
// Check sync committee messages after attestations as their rewards are lesser
1184
1201
// and they don't influence fork choice.
1185
1202
} else if let Some ( item) = sync_contribution_queue. pop ( ) {
@@ -1301,6 +1318,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
1301
1318
match work {
1302
1319
_ if can_spawn => self . spawn_worker ( work, idle_tx) ,
1303
1320
Work :: GossipAttestation { .. } => attestation_queue. push ( work) ,
1321
+ Work :: GossipAttestationToConvert { .. } => {
1322
+ attestation_to_convert_queue. push ( work)
1323
+ }
1304
1324
// Attestation batches are formed internally within the
1305
1325
// `BeaconProcessor`, they are not sent from external services.
1306
1326
Work :: GossipAttestationBatch { .. } => crit ! (
@@ -1431,6 +1451,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
1431
1451
if let Some ( modified_queue_id) = modified_queue_id {
1432
1452
let queue_len = match modified_queue_id {
1433
1453
WorkType :: GossipAttestation => attestation_queue. len ( ) ,
1454
+ WorkType :: GossipAttestationToConvert => attestation_to_convert_queue. len ( ) ,
1434
1455
WorkType :: UnknownBlockAttestation => unknown_block_attestation_queue. len ( ) ,
1435
1456
WorkType :: GossipAttestationBatch => 0 , // No queue
1436
1457
WorkType :: GossipAggregate => aggregate_queue. len ( ) ,
@@ -1563,6 +1584,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
1563
1584
} => task_spawner. spawn_blocking ( move || {
1564
1585
process_individual ( * attestation) ;
1565
1586
} ) ,
1587
+ Work :: GossipAttestationToConvert {
1588
+ attestation,
1589
+ process_individual,
1590
+ } => task_spawner. spawn_blocking ( move || {
1591
+ process_individual ( * attestation) ;
1592
+ } ) ,
1566
1593
Work :: GossipAttestationBatch {
1567
1594
attestations,
1568
1595
process_batch,
0 commit comments