diff --git a/beacon_node/network/src/subnet_service/mod.rs b/beacon_node/network/src/subnet_service/mod.rs index 33ae567eb3f..de90e222543 100644 --- a/beacon_node/network/src/subnet_service/mod.rs +++ b/beacon_node/network/src/subnet_service/mod.rs @@ -216,6 +216,12 @@ impl SubnetService { || self.permanent_attestation_subscriptions.contains(subnet) } + /// Returns whether we are subscribed to a permanent subnet for testing purposes. + #[cfg(test)] + pub(crate) fn is_subscribed_permanent(&self, subnet: &Subnet) -> bool { + self.permanent_attestation_subscriptions.contains(subnet) + } + /// Processes a list of validator subscriptions. /// /// This is fundamentally called form the HTTP API when a validator requests duties from us @@ -629,9 +635,10 @@ impl Stream for SubnetService { // expire subscription. match self.scheduled_subscriptions.poll_next_unpin(cx) { Poll::Ready(Some(Ok(exact_subnet))) => { - let ExactSubnet { subnet, .. } = exact_subnet; - let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default(); - if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) { + let ExactSubnet { subnet, slot } = exact_subnet; + // Set the `end_slot` for the subscription to be `duty.slot + 1` so that we unsubscribe + // only at the end of the duty slot. + if let Err(e) = self.subscribe_to_subnet_immediately(subnet, slot + 1) { debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e); } self.waker diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 7283b4af314..0f3343df638 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -7,9 +7,6 @@ use beacon_chain::{ }; use genesis::{generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use lighthouse_network::NetworkConfig; -use logging::test_logger; -use slog::{o, Drain, Logger}; -use sloggers::{null::NullLoggerBuilder, Build}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::sync::{Arc, LazyLock}; use std::time::{Duration, SystemTime}; @@ -21,10 +18,6 @@ use types::{ SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, }; -// Set to enable/disable logging -// const TEST_LOG_LEVEL: Option = Some(slog::Level::Debug); -const TEST_LOG_LEVEL: Option = None; - const SLOT_DURATION_MILLIS: u64 = 400; type TestBeaconChainType = Witness< @@ -46,7 +39,7 @@ impl TestBeaconChain { let keypairs = generate_deterministic_keypairs(1); - let log = get_logger(TEST_LOG_LEVEL); + let log = logging::test_logger(); let store = HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap(); @@ -98,28 +91,10 @@ pub fn recent_genesis_time() -> u64 { .as_secs() } -fn get_logger(log_level: Option) -> Logger { - if let Some(level) = log_level { - let drain = { - let decorator = slog_term::TermDecorator::new().build(); - let decorator = - logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).chan_size(2048).build(); - drain.filter_level(level) - }; - - Logger::root(drain.fuse(), o!()) - } else { - let builder = NullLoggerBuilder; - builder.build().expect("should build logger") - } -} - static CHAIN: LazyLock = LazyLock::new(TestBeaconChain::new_with_system_clock); fn get_subnet_service() -> SubnetService { - let log = test_logger(); + let log = logging::test_logger(); let config = NetworkConfig::default(); let beacon_chain = CHAIN.chain.clone(); @@ -501,8 +476,6 @@ mod test { let committee_count = 1; // Makes 3 validator subscriptions to the same subnet but at different slots. - // There should be just 1 unsubscription event for each of the later slots subscriptions - // (subscription_slot2 and subscription_slot3). let subscription_slot1 = 0; let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4; let subscription_slot3 = subscription_slot2 * 2; @@ -585,7 +558,7 @@ mod test { let expected_unsubscription = SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1)); - if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) { + if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) { assert_eq!(expected_subscription, events[0]); assert_eq!(expected_unsubscription, events[2]); } @@ -607,9 +580,18 @@ mod test { assert_eq!(no_events, []); - let second_subscribe_event = get_events(&mut subnet_service, None, 2).await; + let subscription_end_slot = current_slot + subscription_slot2 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete + let wait_slots = subnet_service + .beacon_chain + .slot_clock + .duration_to_slot(subscription_end_slot) + .unwrap() + .as_millis() as u64 + / SLOT_DURATION_MILLIS; + + let second_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await; // If the permanent and short lived subnets are different, we should get an unsubscription event. - if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) { + if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) { assert_eq!( [ expected_subscription.clone(), @@ -633,9 +615,18 @@ mod test { assert_eq!(no_events, []); - let third_subscribe_event = get_events(&mut subnet_service, None, 2).await; + let subscription_end_slot = current_slot + subscription_slot3 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete + let wait_slots = subnet_service + .beacon_chain + .slot_clock + .duration_to_slot(subscription_end_slot) + .unwrap() + .as_millis() as u64 + / SLOT_DURATION_MILLIS; + + let third_subscribe_event = get_events(&mut subnet_service, None, wait_slots as u32).await; - if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) { + if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) { assert_eq!( [expected_subscription, expected_unsubscription], third_subscribe_event[..]