Skip to content

Commit c8d82f0

Browse files
committed
Deprecate pruning checkpoint
1 parent bff2af2 commit c8d82f0

File tree

5 files changed

+53
-100
lines changed

5 files changed

+53
-100
lines changed

beacon_node/beacon_chain/src/builder.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ use std::time::Duration;
3838
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
3939
use task_executor::{ShutdownReason, TaskExecutor};
4040
use types::{
41-
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec,
42-
FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot,
41+
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Epoch, EthSpec, FixedBytesExtended,
42+
Hash256, Signature, SignedBeaconBlock, Slot,
4343
};
4444

4545
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
@@ -602,13 +602,6 @@ where
602602
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
603603
);
604604

605-
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
606-
self.pending_io_batch
607-
.push(store.pruning_checkpoint_store_op(Checkpoint {
608-
root: weak_subj_block_root,
609-
epoch: weak_subj_state.slot().epoch(E::slots_per_epoch()),
610-
}));
611-
612605
let snapshot = BeaconSnapshot {
613606
beacon_block_root: weak_subj_block_root,
614607
beacon_block: Arc::new(weak_subj_block),

beacon_node/beacon_chain/src/migrate.rs

Lines changed: 44 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
1515
use store::hot_cold_store::{migrate_database, HotColdDBError};
1616
use store::{DBColumn, Error, HotStateSummary, ItemStore, StoreItem, StoreOp};
1717
pub use store::{HotColdDB, MemoryStore};
18-
use types::{
19-
BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, FixedBytesExtended, Hash256, Slot,
20-
};
18+
use types::{BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, Slot};
2119

2220
/// Compact at least this frequently, finalization permitting (7 days).
2321
const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
@@ -89,7 +87,7 @@ pub struct PrevMigration {
8987
pub enum PruningOutcome {
9088
/// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`.
9189
Successful {
92-
old_finalized_checkpoint: Checkpoint,
90+
old_finalized_checkpoint_epoch: Epoch,
9391
},
9492
/// The run was aborted because the new finalized checkpoint is older than the previous one.
9593
OutOfOrderFinalization {
@@ -119,6 +117,8 @@ pub enum PruningError {
119117
MissingSummaryForFinalizedCheckpoint(Hash256),
120118
MissingBlindedBlock(Hash256),
121119
SummariesDagError(summaries_dag::Error),
120+
EmptyFinalizedStates,
121+
EmptyFinalizedBlocks,
122122
}
123123

124124
/// Message sent to the migration thread containing the information it needs to run.
@@ -356,7 +356,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
356356
}
357357
};
358358

359-
let old_finalized_checkpoint = match Self::prune_hot_db(
359+
let old_finalized_checkpoint_epoch = match Self::prune_hot_db(
360360
db.clone(),
361361
finalized_state_root.into(),
362362
&finalized_state,
@@ -365,8 +365,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
365365
log,
366366
) {
367367
Ok(PruningOutcome::Successful {
368-
old_finalized_checkpoint,
369-
}) => old_finalized_checkpoint,
368+
old_finalized_checkpoint_epoch,
369+
}) => old_finalized_checkpoint_epoch,
370370
Ok(PruningOutcome::DeferredConcurrentHeadTrackerMutation) => {
371371
warn!(
372372
log,
@@ -397,7 +397,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
397397
// Finally, compact the database so that new free space is properly reclaimed.
398398
if let Err(e) = Self::run_compaction(
399399
db,
400-
old_finalized_checkpoint.epoch,
400+
old_finalized_checkpoint_epoch,
401401
notif.finalized_checkpoint.epoch,
402402
log,
403403
) {
@@ -475,17 +475,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
475475
genesis_block_root: Hash256,
476476
log: &Logger,
477477
) -> Result<PruningOutcome, BeaconChainError> {
478-
let old_finalized_checkpoint =
479-
store
480-
.load_pruning_checkpoint()?
481-
.unwrap_or_else(|| Checkpoint {
482-
epoch: Epoch::new(0),
483-
root: Hash256::zero(),
484-
});
485-
486-
let old_finalized_slot = old_finalized_checkpoint
487-
.epoch
488-
.start_slot(E::slots_per_epoch());
489478
let new_finalized_slot = new_finalized_checkpoint
490479
.epoch
491480
.start_slot(E::slots_per_epoch());
@@ -500,21 +489,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
500489
.into());
501490
}
502491

503-
// The new finalized state must be newer than the previous finalized state.
504-
// I think this can happen sometimes currently due to `fork_choice` running in parallel
505-
// with itself and sending us notifications out of order.
506-
if old_finalized_slot > new_finalized_slot {
507-
return Ok(PruningOutcome::OutOfOrderFinalization {
508-
old_finalized_checkpoint,
509-
new_finalized_checkpoint,
510-
});
511-
}
492+
// TODO(hdiff): if we remove the check of `old_finalized_slot > new_finalized_slot` can we
493+
// ensure that a single pruning operation is running at once? If a pruning run is triggered
494+
// with an old finalized checkpoint it can derive a stale hdiff set of slots and delete
495+
// future ones that are necessary breaking the DB.
512496

513497
debug!(
514498
log,
515499
"Starting database pruning";
516-
"old_finalized_checkpoint" => ?old_finalized_checkpoint,
517500
"new_finalized_checkpoint" => ?new_finalized_checkpoint,
501+
"new_finalized_state_hash" => ?new_finalized_state_hash,
518502
);
519503

520504
let (state_summaries_dag, block_summaries_dag) = {
@@ -580,9 +564,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
580564
);
581565

582566
// Note: ancestors_of includes the finalized state root
583-
let newly_finalized_state_roots = state_summaries_dag
567+
let newly_finalized_state_summaries = state_summaries_dag
584568
.ancestors_of(new_finalized_state_hash)
585569
.map_err(PruningError::SummariesDagError)?;
570+
let newly_finalized_state_roots = newly_finalized_state_summaries
571+
.iter()
572+
.map(|(root, _)| *root)
573+
.collect::<HashSet<Hash256>>();
574+
let newly_finalized_states_min_slot = *newly_finalized_state_summaries
575+
.iter()
576+
.map(|(_, slot)| slot)
577+
.min()
578+
.ok_or(PruningError::EmptyFinalizedStates)?;
586579

587580
// Note: ancestors_of includes the finalized block
588581
let newly_finalized_blocks = block_summaries_dag
@@ -592,6 +585,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
592585
.iter()
593586
.map(|(root, _)| *root)
594587
.collect::<HashSet<Hash256>>();
588+
let newly_finalized_blocks_min_slot = *newly_finalized_blocks
589+
.iter()
590+
.map(|(_, slot)| slot)
591+
.min()
592+
.ok_or(PruningError::EmptyFinalizedBlocks)?;
595593

596594
// Compute the set of finalized state roots that we must keep to make the dynamic HDiff system
597595
// work.
@@ -622,11 +620,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
622620
// In the diagram below, `o` are diffs by slot that we must keep. In the prior
623621
// finalized section there's only one chain so we preserve them unconditionally.
624622
// For the newly finalized chain, we check which of is canonical and only keep
625-
// those.
623+
// those. Slots below `min_finalized_state_slot` we don't have canonical
624+
// information so we assume they are part of the finalized pruned chain.
626625
//
627626
// /-----o----
628627
// o-------o------/-------o----
629-
if slot < old_finalized_slot
628+
if slot < newly_finalized_states_min_slot
630629
|| newly_finalized_state_roots.contains(&state_root)
631630
{
632631
// Track kept summaries to debug hdiff inconsistencies with "Extra pruning information"
@@ -660,9 +659,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
660659
} else if newly_finalized_block_roots.contains(&block_root) {
661660
// Keep recently finalized blocks
662661
false
663-
} else if slot <= old_finalized_slot {
664-
// Keep blocks that are prior to the last finalized slot.
665-
// newly_finalized_block_roots only contains blocks over `old_finalized_slot`.
662+
} else if slot < newly_finalized_blocks_min_slot
663+
|| newly_finalized_block_roots.contains(&block_root)
664+
{
665+
// Keep recently finalized blocks that we know are canonical. Blocks with slots <
666+
// that `newly_finalized_blocks_min_slot` we don't have canonical information so we
667+
// assume they are part of the finalized pruned chain
668+
//
666669
// Pruning those risks breaking the DB by deleting canonical blocks once the HDiff
667670
// grid advances. If the pruning routine is correct this condition should never hit.
668671
false
@@ -679,10 +682,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
679682
debug!(
680683
log,
681684
"Extra pruning information";
682-
"old_finalized_checkpoint" => ?old_finalized_checkpoint,
683685
"new_finalized_checkpoint" => ?new_finalized_checkpoint,
684686
"newly_finalized_blocks" => newly_finalized_blocks.len(),
687+
"newly_finalized_blocks_min_slot" => newly_finalized_blocks_min_slot,
685688
"newly_finalized_state_roots" => newly_finalized_state_roots.len(),
689+
"newly_finalized_states_min_slot" => newly_finalized_states_min_slot,
686690
"required_finalized_diff_state_slots" => ?required_finalized_diff_state_slots,
687691
"kept_summaries_for_hdiff" => ?kept_summaries_for_hdiff,
688692
"state_summaries_count" => state_summaries_dag.summaries_count(),
@@ -719,11 +723,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
719723
persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY),
720724
));
721725

722-
// Persist the new finalized checkpoint as the pruning checkpoint.
723-
batch.push(StoreOp::KeyValueOp(
724-
store.pruning_checkpoint_store_op(new_finalized_checkpoint),
725-
));
726-
727726
// Prune sync committee branches of non-checkpoint canonical finalized blocks
728727
Self::prune_non_checkpoint_sync_committee_branches(&newly_finalized_blocks, &mut batch);
729728
// Prune all payloads of the canonical finalized blocks
@@ -736,7 +735,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
736735
debug!(log, "Database pruning complete");
737736

738737
Ok(PruningOutcome::Successful {
739-
old_finalized_checkpoint,
738+
// TODO(hdiff): approximation of the previous finalized checkpoint. Only used in the
739+
// compaction to compute time, can we use something else?
740+
old_finalized_checkpoint_epoch: newly_finalized_blocks_min_slot
741+
.epoch(E::slots_per_epoch()),
740742
})
741743
}
742744

beacon_node/beacon_chain/src/summaries_dag.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl StateSummariesDAG {
178178

179179
/// Returns all ancestors of `state_root` INCLUDING `state_root` until the next parent is not
180180
/// known.
181-
pub fn ancestors_of(&self, mut state_root: Hash256) -> Result<Vec<Hash256>, Error> {
181+
pub fn ancestors_of(&self, mut state_root: Hash256) -> Result<Vec<(Hash256, Slot)>, Error> {
182182
// Sanity check that the first summary exists
183183
if !self.state_summaries_by_state_root.contains_key(&state_root) {
184184
return Err(Error::MissingStateSummary(state_root));
@@ -187,7 +187,7 @@ impl StateSummariesDAG {
187187
let mut ancestors = vec![];
188188
loop {
189189
if let Some(summary) = self.state_summaries_by_state_root.get(&state_root) {
190-
ancestors.push(state_root);
190+
ancestors.push((state_root, summary.slot));
191191
state_root = summary.previous_state_root
192192
} else {
193193
return Ok(ancestors);

beacon_node/store/src/hot_cold_store.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
66
use crate::leveldb_store::{BytesKey, LevelDB};
77
use crate::memory_store::MemoryStore;
88
use crate::metadata::{
9-
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion,
9+
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, SchemaVersion,
1010
ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY,
1111
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY,
12-
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
12+
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
1313
};
1414
use crate::state_cache::{PutStateOutcome, StateCache};
1515
use crate::{
@@ -2694,25 +2694,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
26942694
self.config.compact_on_prune
26952695
}
26962696

2697-
/// Load the checkpoint to begin pruning from (the "old finalized checkpoint").
2698-
pub fn load_pruning_checkpoint(&self) -> Result<Option<Checkpoint>, Error> {
2699-
Ok(self
2700-
.hot_db
2701-
.get(&PRUNING_CHECKPOINT_KEY)?
2702-
.map(|pc: PruningCheckpoint| pc.checkpoint))
2703-
}
2704-
2705-
/// Store the checkpoint to begin pruning from (the "old finalized checkpoint").
2706-
pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> {
2707-
self.hot_db
2708-
.do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)])
2709-
}
2710-
2711-
/// Create a staged store for the pruning checkpoint.
2712-
pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp {
2713-
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
2714-
}
2715-
27162697
/// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch.
27172698
pub fn load_compaction_timestamp(&self) -> Result<Option<Duration>, Error> {
27182699
Ok(self

beacon_node/store/src/metadata.rs

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem};
22
use serde::{Deserialize, Serialize};
33
use ssz::{Decode, Encode};
44
use ssz_derive::{Decode, Encode};
5-
use types::{Checkpoint, Hash256, Slot};
5+
use types::{Hash256, Slot};
66

77
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(23);
88

@@ -12,7 +12,8 @@ pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(23);
1212
pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0);
1313
pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1);
1414
pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
15-
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
15+
// DEPRECATED
16+
// pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
1617
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
1718
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
1819
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
@@ -65,30 +66,6 @@ impl StoreItem for SchemaVersion {
6566
}
6667
}
6768

68-
/// The checkpoint used for pruning the database.
69-
///
70-
/// Updated whenever pruning is successful.
71-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72-
pub struct PruningCheckpoint {
73-
pub checkpoint: Checkpoint,
74-
}
75-
76-
impl StoreItem for PruningCheckpoint {
77-
fn db_column() -> DBColumn {
78-
DBColumn::BeaconMeta
79-
}
80-
81-
fn as_store_bytes(&self) -> Vec<u8> {
82-
self.checkpoint.as_ssz_bytes()
83-
}
84-
85-
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
86-
Ok(PruningCheckpoint {
87-
checkpoint: Checkpoint::from_ssz_bytes(bytes)?,
88-
})
89-
}
90-
}
91-
9269
/// The last time the database was compacted.
9370
pub struct CompactionTimestamp(pub u64);
9471

0 commit comments

Comments
 (0)