Skip to content

INDY-1879: faster way for pool transactions ordering #1010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions plenum/common/stack_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,25 +282,6 @@ def nodeExistsInLedger(self, nym):
def nodeIds(self) -> set:
return {get_payload_data(txn)[TARGET_NYM] for _, txn in self.ledger.getAllTxn()}

def getNodeInfoFromLedger(self, nym, excludeLast=True):
# Returns the info of the node from the ledger with transaction
# sequence numbers that added or updated the info excluding the last
# update transaction. The reason for ignoring last transactions is that
# it is used after update to the ledger has already been made
txns = []
nodeTxnSeqNos = []
for seqNo, txn in self.ledger.getAllTxn():
txn_data = get_payload_data(txn)
if get_type(txn) == NODE and txn_data[TARGET_NYM] == nym:
txns.append(txn)
nodeTxnSeqNos.append(seqNo)
info = {}
if len(txns) > 1 and excludeLast:
txns = txns[:-1]
for txn in txns:
self.updateNodeTxns(info, get_payload_data(txn))
return nodeTxnSeqNos, info

def getNodesServices(self):
# Returns services for each node
srvs = dict()
Expand Down
72 changes: 38 additions & 34 deletions plenum/server/pool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
NODE_IP, NODE_PORT, CLIENT_IP, CLIENT_PORT, VERKEY, SERVICES, \
VALIDATOR, CLIENT_STACK_SUFFIX, POOL_LEDGER_ID, DOMAIN_LEDGER_ID, BLS_KEY
from plenum.common.stack_manager import TxnStackManager
from plenum.common.txn_util import get_type, get_payload_data
from plenum.common.txn_util import get_type, get_payload_data, get_seq_no
from plenum.persistence.util import pop_merkle_info
from plenum.server.pool_req_handler import PoolRequestHandler

Expand Down Expand Up @@ -214,6 +214,8 @@ def executePoolTxnBatch(self, ppTime, reqs, stateRoot, txnRoot) -> List:
return committedTxns

def onPoolMembershipChange(self, txn):
# `onPoolMembershipChange` method can be called only after txn added to ledger

if get_type(txn) != NODE:
return

Expand All @@ -224,7 +226,7 @@ def onPoolMembershipChange(self, txn):
nodeName = txn_data[DATA][ALIAS]
nodeNym = txn_data[TARGET_NYM]

self._set_node_order(nodeNym, nodeName)
self._set_node_ids_in_cash(nodeNym, nodeName)

def _updateNode(txn_data):
if SERVICES in txn_data[DATA]:
Expand All @@ -238,20 +240,17 @@ def _updateNode(txn_data):
if BLS_KEY in txn_data[DATA]:
self.node_blskey_changed(txn_data)

seqNos, info = self.getNodeInfoFromLedger(nodeNym)

# `onPoolMembershipChange` method can be called only after txn added to ledger
if len(seqNos) == 0:
raise LogicError("There are no txns in ledger for nym {}".format(nodeNym))

# If there is only one transaction has been made to this nym,
# that means, that this is a new node transaction
if len(seqNos) == 1:
# If nodeNym is never added in self._ordered_node_services,
# nodeNym is never added in ledger
if nodeNym not in self._ordered_node_services:
txn_data[DATA].setdefault(SERVICES, [])
if VALIDATOR in txn_data[DATA].get(SERVICES, []):
self.addNewNodeAndConnect(txn_data)
else:
_updateNode(txn_data)

self._set_node_services_in_cash(nodeNym, txn_data[DATA].get(SERVICES, None))

def addNewNodeAndConnect(self, txn_data):
nodeName = txn_data[DATA][ALIAS]
if nodeName == self.name:
Expand Down Expand Up @@ -334,25 +333,27 @@ def nodeKeysChanged(self, txn_data):

def nodeServicesChanged(self, txn_data):
nodeNym = txn_data[TARGET_NYM]
_, nodeInfo = self.getNodeInfoFromLedger(nodeNym)
nodeName = nodeInfo[DATA][ALIAS]
oldServices = set(nodeInfo[DATA].get(SERVICES, []))
nodeName = self.getNodeName(nodeNym)
oldServices = set(self._ordered_node_services.get(nodeNym, []))
newServices = set(txn_data[DATA].get(SERVICES, []))
if oldServices == newServices:
logger.info("Node {} not changing {} since it is same as existing".format(nodeNym, SERVICES))
return
else:
if VALIDATOR in newServices.difference(oldServices):
# If validator service is enabled
self.node.nodeReg[nodeName] = HA(nodeInfo[DATA][NODE_IP],
nodeInfo[DATA][NODE_PORT])
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA(
nodeInfo[DATA][CLIENT_IP], nodeInfo[DATA][CLIENT_PORT])
self.updateNodeTxns(nodeInfo, txn_data)
node_info = self.reqHandler.getNodeData(nodeNym)
self.node.nodeReg[nodeName] = HA(node_info[NODE_IP],
node_info[NODE_PORT])
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA(node_info[CLIENT_IP],
node_info[CLIENT_PORT])

