@@ -66,6 +66,7 @@ pub struct ExactSubnet {
66
66
}
67
67
68
68
/// The enum used to group all kinds of validator subscriptions
69
+ #[ derive( Debug , Clone , PartialEq ) ]
69
70
pub enum Subscription {
70
71
Attestation ( ValidatorSubscription ) ,
71
72
SyncCommittee ( SyncCommitteeSubscription ) ,
@@ -198,31 +199,13 @@ impl<T: BeaconChainTypes> SubnetService<T> {
198
199
199
200
/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
200
201
#[ cfg( test) ]
201
- pub fn subscription_count ( & self ) -> usize {
202
- if self . subscribe_all_subnets {
203
- self . beacon_chain . spec . attestation_subnet_count as usize
204
- } else {
205
- self . subscriptions
206
- . iter ( )
207
- . filter ( |subnet| matches ! ( subnet, Subnet :: Attestation ( _) ) )
208
- . collect :: < HashSet < _ > > ( )
209
- . len ( )
210
- }
202
+ pub fn subscriptions ( & self ) -> impl Iterator < Item = & Subnet > {
203
+ self . subscriptions . iter ( )
211
204
}
212
205
213
- /// Return count of all currently subscribed sync committee subnets.
214
206
#[ cfg( test) ]
215
- pub fn sync_committee_subscription_count ( & self ) -> usize {
216
- use types:: consts:: altair:: SYNC_COMMITTEE_SUBNET_COUNT ;
217
- if self . subscribe_all_subnets {
218
- SYNC_COMMITTEE_SUBNET_COUNT as usize
219
- } else {
220
- self . subscriptions
221
- . iter ( )
222
- . filter ( |subnet| matches ! ( subnet, Subnet :: SyncCommittee ( _) ) )
223
- . collect :: < HashSet < _ > > ( )
224
- . len ( )
225
- }
207
+ pub fn permanent_subscriptions ( & self ) -> impl Iterator < Item = & Subnet > {
208
+ self . permanent_attestation_subscriptions . iter ( )
226
209
}
227
210
228
211
/// Returns whether we are subscribed to a subnet for testing purposes.
@@ -326,23 +309,17 @@ impl<T: BeaconChainTypes> SubnetService<T> {
326
309
}
327
310
} ;
328
311
329
- let spec = & self . beacon_chain . spec ;
330
- let sync_committee_duration_in_slots = spec
331
- . epochs_per_sync_committee_period
332
- . as_u64 ( )
333
- . saturating_mul ( T :: EthSpec :: slots_per_epoch ( ) ) ;
334
-
335
312
for subnet_id in subnet_ids {
336
313
let subnet = Subnet :: SyncCommittee ( subnet_id) ;
337
- let slot_when_required = subscription
314
+ let slot_required_until = subscription
338
315
. until_epoch
339
316
. start_slot ( T :: EthSpec :: slots_per_epoch ( ) ) ;
340
- subnets_to_discover. insert ( subnet. clone ( ) , slot_when_required ) ;
317
+ subnets_to_discover. insert ( subnet, slot_required_until ) ;
341
318
342
- let Some ( duration_to_unsubscribe) =
343
- self . beacon_chain . slot_clock . duration_to_slot (
344
- slot_when_required + sync_committee_duration_in_slots ,
345
- )
319
+ let Some ( duration_to_unsubscribe) = self
320
+ . beacon_chain
321
+ . slot_clock
322
+ . duration_to_slot ( slot_required_until )
346
323
else {
347
324
warn ! ( self . log, "Subscription to sync subnet error" ; "error" => "Unable to determine duration to unsubscription slot" , "validator_index" => subscription. validator_index) ;
348
325
continue ;
@@ -359,12 +336,15 @@ impl<T: BeaconChainTypes> SubnetService<T> {
359
336
"Sync committee subscription is past expiration" ;
360
337
"subnet" => ?subnet,
361
338
"current_slot" => ?current_slot,
362
- "unsubscribe_slot" => ?slot_when_required + sync_committee_duration_in_slots,
363
-
364
- ) ;
339
+ "unsubscribe_slot" => ?slot_required_until, ) ;
340
+ continue ;
365
341
}
366
342
367
- self . subscribe_to_sync_subnet ( subnet, duration_to_unsubscribe) ;
343
+ self . subscribe_to_sync_subnet (
344
+ subnet,
345
+ duration_to_unsubscribe,
346
+ slot_required_until,
347
+ ) ;
368
348
}
369
349
}
370
350
}
@@ -516,7 +496,12 @@ impl<T: BeaconChainTypes> SubnetService<T> {
516
496
}
517
497
518
498
/// Adds a subscription event and an associated unsubscription event if required.
519
- fn subscribe_to_sync_subnet ( & mut self , subnet : Subnet , duration_to_unsubscribe : Duration ) {
499
+ fn subscribe_to_sync_subnet (
500
+ & mut self ,
501
+ subnet : Subnet ,
502
+ duration_to_unsubscribe : Duration ,
503
+ slot_required_until : Slot ,
504
+ ) {
520
505
// Return if we have subscribed to all subnets
521
506
if self . subscribe_all_subnets {
522
507
return ;
@@ -540,7 +525,7 @@ impl<T: BeaconChainTypes> SubnetService<T> {
540
525
self . subscriptions
541
526
. insert_at ( subnet, duration_to_unsubscribe) ;
542
527
// We are not currently subscribed and have no waiting subscription, create one
543
- debug ! ( self . log, "Subscribing to subnet" ; "subnet" => ?subnet, "until" => ?duration_to_unsubscribe ) ;
528
+ debug ! ( self . log, "Subscribing to subnet" ; "subnet" => ?subnet, "until" => ?slot_required_until ) ;
544
529
self . events
545
530
. push_back ( SubnetServiceMessage :: Subscribe ( subnet) ) ;
546
531
0 commit comments