-
Notifications
You must be signed in to change notification settings - Fork 377
INDY-1879: faster way for pool transactions ordering #1010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ashcherbakov
merged 5 commits into
hyperledger:master
from
Toktar:task-1879-slow-pool-txns
Dec 11, 2018
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
ce33458
INDY-1879: faster way for PoolManager.getNodeName, remove getNodeData
Toktar c8082f9
INDY-1879: add _ordered_node_services to PoolManager and test
Toktar 900a49a
INDY-1879: add test_on_pool_membership_changes
Toktar e155f9d
INDY-1879: pool manager refactoring
Toktar 9de918d
INDY-1879: fix onPoolMembershipChange and add tests
Toktar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,7 @@ | |
NODE_IP, NODE_PORT, CLIENT_IP, CLIENT_PORT, VERKEY, SERVICES, \ | ||
VALIDATOR, CLIENT_STACK_SUFFIX, POOL_LEDGER_ID, DOMAIN_LEDGER_ID, BLS_KEY | ||
from plenum.common.stack_manager import TxnStackManager | ||
from plenum.common.txn_util import get_type, get_payload_data | ||
from plenum.common.txn_util import get_type, get_payload_data, get_seq_no | ||
from plenum.persistence.util import pop_merkle_info | ||
from plenum.server.pool_req_handler import PoolRequestHandler | ||
|
||
|
@@ -214,6 +214,8 @@ def executePoolTxnBatch(self, ppTime, reqs, stateRoot, txnRoot) -> List: | |
return committedTxns | ||
|
||
def onPoolMembershipChange(self, txn): | ||
# `onPoolMembershipChange` method can be called only after txn added to ledger | ||
|
||
if get_type(txn) != NODE: | ||
return | ||
|
||
|
@@ -224,7 +226,7 @@ def onPoolMembershipChange(self, txn): | |
nodeName = txn_data[DATA][ALIAS] | ||
nodeNym = txn_data[TARGET_NYM] | ||
|
||
self._set_node_order(nodeNym, nodeName) | ||
self._set_node_ids_in_cash(nodeNym, nodeName) | ||
|
||
def _updateNode(txn_data): | ||
if SERVICES in txn_data[DATA]: | ||
|
@@ -238,20 +240,17 @@ def _updateNode(txn_data): | |
if BLS_KEY in txn_data[DATA]: | ||
self.node_blskey_changed(txn_data) | ||
|
||
seqNos, info = self.getNodeInfoFromLedger(nodeNym) | ||
|
||
# `onPoolMembershipChange` method can be called only after txn added to ledger | ||
if len(seqNos) == 0: | ||
raise LogicError("There are no txns in ledger for nym {}".format(nodeNym)) | ||
|
||
# If there is only one transaction has been made to this nym, | ||
# that means, that this is a new node transaction | ||
if len(seqNos) == 1: | ||
# If nodeNym is never added in self._ordered_node_services, | ||
# nodeNym is never added in ledger | ||
if nodeNym not in self._ordered_node_services: | ||
txn_data[DATA].setdefault(SERVICES, []) | ||
if VALIDATOR in txn_data[DATA].get(SERVICES, []): | ||
self.addNewNodeAndConnect(txn_data) | ||
else: | ||
_updateNode(txn_data) | ||
|
||
self._set_node_services_in_cash(nodeNym, txn_data[DATA].get(SERVICES, None)) | ||
|
||
def addNewNodeAndConnect(self, txn_data): | ||
nodeName = txn_data[DATA][ALIAS] | ||
if nodeName == self.name: | ||
|
@@ -334,25 +333,27 @@ def nodeKeysChanged(self, txn_data): | |
|
||
def nodeServicesChanged(self, txn_data): | ||
nodeNym = txn_data[TARGET_NYM] | ||
_, nodeInfo = self.getNodeInfoFromLedger(nodeNym) | ||
nodeName = nodeInfo[DATA][ALIAS] | ||
oldServices = set(nodeInfo[DATA].get(SERVICES, [])) | ||
nodeName = self.getNodeName(nodeNym) | ||
oldServices = set(self._ordered_node_services.get(nodeNym, [])) | ||
newServices = set(txn_data[DATA].get(SERVICES, [])) | ||
if oldServices == newServices: | ||
logger.info("Node {} not changing {} since it is same as existing".format(nodeNym, SERVICES)) | ||
return | ||
else: | ||
if VALIDATOR in newServices.difference(oldServices): | ||
# If validator service is enabled | ||
self.node.nodeReg[nodeName] = HA(nodeInfo[DATA][NODE_IP], | ||
nodeInfo[DATA][NODE_PORT]) | ||
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA( | ||
nodeInfo[DATA][CLIENT_IP], nodeInfo[DATA][CLIENT_PORT]) | ||
self.updateNodeTxns(nodeInfo, txn_data) | ||
node_info = self.reqHandler.getNodeData(nodeNym) | ||
self.node.nodeReg[nodeName] = HA(node_info[NODE_IP], | ||
node_info[NODE_PORT]) | ||
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA(node_info[CLIENT_IP], | ||
node_info[CLIENT_PORT]) | ||
|
||
self.updateNodeTxns({DATA: node_info, }, txn_data) | ||
self.node.nodeJoined(txn_data) | ||
|
||
if self.name != nodeName: | ||
self.connectNewRemote(nodeInfo, nodeName, self.node) | ||
self.connectNewRemote({DATA: node_info, | ||
TARGET_NYM: nodeNym}, nodeName, self.node) | ||
|
||
if VALIDATOR in oldServices.difference(newServices): | ||
# If validator service is disabled | ||
|
@@ -380,8 +381,7 @@ def node_blskey_changed(self, txn_data): | |
|
||
def getNodeName(self, nym): | ||
# Assuming ALIAS does not change | ||
_, nodeTxn = self.getNodeInfoFromLedger(nym) | ||
return nodeTxn[DATA][ALIAS] | ||
return self._ordered_node_ids[nym] | ||
|
||
@property | ||
def merkleRootHash(self) -> str: | ||
|
@@ -391,10 +391,6 @@ def merkleRootHash(self) -> str: | |
def txnSeqNo(self) -> int: | ||
return self.ledger.seqNo | ||
|
||
def getNodeData(self, nym): | ||
_, nodeTxn = self.getNodeInfoFromLedger(nym) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||
return nodeTxn[DATA] | ||
|
||
# Question: Why are `_isIpAddressValid` and `_isPortValid` part of | ||
# pool_manager? | ||
@staticmethod | ||
|
@@ -421,24 +417,32 @@ def id(self): | |
|
||
def _load_nodes_order_from_ledger(self): | ||
self._ordered_node_ids = OrderedDict() | ||
self._ordered_node_services = {} | ||
for _, txn in self.ledger.getAllTxn(): | ||
if get_type(txn) == NODE: | ||
txn_data = get_payload_data(txn) | ||
self._set_node_order(txn_data[TARGET_NYM], txn_data[DATA][ALIAS]) | ||
self._set_node_ids_in_cash(txn_data[TARGET_NYM], | ||
txn_data[DATA][ALIAS]) | ||
self._set_node_services_in_cash(txn_data[TARGET_NYM], | ||
txn_data[DATA].get(SERVICES, None)) | ||
|
||
def _set_node_order(self, nodeNym, nodeName): | ||
curName = self._ordered_node_ids.get(nodeNym) | ||
def _set_node_ids_in_cash(self, node_nym, node_name): | ||
curName = self._ordered_node_ids.get(node_nym) | ||
if curName is None: | ||
self._ordered_node_ids[nodeNym] = nodeName | ||
self._ordered_node_ids[node_nym] = node_name | ||
logger.info("{} sets node {} ({}) order to {}".format( | ||
self.name, nodeName, nodeNym, | ||
len(self._ordered_node_ids[nodeNym]))) | ||
elif curName != nodeName: | ||
self.name, node_name, node_nym, | ||
len(self._ordered_node_ids[node_nym]))) | ||
elif curName != node_name: | ||
msg = "{} is trying to order already ordered node {} ({}) with other alias {}" \ | ||
.format(self.name, curName, nodeNym, nodeName) | ||
.format(self.name, curName, node_nym, node_name) | ||
logger.error(msg) | ||
raise LogicError(msg) | ||
|
||
def _set_node_services_in_cash(self, node_nym, node_services): | ||
if node_services is not None: | ||
self._ordered_node_services[node_nym] = node_services | ||
|
||
def node_ids_ordered_by_rank(self, nodeReg=None) -> List: | ||
if nodeReg is None: | ||
nodeReg = self.nodeReg | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5 changes: 1 addition & 4 deletions
5
plenum/test/pool_transactions/test_add_node_with_invalid_key_proof.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
54 changes: 54 additions & 0 deletions
54
plenum/test/pool_transactions/test_on_pool_membership_changes.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import pytest | ||
|
||
from plenum.common.config_helper import PNodeConfigHelper | ||
from plenum.test.test_node import TestNode | ||
|
||
from plenum.common.txn_util import get_payload_data | ||
|
||
from plenum.common.constants import TARGET_NYM, DATA, SERVICES, VALIDATOR, TXN_PAYLOAD | ||
from plenum.test.testing_utils import FakeSomething | ||
|
||
|
||
@pytest.fixture(scope='function') | ||
def test_node(tdirWithPoolTxns, | ||
tdirWithDomainTxns, | ||
poolTxnNodeNames, | ||
tdirWithNodeKeepInited, | ||
tdir, | ||
tconf, | ||
allPluginsPath): | ||
node_name = poolTxnNodeNames[0] | ||
config_helper = PNodeConfigHelper(node_name, tconf, chroot=tdir) | ||
node = TestNode( | ||
node_name, | ||
config_helper=config_helper, | ||
config=tconf, | ||
pluginPaths=allPluginsPath) | ||
node.view_changer = FakeSomething(view_change_in_progress=False, | ||
view_no=0) | ||
yield node | ||
node.onStopping() | ||
|
||
|
||
def test_on_pool_membership_changes(test_node, pool_node_txns): | ||
def get_all_txn(): | ||
assert False, "ledger.getAllTxn() shouldn't be called in onPoolMembershipChange()" | ||
|
||
pool_manager = test_node.poolManager | ||
pool_manager.ledger.getAllTxn = get_all_txn | ||
|
||
txn = pool_node_txns[0] | ||
node_nym = get_payload_data(txn)[TARGET_NYM] | ||
|
||
txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = [] | ||
assert node_nym in pool_manager._ordered_node_services | ||
pool_manager.onPoolMembershipChange(txn) | ||
assert pool_manager._ordered_node_services[node_nym] == [] | ||
|
||
txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = [VALIDATOR] | ||
pool_manager.onPoolMembershipChange(txn) | ||
assert pool_manager._ordered_node_services[node_nym] == [VALIDATOR] | ||
|
||
txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = [] | ||
pool_manager.onPoolMembershipChange(pool_node_txns[0]) | ||
assert pool_manager._ordered_node_services[node_nym] == [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment
# Assuming ALIAS does not change
needs to be kept.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.