self.updateNodeTxns({DATA: node_info, }, txn_data)
self.node.nodeJoined(txn_data)

if self.name != nodeName:
self.connectNewRemote(nodeInfo, nodeName, self.node)
self.connectNewRemote({DATA: node_info,
TARGET_NYM: nodeNym}, nodeName, self.node)

if VALIDATOR in oldServices.difference(newServices):
# If validator service is disabled
Expand Down Expand Up @@ -380,8 +381,7 @@ def node_blskey_changed(self, txn_data):

def getNodeName(self, nym):
# Assuming ALIAS does not change
_, nodeTxn = self.getNodeInfoFromLedger(nym)
return nodeTxn[DATA][ALIAS]
return self._ordered_node_ids[nym]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment # Assuming ALIAS does not change needs to be kept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


@property
def merkleRootHash(self) -> str:
Expand All @@ -391,10 +391,6 @@ def merkleRootHash(self) -> str:
def txnSeqNo(self) -> int:
return self.ledger.seqNo

def getNodeData(self, nym):
_, nodeTxn = self.getNodeInfoFromLedger(nym)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove getNodeInfoFromLedger as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

return nodeTxn[DATA]

# Question: Why are `_isIpAddressValid` and `_isPortValid` part of
# pool_manager?
@staticmethod
Expand All @@ -421,24 +417,32 @@ def id(self):

def _load_nodes_order_from_ledger(self):
self._ordered_node_ids = OrderedDict()
self._ordered_node_services = {}
for _, txn in self.ledger.getAllTxn():
if get_type(txn) == NODE:
txn_data = get_payload_data(txn)
self._set_node_order(txn_data[TARGET_NYM], txn_data[DATA][ALIAS])
self._set_node_ids_in_cash(txn_data[TARGET_NYM],
txn_data[DATA][ALIAS])
self._set_node_services_in_cash(txn_data[TARGET_NYM],
txn_data[DATA].get(SERVICES, None))

def _set_node_order(self, nodeNym, nodeName):
curName = self._ordered_node_ids.get(nodeNym)
def _set_node_ids_in_cash(self, node_nym, node_name):
curName = self._ordered_node_ids.get(node_nym)
if curName is None:
self._ordered_node_ids[nodeNym] = nodeName
self._ordered_node_ids[node_nym] = node_name
logger.info("{} sets node {} ({}) order to {}".format(
self.name, nodeName, nodeNym,
len(self._ordered_node_ids[nodeNym])))
elif curName != nodeName:
self.name, node_name, node_nym,
len(self._ordered_node_ids[node_nym])))
elif curName != node_name:
msg = "{} is trying to order already ordered node {} ({}) with other alias {}" \
.format(self.name, curName, nodeNym, nodeName)
.format(self.name, curName, node_nym, node_name)
logger.error(msg)
raise LogicError(msg)

def _set_node_services_in_cash(self, node_nym, node_services):
if node_services is not None:
self._ordered_node_services[node_nym] = node_services

def node_ids_ordered_by_rank(self, nodeReg=None) -> List:
if nodeReg is None:
nodeReg = self.nodeReg
Expand Down
12 changes: 12 additions & 0 deletions plenum/test/pool_transactions/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest

