Skip to content

Commit 5f4d26b

Browse files
twoethsnflaig
andauthored
fix: aggregated attestation pool for electra (#7656)
**Motivation** - we have 2 bugs for electra logic: - one logic in phase0 cannot be applied to electra - wrong for loop place - one bug for all forks: incorrect dependent root used when checking attestation data, see #7651 **Description** extract fixes from #7646 - add more metrics for the pool - remove this logic for electra: "after 2 slots, there are a good chance that we have 2 * MAX_ATTESTATIONS_ELECTRA attestations and break the for loop early" - correct the for loop place to limit attestation consolidations - fix `isValidShuffling()` for #7651 - unit tests are done in #7646 , since logic changes there I cannot bring them here --------- Co-authored-by: Tuyen Nguyen <[email protected]> Co-authored-by: Nico Flaig <[email protected]>
1 parent bf6f3c9 commit 5f4d26b

File tree

4 files changed

+115
-63
lines changed

4 files changed

+115
-63
lines changed

packages/beacon-node/src/chain/chain.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ export class BeaconChain implements IBeaconChain {
239239
preAggregateCutOffTime,
240240
this.opts?.preaggregateSlotDistance
241241
);
242-
this.aggregatedAttestationPool = new AggregatedAttestationPool(this.config);
242+
this.aggregatedAttestationPool = new AggregatedAttestationPool(this.config, metrics);
243243
this.syncCommitteeMessagePool = new SyncCommitteeMessagePool(
244244
clock,
245245
preAggregateCutOffTime,
@@ -373,7 +373,7 @@ export class BeaconChain implements IBeaconChain {
373373
}
374374

375375
if (metrics) {
376-
metrics.opPool.aggregatedAttestationPoolSize.addCollect(() => this.onScrapeMetrics(metrics));
376+
metrics.opPool.attestationPoolSize.addCollect(() => this.onScrapeMetrics(metrics));
377377
}
378378

379379
// Event handlers. emitter is created internally and dropped on close(). Not need to .removeListener()
@@ -1076,9 +1076,7 @@ export class BeaconChain implements IBeaconChain {
10761076
}
10771077

10781078
private onScrapeMetrics(metrics: Metrics): void {
1079-
const {attestationCount, attestationDataCount} = this.aggregatedAttestationPool.getAttestationCount();
1080-
metrics.opPool.aggregatedAttestationPoolSize.set(attestationCount);
1081-
metrics.opPool.aggregatedAttestationPoolUniqueData.set(attestationDataCount);
1079+
// aggregatedAttestationPool tracks metrics on its own
10821080
metrics.opPool.attestationPoolSize.set(this.attestationPool.getAttestationCount());
10831081
metrics.opPool.attesterSlashingPoolSize.set(this.opPool.attesterSlashingsSize);
10841082
metrics.opPool.proposerSlashingPoolSize.set(this.opPool.proposerSlashingsSize);

packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts

Lines changed: 82 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ import {
3434
ssz,
3535
} from "@lodestar/types";
3636
import {assert, MapDef, toRootHex} from "@lodestar/utils";
37+
import {Metrics} from "../../metrics/metrics.js";
3738
import {IntersectResult, intersectUint8Arrays} from "../../util/bitArray.js";
39+
import {getShufflingDependentRoot} from "../../util/dependentRoot.js";
3840
import {InsertOutcome} from "./types.js";
3941
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
4042

@@ -104,21 +106,11 @@ export class AggregatedAttestationPool {
104106
>(() => new Map<DataRootHex, Map<CommitteeIndex, MatchingDataAttestationGroup>>());
105107
private lowestPermissibleSlot = 0;
106108

107-
constructor(private readonly config: ChainForkConfig) {}
108-
109-
/** For metrics to track size of the pool */
110-
getAttestationCount(): {attestationCount: number; attestationDataCount: number} {
111-
let attestationCount = 0;
112-
let attestationDataCount = 0;
113-
for (const attestationGroupByIndexByDataHex of this.attestationGroupByIndexByDataHexBySlot.values()) {
114-
for (const attestationGroupByIndex of attestationGroupByIndexByDataHex.values()) {
115-
attestationDataCount += attestationGroupByIndex.size;
116-
for (const attestationGroup of attestationGroupByIndex.values()) {
117-
attestationCount += attestationGroup.getAttestationCount();
118-
}
119-
}
120-
}
121-
return {attestationCount, attestationDataCount};
109+
constructor(
110+
private readonly config: ChainForkConfig,
111+
private readonly metrics: Metrics | null = null
112+
) {
113+
metrics?.opPool.aggregatedAttestationPool.attDataPerSlot.addCollect(() => this.onScrapeMetrics(metrics));
122114
}
123115

124116
add(
@@ -317,10 +309,7 @@ export class AggregatedAttestationPool {
317309
const slots = Array.from(this.attestationGroupByIndexByDataHexBySlot.keys()).sort((a, b) => b - a);
318310
// Track score of each `AttestationsConsolidation`
319311
const consolidations = new Map<AttestationsConsolidation, number>();
320-
let minScore = Number.MAX_SAFE_INTEGER;
321-
let slotCount = 0;
322312
slot: for (const slot of slots) {
323-
slotCount++;
324313
const attestationGroupByIndexByDataHash = this.attestationGroupByIndexByDataHexBySlot.get(slot);
325314
// should not happen
326315
if (!attestationGroupByIndexByDataHash) {
@@ -338,34 +327,30 @@ export class AggregatedAttestationPool {
338327
}
339328

340329
const slotDelta = stateSlot - slot;
341-
// CommitteeIndex 0 1 2 ... Consolidation
330+
// CommitteeIndex 0 1 2 ... Consolidationn (sameAttDataCons)
342331
// Attestations att00 --- att10 --- att20 --- 0 (att 00 10 20)
343332
// att01 --- - --- att21 --- 1 (att 01 __ 21)
344333
// - --- - --- att22 --- 2 (att __ __ 22)
345334
for (const attestationGroupByIndex of attestationGroupByIndexByDataHash.values()) {
346335
// sameAttDataCons could be up to MAX_ATTESTATIONS_PER_GROUP_ELECTRA
347336
const sameAttDataCons: AttestationsConsolidation[] = [];
337+
const allAttestationGroups = Array.from(attestationGroupByIndex.values());
338+
if (allAttestationGroups.length === 0) {
339+
continue;
340+
}
341+
342+
if (!validateAttestationDataFn(allAttestationGroups[0].data)) {
343+
continue;
344+
}
345+
348346
for (const [committeeIndex, attestationGroup] of attestationGroupByIndex.entries()) {
349347
const notSeenAttestingIndices = notSeenValidatorsFn(epoch, slot, committeeIndex);
350348
if (notSeenAttestingIndices === null || notSeenAttestingIndices.size === 0) {
351349
continue;
352350
}
353351

354-
if (
355-
slotCount > 2 &&
356-
consolidations.size >= MAX_ATTESTATIONS_ELECTRA &&
357-
notSeenAttestingIndices.size / slotDelta < minScore
358-
) {
359-
// after 2 slots, there are a good chance that we have 2 * MAX_ATTESTATIONS_ELECTRA attestations and break the for loop early
360-
// if not, we may have to scan all slots in the pool
361-
// if we have enough attestations and the max possible score is lower than scores of `attestationsByScore`, we should skip
362-
// otherwise it takes time to check attestation, add it and remove it later after the sort by score
363-
continue;
364-
}
365-
366-
if (!validateAttestationDataFn(attestationGroup.data)) {
367-
continue;
368-
}
352+
// cannot apply this optimization like pre-electra because consolidation needs to be done across committees:
353+
// "after 2 slots, there are a good chance that we have 2 * MAX_ATTESTATIONS_ELECTRA attestations and break the for loop early"
369354

370355
// TODO: Is it necessary to validateAttestation for:
371356
// - Attestation committee index not within current committee count
@@ -387,18 +372,16 @@ export class AggregatedAttestationPool {
387372
sameAttDataCons[i].byCommittee.set(committeeIndex, attestationNonParticipation);
388373
sameAttDataCons[i].totalNotSeenCount += attestationNonParticipation.notSeenAttesterCount;
389374
}
390-
for (const consolidation of sameAttDataCons) {
391-
const score = consolidation.totalNotSeenCount / slotDelta;
392-
if (score < minScore) {
393-
minScore = score;
394-
}
375+
} // all committees are processed
395376

396-
consolidations.set(consolidation, score);
377+
// after all committees are processed, we have a list of sameAttDataCons
378+
for (const consolidation of sameAttDataCons) {
379+
const score = consolidation.totalNotSeenCount / slotDelta;
380+
consolidations.set(consolidation, score);
397381

398-
// Stop accumulating attestations there are enough that may have good scoring
399-
if (consolidations.size >= MAX_ATTESTATIONS_ELECTRA * 2) {
400-
break slot;
401-
}
382+
// Stop accumulating attestations there are enough that may have good scoring
383+
if (consolidations.size >= MAX_ATTESTATIONS_ELECTRA * 2) {
384+
break slot;
402385
}
403386
}
404387
}
@@ -440,6 +423,53 @@ export class AggregatedAttestationPool {
440423
}
441424
return attestations;
442425
}
426+
427+
private onScrapeMetrics({opPool}: Metrics): void {
428+
const metrics = opPool.aggregatedAttestationPool;
429+
const allSlots = Array.from(this.attestationGroupByIndexByDataHexBySlot.keys());
430+
431+
// always record the previous slot because the current slot may not be finished yet, we may receive more attestations
432+
if (allSlots.length > 1) {
433+
// last item is current slot, we want the previous one
434+
const previousSlot = allSlots.at(-2);
435+
if (previousSlot == null) {
436+
// only happen right after we start the node
437+
return;
438+
}
439+
440+
const groupByIndexByDataHex = this.attestationGroupByIndexByDataHexBySlot.get(previousSlot);
441+
if (groupByIndexByDataHex != null) {
442+
metrics.attDataPerSlot.set(groupByIndexByDataHex.size);
443+
444+
let maxAttestations = 0;
445+
let committeeCount = 0;
446+
for (const groupByIndex of groupByIndexByDataHex.values()) {
447+
for (const group of groupByIndex.values()) {
448+
const attestationCount = group.getAttestationCount();
449+
maxAttestations = Math.max(maxAttestations, attestationCount);
450+
metrics.attestationsPerCommittee.observe(attestationCount);
451+
committeeCount += 1;
452+
}
453+
}
454+
metrics.maxAttestationsPerCommittee.set(maxAttestations);
455+
metrics.committeesPerSlot.set(committeeCount);
456+
}
457+
}
458+
459+
let attestationCount = 0;
460+
let attestationDataCount = 0;
461+
for (const attestationGroupByIndexByDataHex of this.attestationGroupByIndexByDataHexBySlot.values()) {
462+
for (const attestationGroupByIndex of attestationGroupByIndexByDataHex.values()) {
463+
attestationDataCount += attestationGroupByIndex.size;
464+
for (const attestationGroup of attestationGroupByIndex.values()) {
465+
attestationCount += attestationGroup.getAttestationCount();
466+
}
467+
}
468+
}
469+
470+
metrics.poolSize.set(attestationCount);
471+
metrics.poolUniqueData.set(attestationDataCount);
472+
}
443473
}
444474

445475
interface AttestationWithIndex {
@@ -815,7 +845,14 @@ function isValidShuffling(
815845

816846
let attestationDependentRoot: string;
817847
try {
818-
attestationDependentRoot = forkChoice.getDependentRoot(beaconBlock, EpochDifference.previous);
848+
// should not use forkChoice.getDependentRoot directly, see https://github.com/ChainSafe/lodestar/issues/7651
849+
// attestationDependentRoot = forkChoice.getDependentRoot(beaconBlock, EpochDifference.previous);
850+
attestationDependentRoot = getShufflingDependentRoot(
851+
forkChoice,
852+
targetEpoch,
853+
computeEpochAtSlot(beaconBlock.slot),
854+
beaconBlock
855+
);
819856
} catch (_) {
820857
// getDependent root may throw error if the dependent root of attestation data is prior to finalized slot
821858
// ignore this attestation data in that case since we're not sure it's compatible to the state

packages/beacon-node/src/metrics/metrics/lodestar.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -862,17 +862,34 @@ export function createLodestarMetrics(
862862
},
863863

864864
opPool: {
865-
// Note: Current opPool metrics only track current size.
866-
// I don't believe tracking total add() count is relevant since that can be seen with gossip ACCEPTs
867-
aggregatedAttestationPoolSize: register.gauge({
868-
name: "lodestar_oppool_aggregated_attestation_pool_size",
869-
help: "Current size of the AggregatedAttestationPool = total attestations",
870-
}),
871-
/** This metric helps view how many overlapping attestations we keep per data on average */
872-
aggregatedAttestationPoolUniqueData: register.gauge({
873-
name: "lodestar_oppool_aggregated_attestation_pool_unique_data_count",
874-
help: "Current size of the AggregatedAttestationPool = total attestations unique by data",
875-
}),
865+
aggregatedAttestationPool: {
866+
poolSize: register.gauge({
867+
name: "lodestar_oppool_aggregated_attestation_pool_size",
868+
help: "Current size of the AggregatedAttestationPool = total attestations",
869+
}),
870+
poolUniqueData: register.gauge({
871+
name: "lodestar_oppool_aggregated_attestation_pool_unique_data_count",
872+
help: "Current size of the AggregatedAttestationPool = total attestations unique by data",
873+
}),
874+
attDataPerSlot: register.gauge({
875+
name: "lodestar_oppool_aggregated_attestation_pool_attestation_data_per_slot_total",
876+
help: "Total number of attestation data per slot in AggregatedAttestationPool",
877+
}),
878+
committeesPerSlot: register.gauge({
879+
name: "lodestar_oppool_aggregated_attestation_pool_committees_per_slot_total",
880+
help: "Total number of committees per slot in AggregatedAttestationPool",
881+
}),
882+
// max number of attestations per committee will become number of consolidations
883+
maxAttestationsPerCommittee: register.gauge({
884+
name: "lodestar_oppool_aggregated_attestation_pool_max_attestations_per_committee",
885+
help: "Max number of attestations per committee in AggregatedAttestationPool",
886+
}),
887+
attestationsPerCommittee: register.histogram({
888+
name: "lodestar_oppool_aggregated_attestation_pool_attestations_per_committee",
889+
help: "Number of attestations per committee in AggregatedAttestationPool",
890+
buckets: [0, 2, 4, 8],
891+
}),
892+
},
876893
attestationPoolSize: register.gauge({
877894
name: "lodestar_oppool_attestation_pool_size",
878895
help: "Current size of the AttestationPool = total attestations unique by data and slot",

packages/beacon-node/test/unit/chain/opPools/aggregatedAttestationPool.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ describe("AggregatedAttestationPool - Altair", () => {
126126
aggregationBits.getTrueBitIndexes().length,
127127
committee
128128
);
129-
forkchoiceStub.getBlockHex.mockReturnValue(generateProtoBlock());
129+
forkchoiceStub.getBlockHex.mockReturnValue(generateProtoBlock({slot: attestation.data.slot}));
130130
forkchoiceStub.getDependentRoot.mockReturnValue(ZERO_HASH_HEX);
131131
if (isReturned) {
132132
expect(pool.getAttestationsForBlock(fork, forkchoiceStub, altairState).length).toBeGreaterThan(0);
@@ -152,7 +152,7 @@ describe("AggregatedAttestationPool - Altair", () => {
152152
// all attesters are not seen
153153
const attestingIndices = [2, 3];
154154
pool.add(attestation, attDataRootHex, attestingIndices.length, committee);
155-
forkchoiceStub.getBlockHex.mockReturnValue(generateProtoBlock());
155+
forkchoiceStub.getBlockHex.mockReturnValue(generateProtoBlock({slot: attestation.data.slot}));
156156
forkchoiceStub.getDependentRoot.mockReturnValue("0xWeird");
157157
expect(pool.getAttestationsForBlock(fork, forkchoiceStub, altairState)).toEqual([]);
158158
// "forkchoice should be called to check pivot block"

0 commit comments

Comments
 (0)