Skip to content

Commit dcce765

Browse files
committed
Address todos
1 parent bf3d6b6 commit dcce765

File tree

3 files changed

+63
-61
lines changed

3 files changed

+63
-61
lines changed

beacon_node/beacon_chain/src/migrate.rs

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::{mpsc, Arc};
1111
use std::thread;
1212
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1313
use store::hot_cold_store::{migrate_database, HotColdDBError};
14-
use store::iter::StateRootsIterator;
14+
use store::iter::{BlockRootsIterator, StateRootsIterator};
1515
use store::{DBColumn, Error, HotStateSummary, ItemStore, StoreItem, StoreOp};
1616
pub use store::{HotColdDB, MemoryStore};
1717
use types::{
@@ -514,23 +514,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
514514
"new_finalized_epoch" => new_finalized_checkpoint.epoch,
515515
);
516516

517-
let mut hot_db_ops = vec![];
518-
// TODO(hdiff): iterate blocks from finalized block up to old_finalized_slot
519-
let finalized_blocks = vec![];
520-
Self::prune_non_checkpoint_sync_committee_branches(&finalized_blocks, &mut hot_db_ops);
521-
// if store.config.prune_payloads {
522-
Self::prune_finalized_payloads(&finalized_blocks, &mut hot_db_ops);
523-
// }
524-
525517
debug!(
526518
log,
527519
"Extra pruning information";
528520
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
529521
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
530522
);
531523

532-
/////
533-
534524
let summaries_dag = {
535525
let state_summaries = store
536526
.hot_db
@@ -568,26 +558,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
568558
summaries_dag.descendant_block_roots_of(&new_finalized_checkpoint.root)?;
569559

570560
// TODO(hdiff): Could re-use the summaries dag
571-
let newly_finalized_chain =
572-
std::iter::once(Ok((new_finalized_slot, new_finalized_state_hash)))
573-
.chain(
574-
StateRootsIterator::new(&store, new_finalized_state)
575-
.map(|res| res.map(|(state_root, slot)| (slot, state_root))),
576-
)
561+
let newly_finalized_state_roots =
562+
std::iter::once(Ok((new_finalized_state_hash, new_finalized_slot)))
563+
.chain(StateRootsIterator::new(&store, new_finalized_state))
577564
.take_while(|res| {
578565
res.as_ref()
579-
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
566+
.map_or(true, |(_, slot)| *slot >= old_finalized_slot)
580567
})
581-
.map(|res| res.map(|(_, state_root)| state_root))
582-
.collect::<Result<Vec<Hash256>, _>>()?;
568+
.map(|res| res.map(|(state_root, _)| state_root))
569+
.collect::<Result<HashSet<Hash256>, _>>()?;
570+
571+
// TODO(hdiff): Could re-use the summaries dag
572+
let newly_finalized_blocks = BlockRootsIterator::new(&store, new_finalized_state)
573+
.take_while(|res| {
574+
res.as_ref()
575+
.map_or(true, |(_, slot)| *slot >= old_finalized_slot)
576+
})
577+
.collect::<Result<Vec<(Hash256, Slot)>, _>>()?;
583578

584579
// Compute the set of finalized state roots that we must keep to make the dynamic HDiff system
585580
// work.
586-
// TODO(hdiff): must not delete *all* finalized hdiffs. Instead, keep
587-
// the more recent diff of each layer including the snapshot.
588-
// Implement a routine somewhere to figure out which diffs should be kept
589-
// Given a start slot, and the current finalized state:
590-
// - iterate each hdiff layer and compute the most recent point <= finalized slot
591581
let required_finalized_diff_state_slots =
592582
store.hierarchy_hot.closest_layer_points(new_finalized_slot);
593583

@@ -601,23 +591,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
601591
if block_roots_descendant_of_finalized.contains(&summary.latest_block_root) {
602592
// Keep this state is the post state of a viable head, or a state advance from a
603593
// viable head.
604-
} else if newly_finalized_chain.contains(&state_root)
594+
} else if newly_finalized_state_roots.contains(&state_root)
605595
&& required_finalized_diff_state_slots.contains(&slot)
606596
{
607597
// Keep this state and diff as it's necessary for the finalized portion of the
608-
// HDiff links
598+
// HDiff links. `required_finalized_diff_state_slots` tracks the set of slots on
599+
// each diff layer, and by checking `newly_finalized_state_roots` which only
600+
// keep those on the finalized canonical chain. Note that there may be lingering
601+
// forks.
609602
} else {
610-
// TODO(hdiff) compute the root of the hdiffs to keep, else prune
611603
abandoned_blocks.insert(SignedBeaconBlockHash::from(summary.latest_block_root));
612604
abandoned_states.insert((slot, BeaconStateHash::from(state_root)));
613605
}
614606
}
615607
}
616608

617-
// TODO(hdiff): Update the headtracker or remove it
618-
619-
/////
620-
621609
let mut batch: Vec<StoreOp<E>> = abandoned_blocks
622610
.into_iter()
623611
.map(Into::into)
@@ -650,11 +638,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
650638
store.pruning_checkpoint_store_op(new_finalized_checkpoint),
651639
));
652640

