Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit b4eb815

Browse files
authored
Snapshot hash gossip changes (#8358) (#8359)
automerge
1 parent 489fd30 commit b4eb815

File tree

7 files changed

+105
-7
lines changed

7 files changed

+105
-7
lines changed

core/src/cluster_info.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121
crds_gossip::CrdsGossip,
2222
crds_gossip_error::CrdsGossipError,
2323
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
24-
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote},
24+
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, SnapshotHash, Vote},
2525
packet::{Packet, PACKET_DATA_SIZE},
2626
result::{Error, Result},
2727
sendmmsg::{multicast, send_mmsg},
@@ -43,6 +43,7 @@ use solana_net_utils::{
4343
};
4444
use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler};
4545
use solana_rayon_threadlimit::get_thread_count;
46+
use solana_sdk::hash::Hash;
4647
use solana_sdk::{
4748
clock::{Slot, DEFAULT_MS_PER_SLOT},
4849
pubkey::Pubkey,
@@ -436,6 +437,16 @@ impl ClusterInfo {
436437
.process_push_message(&self.id(), vec![entry], now);
437438
}
438439

440+
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) {
441+
let now = timestamp();
442+
let entry = CrdsValue::new_signed(
443+
CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)),
444+
&self.keypair,
445+
);
446+
self.gossip
447+
.process_push_message(&self.id(), vec![entry], now);
448+
}
449+
439450
pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) {
440451
let now = timestamp();
441452
let vote = Vote::new(&self.id(), vote, now);
@@ -476,6 +487,23 @@ impl ClusterInfo {
476487
(txs, max_ts)
477488
}
478489

490+
pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> {
491+
self.gossip
492+
.crds
493+
.table
494+
.values()
495+
.filter_map(|x| x.value.snapshot_hash().map(|v| v))
496+
.filter_map(|x| {
497+
for (table_slot, hash) in &x.hashes {
498+
if *table_slot == slot {
499+
return Some((x.from, *hash));
500+
}
501+
}
502+
None
503+
})
504+
.collect()
505+
}
506+
479507
pub fn get_epoch_state_for_node(
480508
&self,
481509
pubkey: &Pubkey,

core/src/crds_value.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::contact_info::ContactInfo;
22
use bincode::{serialize, serialized_size};
33
use solana_sdk::{
44
clock::Slot,
5+
hash::Hash,
56
pubkey::Pubkey,
67
signature::{Keypair, Signable, Signature},
78
transaction::Transaction,
@@ -61,6 +62,7 @@ pub enum CrdsData {
6162
ContactInfo(ContactInfo),
6263
Vote(VoteIndex, Vote),
6364
EpochSlots(EpochSlotIndex, EpochSlots),
65+
SnapshotHash(SnapshotHash),
6466
}
6567

6668
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@@ -83,6 +85,23 @@ pub struct EpochIncompleteSlots {
8385
pub compressed_list: Vec<u8>,
8486
}
8587

88+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
89+
pub struct SnapshotHash {
90+
pub from: Pubkey,
91+
pub hashes: Vec<(Slot, Hash)>,
92+
pub wallclock: u64,
93+
}
94+
95+
impl SnapshotHash {
96+
pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>, wallclock: u64) -> Self {
97+
Self {
98+
from,
99+
hashes,
100+
wallclock,
101+
}
102+
}
103+
}
104+
86105
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
87106
pub struct EpochSlots {
88107
pub from: Pubkey,
@@ -137,6 +156,7 @@ pub enum CrdsValueLabel {
137156
ContactInfo(Pubkey),
138157
Vote(VoteIndex, Pubkey),
139158
EpochSlots(Pubkey),
159+
SnapshotHash(Pubkey),
140160
}
141161

142162
impl fmt::Display for CrdsValueLabel {
@@ -145,6 +165,7 @@ impl fmt::Display for CrdsValueLabel {
145165
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
146166
CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()),
147167
CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()),
168+
CrdsValueLabel::SnapshotHash(_) => write!(f, "SnapshotHash({})", self.pubkey()),
148169
}
149170
}
150171
}
@@ -155,6 +176,7 @@ impl CrdsValueLabel {
155176
CrdsValueLabel::ContactInfo(p) => *p,
156177
CrdsValueLabel::Vote(_, p) => *p,
157178
CrdsValueLabel::EpochSlots(p) => *p,
179+
CrdsValueLabel::SnapshotHash(p) => *p,
158180
}
159181
}
160182
}
@@ -180,20 +202,23 @@ impl CrdsValue {
180202
CrdsData::ContactInfo(contact_info) => contact_info.wallclock,
181203
CrdsData::Vote(_, vote) => vote.wallclock,
182204
CrdsData::EpochSlots(_, vote) => vote.wallclock,
205+
CrdsData::SnapshotHash(hash) => hash.wallclock,
183206
}
184207
}
185208
pub fn pubkey(&self) -> Pubkey {
186209
match &self.data {
187210
CrdsData::ContactInfo(contact_info) => contact_info.id,
188211
CrdsData::Vote(_, vote) => vote.from,
189212
CrdsData::EpochSlots(_, slots) => slots.from,
213+
CrdsData::SnapshotHash(hash) => hash.from,
190214
}
191215
}
192216
pub fn label(&self) -> CrdsValueLabel {
193217
match &self.data {
194218
CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()),
195219
CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()),
196220
CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()),
221+
CrdsData::SnapshotHash(_) => CrdsValueLabel::SnapshotHash(self.pubkey()),
197222
}
198223
}
199224
pub fn contact_info(&self) -> Option<&ContactInfo> {
@@ -222,11 +247,20 @@ impl CrdsValue {
222247
_ => None,
223248
}
224249
}
250+
251+
pub fn snapshot_hash(&self) -> Option<&SnapshotHash> {
252+
match &self.data {
253+
CrdsData::SnapshotHash(slots) => Some(slots),
254+
_ => None,
255+
}
256+
}
257+
225258
/// Return all the possible labels for a record identified by Pubkey.
226259
pub fn record_labels(key: &Pubkey) -> Vec<CrdsValueLabel> {
227260
let mut labels = vec![
228261
CrdsValueLabel::ContactInfo(*key),
229262
CrdsValueLabel::EpochSlots(*key),
263+
CrdsValueLabel::SnapshotHash(*key),
230264
];
231265
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
232266
labels
@@ -276,13 +310,14 @@ mod test {
276310

277311
#[test]
278312
fn test_labels() {
279-
let mut hits = [false; 2 + MAX_VOTES as usize];
313+
let mut hits = [false; 3 + MAX_VOTES as usize];
280314
// this method should cover all the possible labels
281315
for v in &CrdsValue::record_labels(&Pubkey::default()) {
282316
match v {
283317
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
284318
CrdsValueLabel::EpochSlots(_) => hits[1] = true,
285-
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 2] = true,
319+
CrdsValueLabel::SnapshotHash(_) => hits[2] = true,
320+
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 3] = true,
286321
}
287322
}
288323
assert!(hits.iter().all(|x| *x));

core/src/snapshot_packager_service.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use crate::cluster_info::ClusterInfo;
12
use solana_ledger::{
23
snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package,
34
};
45
use std::{
56
sync::{
67
atomic::{AtomicBool, Ordering},
78
mpsc::RecvTimeoutError,
8-
Arc,
9+
Arc, RwLock,
910
},
1011
thread::{self, Builder, JoinHandle},
1112
time::Duration,
@@ -15,25 +16,42 @@ pub struct SnapshotPackagerService {
1516
t_snapshot_packager: JoinHandle<()>,
1617
}
1718

19+
const MAX_SNAPSHOT_HASHES: usize = 24;
20+
1821
impl SnapshotPackagerService {
19-
pub fn new(snapshot_package_receiver: SnapshotPackageReceiver, exit: &Arc<AtomicBool>) -> Self {
22+
pub fn new(
23+
snapshot_package_receiver: SnapshotPackageReceiver,
24+
exit: &Arc<AtomicBool>,
25+
cluster_info: &Arc<RwLock<ClusterInfo>>,
26+
) -> Self {
2027
let exit = exit.clone();
28+
let cluster_info = cluster_info.clone();
2129
let t_snapshot_packager = Builder::new()
2230
.name("solana-snapshot-packager".to_string())
2331
.spawn(move || loop {
32+
let mut hashes = vec![];
2433
if exit.load(Ordering::Relaxed) {
2534
break;
2635
}
2736

2837
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
2938
Ok(mut snapshot_package) => {
39+
hashes.push((snapshot_package.root, snapshot_package.hash));
3040
// Only package the latest
3141
while let Ok(new_snapshot_package) = snapshot_package_receiver.try_recv() {
3242
snapshot_package = new_snapshot_package;
43+
hashes.push((snapshot_package.root, snapshot_package.hash));
3344
}
3445
if let Err(err) = archive_snapshot_package(&snapshot_package) {
3546
warn!("Failed to create snapshot archive: {}", err);
3647
}
48+
while hashes.len() > MAX_SNAPSHOT_HASHES {
49+
hashes.remove(0);
50+
}
51+
cluster_info
52+
.write()
53+
.unwrap()
54+
.push_snapshot_hashes(hashes.clone());
3755
}
3856
Err(RecvTimeoutError::Disconnected) => break,
3957
Err(RecvTimeoutError::Timeout) => (),
@@ -61,6 +79,7 @@ mod tests {
6179
use solana_runtime::{
6280
accounts_db::AccountStorageEntry, bank::BankSlotDelta, bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
6381
};
82+
use solana_sdk::hash::Hash;
6483
use std::{
6584
fs::{self, remove_dir_all, OpenOptions},
6685
io::Write,
@@ -139,6 +158,7 @@ mod tests {
139158
link_snapshots_dir,
140159
storage_entries.clone(),
141160
output_tar_path.clone(),
161+
Hash::default(),
142162
);
143163

144164
// Make tarball from packageable snapshot

core/src/tvu.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ impl Tvu {
153153
if snapshot_config.is_some() {
154154
// Start a snapshot packaging service
155155
let (sender, receiver) = channel();
156-
let snapshot_packager_service = SnapshotPackagerService::new(receiver, exit);
156+
let snapshot_packager_service =
157+
SnapshotPackagerService::new(receiver, exit, &cluster_info.clone());
157158
(Some(snapshot_packager_service), Some(sender))
158159
} else {
159160
(None, None)

core/tests/bank_forks.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ mod tests {
55
use bincode::serialize_into;
66
use fs_extra::dir::CopyOptions;
77
use itertools::Itertools;
8+
use solana_core::cluster_info::ClusterInfo;
9+
use solana_core::contact_info::ContactInfo;
810
use solana_core::{
911
genesis_utils::{create_genesis_config, GenesisConfigInfo},
1012
snapshot_packager_service::SnapshotPackagerService,
@@ -24,6 +26,7 @@ mod tests {
2426
signature::{Keypair, KeypairUtil},
2527
system_transaction,
2628
};
29+
use std::sync::RwLock;
2730
use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc};
2831
use tempfile::TempDir;
2932

@@ -296,7 +299,13 @@ mod tests {
296299
// correctly construct the earlier snapshots because the SnapshotPackage's on the
297300
// channel hold hard links to these deleted snapshots. We verify this is the case below.
298301
let exit = Arc::new(AtomicBool::new(false));
299-
let snapshot_packager_service = SnapshotPackagerService::new(receiver, &exit);
302+
303+
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
304+
ContactInfo::default(),
305+
)));
306+
307+
let snapshot_packager_service =
308+
SnapshotPackagerService::new(receiver, &exit, &cluster_info);
300309

301310
// Close the channel so that the package service will exit after reading all the
302311
// packages off the channel

ledger/src/snapshot_package.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta};
22
use solana_sdk::clock::Slot;
3+
use solana_sdk::hash::Hash;
34
use std::{
45
path::PathBuf,
56
sync::{
@@ -20,6 +21,7 @@ pub struct SnapshotPackage {
2021
pub snapshot_links: TempDir,
2122
pub storage_entries: Vec<Arc<AccountStorageEntry>>,
2223
pub tar_output_file: PathBuf,
24+
pub hash: Hash,
2325
}
2426

2527
impl SnapshotPackage {
@@ -29,13 +31,15 @@ impl SnapshotPackage {
2931
snapshot_links: TempDir,
3032
storage_entries: Vec<Arc<AccountStorageEntry>>,
3133
tar_output_file: PathBuf,
34+
hash: Hash,
3235
) -> Self {
3336
Self {
3437
root,
3538
slot_deltas,
3639
snapshot_links,
3740
storage_entries,
3841
tar_output_file,
42+
hash,
3943
}
4044
}
4145
}

ledger/src/snapshot_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
108108
snapshot_hard_links_dir,
109109
account_storage_entries,
110110
snapshot_package_output_file.as_ref().to_path_buf(),
111+
bank.hash(),
111112
);
112113

113114
Ok(package)

0 commit comments

Comments
 (0)