Skip to content

Commit 21c3679

Browse files
authored
refactor: Revamp startup sequence and check sync status (#372)
We want to be able to listen to the network without an Operator ID, for example to act as an exporter, or to already discover some peers during sync. Also, we want to make sure to not do anything punishable (invalid network messages, signing slashable objects) if we are out of sync. - Add `OwnValidatorId` to allow managers and network to start eagerly - Provide watcher for synced status, unifying `historic_finished_notify` and `operational_status` - Avoid sending messages and starting duties for slashable objects if not synced - Add sync status logging to notifier
1 parent afb8626 commit 21c3679

File tree

16 files changed

+166
-94
lines changed

16 files changed

+166
-94
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ indexmap = "2.7.0"
117117
libp2p = { version = "0.55", default-features = false }
118118
multiaddr = "0.18.2"
119119
num_cpus = "1"
120+
once_cell = "1.21.3"
120121
openssl = "0.10.72"
121122
parking_lot = "0.12"
122123
pbkdf2 = "0.12.2"

anchor/client/src/lib.rs

Lines changed: 30 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use beacon_node_fallback::{
1818
};
1919
pub use cli::Node;
2020
use config::Config;
21-
use database::NetworkDatabase;
21+
use database::{NetworkDatabase, OwnOperatorId};
2222
use duties_tracker::{duties_tracker::DutiesTracker, voluntary_exit_tracker::VoluntaryExitTracker};
2323
use eth::{
2424
index_sync::start_validator_index_syncer, voluntary_exit_processor::start_exit_processor,
@@ -38,17 +38,15 @@ use sensitive_url::SensitiveUrl;
3838
use signature_collector::SignatureCollectorManager;
3939
use slashing_protection::SlashingDatabase;
4040
use slot_clock::{SlotClock, SystemTimeSlotClock};
41-
use ssv_types::OperatorId;
4241
use subnet_service::{SUBNET_COUNT, SubnetId, start_subnet_service};
4342
use task_executor::TaskExecutor;
4443
use tokio::{
4544
net::TcpListener,
46-
select,
47-
sync::{mpsc, mpsc::unbounded_channel, oneshot, oneshot::Receiver},
45+
sync::{mpsc, mpsc::unbounded_channel},
4846
time::sleep,
4947
};
5048
use tracing::{debug, error, info, warn};
51-
use types::{ChainSpec, EthSpec, Hash256};
49+
use types::{EthSpec, Hash256};
5250
use validator_metrics::set_gauge;
5351
use validator_services::{
5452
attestation_service::AttestationServiceBuilder,
@@ -381,7 +379,6 @@ impl Client {
381379
let voluntary_exit_tracker = Arc::new(VoluntaryExitTracker::new());
382380

383381
// Start syncer
384-
let (historic_finished_tx, historic_finished_rx) = oneshot::channel();
385382
let mut syncer = eth::SsvEventSyncer::new(
386383
database.clone(),
387384
index_sync_tx,
@@ -390,15 +387,14 @@ impl Client {
390387
http_urls: config.execution_nodes,
391388
ws_url: config.execution_nodes_websocket,
392389
network: config.global_config.ssv_network.clone(),
393-
historic_finished_notify: Some(historic_finished_tx),
394390
},
395391
)
396392
.await
397393
.map_err(|e| format!("Unable to create syncer: {e}"))?;
398394

399-
// Access to the operational status of the sync. This can be passed around to condition
400-
// duties based on the current status of the sync
401-
let _operational_status = syncer.operational_status();
395+
// Access to the sync status. This can be passed around to condition duties based on whether
396+
// we are synced.
397+
let is_synced = syncer.is_synced();
402398

403399
executor.spawn(
404400
async move {
@@ -409,10 +405,7 @@ impl Client {
409405
"syncer",
410406
);
411407

412-
// Wait until we have an operator id and historical sync is done
413-
let operator_id = wait_for_operator_id_and_sync(&database, historic_finished_rx, &spec)
414-
.await
415-
.ok_or("Failed waiting for operator id")?;
408+
let operator_id = OwnOperatorId::new(database.watch());
416409

417410
// Network sender/receiver
418411
let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);
@@ -442,9 +435,10 @@ impl Client {
442435
processor_senders.clone(),
443436
network_tx.clone(),
444437
key.clone(),
445-
operator_id,
438+
operator_id.clone(),
446439
Some(message_validator.clone()),
447440
SUBNET_COUNT,
441+
is_synced.clone(),
448442
)?)
449443
} else {
450444
Arc::new(ImpostorMessageSender::new(network_tx.clone(), SUBNET_COUNT))
@@ -453,7 +447,7 @@ impl Client {
453447
// Create the signature collector
454448
let signature_collector = SignatureCollectorManager::new(
455449
processor_senders.clone(),
456-
operator_id,
450+
operator_id.clone(),
457451
config.global_config.ssv_network.ssv_domain_type.clone(),
458452
message_sender.clone(),
459453
slot_clock.clone(),
@@ -463,7 +457,7 @@ impl Client {
463457
// Create the qbft manager
464458
let qbft_manager = QbftManager::new(
465459
processor_senders.clone(),
466-
operator_id,
460+
operator_id.clone(),
467461
slot_clock.clone(),
468462
message_sender,
469463
config.global_config.ssv_network.ssv_domain_type.clone(),
@@ -528,6 +522,7 @@ impl Client {
528522
config.builder_proposals,
529523
config.builder_boost_factor,
530524
config.prefer_builder_proposals,
525+
is_synced.clone(),
531526
);
532527

533528
start_exit_processor(
@@ -566,6 +561,24 @@ impl Client {
566561
ctx.write().duties_service = Some(duties_service.clone());
567562
}
568563

564+
// Spawn notifier for logging and metrics
565+
spawn_notifier(
566+
duties_service.clone(),
567+
database.watch(),
568+
is_synced.clone(),
569+
executor.clone(),
570+
&spec,
571+
);
572+
573+
// Wait for sync to complete before starting services
574+
info!("Waiting for sync to complete before starting services...");
575+
is_synced
576+
.clone()
577+
.wait_for(|&is_synced| is_synced)
578+
.await
579+
.map_err(|_| "Sync watch channel closed")?;
580+
info!("Sync complete, starting services...");
581+
569582
let mut block_service_builder = BlockServiceBuilder::new()
570583
.slot_clock(slot_clock.clone())
571584
.validator_store(validator_store.clone())
@@ -644,13 +657,6 @@ impl Client {
644657

645658
http_api_shared_state.write().database_state = Some(database.watch());
646659

647-
spawn_notifier(
648-
duties_service.clone(),
649-
database.watch(),
650-
executor.clone(),
651-
&spec,
652-
);
653-
654660
if !config.disable_latency_measurement_service {
655661
start_latency_service(executor.clone(), slot_clock.clone(), beacon_nodes.clone());
656662
}
@@ -831,33 +837,6 @@ async fn poll_whilst_waiting_for_genesis(
831837
}
832838
}
833839

834-
async fn wait_for_operator_id_and_sync(
835-
database: &Arc<NetworkDatabase>,
836-
mut sync_notification: Receiver<()>,
837-
spec: &Arc<ChainSpec>,
838-
) -> Option<OperatorId> {
839-
let sleep_duration = Duration::from_secs(spec.seconds_per_slot);
840-
let mut state = database.watch();
841-
let id = loop {
842-
select! {
843-
result = state.changed() => {
844-
result.ok()?;
845-
if let Some(id) = state.borrow().get_own_id() {
846-
break id;
847-
}
848-
}
849-
_ = sleep(sleep_duration) => info!("Waiting for operator id"),
850-
}
851-
};
852-
info!(id = *id, "Operator found on chain");
853-
loop {
854-
select! {
855-
result = &mut sync_notification => return result.ok().map(|_| id),
856-
_ = sleep(sleep_duration) => info!("Waiting for historical sync to finish"),
857-
}
858-
}
859-
}
860-
861840
pub fn load_pem_certificate<P: AsRef<Path>>(pem_path: P) -> Result<Certificate, String> {
862841
let mut buf = Vec::new();
863842
File::open(&pem_path)

anchor/client/src/notifier.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::duties_service::DutiesService;
2121
pub fn spawn_notifier<E: EthSpec, T: SlotClock + 'static>(
2222
duties_service: Arc<DutiesService<AnchorValidatorStore<T, E>, T>>,
2323
network_state: watch::Receiver<NetworkState>,
24+
synced: watch::Receiver<bool>,
2425
executor: TaskExecutor,
2526
spec: &ChainSpec,
2627
) {
@@ -31,7 +32,7 @@ pub fn spawn_notifier<E: EthSpec, T: SlotClock + 'static>(
3132
if let Some(duration_to_next_slot) = duties_service.slot_clock.duration_to_next_slot() {
3233
// Sleep until the middle of the next slot
3334
sleep(duration_to_next_slot + slot_duration / 2).await;
34-
notify(&duties_service, &network_state).await;
35+
notify(&duties_service, &network_state, &synced).await;
3536
} else {
3637
error!("Failed to read slot clock");
3738
// If we can't read the slot clock, just wait another slot.
@@ -51,6 +52,7 @@ pub fn spawn_notifier<E: EthSpec, T: SlotClock + 'static>(
5152
async fn notify<E: EthSpec, T: SlotClock + 'static>(
5253
duties_service: &DutiesService<AnchorValidatorStore<T, E>, T>,
5354
network_state: &watch::Receiver<NetworkState>,
55+
synced: &watch::Receiver<bool>,
5456
) {
5557
// Scope needed as Rust complains about `state` being held across `await` if using `drop`
5658
let (operator_id, cluster_count) = {
@@ -60,14 +62,18 @@ async fn notify<E: EthSpec, T: SlotClock + 'static>(
6062
(operator_id, cluster_count)
6163
};
6264

63-
if let Some(operator_id) = operator_id {
64-
if duties_service.total_validator_count() > 0 {
65+
let is_synced = *synced.borrow();
66+
67+
match (operator_id, is_synced) {
68+
(None, false) => info!("Syncing"),
69+
(None, true) => info!("Synced, waiting for operator key to appear on chain"),
70+
(Some(operator_id), false) => {
71+
info!(%operator_id, "Operator present on chain, waiting for sync")
72+
}
73+
(Some(operator_id), true) if duties_service.total_validator_count() > 0 => {
6574
info!(%operator_id, cluster_count, "Operator active");
6675
validator_services::notifier_service::notify(duties_service).await;
67-
} else {
68-
info!(%operator_id, "Operator ready, no validators assigned");
6976
}
70-
} else {
71-
error!("No operator ID - the operator might have been removed via the contract");
77+
(Some(operator_id), true) => info!(%operator_id, "Operator ready, no validators assigned"),
7278
}
7379
}

anchor/database/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ authors = ["Sigma Prime <[email protected]>"]
66

77
[dependencies]
88
base64 = { workspace = true }
9+
once_cell = { workspace = true }
910
openssl = { workspace = true }
1011
r2d2 = { workspace = true }
1112
r2d2_sqlite = { workspace = true }

anchor/database/src/lib.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::{
55
time::Duration,
66
};
77

8+
use once_cell::sync::OnceCell;
89
use openssl::{pkey::Public, rsa::Rsa};
910
use r2d2_sqlite::SqliteConnectionManager;
1011
use rusqlite::{Transaction, params};
@@ -228,3 +229,54 @@ impl NetworkDatabase {
228229
});
229230
}
230231
}
232+
233+
/// A helper to get the operator ID of the current operator. Caches the ID after successfully
234+
/// retrieving it to avoid locking the state further.
235+
#[derive(Clone)]
236+
pub enum OwnOperatorId {
237+
/// The operator ID was known when the `OwnOperatorId` was created.
238+
Known(OperatorId),
239+
/// The operator ID was not known when the `OwnOperatorId` was created. It will be retrieved
240+
/// from the `receiver` and cached in the `id` on first success.
241+
FromState {
242+
receiver: Receiver<NetworkState>,
243+
/// We use a `OnceLock` so that `get` can be called without a mutable reference.
244+
id: OnceCell<OperatorId>,
245+
},
246+
}
247+
248+
impl OwnOperatorId {
249+
/// Creates the `OwnOperatorId` to either immediately store the operator ID or to recheck it on
250+
/// later `get` calls.
251+
pub fn new(receiver: Receiver<NetworkState>) -> Self {
252+
if let Some(operator_id) = receiver.borrow().get_own_id() {
253+
Self::Known(operator_id)
254+
} else {
255+
Self::FromState {
256+
receiver,
257+
id: OnceCell::new(),
258+
}
259+
}
260+
}
261+
262+
/// Get the operator ID if it is available. Caches the ID internally after the first successful
263+
/// call to avoid locking the state in the future. This is possible because the own Operator ID
264+
/// never changes.
265+
pub fn get(&self) -> Option<OperatorId> {
266+
match self {
267+
Self::Known(id) => Some(*id),
268+
Self::FromState { receiver, id } => {
269+
// Switch to `std`'s OnceLock as soon as `get_or_try_init` is stable
270+
id.get_or_try_init(|| receiver.borrow().get_own_id().ok_or(()))
271+
.ok()
272+
.copied()
273+
}
274+
}
275+
}
276+
}
277+
278+
impl From<OperatorId> for OwnOperatorId {
279+
fn from(operator_id: OperatorId) -> Self {
280+
Self::Known(operator_id)
281+
}
282+
}

0 commit comments

Comments
 (0)