653-
store.do_atomically_with_block_and_blobs_cache(batch)?;
641+
Self::prune_non_checkpoint_sync_committee_branches(&newly_finalized_blocks, &mut batch);
642+
// TODO(hdiff): access store config here
643+
// if store.config.prune_payloads {
644+
Self::prune_finalized_payloads(&newly_finalized_blocks, &mut batch);
645+
// }
654646

655-
// Do a quick separate pass to delete obsoleted hot states, usually pre-states from the state
656-
// advance which are not canonical due to blocks being applied on top.
657-
store.prune_old_hot_states()?;
647+
store.do_atomically_with_block_and_blobs_cache(batch)?;
658648

659649
debug!(log, "Database pruning complete");
660650

@@ -664,10 +654,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
664654
}
665655

666656
fn prune_finalized_payloads(
667-
finalized_blocks: &[(Slot, Hash256)],
657+
finalized_blocks: &[(Hash256, Slot)],
668658
hot_db_ops: &mut Vec<StoreOp<E>>,
669659
) {
670-
for (_, block_root) in finalized_blocks {
660+
for (block_root, _) in finalized_blocks {
671661
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
672662
// delete the payload for the finalized block itself, but that's OK as we only guarantee
673663
// that payloads are present for slots >= the split slot. The payload fetching code is also
@@ -677,14 +667,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
677667
}
678668

679669
fn prune_non_checkpoint_sync_committee_branches(
680-
finalized_blocks: &[(Slot, Hash256)],
670+
finalized_blocks: &[(Hash256, Slot)],
681671
hot_db_ops: &mut Vec<StoreOp<E>>,
682672
) {
683673
let mut epoch_boundary_blocks = HashSet::new();
684674
let mut non_checkpoint_block_roots = HashSet::new();
685675

686676
// Then, iterate states in slot ascending order, as they are stored wrt previous states.
687-
for (slot, block_root) in finalized_blocks {
677+
for (block_root, slot) in finalized_blocks {
688678
// At a missed slot, `state_root_iter` will return the block root
689679
// from the previous non-missed slot. This ensures that the block root at an
690680
// epoch boundary is always a checkpoint block root. We keep track of block roots

beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,15 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
3131

3232
let split = db.get_split_info();
3333

34-
// TODO(hdiff): sort summaries topologically starting from finalized
34+
let state_summaries_dag = new_dag::<T>(&db)?;
35+
36+
// Sort summaries by slot so we have their ancestor diffs already stored when we store them.
3537
// If the summaries are sorted topologically we can insert them into the DB like if they were a
3638
// new state, re-using existing code. As states are likely to be sequential the diff cache
3739
// should kick in making the migration more efficient. If we just iterate the column of
3840
// summaries we may get distance state of each iteration.
39-
let state_summaries_dag = new_dag::<T>(&db)?;
40-
41-
// Sort summaries by slot so we have their ancestor diffs already stored when we store them.
4241
let summaries_by_slot = state_summaries_dag.summaries_by_slot_ascending();
4342

44-
// TODO(hdiff): Should prune states associated with summaries that are not descendant of finalized?
45-
46-
// TODO(hdiff): Must create the finalized diff points in the hot DB if the node checkpoint synced long
47-
// ago. Also consider availability of the finalized state from the hot DB.
48-
4943
// Upgrade all hot DB state summaries to the new type:
5044
// - Set all summaries of boundary states as `Snapshot` type
5145
// - Set all others are `Replay` pointing to `epoch_boundary_state_root`

beacon_node/store/src/hot_cold_store.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,18 +1210,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
12101210
);
12111211
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_temp_key));
12121212

1213-
// Always delete diffs with summary. If a diff must be kept beyond finalization,
1214-
// do not issue a DeleteState op at that time.
1215-
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
1216-
DBColumn::BeaconStateHotDiff.into(),
1217-
state_root.as_slice(),
1218-
)));
1219-
1220-
// TODO(hdiff): Review under HDiff
1221-
if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) {
1222-
let state_key =
1223-
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
1224-
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
1213+
if let Some(slot) = slot {
1214+
match self.hierarchy_hot.storage_strategy(slot)? {
1215+
StorageStrategy::Snapshot => {
1216+
// Full state stored in this position
1217+
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
1218+
DBColumn::BeaconState.into(),
1219+
state_root.as_slice(),
1220+
)));
1221+
}
1222+
StorageStrategy::DiffFrom(_) => {
1223+
// Diff stored in this position
1224+
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
1225+
DBColumn::BeaconStateHotDiff.into(),
1226+
state_root.as_slice(),
1227+
)));
1228+
}
1229+
StorageStrategy::ReplayFrom(_) => {
1230+
// Nothing else to delete
1231+
}
1232+
}
1233+
} else {
1234+
// TODO(hdiff): should attempt to delete everything if slot is not available?
1235+
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
1236+
DBColumn::BeaconState.into(),
1237+
state_root.as_slice(),
1238+
)));
1239+
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
1240+
DBColumn::BeaconStateHotDiff.into(),
1241+
state_root.as_slice(),
1242+
)));
12251243
}
12261244
}
12271245

0 commit comments

Comments
 (0)