diff --git a/plenum/common/stack_manager.py b/plenum/common/stack_manager.py index 40dd66b2ab..8d1d8e21fd 100644 --- a/plenum/common/stack_manager.py +++ b/plenum/common/stack_manager.py @@ -282,25 +282,6 @@ def nodeExistsInLedger(self, nym): def nodeIds(self) -> set: return {get_payload_data(txn)[TARGET_NYM] for _, txn in self.ledger.getAllTxn()} - def getNodeInfoFromLedger(self, nym, excludeLast=True): - # Returns the info of the node from the ledger with transaction - # sequence numbers that added or updated the info excluding the last - # update transaction. The reason for ignoring last transactions is that - # it is used after update to the ledger has already been made - txns = [] - nodeTxnSeqNos = [] - for seqNo, txn in self.ledger.getAllTxn(): - txn_data = get_payload_data(txn) - if get_type(txn) == NODE and txn_data[TARGET_NYM] == nym: - txns.append(txn) - nodeTxnSeqNos.append(seqNo) - info = {} - if len(txns) > 1 and excludeLast: - txns = txns[:-1] - for txn in txns: - self.updateNodeTxns(info, get_payload_data(txn)) - return nodeTxnSeqNos, info - def getNodesServices(self): # Returns services for each node srvs = dict() diff --git a/plenum/server/pool_manager.py b/plenum/server/pool_manager.py index 06f5416fe5..cbc30c8b7c 100644 --- a/plenum/server/pool_manager.py +++ b/plenum/server/pool_manager.py @@ -21,7 +21,7 @@ NODE_IP, NODE_PORT, CLIENT_IP, CLIENT_PORT, VERKEY, SERVICES, \ VALIDATOR, CLIENT_STACK_SUFFIX, POOL_LEDGER_ID, DOMAIN_LEDGER_ID, BLS_KEY from plenum.common.stack_manager import TxnStackManager -from plenum.common.txn_util import get_type, get_payload_data +from plenum.common.txn_util import get_type, get_payload_data, get_seq_no from plenum.persistence.util import pop_merkle_info from plenum.server.pool_req_handler import PoolRequestHandler @@ -214,6 +214,8 @@ def executePoolTxnBatch(self, ppTime, reqs, stateRoot, txnRoot) -> List: return committedTxns def onPoolMembershipChange(self, txn): + # `onPoolMembershipChange` method can be called only after txn added to ledger + if get_type(txn) != NODE: return @@ -224,7 +226,7 @@ def onPoolMembershipChange(self, txn): nodeName = txn_data[DATA][ALIAS] nodeNym = txn_data[TARGET_NYM] - self._set_node_order(nodeNym, nodeName) + self._set_node_ids_in_cash(nodeNym, nodeName) def _updateNode(txn_data): if SERVICES in txn_data[DATA]: @@ -238,20 +240,17 @@ def _updateNode(txn_data): if BLS_KEY in txn_data[DATA]: self.node_blskey_changed(txn_data) - seqNos, info = self.getNodeInfoFromLedger(nodeNym) - - # `onPoolMembershipChange` method can be called only after txn added to ledger - if len(seqNos) == 0: - raise LogicError("There are no txns in ledger for nym {}".format(nodeNym)) - - # If there is only one transaction has been made to this nym, - # that means, that this is a new node transaction - if len(seqNos) == 1: + # If nodeNym is never added in self._ordered_node_services, + # nodeNym is never added in ledger + if nodeNym not in self._ordered_node_services: + txn_data[DATA].setdefault(SERVICES, []) if VALIDATOR in txn_data[DATA].get(SERVICES, []): self.addNewNodeAndConnect(txn_data) else: _updateNode(txn_data) + self._set_node_services_in_cash(nodeNym, txn_data[DATA].get(SERVICES, None)) + def addNewNodeAndConnect(self, txn_data): nodeName = txn_data[DATA][ALIAS] if nodeName == self.name: @@ -334,9 +333,8 @@ def nodeKeysChanged(self, txn_data): def nodeServicesChanged(self, txn_data): nodeNym = txn_data[TARGET_NYM] - _, nodeInfo = self.getNodeInfoFromLedger(nodeNym) - nodeName = nodeInfo[DATA][ALIAS] - oldServices = set(nodeInfo[DATA].get(SERVICES, [])) + nodeName = self.getNodeName(nodeNym) + oldServices = set(self._ordered_node_services.get(nodeNym, [])) newServices = set(txn_data[DATA].get(SERVICES, [])) if oldServices == newServices: logger.info("Node {} not changing {} since it is same as existing".format(nodeNym, SERVICES)) @@ -344,15 +342,18 @@ def nodeServicesChanged(self, txn_data): else: if VALIDATOR in newServices.difference(oldServices): # If validator service is enabled - self.node.nodeReg[nodeName] = HA(nodeInfo[DATA][NODE_IP], - nodeInfo[DATA][NODE_PORT]) - self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA( - nodeInfo[DATA][CLIENT_IP], nodeInfo[DATA][CLIENT_PORT]) - self.updateNodeTxns(nodeInfo, txn_data) + node_info = self.reqHandler.getNodeData(nodeNym) + self.node.nodeReg[nodeName] = HA(node_info[NODE_IP], + node_info[NODE_PORT]) + self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA(node_info[CLIENT_IP], + node_info[CLIENT_PORT]) + + self.updateNodeTxns({DATA: node_info, }, txn_data) self.node.nodeJoined(txn_data) if self.name != nodeName: - self.connectNewRemote(nodeInfo, nodeName, self.node) + self.connectNewRemote({DATA: node_info, + TARGET_NYM: nodeNym}, nodeName, self.node) if VALIDATOR in oldServices.difference(newServices): # If validator service is disabled @@ -380,8 +381,7 @@ def node_blskey_changed(self, txn_data): def getNodeName(self, nym): # Assuming ALIAS does not change - _, nodeTxn = self.getNodeInfoFromLedger(nym) - return nodeTxn[DATA][ALIAS] + return self._ordered_node_ids[nym] @property def merkleRootHash(self) -> str: @@ -391,10 +391,6 @@ def merkleRootHash(self) -> str: def txnSeqNo(self) -> int: return self.ledger.seqNo - def getNodeData(self, nym): - _, nodeTxn = self.getNodeInfoFromLedger(nym) - return nodeTxn[DATA] - # Question: Why are `_isIpAddressValid` and `_isPortValid` part of # pool_manager? @staticmethod @@ -421,24 +417,32 @@ def id(self): def _load_nodes_order_from_ledger(self): self._ordered_node_ids = OrderedDict() + self._ordered_node_services = {} for _, txn in self.ledger.getAllTxn(): if get_type(txn) == NODE: txn_data = get_payload_data(txn) - self._set_node_order(txn_data[TARGET_NYM], txn_data[DATA][ALIAS]) + self._set_node_ids_in_cash(txn_data[TARGET_NYM], + txn_data[DATA][ALIAS]) + self._set_node_services_in_cash(txn_data[TARGET_NYM], + txn_data[DATA].get(SERVICES, None)) - def _set_node_order(self, nodeNym, nodeName): - curName = self._ordered_node_ids.get(nodeNym) + def _set_node_ids_in_cash(self, node_nym, node_name): + curName = self._ordered_node_ids.get(node_nym) if curName is None: - self._ordered_node_ids[nodeNym] = nodeName + self._ordered_node_ids[node_nym] = node_name logger.info("{} sets node {} ({}) order to {}".format( - self.name, nodeName, nodeNym, - len(self._ordered_node_ids[nodeNym]))) - elif curName != nodeName: + self.name, node_name, node_nym, + len(self._ordered_node_ids[node_nym]))) + elif curName != node_name: msg = "{} is trying to order already ordered node {} ({}) with other alias {}" \ - .format(self.name, curName, nodeNym, nodeName) + .format(self.name, curName, node_nym, node_name) logger.error(msg) raise LogicError(msg) + def _set_node_services_in_cash(self, node_nym, node_services): + if node_services is not None: + self._ordered_node_services[node_nym] = node_services + def node_ids_ordered_by_rank(self, nodeReg=None) -> List: if nodeReg is None: nodeReg = self.nodeReg diff --git a/plenum/test/pool_transactions/conftest.py b/plenum/test/pool_transactions/conftest.py index 7ace61bc13..0e70eacf9c 100644 --- a/plenum/test/pool_transactions/conftest.py +++ b/plenum/test/pool_transactions/conftest.py @@ -1,5 +1,7 @@ import pytest +from plenum.common.constants import NODE +from plenum.common.txn_util import get_type from plenum.common.util import randomString from plenum.test.test_node import checkNodesConnected, TestNode from plenum.test.pool_transactions.helper import \ @@ -43,3 +45,13 @@ def sdk_node_theta_added(looper, looper.run(checkNodesConnected(txnPoolNodeSet)) sdk_pool_refresh(looper, sdk_pool_handle) return new_steward_wallet, new_node + + +@pytest.fixture() +def pool_node_txns(poolTxnData): + node_txns = [] + for txn in poolTxnData["txns"]: + if get_type(txn) == NODE: + node_txns.append(txn) + return node_txns + diff --git a/plenum/test/pool_transactions/test_add_node_with_invalid_key_proof.py b/plenum/test/pool_transactions/test_add_node_with_invalid_key_proof.py index 7f531b7cb9..b46ce8b677 100644 --- a/plenum/test/pool_transactions/test_add_node_with_invalid_key_proof.py +++ b/plenum/test/pool_transactions/test_add_node_with_invalid_key_proof.py @@ -1,12 +1,9 @@ import pytest -from plenum.common.config_helper import PNodeConfigHelper from plenum.common.exceptions import RequestNackedException from plenum.test.helper import sdk_get_and_check_replies from plenum.common.constants import VALIDATOR from plenum.test.pool_transactions.helper import prepare_new_node_data, \ - prepare_node_request, sdk_sign_and_send_prepared_request, \ - create_and_start_new_node -from plenum.test.test_node import TestNode + prepare_node_request, sdk_sign_and_send_prepared_request def test_add_node_with_invalid_key_proof(looper, diff --git a/plenum/test/pool_transactions/test_nodes_data_changed.py b/plenum/test/pool_transactions/test_nodes_data_changed.py index d31646b182..64e4b64027 100644 --- a/plenum/test/pool_transactions/test_nodes_data_changed.py +++ b/plenum/test/pool_transactions/test_nodes_data_changed.py @@ -9,7 +9,7 @@ from plenum.common.util import randomString, hexToFriendly from plenum.test.pool_transactions.helper import sdk_send_update_node, \ sdk_add_new_steward_and_node, sdk_pool_refresh, \ - update_node_data_and_reconnect + update_node_data_and_reconnect, demote_node from plenum.test.test_node import checkNodesConnected from stp_core.common.log import getlogger @@ -64,7 +64,7 @@ def testNodePortChanged(looper, txnPoolNodeSet, sdk_node_theta_added, tdir, tconf): """ - An running node's port is changed + A running node's port is changed """ new_steward_wallet, new_node = sdk_node_theta_added diff --git a/plenum/test/pool_transactions/test_on_pool_membership_changes.py b/plenum/test/pool_transactions/test_on_pool_membership_changes.py new file mode 100644 index 0000000000..37b60960cc --- /dev/null +++ b/plenum/test/pool_transactions/test_on_pool_membership_changes.py @@ -0,0 +1,54 @@ +import pytest + +from plenum.common.config_helper import PNodeConfigHelper +from plenum.test.test_node import TestNode + +from plenum.common.txn_util import get_payload_data + +from plenum.common.constants import TARGET_NYM, DATA, SERVICES, VALIDATOR, TXN_PAYLOAD +from plenum.test.testing_utils import FakeSomething + + +@pytest.fixture(scope='function') +def test_node(tdirWithPoolTxns, + tdirWithDomainTxns, + poolTxnNodeNames, + tdirWithNodeKeepInited, + tdir, + tconf, + allPluginsPath): + node_name = poolTxnNodeNames[0] + config_helper = PNodeConfigHelper(node_name, tconf, chroot=tdir) + node = TestNode( + node_name, + config_helper=config_helper, + config=tconf, + pluginPaths=allPluginsPath) + node.view_changer = FakeSomething(view_change_in_progress=False, + view_no=0) + yield node + node.onStopping() + + +def test_on_pool_membership_changes(test_node, pool_node_txns): + def get_all_txn(): + assert False, "ledger.getAllTxn() shouldn't be called in onPoolMembershipChange()" + + pool_manager = test_node.poolManager + pool_manager.ledger.getAllTxn = get_all_txn + + txn = pool_node_txns[0] + node_nym = get_payload_data(txn)[TARGET_NYM] + + txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = [] + assert node_nym in pool_manager._ordered_node_services + pool_manager.onPoolMembershipChange(txn) + assert pool_manager._ordered_node_services[node_nym] == [] + + txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = [VALIDATOR] + pool_manager.onPoolMembershipChange(txn) + assert pool_manager._ordered_node_services[node_nym] == [VALIDATOR] + + txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = [] + pool_manager.onPoolMembershipChange(pool_node_txns[0]) + assert pool_manager._ordered_node_services[node_nym] == [] diff --git a/plenum/test/pool_transactions/test_txn_pool_manager.py b/plenum/test/pool_transactions/test_txn_pool_manager.py index 9d4a79fbc0..94526a8bf1 100644 --- a/plenum/test/pool_transactions/test_txn_pool_manager.py +++ b/plenum/test/pool_transactions/test_txn_pool_manager.py @@ -1,4 +1,7 @@ import pytest + +from plenum.common.config_helper import PNodeConfigHelper +from plenum.test.test_node import TestNode from stp_core.loop.eventually import eventually from plenum.common.metrics_collector import MetricsName @@ -7,22 +10,14 @@ from plenum.common.txn_util import get_type, get_payload_data -from plenum.common.constants import TARGET_NYM, NODE, CLIENT_STACK_SUFFIX, DATA, ALIAS, SERVICES +from plenum.common.constants import TARGET_NYM, NODE, \ + CLIENT_STACK_SUFFIX, DATA, ALIAS, SERVICES, VALIDATOR, TXN_PAYLOAD from plenum.test.pool_transactions.helper import demote_node nodeCount = 7 nodes_wth_bls = 0 -@pytest.fixture() -def pool_node_txns(poolTxnData): - node_txns = [] - for txn in poolTxnData["txns"]: - if get_type(txn) == NODE: - node_txns.append(txn) - return node_txns - - def test_twice_demoted_node_dont_write_txns(txnPoolNodeSet, looper, sdk_wallet_stewards, sdk_pool_handle): request_count = 5