Skip to content

Commit c6dab4c

Browse files
fix: manage StakingInfos in restart (#12911)
Signed-off-by: Michael Tinker <[email protected]> Signed-off-by: Neeharika-Sompalli <[email protected]> Co-authored-by: Neeharika-Sompalli <[email protected]>
1 parent a98c580 commit c6dab4c

File tree

10 files changed

+350
-333
lines changed

10 files changed

+350
-333
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/state/HederaLifecyclesImpl.java

Lines changed: 74 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@
2626
import com.hedera.node.app.service.mono.state.merkle.MerkleStakingInfo;
2727
import com.hedera.node.app.service.mono.state.migration.StateChildIndices;
2828
import com.hedera.node.app.service.mono.utils.EntityNum;
29-
import com.hedera.node.app.service.token.ReadableStakingInfoStore;
3029
import com.hedera.node.app.service.token.TokenService;
31-
import com.hedera.node.app.spi.state.WritableKVState;
32-
import com.hedera.node.app.spi.state.WritableKVStateBase;
3330
import com.hedera.node.app.state.merkle.HederaLifecycles;
3431
import com.hedera.node.app.state.merkle.MerkleHederaState;
35-
import com.hedera.node.app.workflows.dispatcher.ReadableStoreFactory;
32+
import com.hedera.node.app.state.merkle.disk.OnDiskKey;
33+
import com.hedera.node.app.state.merkle.disk.OnDiskValue;
3634
import com.swirlds.common.context.PlatformContext;
3735
import com.swirlds.common.platform.NodeId;
36+
import com.swirlds.common.threading.manager.AdHocThreadManager;
3837
import com.swirlds.config.api.Configuration;
3938
import com.swirlds.merkle.map.MerkleMap;
4039
import com.swirlds.platform.state.PlatformState;
@@ -44,11 +43,13 @@
4443
import com.swirlds.platform.system.SoftwareVersion;
4544
import com.swirlds.platform.system.address.AddressBook;
4645
import com.swirlds.platform.system.events.Event;
46+
import com.swirlds.virtualmap.VirtualMap;
47+
import com.swirlds.virtualmap.VirtualMapMigration;
4748
import edu.umd.cs.findbugs.annotations.NonNull;
4849
import edu.umd.cs.findbugs.annotations.Nullable;
4950
import java.util.Arrays;
50-
import java.util.Objects;
5151
import java.util.Set;
52+
import java.util.function.BiConsumer;
5253
import org.apache.logging.log4j.LogManager;
5354
import org.apache.logging.log4j.Logger;
5455

@@ -60,10 +61,42 @@ public class HederaLifecyclesImpl implements HederaLifecycles {
6061
private static final long LEDGER_TOTAL_TINY_BAR_FLOAT = 5000000000000000000L;
6162
private static final int NUM_REWARD_HISTORY_STORED_PERIODS = 365;
6263

64+
private static final BiConsumer<
65+
VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>,
66+
BiConsumer<EntityNumber, StakingNodeInfo>>
67+
WEIGHT_UPDATE_VISITOR = (map, visitor) -> {
68+
try {
69+
VirtualMapMigration.extractVirtualMapData(
70+
AdHocThreadManager.getStaticThreadManager(),
71+
map,
72+
pair -> visitor.accept(
73+
pair.key().getKey(), pair.value().getValue()),
74+
1);
75+
} catch (InterruptedException e) {
76+
logger.error("Interrupted while updating weights", e);
77+
throw new IllegalStateException(e);
78+
}
79+
};
80+
6381
private final Hedera hedera;
82+
private final BiConsumer<
83+
VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>,
84+
BiConsumer<EntityNumber, StakingNodeInfo>>
85+
weightUpdateVisitor;
6486

6587
public HederaLifecyclesImpl(@NonNull final Hedera hedera) {
66-
this.hedera = Objects.requireNonNull(hedera);
88+
this(hedera, WEIGHT_UPDATE_VISITOR);
89+
}
90+
91+
public HederaLifecyclesImpl(
92+
@NonNull final Hedera hedera,
93+
@NonNull
94+
final BiConsumer<
95+
VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>,
96+
BiConsumer<EntityNumber, StakingNodeInfo>>
97+
weightUpdateVisitor) {
98+
this.hedera = requireNonNull(hedera);
99+
this.weightUpdateVisitor = requireNonNull(weightUpdateVisitor);
67100
}
68101

69102
@Override
@@ -92,56 +125,40 @@ public void onUpdateWeight(
92125
@NonNull final MerkleHederaState state,
93126
@NonNull final AddressBook configAddressBook,
94127
@NonNull final PlatformContext context) {
95-
final var tokenServiceState = state.getWritableStates(TokenService.NAME);
128+
final var isMonoState = state.getChild(StateChildIndices.STAKING_INFO) instanceof MerkleMap;
96129
// Get all nodeIds added in the config.txt
97-
Set<NodeId> configNodeIds = configAddressBook.getNodeIdSet();
98-
final var configAddressBookSize = configNodeIds.size();
99-
if (!tokenServiceState.isEmpty()) {
100-
final var stakingInfoState = tokenServiceState.<EntityNumber, StakingNodeInfo>get(STAKING_INFO_KEY);
101-
final var readableStoreFactory = new ReadableStoreFactory(state);
102-
final var stakingInfoStore = readableStoreFactory.getStore(ReadableStakingInfoStore.class);
103-
final var allNodeIds = stakingInfoStore.getAll();
104-
for (final var nodeId : allNodeIds) {
105-
final var stakingInfo = requireNonNull(stakingInfoStore.get(nodeId));
106-
NodeId id = new NodeId(nodeId);
107-
// set weight for the nodes that exist in state and remove from
108-
// nodes given in config.txt. This is needed to recognize newly added nodes
109-
// It is possible that some nodes are deleted in configAddressBook compared to state
110-
// We will set those node sas deleted in EndOfStakingPeriodCalculator
111-
if (configNodeIds.contains(id)) {
112-
configAddressBook.updateWeight(id, stakingInfo.weight());
113-
configNodeIds.remove(id);
114-
} else {
115-
// We need to validate and mark any node that are removed during upgrade as deleted
116-
// and also set the weight to 0
117-
stakingInfoState.put(
118-
EntityNumber.newBuilder().number(id.id()).build(),
119-
stakingInfo.copyBuilder().deleted(true).weight(0).build());
120-
logger.info(
121-
"Node {} is deleted from configAddressBook during upgrade "
122-
+ "and is marked deleted in state",
123-
id);
124-
}
130+
final var nodeIdsLeftToUpdate = configAddressBook.getNodeIdSet();
131+
if (!isMonoState) {
132+
final var stakingInfoIndex = state.findNodeIndex(TokenService.NAME, STAKING_INFO_KEY);
133+
if (stakingInfoIndex == -1) {
134+
logger.warn("Staking info not found in state, skipping weight update");
135+
return;
125136
}
126-
// for any newly added nodes that doesn't exist in state, weight should be set to 0
127-
// irrespective of the weight provided in config.txt
128-
if (!configNodeIds.isEmpty()) {
129-
configNodeIds.forEach(nodeId -> {
130-
configAddressBook.updateWeight(nodeId, 0);
137+
@SuppressWarnings("unchecked")
138+
final var stakingInfoVMap = (VirtualMap<OnDiskKey<EntityNumber>, OnDiskValue<StakingNodeInfo>>)
139+
state.getChild(stakingInfoIndex);
140+
// Since it is much easier to modify the in-state staking info after schemas
141+
// are registered with MerkleHederaState, we do that work later in the token
142+
// service schema's restart() hook. Here we only update the address book weights
143+
// based on the staking info in the state.
144+
weightUpdateVisitor.accept(stakingInfoVMap, (node, info) -> {
145+
final var nodeId = new NodeId(node.number());
146+
// If present in the address book, remove this node id from the
147+
// set of node ids left to update and update its weight
148+
if (nodeIdsLeftToUpdate.remove(nodeId)) {
149+
configAddressBook.updateWeight(nodeId, info.weight());
150+
} else {
131151
logger.info(
132-
"Node {} is newly added in configAddressBook during upgrade "
133-
+ "with weight 0 in configAddressBook",
152+
"Node {} is no longer in address book; restart() will ensure its staking info is marked deleted",
134153
nodeId);
135-
});
136-
// update the state with new nodes and set weight to 0
137-
addAdditionalNodesToState(
138-
stakingInfoState, configNodeIds, context.getConfiguration(), configAddressBookSize);
139-
}
140-
141-
if (stakingInfoState.isModified()) {
142-
((WritableKVStateBase) stakingInfoState).commit();
143-
}
154+
}
155+
});
156+
nodeIdsLeftToUpdate.forEach(nodeId -> {
157+
configAddressBook.updateWeight(nodeId, 0);
158+
logger.info("Found new node {}; restart() will add its staking info", nodeId);
159+
});
144160
} else {
161+
final var configAddressBookSize = nodeIdsLeftToUpdate.size();
145162
// When doing the first upgrade from 0.48 to 0.49, we will call updateWeight before BBM migration.
146163
// In this case, we need to update the weight in the stakingInfo map from mono service state.
147164
logger.info("Token service state is empty, so we are migrating from mono to mod-service with "
@@ -150,9 +167,9 @@ public void onUpdateWeight(
150167
(MerkleMap<EntityNum, MerkleStakingInfo>) state.getChild(StateChildIndices.STAKING_INFO));
151168
stakingInfosMap.forEachNode((nodeNum, stakingInfo) -> {
152169
final NodeId nodeId = new NodeId(nodeNum.longValue());
153-
if (configNodeIds.contains(nodeId)) {
170+
if (nodeIdsLeftToUpdate.contains(nodeId)) {
154171
configAddressBook.updateWeight(nodeId, stakingInfo.getWeight());
155-
configNodeIds.remove(nodeId);
172+
nodeIdsLeftToUpdate.remove(nodeId);
156173
} else {
157174
final var newStakingInfo = stakingInfosMap.getForModify(nodeNum);
158175
newStakingInfo.setWeight(0);
@@ -164,8 +181,8 @@ public void onUpdateWeight(
164181
});
165182
// for any newly added nodes that doesn't exist in state, weight should be set to 0
166183
// irrespective of the weight provided in config.txt
167-
if (!configNodeIds.isEmpty()) {
168-
configNodeIds.forEach(nodeId -> {
184+
if (!nodeIdsLeftToUpdate.isEmpty()) {
185+
nodeIdsLeftToUpdate.forEach(nodeId -> {
169186
configAddressBook.updateWeight(nodeId, 0);
170187
logger.info(
171188
"Node {} is newly added in configAddressBook during upgrade "
@@ -174,13 +191,14 @@ public void onUpdateWeight(
174191
});
175192
// update the state with new nodes and set weight to 0
176193
addAdditionalNodesToState(
177-
stakingInfosMap, configNodeIds, context.getConfiguration(), configAddressBookSize);
194+
stakingInfosMap, nodeIdsLeftToUpdate, context.getConfiguration(), configAddressBookSize);
178195
}
179196
}
180197
}
181198

182199
/**
183200
* Add additional nodes to state with weight 0 and update all nodes maxStake to maxStakePerNode
201+
*
184202
* @param stakingInfoMap The state to update
185203
* @param newNodeIds The new node ids to add
186204
* @param config The configuration
@@ -220,49 +238,6 @@ private void addAdditionalNodesToState(
220238
});
221239
}
222240

223-
/**
224-
* Add additional nodes to state with weight 0 and update all nodes maxStake to maxStakePerNode
225-
* @param stakingToState The state to update
226-
* @param newNodeIds The new node ids to add
227-
* @param config The configuration
228-
* @param configAddressBookSize The size of the address book
229-
*/
230-
private void addAdditionalNodesToState(
231-
@NonNull final WritableKVState<EntityNumber, StakingNodeInfo> stakingToState,
232-
@NonNull final Set<NodeId> newNodeIds,
233-
@NonNull final Configuration config,
234-
final int configAddressBookSize) {
235-
// Since PlatformContext configuration is not available here,
236-
// we are using the default values here. (FUTURE) Need to see how to use config here
237-
// for ledger.totalTinyBarFloat and staking.rewardHistory.numStoredPeriods
238-
final long maxStakePerNode = LEDGER_TOTAL_TINY_BAR_FLOAT / configAddressBookSize;
239-
240-
// Add new nodes with weight 0
241-
for (final var nodeId : newNodeIds) {
242-
final var rewardSumHistory = new Long[NUM_REWARD_HISTORY_STORED_PERIODS + 1];
243-
Arrays.fill(rewardSumHistory, 0L);
244-
245-
final var newNodeStakingInfo = StakingNodeInfo.newBuilder()
246-
.nodeNumber(nodeId.id())
247-
.maxStake(maxStakePerNode)
248-
.minStake(0L)
249-
.rewardSumHistory(Arrays.asList(rewardSumHistory))
250-
.weight(0)
251-
.build();
252-
stakingToState.put(new EntityNumber(nodeId.id()), newNodeStakingInfo);
253-
logger.info("Node {} is added in state with weight 0 and maxStakePerNode {} ", nodeId, maxStakePerNode);
254-
}
255-
// Update all nodes maxStake to maxStakePerNode
256-
stakingToState.keys().forEachRemaining(key -> {
257-
final var stakingInfo = stakingToState.get(key);
258-
final var copy = requireNonNull(stakingInfo)
259-
.copyBuilder()
260-
.maxStake(maxStakePerNode)
261-
.build();
262-
stakingToState.put(key, copy);
263-
});
264-
}
265-
266241
@Override
267242
public void onNewRecoveredState(@NonNull final MerkleHederaState recoveredState) {
268243
hedera.onNewRecoveredState(recoveredState);

hedera-node/hedera-app/src/main/java/com/hedera/node/app/workflows/handle/HandleWorkflow.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,15 @@ private void handleUserTransaction(
334334
final var feeAccumulator = createFeeAccumulator(stack, configuration, recordBuilder);
335335

336336
final var tokenServiceContext = new TokenContextImpl(
337-
configuration, storeMetricsService, stack, recordListBuilder, blockRecordManager, isFirstTransaction);
338-
// It's awful that we have to check this every time a transaction is handled, especially since this mostly
339-
// applies to non-production cases. Let's find a way to 💥💥 remove this 💥💥
337+
configuration,
338+
state,
339+
storeMetricsService,
340+
stack,
341+
recordListBuilder,
342+
blockRecordManager,
343+
isFirstTransaction);
344+
// Do any one-time work for the first transaction after genesis;
345+
// overhead for all following transactions is effectively zero
340346
genesisRecordsTimeHook.process(tokenServiceContext);
341347
try {
342348
// If this is the first user transaction after midnight, then handle staking updates prior to handling the

hedera-node/hedera-app/src/main/java/com/hedera/node/app/workflows/handle/TokenContextImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import static java.util.Objects.requireNonNull;
2222

2323
import com.hedera.node.app.records.BlockRecordManager;
24+
import com.hedera.node.app.service.token.ReadableStakingInfoStore;
2425
import com.hedera.node.app.service.token.TokenService;
2526
import com.hedera.node.app.service.token.records.FinalizeContext;
2627
import com.hedera.node.app.service.token.records.TokenContext;
2728
import com.hedera.node.app.spi.metrics.StoreMetricsService;
29+
import com.hedera.node.app.state.HederaState;
2830
import com.hedera.node.app.workflows.dispatcher.ReadableStoreFactory;
2931
import com.hedera.node.app.workflows.dispatcher.WritableStoreFactory;
3032
import com.hedera.node.app.workflows.handle.record.RecordListBuilder;
@@ -33,10 +35,12 @@
3335
import com.swirlds.config.api.Configuration;
3436
import edu.umd.cs.findbugs.annotations.NonNull;
3537
import java.time.Instant;
38+
import java.util.Set;
3639
import java.util.function.Consumer;
3740

3841
public class TokenContextImpl implements TokenContext, FinalizeContext {
3942
private final Configuration configuration;
43+
private final HederaState state;
4044
private final ReadableStoreFactory readableStoreFactory;
4145
private final WritableStoreFactory writableStoreFactory;
4246
private final RecordListBuilder recordListBuilder;
@@ -45,11 +49,13 @@ public class TokenContextImpl implements TokenContext, FinalizeContext {
4549

4650
public TokenContextImpl(
4751
@NonNull final Configuration configuration,
52+
@NonNull final HederaState state,
4853
@NonNull final StoreMetricsService storeMetricsService,
4954
@NonNull final SavepointStackImpl stack,
5055
@NonNull final RecordListBuilder recordListBuilder,
5156
@NonNull final BlockRecordManager blockRecordManager,
5257
final boolean isFirstTransaction) {
58+
this.state = requireNonNull(state, "state must not be null");
5359
requireNonNull(stack, "stack must not be null");
5460
this.configuration = requireNonNull(configuration, "configuration must not be null");
5561
this.recordListBuilder = requireNonNull(recordListBuilder, "recordListBuilder must not be null");
@@ -143,4 +149,11 @@ public boolean isScheduleDispatch() {
143149
public void markMigrationRecordsStreamed() {
144150
blockRecordManager.markMigrationRecordsStreamed();
145151
}
152+
153+
@Override
154+
public Set<Long> knownNodeIds() {
155+
return new ReadableStoreFactory(state)
156+
.getStore(ReadableStakingInfoStore.class)
157+
.getAll();
158+
}
146159
}

0 commit comments

Comments
 (0)