@@ -121,7 +121,7 @@ use store::{
121
121
KeyValueStoreOp , StoreItem , StoreOp ,
122
122
} ;
123
123
use task_executor:: { ShutdownReason , TaskExecutor } ;
124
- use tokio:: sync:: mpsc :: Receiver ;
124
+ use tokio:: sync:: oneshot ;
125
125
use tokio_stream:: Stream ;
126
126
use tree_hash:: TreeHash ;
127
127
use types:: blob_sidecar:: FixedBlobSidecarList ;
@@ -3088,7 +3088,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3088
3088
slot : Slot ,
3089
3089
block_root : Hash256 ,
3090
3090
blobs : FixedBlobSidecarList < T :: EthSpec > ,
3091
- data_column_recv : Option < Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3091
+ data_column_recv : Option < oneshot :: Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3092
3092
) -> Result < AvailabilityProcessingStatus , BlockError > {
3093
3093
// If this block has already been imported to forkchoice it must have been available, so
3094
3094
// we don't need to process its blobs again.
@@ -3216,7 +3216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3216
3216
} ;
3217
3217
3218
3218
let r = self
3219
- . process_availability ( slot, availability, None , || Ok ( ( ) ) )
3219
+ . process_availability ( slot, availability, || Ok ( ( ) ) )
3220
3220
. await ;
3221
3221
self . remove_notified ( & block_root, r)
3222
3222
. map ( |availability_processing_status| {
@@ -3344,7 +3344,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3344
3344
3345
3345
match executed_block {
3346
3346
ExecutedBlock :: Available ( block) => {
3347
- self . import_available_block ( Box :: new ( block) , None ) . await
3347
+ self . import_available_block ( Box :: new ( block) ) . await
3348
3348
}
3349
3349
ExecutedBlock :: AvailabilityPending ( block) => {
3350
3350
self . check_block_availability_and_import ( block) . await
@@ -3476,7 +3476,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3476
3476
let availability = self
3477
3477
. data_availability_checker
3478
3478
. put_pending_executed_block ( block) ?;
3479
- self . process_availability ( slot, availability, None , || Ok ( ( ) ) )
3479
+ self . process_availability ( slot, availability, || Ok ( ( ) ) )
3480
3480
. await
3481
3481
}
3482
3482
@@ -3492,7 +3492,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3492
3492
}
3493
3493
let availability = self . data_availability_checker . put_gossip_blob ( blob) ?;
3494
3494
3495
- self . process_availability ( slot, availability, None , || Ok ( ( ) ) )
3495
+ self . process_availability ( slot, availability, || Ok ( ( ) ) )
3496
3496
. await
3497
3497
}
3498
3498
@@ -3515,7 +3515,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3515
3515
. data_availability_checker
3516
3516
. put_gossip_data_columns ( block_root, data_columns) ?;
3517
3517
3518
- self . process_availability ( slot, availability, None , publish_fn)
3518
+ self . process_availability ( slot, availability, publish_fn)
3519
3519
. await
3520
3520
}
3521
3521
@@ -3559,7 +3559,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3559
3559
. data_availability_checker
3560
3560
. put_rpc_blobs ( block_root, blobs) ?;
3561
3561
3562
- self . process_availability ( slot, availability, None , || Ok ( ( ) ) )
3562
+ self . process_availability ( slot, availability, || Ok ( ( ) ) )
3563
3563
. await
3564
3564
}
3565
3565
@@ -3568,14 +3568,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3568
3568
slot : Slot ,
3569
3569
block_root : Hash256 ,
3570
3570
blobs : FixedBlobSidecarList < T :: EthSpec > ,
3571
- data_column_recv : Option < Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3571
+ data_column_recv : Option < oneshot :: Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3572
3572
) -> Result < AvailabilityProcessingStatus , BlockError > {
3573
3573
self . check_blobs_for_slashability ( block_root, & blobs) ?;
3574
- let availability = self
3575
- . data_availability_checker
3576
- . put_engine_blobs ( block_root, blobs) ?;
3574
+ let availability =
3575
+ self . data_availability_checker
3576
+ . put_engine_blobs ( block_root, blobs, data_column_recv ) ?;
3577
3577
3578
- self . process_availability ( slot, availability, data_column_recv , || Ok ( ( ) ) )
3578
+ self . process_availability ( slot, availability, || Ok ( ( ) ) )
3579
3579
. await
3580
3580
}
3581
3581
@@ -3615,7 +3615,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3615
3615
. data_availability_checker
3616
3616
. put_rpc_custody_columns ( block_root, custody_columns) ?;
3617
3617
3618
- self . process_availability ( slot, availability, None , || Ok ( ( ) ) )
3618
+ self . process_availability ( slot, availability, || Ok ( ( ) ) )
3619
3619
. await
3620
3620
}
3621
3621
@@ -3627,14 +3627,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3627
3627
self : & Arc < Self > ,
3628
3628
slot : Slot ,
3629
3629
availability : Availability < T :: EthSpec > ,
3630
- recv : Option < Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3631
3630
publish_fn : impl FnOnce ( ) -> Result < ( ) , BlockError > ,
3632
3631
) -> Result < AvailabilityProcessingStatus , BlockError > {
3633
3632
match availability {
3634
3633
Availability :: Available ( block) => {
3635
3634
publish_fn ( ) ?;
3636
3635
// Block is fully available, import into fork choice
3637
- self . import_available_block ( block, recv ) . await
3636
+ self . import_available_block ( block) . await
3638
3637
}
3639
3638
Availability :: MissingComponents ( block_root) => Ok (
3640
3639
AvailabilityProcessingStatus :: MissingComponents ( slot, block_root) ,
@@ -3645,7 +3644,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3645
3644
pub async fn import_available_block (
3646
3645
self : & Arc < Self > ,
3647
3646
block : Box < AvailableExecutedBlock < T :: EthSpec > > ,
3648
- data_column_recv : Option < Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3649
3647
) -> Result < AvailabilityProcessingStatus , BlockError > {
3650
3648
let AvailableExecutedBlock {
3651
3649
block,
@@ -3660,6 +3658,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3660
3658
parent_eth1_finalization_data,
3661
3659
confirmed_state_roots,
3662
3660
consensus_context,
3661
+ data_column_recv,
3663
3662
} = import_data;
3664
3663
3665
3664
// Record the time at which this block's blobs became available.
@@ -3726,7 +3725,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3726
3725
parent_block : SignedBlindedBeaconBlock < T :: EthSpec > ,
3727
3726
parent_eth1_finalization_data : Eth1FinalizationData ,
3728
3727
mut consensus_context : ConsensusContext < T :: EthSpec > ,
3729
- data_column_recv : Option < Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3728
+ data_column_recv : Option < oneshot :: Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
3730
3729
) -> Result < Hash256 , BlockError > {
3731
3730
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
3732
3731
// Everything in this initial section is on the hot path between processing the block and
@@ -3894,44 +3893,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3894
3893
// end up with blocks in fork choice that are missing from disk.
3895
3894
// See https://github.com/sigp/lighthouse/issues/2028
3896
3895
let ( _, signed_block, blobs, data_columns) = signed_block. deconstruct ( ) ;
3897
- // TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
3898
- // custody columns: https://github.com/sigp/lighthouse/issues/6465
3899
- let custody_columns_count = self . data_availability_checker . get_sampling_column_count ( ) ;
3900
- // if block is made available via blobs, dropped the data columns.
3901
- let data_columns = data_columns. filter ( |columns| columns. len ( ) == custody_columns_count) ;
3902
-
3903
- let data_columns = match ( data_columns, data_column_recv) {
3904
- // If the block was made available via custody columns received from gossip / rpc, use them
3905
- // since we already have them.
3906
- ( Some ( columns) , _) => Some ( columns) ,
3907
- // Otherwise, it means blobs were likely available via fetching from EL, in this case we
3908
- // wait for the data columns to be computed (blocking).
3909
- ( None , Some ( mut data_column_recv) ) => {
3910
- let _column_recv_timer =
3911
- metrics:: start_timer ( & metrics:: BLOCK_PROCESSING_DATA_COLUMNS_WAIT ) ;
3912
- // Unable to receive data columns from sender, sender is either dropped or
3913
- // failed to compute data columns from blobs. We restore fork choice here and
3914
- // return to avoid inconsistency in database.
3915
- if let Some ( columns) = data_column_recv. blocking_recv ( ) {
3916
- Some ( columns)
3917
- } else {
3918
- let err_msg = "Did not receive data columns from sender" ;
3919
- error ! (
3920
- self . log,
3921
- "Failed to store data columns into the database" ;
3922
- "msg" => "Restoring fork choice from disk" ,
3923
- "error" => err_msg,
3924
- ) ;
3925
- return Err ( self
3926
- . handle_import_block_db_write_error ( fork_choice)
3927
- . err ( )
3928
- . unwrap_or ( BlockError :: InternalError ( err_msg. to_string ( ) ) ) ) ;
3929
- }
3896
+
3897
+ match self . get_blobs_or_columns_store_op (
3898
+ block_root,
3899
+ signed_block. epoch ( ) ,
3900
+ blobs,
3901
+ data_columns,
3902
+ data_column_recv,
3903
+ ) {
3904
+ Ok ( Some ( blobs_or_columns_store_op) ) => {
3905
+ ops. push ( blobs_or_columns_store_op) ;
3930
3906
}
3931
- // No data columns present and compute data columns task was not spawned.
3932
- // Could either be no blobs in the block or before PeerDAS activation.
3933
- ( None , None ) => None ,
3934
- } ;
3907
+ Ok ( None ) => { }
3908
+ Err ( e) => {
3909
+ error ! (
3910
+ self . log,
3911
+ "Failed to store data columns into the database" ;
3912
+ "msg" => "Restoring fork choice from disk" ,
3913
+ "error" => & e,
3914
+ "block_root" => ?block_root
3915
+ ) ;
3916
+ return Err ( self
3917
+ . handle_import_block_db_write_error ( fork_choice)
3918
+ . err ( )
3919
+ . unwrap_or ( BlockError :: InternalError ( e) ) ) ;
3920
+ }
3921
+ }
3935
3922
3936
3923
let block = signed_block. message ( ) ;
3937
3924
let db_write_timer = metrics:: start_timer ( & metrics:: BLOCK_PROCESSING_DB_WRITE ) ;
@@ -3943,30 +3930,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3943
3930
ops. push ( StoreOp :: PutBlock ( block_root, signed_block. clone ( ) ) ) ;
3944
3931
ops. push ( StoreOp :: PutState ( block. state_root ( ) , & state) ) ;
3945
3932
3946
- if let Some ( blobs) = blobs {
3947
- if !blobs. is_empty ( ) {
3948
- debug ! (
3949
- self . log, "Writing blobs to store" ;
3950
- "block_root" => %block_root,
3951
- "count" => blobs. len( ) ,
3952
- ) ;
3953
- ops. push ( StoreOp :: PutBlobs ( block_root, blobs) ) ;
3954
- }
3955
- }
3956
-
3957
- if let Some ( data_columns) = data_columns {
3958
- // TODO(das): `available_block includes all sampled columns, but we only need to store
3959
- // custody columns. To be clarified in spec.
3960
- if !data_columns. is_empty ( ) {
3961
- debug ! (
3962
- self . log, "Writing data_columns to store" ;
3963
- "block_root" => %block_root,
3964
- "count" => data_columns. len( ) ,
3965
- ) ;
3966
- ops. push ( StoreOp :: PutDataColumns ( block_root, data_columns) ) ;
3967
- }
3968
- }
3969
-
3970
3933
let txn_lock = self . store . hot_db . begin_rw_transaction ( ) ;
3971
3934
3972
3935
if let Err ( e) = self . store . do_atomically_with_block_and_blobs_cache ( ops) {
@@ -7184,6 +7147,68 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
7184
7147
reqresp_pre_import_cache_len : self . reqresp_pre_import_cache . read ( ) . len ( ) ,
7185
7148
}
7186
7149
}
7150
+
7151
+ fn get_blobs_or_columns_store_op (
7152
+ & self ,
7153
+ block_root : Hash256 ,
7154
+ block_epoch : Epoch ,
7155
+ blobs : Option < BlobSidecarList < T :: EthSpec > > ,
7156
+ data_columns : Option < DataColumnSidecarList < T :: EthSpec > > ,
7157
+ data_column_recv : Option < oneshot:: Receiver < DataColumnSidecarList < T :: EthSpec > > > ,
7158
+ ) -> Result < Option < StoreOp < T :: EthSpec > > , String > {
7159
+ if self . spec . is_peer_das_enabled_for_epoch ( block_epoch) {
7160
+ // TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
7161
+ // custody columns: https://github.com/sigp/lighthouse/issues/6465
7162
+ let custody_columns_count = self . data_availability_checker . get_sampling_column_count ( ) ;
7163
+
7164
+ let custody_columns_available = data_columns
7165
+ . as_ref ( )
7166
+ . as_ref ( )
7167
+ . is_some_and ( |columns| columns. len ( ) == custody_columns_count) ;
7168
+
7169
+ let data_columns_to_persist = if custody_columns_available {
7170
+ // If the block was made available via custody columns received from gossip / rpc, use them
7171
+ // since we already have them.
7172
+ data_columns
7173
+ } else if let Some ( data_column_recv) = data_column_recv {
7174
+ // Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
7175
+ let _column_recv_timer =
7176
+ metrics:: start_timer ( & metrics:: BLOCK_PROCESSING_DATA_COLUMNS_WAIT ) ;
7177
+ // Unable to receive data columns from sender, sender is either dropped or
7178
+ // failed to compute data columns from blobs. We restore fork choice here and
7179
+ // return to avoid inconsistency in database.
7180
+ let computed_data_columns = data_column_recv
7181
+ . blocking_recv ( )
7182
+ . map_err ( |e| format ! ( "Did not receive data columns from sender: {e:?}" ) ) ?;
7183
+ Some ( computed_data_columns)
7184
+ } else {
7185
+ // No blobs in the block.
7186
+ None
7187
+ } ;
7188
+
7189
+ if let Some ( data_columns) = data_columns_to_persist {
7190
+ if !data_columns. is_empty ( ) {
7191
+ debug ! (
7192
+ self . log, "Writing data_columns to store" ;
7193
+ "block_root" => %block_root,
7194
+ "count" => data_columns. len( ) ,
7195
+ ) ;
7196
+ return Ok ( Some ( StoreOp :: PutDataColumns ( block_root, data_columns) ) ) ;
7197
+ }
7198
+ }
7199
+ } else if let Some ( blobs) = blobs {
7200
+ if !blobs. is_empty ( ) {
7201
+ debug ! (
7202
+ self . log, "Writing blobs to store" ;
7203
+ "block_root" => %block_root,
7204
+ "count" => blobs. len( ) ,
7205
+ ) ;
7206
+ return Ok ( Some ( StoreOp :: PutBlobs ( block_root, blobs) ) ) ;
7207
+ }
7208
+ }
7209
+
7210
+ Ok ( None )
7211
+ }
7187
7212
}
7188
7213
7189
7214
impl < T : BeaconChainTypes > Drop for BeaconChain < T > {
0 commit comments