from plenum.common.constants import NODE
from plenum.common.txn_util import get_type
from plenum.common.util import randomString
from plenum.test.test_node import checkNodesConnected, TestNode
from plenum.test.pool_transactions.helper import \
Expand Down Expand Up @@ -43,3 +45,13 @@ def sdk_node_theta_added(looper,
looper.run(checkNodesConnected(txnPoolNodeSet))
sdk_pool_refresh(looper, sdk_pool_handle)
return new_steward_wallet, new_node


@pytest.fixture()
def pool_node_txns(poolTxnData):
node_txns = []
for txn in poolTxnData["txns"]:
if get_type(txn) == NODE:
node_txns.append(txn)
return node_txns

Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import pytest
from plenum.common.config_helper import PNodeConfigHelper
from plenum.common.exceptions import RequestNackedException
from plenum.test.helper import sdk_get_and_check_replies
from plenum.common.constants import VALIDATOR
from plenum.test.pool_transactions.helper import prepare_new_node_data, \
prepare_node_request, sdk_sign_and_send_prepared_request, \
create_and_start_new_node
from plenum.test.test_node import TestNode
prepare_node_request, sdk_sign_and_send_prepared_request


def test_add_node_with_invalid_key_proof(looper,
Expand Down
4 changes: 2 additions & 2 deletions plenum/test/pool_transactions/test_nodes_data_changed.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from plenum.common.util import randomString, hexToFriendly
from plenum.test.pool_transactions.helper import sdk_send_update_node, \
sdk_add_new_steward_and_node, sdk_pool_refresh, \
update_node_data_and_reconnect
update_node_data_and_reconnect, demote_node
from plenum.test.test_node import checkNodesConnected

from stp_core.common.log import getlogger
Expand Down Expand Up @@ -64,7 +64,7 @@ def testNodePortChanged(looper, txnPoolNodeSet,
sdk_node_theta_added,
tdir, tconf):
"""
An running node's port is changed
A running node's port is changed
"""
new_steward_wallet, new_node = sdk_node_theta_added

Expand Down
54 changes: 54 additions & 0 deletions plenum/test/pool_transactions/test_on_pool_membership_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest

from plenum.common.config_helper import PNodeConfigHelper
from plenum.test.test_node import TestNode

from plenum.common.txn_util import get_payload_data

from plenum.common.constants import TARGET_NYM, DATA, SERVICES, VALIDATOR, TXN_PAYLOAD
from plenum.test.testing_utils import FakeSomething


@pytest.fixture(scope='function')
def test_node(tdirWithPoolTxns,
tdirWithDomainTxns,
poolTxnNodeNames,
tdirWithNodeKeepInited,
tdir,
tconf,
allPluginsPath):
node_name = poolTxnNodeNames[0]
config_helper = PNodeConfigHelper(node_name, tconf, chroot=tdir)
node = TestNode(
node_name,
config_helper=config_helper,
config=tconf,
pluginPaths=allPluginsPath)
node.view_changer = FakeSomething(view_change_in_progress=False,
view_no=0)
yield node
node.onStopping()


def test_on_pool_membership_changes(test_node, pool_node_txns):
def get_all_txn():
assert False, "ledger.getAllTxn() shouldn't be called in onPoolMembershipChange()"

pool_manager = test_node.poolManager
pool_manager.ledger.getAllTxn = get_all_txn

txn = pool_node_txns[0]
node_nym = get_payload_data(txn)[TARGET_NYM]

txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = []
assert node_nym in pool_manager._ordered_node_services
pool_manager.onPoolMembershipChange(txn)
assert pool_manager._ordered_node_services[node_nym] == []

txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = [VALIDATOR]
pool_manager.onPoolMembershipChange(txn)
assert pool_manager._ordered_node_services[node_nym] == [VALIDATOR]

txn[TXN_PAYLOAD][DATA][DATA][SERVICES] = []
pool_manager.onPoolMembershipChange(pool_node_txns[0])
assert pool_manager._ordered_node_services[node_nym] == []
15 changes: 5 additions & 10 deletions plenum/test/pool_transactions/test_txn_pool_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import pytest

from plenum.common.config_helper import PNodeConfigHelper
from plenum.test.test_node import TestNode
from stp_core.loop.eventually import eventually

from plenum.common.metrics_collector import MetricsName
Expand All @@ -7,22 +10,14 @@

from plenum.common.txn_util import get_type, get_payload_data

from plenum.common.constants import TARGET_NYM, NODE, CLIENT_STACK_SUFFIX, DATA, ALIAS, SERVICES
from plenum.common.constants import TARGET_NYM, NODE, \
CLIENT_STACK_SUFFIX, DATA, ALIAS, SERVICES, VALIDATOR, TXN_PAYLOAD
from plenum.test.pool_transactions.helper import demote_node

nodeCount = 7
nodes_wth_bls = 0


@pytest.fixture()
def pool_node_txns(poolTxnData):
node_txns = []
for txn in poolTxnData["txns"]:
if get_type(txn) == NODE:
node_txns.append(txn)
return node_txns


def test_twice_demoted_node_dont_write_txns(txnPoolNodeSet,
looper, sdk_wallet_stewards, sdk_pool_handle):
request_count = 5
Expand Down