Skip to content

Commit 299fd87

Browse files
authored
Gossip: Add dynamic stake weighting based on fraction of unstaked nodes - Fixed Point Math (#7098)
* add dynamic weighting based on fraction of unstaked nodes. fixed point math * Add setup for account control * address comments - add explicit types, bundle config into Dynamic, refactor, etc * update to vanity key: gosW... (gossip weight) * agave-unstable-api implementation * refactor add apply_cfg tests * refactor and address comments * update interpolate. change lpf * fix deser bug. dont forget to rm logs * update WeightingConfig to match record program * address comments and make default setting static for PushActiveSet
1 parent f01c7cb commit 299fd87

File tree

13 files changed

+1163
-39
lines changed

13 files changed

+1163
-39
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ members = [
5151
"ledger-tool",
5252
"local-cluster",
5353
"log-analyzer",
54+
"low-pass-filter",
5455
"measure",
5556
"memory-management",
5657
"merkle-tree",
@@ -452,6 +453,7 @@ solana-loader-v4-interface = "2.2.1"
452453
solana-loader-v4-program = { path = "programs/loader-v4", version = "=3.0.0" }
453454
solana-local-cluster = { path = "local-cluster", version = "=3.0.0" }
454455
solana-logger = "3.0.0"
456+
solana-low-pass-filter = { path = "low-pass-filter", version = "=3.0.0" }
455457
solana-measure = { path = "measure", version = "=3.0.0" }
456458
solana-merkle-tree = { path = "merkle-tree", version = "=3.0.0" }
457459
solana-message = "2.4.0"

gossip/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ frozen-abi = [
3333
"solana-vote/frozen-abi",
3434
"solana-vote-program/frozen-abi",
3535
]
36+
agave-unstable-api = ["solana-low-pass-filter/agave-unstable-api"]
3637

3738
[dependencies]
3839
agave-feature-set = { workspace = true }
@@ -56,6 +57,7 @@ serde-big-array = { workspace = true }
5657
serde_bytes = { workspace = true }
5758
serde_derive = { workspace = true }
5859
siphasher = { workspace = true }
60+
solana-account = { workspace = true }
5961
solana-bloom = { workspace = true }
6062
solana-clap-utils = { workspace = true }
6163
solana-client = { workspace = true }
@@ -73,6 +75,7 @@ solana-hash = "=2.3.0"
7375
solana-keypair = "=2.2.1"
7476
solana-ledger = { workspace = true, features = ["agave-unstable-api"] }
7577
solana-logger = "=3.0.0"
78+
solana-low-pass-filter = { workspace = true, features = ["agave-unstable-api"] }
7679
solana-measure = { workspace = true }
7780
solana-metrics = { workspace = true }
7881
solana-native-token = "=2.2.2"

gossip/src/cluster_info.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use {
6262
},
6363
solana_pubkey::Pubkey,
6464
solana_rayon_threadlimit::get_thread_count,
65-
solana_runtime::bank_forks::BankForks,
65+
solana_runtime::{bank::Bank, bank_forks::BankForks},
6666
solana_sanitize::Sanitize,
6767
solana_signature::Signature,
6868
solana_signer::Signer,
@@ -130,6 +130,8 @@ pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
130130
pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
131131
// Limit number of unique pubkeys in the crds table.
132132
pub(crate) const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 8192;
133+
// Interval between push active set refreshes.
134+
pub const REFRESH_PUSH_ACTIVE_SET_INTERVAL_MS: u64 = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2;
133135

134136
// Must have at least one socket to monitor the TVU port
135137
pub const MINIMUM_NUM_TVU_RECEIVE_SOCKETS: NonZeroUsize = NonZeroUsize::new(1).unwrap();
@@ -214,6 +216,7 @@ impl ClusterInfo {
214216
stakes: &HashMap<Pubkey, u64>,
215217
gossip_validators: Option<&HashSet<Pubkey>>,
216218
sender: &impl ChannelSend<PacketBatch>,
219+
maybe_bank_ref: Option<&Bank>,
217220
) {
218221
let shred_version = self.my_contact_info.read().unwrap().shred_version();
219222
let self_keypair: Arc<Keypair> = self.keypair().clone();
@@ -226,6 +229,7 @@ impl ClusterInfo {
226229
&self.ping_cache,
227230
&mut pings,
228231
&self.socket_addr_space,
232+
maybe_bank_ref,
229233
);
230234
let pings = pings
231235
.into_iter()
@@ -1448,7 +1452,7 @@ impl ClusterInfo {
14481452
.thread_name(|i| format!("solGossipRun{i:02}"))
14491453
.build()
14501454
.unwrap();
1451-
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
1455+
let mut epoch_specs = bank_forks.clone().map(EpochSpecs::from);
14521456
Builder::new()
14531457
.name("solGossip".to_string())
14541458
.spawn(move || {
@@ -1504,13 +1508,19 @@ impl ClusterInfo {
15041508
entrypoints_processed = entrypoints_processed || self.process_entrypoints();
15051509
//TODO: possibly tune this parameter
15061510
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
1507-
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
1511+
if start - last_push > REFRESH_PUSH_ACTIVE_SET_INTERVAL_MS {
1512+
let maybe_bank = bank_forks
1513+
.as_ref()
1514+
.and_then(|bf| bf.read().ok())
1515+
.map(|forks| forks.root_bank());
1516+
let maybe_bank_ref = maybe_bank.as_deref();
15081517
self.refresh_my_gossip_contact_info();
15091518
self.refresh_push_active_set(
15101519
&recycler,
15111520
&stakes,
15121521
gossip_validators.as_ref(),
15131522
&sender,
1523+
maybe_bank_ref,
15141524
);
15151525
last_push = timestamp();
15161526
}
@@ -2831,6 +2841,7 @@ mod tests {
28312841
&cluster_info.ping_cache,
28322842
&mut Vec::new(), // pings
28332843
&SocketAddrSpace::Unspecified,
2844+
None,
28342845
);
28352846
let mut reqs = cluster_info.generate_new_gossip_requests(
28362847
&thread_pool,
@@ -2972,6 +2983,7 @@ mod tests {
29722983
&cluster_info.ping_cache,
29732984
&mut Vec::new(), // pings
29742985
&SocketAddrSpace::Unspecified,
2986+
None,
29752987
);
29762988
//check that all types of gossip messages are signed correctly
29772989
cluster_info.flush_push_queue();

gossip/src/crds_gossip.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use {
2727
solana_keypair::Keypair,
2828
solana_ledger::shred::Shred,
2929
solana_pubkey::Pubkey,
30+
solana_runtime::bank::Bank,
3031
solana_signer::Signer,
3132
solana_streamer::socket::SocketAddrSpace,
3233
solana_time_utils::timestamp,
@@ -186,6 +187,7 @@ impl CrdsGossip {
186187
ping_cache: &Mutex<PingCache>,
187188
pings: &mut Vec<(SocketAddr, Ping)>,
188189
socket_addr_space: &SocketAddrSpace,
190+
maybe_bank_ref: Option<&Bank>,
189191
) {
190192
self.push.refresh_push_active_set(
191193
&self.crds,
@@ -196,6 +198,7 @@ impl CrdsGossip {
196198
ping_cache,
197199
pings,
198200
socket_addr_space,
201+
maybe_bank_ref,
199202
)
200203
}
201204

@@ -445,6 +448,7 @@ mod test {
445448
&ping_cache,
446449
&mut Vec::new(), // pings
447450
&SocketAddrSpace::Unspecified,
451+
None,
448452
);
449453
let now = timestamp();
450454
//incorrect dest

gossip/src/crds_gossip_push.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ use {
2121
protocol::{Ping, PingCache},
2222
push_active_set::PushActiveSet,
2323
received_cache::ReceivedCache,
24+
stake_weighting_config::{get_gossip_config_from_account, WeightingConfig},
2425
},
2526
itertools::Itertools,
2627
solana_keypair::Keypair,
2728
solana_pubkey::Pubkey,
29+
solana_runtime::bank::Bank,
2830
solana_signer::Signer,
2931
solana_streamer::socket::SocketAddrSpace,
3032
solana_time_utils::timestamp,
@@ -49,6 +51,7 @@ const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
4951
const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
5052
const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
5153
const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT + 3;
54+
const CONFIG_REFRESH_INTERVAL_MS: u64 = 60_000;
5255

5356
pub struct CrdsGossipPush {
5457
/// Active set of validators for push
@@ -65,12 +68,13 @@ pub struct CrdsGossipPush {
6568
pub num_total: AtomicUsize,
6669
pub num_old: AtomicUsize,
6770
pub num_pushes: AtomicUsize,
71+
last_cfg_poll_ms: Mutex<u64>,
6872
}
6973

7074
impl Default for CrdsGossipPush {
7175
fn default() -> Self {
7276
Self {
73-
active_set: RwLock::default(),
77+
active_set: RwLock::new(PushActiveSet::new_static()),
7478
crds_cursor: Mutex::default(),
7579
received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)),
7680
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
@@ -79,6 +83,7 @@ impl Default for CrdsGossipPush {
7983
num_total: AtomicUsize::default(),
8084
num_old: AtomicUsize::default(),
8185
num_pushes: AtomicUsize::default(),
86+
last_cfg_poll_ms: Mutex::new(0),
8287
}
8388
}
8489
}
@@ -238,6 +243,22 @@ impl CrdsGossipPush {
238243
active_set.prune(self_pubkey, peer, origins, stakes);
239244
}
240245

246+
fn maybe_refresh_weighting_config(
247+
&self,
248+
maybe_bank_ref: Option<&Bank>,
249+
now_ms: u64,
250+
) -> Option<WeightingConfig> {
251+
let bank = maybe_bank_ref?;
252+
{
253+
let mut last = self.last_cfg_poll_ms.lock().unwrap();
254+
if now_ms.saturating_sub(*last) < CONFIG_REFRESH_INTERVAL_MS {
255+
return None;
256+
}
257+
*last = now_ms;
258+
}
259+
get_gossip_config_from_account(bank)
260+
}
261+
241262
/// Refresh the push active set.
242263
#[allow(clippy::too_many_arguments)]
243264
pub(crate) fn refresh_push_active_set(
@@ -250,6 +271,7 @@ impl CrdsGossipPush {
250271
ping_cache: &Mutex<PingCache>,
251272
pings: &mut Vec<(SocketAddr, Ping)>,
252273
socket_addr_space: &SocketAddrSpace,
274+
maybe_bank_ref: Option<&Bank>,
253275
) {
254276
let mut rng = rand::thread_rng();
255277
// Active and valid gossip nodes with matching shred-version.
@@ -280,13 +302,18 @@ impl CrdsGossipPush {
280302
return;
281303
}
282304
let cluster_size = crds.read().unwrap().num_pubkeys().max(stakes.len());
305+
let maybe_cfg = self.maybe_refresh_weighting_config(maybe_bank_ref, timestamp());
283306
let mut active_set = self.active_set.write().unwrap();
307+
if let Some(cfg) = maybe_cfg {
308+
active_set.apply_cfg(&cfg);
309+
}
284310
active_set.rotate(
285311
&mut rng,
286312
CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE,
287313
cluster_size,
288314
&nodes,
289315
stakes,
316+
&self_keypair.pubkey(),
290317
)
291318
}
292319
}
@@ -447,6 +474,7 @@ mod tests {
447474
&ping_cache,
448475
&mut Vec::new(), // pings
449476
&SocketAddrSpace::Unspecified,
477+
None,
450478
);
451479

452480
let new_msg = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
@@ -514,6 +542,7 @@ mod tests {
514542
&ping_cache,
515543
&mut Vec::new(),
516544
&SocketAddrSpace::Unspecified,
545+
None,
517546
);
518547

519548
// push 3's contact info to 1 and 2 and 3
@@ -557,6 +586,7 @@ mod tests {
557586
&ping_cache,
558587
&mut Vec::new(), // pings
559588
&SocketAddrSpace::Unspecified,
589+
None,
560590
);
561591

562592
let new_msg = CrdsValue::new_unsigned(CrdsData::from(ContactInfo::new_localhost(
@@ -605,6 +635,7 @@ mod tests {
605635
&ping_cache,
606636
&mut Vec::new(), // pings
607637
&SocketAddrSpace::Unspecified,
638+
None,
608639
);
609640

610641
let mut ci = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 0);

gossip/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mod protocol;
4141
mod push_active_set;
4242
mod received_cache;
4343
pub mod restart_crds_values;
44+
pub mod stake_weighting_config;
4445
pub mod weighted_shuffle;
4546

4647
#[macro_use]

0 commit comments

Comments
 (0)