Skip to content

Tree states archive - review comments and metrics #6386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub fn migrate_schema<T: BeaconChainTypes>(
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
// This migration needs to sync data between hot and cold DBs. The schema version is
// bumped inside the upgrade_to_v22 fn
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
}
// Anything else is an error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
db.store_cold_state(&genesis_state_root, &genesis_state, &mut cold_ops)?;
}

// Write the block roots in the new format. Similar to above, we do this separately from
// deleting the old format block roots so that this is crash safe.
// Write the block roots in the new format in a new column. Similar to above, we do this
// separately from deleting the old format block roots so that this is crash safe.
let oldest_block_slot = old_anchor
.as_ref()
.map_or(Slot::new(0), |a| a.oldest_block_slot);
rewrite_block_roots::<T>(
write_new_schema_block_roots::<T>(
&db,
genesis_block_root,
oldest_block_slot,
Expand Down Expand Up @@ -158,7 +158,7 @@ pub fn delete_old_schema_freezer_data<T: BeaconChainTypes>(
Ok(())
}

pub fn rewrite_block_roots<T: BeaconChainTypes>(
pub fn write_new_schema_block_roots<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
genesis_block_root: Hash256,
oldest_block_slot: Slot,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl StoreConfig {
anchor: Option<&AnchorInfo>,
) -> Result<(), StoreConfigError> {
// Allow changing the hierarchy exponents if no historic states are stored.
// anchor == None implies full archive node thus all historic states
let no_historic_states_stored =
anchor.map_or(false, |anchor| anchor.no_historic_states_stored(split.slot));
let hierarchy_config_changed =
Expand Down
56 changes: 37 additions & 19 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
state_cache: Mutex<StateCache<E>>,
/// Cache of hierarchical diff buffers.
///
/// This cache is never pruned. It is only populated in response to historical queries from the
/// HTTP API.
hdiff_buffer_cache: Mutex<LruCache<Slot, HDiffBuffer>>,
/// Chain spec.
pub(crate) spec: ChainSpec,
Expand Down Expand Up @@ -461,6 +464,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_BYTE_SIZE,
hdiff_buffer_cache_byte_size as i64,
);

if let Some(anchor_info) = self.get_anchor_info() {
metrics::set_gauge(
&metrics::STORE_BEACON_ANCHOR_SLOT,
anchor_info.anchor_slot.as_u64() as i64,
);
metrics::set_gauge(
&metrics::STORE_BEACON_OLDEST_BLOCK_SLOT,
anchor_info.oldest_block_slot.as_u64() as i64,
);
metrics::set_gauge(
&metrics::STORE_BEACON_STATE_LOWER_LIMIT,
anchor_info.state_lower_limit.as_u64() as i64,
);
}
}

/// Store a block and update the LRU cache.
Expand Down Expand Up @@ -1456,6 +1474,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
"from_slot" => from,
"slot" => state.slot(),
);
// Already have persisted the state summary, don't persist anything else
}
StorageStrategy::Snapshot => {
debug!(
Expand Down Expand Up @@ -1539,7 +1558,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// Load diff base state bytes.
let (_, base_buffer) = self.load_hdiff_buffer_for_slot(from_slot, 0)?;
let (_, base_buffer) = {
let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_LOAD_FOR_STORE_TIME);
self.load_hdiff_buffer_for_slot(from_slot)?
};
let target_buffer = HDiffBuffer::from_state(state.clone());
let diff = {
let _timer = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_COMPUTE_TIME);
Expand Down Expand Up @@ -1569,7 +1591,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
///
/// Will reconstruct the state if it lies between restore points.
pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result<Option<BeaconState<E>>, Error> {
let (base_slot, hdiff_buffer) = self.load_hdiff_buffer_for_slot(slot, 0)?;
let (base_slot, hdiff_buffer) = {
let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_LOAD_TIME);
self.load_hdiff_buffer_for_slot(slot)?
};
let base_state = hdiff_buffer.into_state(&self.spec)?;
debug_assert_eq!(base_slot, base_state.slot());

Expand All @@ -1579,7 +1604,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

let blocks = self.load_cold_blocks(base_state.slot() + 1, slot)?;

// Include state root for base state as it is required by block processing.
// Include state root for base state as it is required by block processing to not have to
// hash the state.
let state_root_iter =
self.forwards_state_roots_iterator_until(base_state.slot(), slot, || {
Err(Error::StateShouldNotBeRequired(slot))
Expand All @@ -1602,11 +1628,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

/// Returns `HDiffBuffer` for the specified slot, or `HDiffBuffer` for the `ReplayFrom` slot if
/// the diff for the specified slot is not stored.
fn load_hdiff_buffer_for_slot(
&self,
slot: Slot,
recursion: usize,
) -> Result<(Slot, HDiffBuffer), Error> {
fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result<(Slot, HDiffBuffer), Error> {
if let Some(buffer) = self.hdiff_buffer_cache.lock().get(&slot) {
debug!(
self.log,
Expand All @@ -1619,10 +1641,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
metrics::inc_counter(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_MISS);
}

// Do not time recursive calls into load_hdiff_buffer_for_slot to not double count
let _timer = (recursion == 0)
.then(|| metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_LOAD_TIME));

// Load buffer for the previous state.
// This amount of recursion (<10 levels) should be OK.
let t = std::time::Instant::now();
Expand All @@ -1646,8 +1664,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
// Recursive case.
StorageStrategy::DiffFrom(from) => {
let (_buffer_slot, mut buffer) =
self.load_hdiff_buffer_for_slot(from, recursion + 1)?;
let (_buffer_slot, mut buffer) = self.load_hdiff_buffer_for_slot(from)?;

// Load diff and apply it to buffer.
let diff = self.load_hdiff_for_slot(slot)?;
Expand All @@ -1667,9 +1684,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

Ok((slot, buffer))
}
StorageStrategy::ReplayFrom(from) => {
self.load_hdiff_buffer_for_slot(from, recursion + 1)
}
StorageStrategy::ReplayFrom(from) => self.load_hdiff_buffer_for_slot(from),
}
}

Expand Down Expand Up @@ -2767,6 +2782,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
let mut hot_db_ops = vec![];
let mut cold_db_block_ops = vec![];

// Iterate in descending order until the current split slot
let state_roots = RootsIterator::new(&store, finalized_state)
.take_while(|result| match result {
Ok((_, _, slot)) => {
Expand All @@ -2777,7 +2793,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
})
.collect::<Result<Vec<_>, _>>()?;

// Iterate states in slot ascending order, as they are stored wrt previous states.
// Then, iterate states in slot ascending order, as they are stored wrt previous states.
for (block_root, state_root, slot) in state_roots.into_iter().rev() {
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
// delete the payload for the finalized block itself, but that's OK as we only guarantee
Expand Down Expand Up @@ -2813,7 +2829,9 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(

let mut cold_db_ops = vec![];

// Only store the cold state if it's on a diff boundary
// Only store the cold state if it's on a diff boundary.
// Calling `store_cold_state_summary` instead of `store_cold_state` for those allows us
// to skip loading many hot states.
if matches!(
store.hierarchy.storage_strategy(slot)?,
StorageStrategy::ReplayFrom(..)
Expand Down
38 changes: 34 additions & 4 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,47 @@ impl StoreItem for CompactionTimestamp {
/// Database parameters relevant to weak subjectivity sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)]
pub struct AnchorInfo {
/// The slot at which the anchor state is present and which we cannot revert.
/// The slot at which the anchor state is present and which we cannot revert. Values on start:
/// - Genesis start: 0
/// - Checkpoint sync: Slot of the finalized checkpoint block
///
/// Immutable
pub anchor_slot: Slot,
/// The slot from which historical blocks are available (>=).
/// All blocks with slots greater than or equal to this value are available in the database.
/// Additionally, the genesis block is always available.
///
/// Values on start:
/// - Genesis start: 0
/// - Checkpoint sync: Slot of the finalized checkpoint block
///
/// Progressively decreases during backfill sync until reaching 0.
pub oldest_block_slot: Slot,
/// The block root of the next block that needs to be added to fill in the history.
///
/// Zero if we know all blocks back to genesis.
pub oldest_block_parent: Hash256,
/// The slot from which historical states are available (>=).
/// All states with slots _greater than or equal to_ `min(split.slot, state_upper_limit)` are
/// available in the database. If `state_upper_limit` is higher than `split.slot`, states are
/// not being written to the freezer database.
///
/// Values on start if state reconstruction is enabled:
/// - Genesis start: 0
/// - Checkpoint sync: Slot of the next scheduled snapshot
///
/// Value on start if state reconstruction is disabled:
/// - 2^64 - 1 representing no historic state storage.
///
/// Immutable until state reconstruction completes.
pub state_upper_limit: Slot,
/// The slot before which historical states are available (<=).
/// All states with slots _less than or equal to_ this value are available in the database.
/// The minimum value is 0, indicating that the genesis state is always available.
///
/// Values on start:
/// - Genesis start: 0
/// - Checkpoint sync: 0
///
/// When full block backfill completes (`oldest_block_slot == 0`) state reconstruction starts and
/// this value will progressively increase until reaching `state_upper_limit`.
pub state_lower_limit: Slot,
}

Expand Down
34 changes: 34 additions & 0 deletions beacon_node/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,27 @@ pub static DISK_DB_DELETE_COUNT: LazyLock<Result<IntCounterVec>> = LazyLock::new
&["col"],
)
});
/*
* Anchor Info
*/
pub static STORE_BEACON_ANCHOR_SLOT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"store_beacon_anchor_slot",
"Current anchor info anchor_slot value",
)
});
pub static STORE_BEACON_OLDEST_BLOCK_SLOT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"store_beacon_oldest_block_slot",
"Current anchor info oldest_block_slot value",
)
});
pub static STORE_BEACON_STATE_LOWER_LIMIT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"store_beacon_state_lower_limit",
"Current anchor info state_lower_limit value",
)
});
/*
* Beacon State
*/
Expand Down Expand Up @@ -227,6 +248,13 @@ pub static STORE_BEACON_HDIFF_BUFFER_LOAD_TIME: LazyLock<Result<Histogram>> = La
"Time taken to load an hdiff buffer",
)
});
pub static STORE_BEACON_HDIFF_BUFFER_LOAD_FOR_STORE_TIME: LazyLock<Result<Histogram>> =
LazyLock::new(|| {
try_create_histogram(
"store_beacon_hdiff_buffer_load_for_store_seconds",
"Time taken to load an hdiff buffer to store another hdiff",
)
});
pub static STORE_BEACON_HDIFF_BUFFER_CACHE_HIT: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
Expand All @@ -247,6 +275,12 @@ pub static STORE_BEACON_REPLAYED_BLOCKS: LazyLock<Result<IntCounter>> = LazyLock
"Total count of replayed blocks",
)
});
pub static STORE_BEACON_RECONSTRUCTION_TIME: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram(
"store_beacon_reconstruction_time_seconds",
"Time taken to run a reconstruct historic states batch",
)
});
pub static BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/store/src/reconstruct.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Implementation of historic state reconstruction (given complete block history).
use crate::hot_cold_store::{HotColdDB, HotColdDBError};
use crate::metrics;
use crate::{Error, ItemStore};
use itertools::{process_results, Itertools};
use slog::{debug, info};
Expand Down Expand Up @@ -38,6 +39,8 @@ where
"start_slot" => anchor.state_lower_limit,
);

let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME);

// Iterate blocks from the state lower limit to the upper limit.
let split = self.get_split_info();
let lower_limit_slot = anchor.state_lower_limit;
Expand Down
Loading