Skip to content

Commit 94d9d4a

Browse files
authored
Reduce memory usage in validator (#379)
1 parent 84db1e4 commit 94d9d4a

File tree

7 files changed

+100
-33
lines changed

7 files changed

+100
-33
lines changed

anchor/client/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -416,14 +416,15 @@ impl Client {
416416
));
417417
duties_tracker.clone().start(executor.clone());
418418

419-
let message_validator = Arc::new(Validator::new(
419+
let message_validator = Validator::new(
420420
database.watch(),
421421
E::slots_per_epoch(),
422422
spec.epochs_per_sync_committee_period.as_u64(),
423423
E::sync_committee_size(),
424424
duties_tracker.clone(),
425425
slot_clock.clone(),
426-
));
426+
&executor,
427+
);
427428

428429
let message_sender: Arc<dyn MessageSender> = if config.impostor.is_none() {
429430
Arc::new(NetworkMessageSender::new(

anchor/fuzz/fuzz_targets/setup.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,20 @@ pub fn setup_test_message_validator() -> Arc<Validator<ManualSlotClock, MockDuti
116116

117117
let duties_provider = MockDutiesProvider {};
118118

119-
Arc::new(Validator::new(
119+
let handle = tokio::runtime::Handle::current();
120+
let (_signal, exit) = async_channel::bounded(1);
121+
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
122+
let executor = TaskExecutor::new(handle, exit, shutdown_tx, "test_executor".into());
123+
124+
Validator::new(
120125
db.watch(),
121126
32,
122127
256,
123128
512,
124129
duties_provider.into(),
125130
slot_clock.clone(),
126-
))
131+
&executor,
132+
)
127133
}
128134

129135
// Sets up a real NetworkMessageReceiver for fuzzing
@@ -138,7 +144,7 @@ pub fn setup_test_message_receiver()
138144
max_workers: 2,
139145
queue_size: Default::default(),
140146
};
141-
let processor_senders = processor::spawn(processor_config, executor);
147+
let processor_senders = processor::spawn(processor_config, executor.clone());
142148

143149
let slot_clock = ManualSlotClock::new(
144150
types::Slot::new(0),
@@ -161,14 +167,15 @@ pub fn setup_test_message_receiver()
161167

162168
let duties_provider = MockDutiesProvider {};
163169

164-
let message_validator = Arc::new(Validator::new(
170+
let message_validator = Validator::new(
165171
db.watch(),
166172
32,
167173
256,
168174
512,
169175
duties_provider.into(),
170176
slot_clock.clone(),
171-
));
177+
&executor,
178+
);
172179

173180
let network_message_sender: Arc<dyn MessageSender> = Arc::new(
174181
NetworkMessageSender::new(

anchor/message_sender/src/network.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl<S: SlotClock + 'static, D: DutiesProvider> MessageSender for Arc<NetworkMes
9494
}
9595
}
9696

97-
impl<S: SlotClock, D: DutiesProvider> NetworkMessageSender<S, D> {
97+
impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageSender<S, D> {
9898
pub fn new(
9999
processor: processor::Senders,
100100
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,

anchor/message_validator/src/consensus_message.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,13 @@ pub(crate) fn validate_qbft_logic(
221221
}
222222

223223
if consensus_message.round == signer_state.round {
224-
// Rule: Peer must not send two proposals with different data
224+
// Rule: Peer must not send two proposals with different data.
225+
// We separately verify that the root in the message matches the data.
225226
if !signed_ssv_message.full_data().is_empty()
226227
&& signer_state
227-
.proposal_data
228+
.proposal_hash
228229
.as_ref()
229-
.is_some_and(|data| data != signed_ssv_message.full_data())
230+
.is_some_and(|hash| hash != consensus_message.root)
230231
{
231232
return Err(ValidationFailure::DifferentProposalData);
232233
}

anchor/message_validator/src/duty_state.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,17 @@ impl DutyState {
104104

105105
Ok(())
106106
}
107+
108+
/// Returns true if all operators within the map have a `max_slot` lower than `now -
109+
/// stored_slot_count`. This indicates that there has been no relevant activity for this duty
110+
/// recently and no relevant information is lost if this is dropped.
111+
pub(crate) fn outdated(&self, current_slot: Slot) -> bool {
112+
let earliest_relevant_slot =
113+
current_slot.saturating_sub(Slot::from(self.stored_slot_count));
114+
self.operators
115+
.values()
116+
.all(|operator_state| operator_state.max_slot < earliest_relevant_slot)
117+
}
107118
}
108119

109120
/// Tracks the state for a specific operator across multiple slots.
@@ -262,8 +273,8 @@ pub(crate) struct SignerState {
262273
pub(crate) round: u64,
263274
/// Records the count of each type of consensus message encountered.
264275
pub(crate) message_counts: MessageCounts,
265-
/// Optionally holds proposal-related data if a proposal message was received.
266-
pub(crate) proposal_data: Option<Vec<u8>>,
276+
/// Holds the hash of the proposal data, if a proposal was received.
277+
pub(crate) proposal_hash: Option<[u8; 32]>,
267278
/// A set of CommitteeIds indicating which committees have already been seen.
268279
seen_signers: HashSet<CommitteeId>,
269280
}
@@ -275,7 +286,7 @@ impl SignerState {
275286
slot,
276287
round,
277288
message_counts: MessageCounts::default(),
278-
proposal_data: None,
289+
proposal_hash: None,
279290
seen_signers: HashSet::new(),
280291
}
281292
}
@@ -289,14 +300,15 @@ impl SignerState {
289300

290301
/// Updates the SignerState with a new consensus message.
291302
///
292-
/// - If the message is a proposal (and contains full data), it stores the proposal data.
303+
/// - If the message is a proposal (and contains full data), it stores the hashed data.
293304
/// - If multiple operator IDs are present, it records the committee as seen.
294305
/// - Updates the message counts based on the message type.
295306
fn update(&mut self, signed_ssv_message: &SignedSSVMessage, consensus_message: &QbftMessage) {
296307
if !signed_ssv_message.full_data().is_empty()
297308
&& consensus_message.qbft_message_type == QbftMessageType::Proposal
298309
{
299-
self.proposal_data = Some(Vec::from(signed_ssv_message.full_data()));
310+
// We verified that the proposal data matches the root.
311+
self.proposal_hash = Some(*consensus_message.root);
300312
}
301313

302314
if signed_ssv_message.operator_ids().len() > 1 {
@@ -316,18 +328,22 @@ mod tests {
316328
use ssv_types::{OperatorId, Slot, consensus::QbftMessageType, msgid::Role};
317329

318330
use super::*;
319-
use crate::tests::{QbftMessageBuilder, create_signed_consensus_message};
331+
use crate::{
332+
hash_data,
333+
tests::{QbftMessageBuilder, create_signed_consensus_message},
334+
};
320335

321336
#[test]
322337
fn test_duty_state_update() {
323338
let mut duty_state = DutyState::new(10);
324339

325-
let qbft_message =
340+
let mut qbft_message =
326341
QbftMessageBuilder::new(Role::Committee, QbftMessageType::Proposal).build();
327342

328343
let operator_id = OperatorId(1);
329344

330345
let full_data = vec![1, 2, 3];
346+
*qbft_message.root = hash_data(&full_data);
331347
let signed_ssv_message = create_signed_consensus_message(
332348
qbft_message.clone(),
333349
vec![operator_id],
@@ -346,9 +362,9 @@ mod tests {
346362
if let Some(signer_state) = operator_state.get_signer_state(&slot) {
347363
// // Verify that the proposal data was correctly stored
348364
assert_eq!(
349-
&signer_state.proposal_data,
350-
&Some(full_data),
351-
"Proposal data should match the signed message data"
365+
signer_state.proposal_hash,
366+
Some(hash_data(&full_data)),
367+
"Proposal data should match the hashed full data"
352368
);
353369

354370
// Verify message counts were updated

anchor/message_validator/src/lib.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use ssv_types::{
2929
partial_sig::PartialSignatureMessages,
3030
};
3131
use ssz::{Decode, DecodeError, Encode};
32-
use tokio::sync::watch::Receiver;
32+
use task_executor::TaskExecutor;
33+
use tokio::{sync::watch::Receiver, time::sleep};
3334
use tracing::{error, trace};
3435
use types::{Epoch, Slot};
3536

@@ -39,6 +40,8 @@ use crate::{
3940
partial_signature::validate_partial_signature_message,
4041
};
4142

43+
const VALIDATOR_CLEANER_NAME: &str = "validator_cleaner";
44+
4245
pub(crate) const FIRST_ROUND: u64 = 1;
4346

4447
#[derive(Debug)]
@@ -271,24 +274,29 @@ pub struct Validator<S: SlotClock, D: DutiesProvider> {
271274
slot_clock: S,
272275
}
273276

274-
impl<S: SlotClock, D: DutiesProvider> Validator<S, D> {
277+
impl<S: SlotClock + 'static, D: DutiesProvider> Validator<S, D> {
275278
pub fn new(
276279
network_state_rx: Receiver<NetworkState>,
277280
slots_per_epoch: u64,
278281
epochs_per_sync_committee_period: u64,
279282
sync_committee_size: usize,
280283
duties_provider: Arc<D>,
281284
slot_clock: S,
282-
) -> Self {
283-
Self {
285+
task_executor: &TaskExecutor,
286+
) -> Arc<Self> {
287+
let validator = Arc::new(Self {
284288
network_state_rx,
285289
duty_state_map: DashMap::new(),
286290
slots_per_epoch,
287291
epochs_per_sync_committee_period,
288292
sync_committee_size,
289293
duties_provider,
290294
slot_clock,
291-
}
295+
});
296+
297+
task_executor.spawn(Arc::clone(&validator).cleaner(), VALIDATOR_CLEANER_NAME);
298+
299+
validator
292300
}
293301

294302
pub fn validate(&self, message_data: &[u8]) -> ValidationResult {
@@ -381,6 +389,40 @@ impl<S: SlotClock, D: DutiesProvider> Validator<S, D> {
381389
DutyState::new(stored_slot_count as usize)
382390
})
383391
}
392+
393+
async fn cleaner(self: Arc<Self>) {
394+
let slot_clock = self.slot_clock.clone();
395+
let slots_per_epoch = self.slots_per_epoch;
396+
397+
// Use a weak reference to exit when the other `Arc` are dropped.
398+
let weak_self = Arc::downgrade(&self);
399+
loop {
400+
// Try to get the time to the next slot.
401+
let Some(until_next_epoch) = slot_clock.duration_to_next_epoch(slots_per_epoch) else {
402+
sleep(slot_clock.slot_duration()).await;
403+
continue;
404+
};
405+
406+
// Wait until 5/6ths into the slot. Then, all proposal and attestation duties should be
407+
// done, so we can lock the map without risking message delays for time-critical
408+
// messages.
409+
let sleep_for = until_next_epoch + slot_clock.slot_duration() * 5 / 6;
410+
sleep(sleep_for).await;
411+
412+
let Some(validator) = weak_self.upgrade() else {
413+
// No validator to clean anymore, exit.
414+
break;
415+
};
416+
let Some(now) = slot_clock.now() else {
417+
// Very weird, let's try again later.
418+
continue;
419+
};
420+
421+
validator
422+
.duty_state_map
423+
.retain(|_, duty_state| !duty_state.outdated(now));
424+
}
425+
}
384426
}
385427

386428
fn validate_ssv_message(

anchor/message_validator/src/message_counts.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ use ssv_types::{
66

77
use crate::ValidationFailure;
88

9-
const MAX_MESSAGES_PER_ROUND: u64 = 1;
9+
const MAX_MESSAGES_PER_ROUND: u8 = 1;
1010

1111
/// MessageCounts tracks different types of message counts per slot
1212
#[derive(Debug, Clone, Copy, Default)]
1313
pub(crate) struct MessageCounts {
14-
pub(crate) pre_consensus: u64,
15-
pub(crate) proposal: u64,
16-
pub(crate) prepare: u64,
17-
pub(crate) commit: u64,
18-
pub(crate) round_change: u64,
19-
pub(crate) post_consensus: u64,
14+
pub(crate) pre_consensus: u8,
15+
pub(crate) proposal: u8,
16+
pub(crate) prepare: u8,
17+
pub(crate) commit: u8,
18+
pub(crate) round_change: u8,
19+
pub(crate) post_consensus: u8,
2020
}
2121

2222
impl MessageCounts {

0 commit comments

Comments
 (0)