Skip to content

Fix subnet unsubscription time #6890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions beacon_node/network/src/subnet_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ impl<T: BeaconChainTypes> SubnetService<T> {
|| self.permanent_attestation_subscriptions.contains(subnet)
}

/// Returns whether we are subscribed to a permanent subnet for testing purposes.
#[cfg(test)]
pub(crate) fn is_subscribed_permanent(&self, subnet: &Subnet) -> bool {
self.permanent_attestation_subscriptions.contains(subnet)
}

/// Processes a list of validator subscriptions.
///
/// This is fundamentally called form the HTTP API when a validator requests duties from us
Expand Down Expand Up @@ -629,9 +635,10 @@ impl<T: BeaconChainTypes> Stream for SubnetService<T> {
// expire subscription.
match self.scheduled_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => {
let ExactSubnet { subnet, .. } = exact_subnet;
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
let ExactSubnet { subnet, slot } = exact_subnet;
// Set the `end_slot` for the subscription to be `duty.slot + 1` so that we unsubscribe
// only at the end of the duty slot.
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, slot + 1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add some comments referencing the issue and the reason for slot + 1 in the code. Useful if we ever refactor this and git blame breaks

debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);
}
self.waker
Expand Down
59 changes: 25 additions & 34 deletions beacon_node/network/src/subnet_service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ use beacon_chain::{
};
use genesis::{generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use lighthouse_network::NetworkConfig;
use logging::test_logger;
use slog::{o, Drain, Logger};
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::sync::{Arc, LazyLock};
use std::time::{Duration, SystemTime};
Expand All @@ -21,10 +18,6 @@ use types::{
SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription,
};

// Set to enable/disable logging
// const TEST_LOG_LEVEL: Option<slog::Level> = Some(slog::Level::Debug);
const TEST_LOG_LEVEL: Option<slog::Level> = None;

const SLOT_DURATION_MILLIS: u64 = 400;

type TestBeaconChainType = Witness<
Expand All @@ -46,7 +39,7 @@ impl TestBeaconChain {

let keypairs = generate_deterministic_keypairs(1);

let log = get_logger(TEST_LOG_LEVEL);
let log = logging::test_logger();
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap();

Expand Down Expand Up @@ -98,28 +91,10 @@ pub fn recent_genesis_time() -> u64 {
.as_secs()
}

fn get_logger(log_level: Option<slog::Level>) -> Logger {
if let Some(level) = log_level {
let drain = {
let decorator = slog_term::TermDecorator::new().build();
let decorator =
logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH);
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).chan_size(2048).build();
drain.filter_level(level)
};

Logger::root(drain.fuse(), o!())
} else {
let builder = NullLoggerBuilder;
builder.build().expect("should build logger")
}
}

static CHAIN: LazyLock<TestBeaconChain> = LazyLock::new(TestBeaconChain::new_with_system_clock);

fn get_subnet_service() -> SubnetService<TestBeaconChainType> {
let log = test_logger();
let log = logging::test_logger();
let config = NetworkConfig::default();

let beacon_chain = CHAIN.chain.clone();
Expand Down Expand Up @@ -501,8 +476,6 @@ mod test {
let committee_count = 1;

// Makes 3 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for each of the later slots subscriptions
// (subscription_slot2 and subscription_slot3).
let subscription_slot1 = 0;
let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
let subscription_slot3 = subscription_slot2 * 2;
Expand Down Expand Up @@ -585,7 +558,7 @@ mod test {
let expected_unsubscription =
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1));

if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
assert_eq!(expected_subscription, events[0]);
assert_eq!(expected_unsubscription, events[2]);
}
Expand All @@ -607,9 +580,18 @@ mod test {

assert_eq!(no_events, []);

let second_subscribe_event = get_events(&mut subnet_service, None, 2).await;
let subscription_end_slot = current_slot + subscription_slot2 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete
let wait_slots = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_end_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;

let second_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await;
// If the permanent and short lived subnets are different, we should get an unsubscription event.
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
[
expected_subscription.clone(),
Expand All @@ -633,9 +615,18 @@ mod test {

assert_eq!(no_events, []);

let third_subscribe_event = get_events(&mut subnet_service, None, 2).await;
let subscription_end_slot = current_slot + subscription_slot3 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete
let wait_slots = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_end_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;

let third_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await;

if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
[expected_subscription, expected_unsubscription],
third_subscribe_event[..]
Expand Down