Skip to content

Master #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plenum/common/ledger_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,14 @@ def request_CPs_if_needed(self, ledgerId):
if ledgerInfo.consistencyProofsTimer is None:
return

proofs = ledgerInfo.recvdConsistencyProofs
# there is no any received ConsistencyProofs
if not proofs:
return
logger.debug("{} requesting consistency "
"proofs after timeout".format(self))

quorum = Quorums(self.owner.totalNodes)
proofs = ledgerInfo.recvdConsistencyProofs
groupedProofs, null_proofs_count = self._groupConsistencyProofs(proofs)
if quorum.same_consistency_proof.is_reached(null_proofs_count):
return
Expand Down
44 changes: 38 additions & 6 deletions plenum/common/stack_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,46 @@ def connectNewRemote(self, txn, remoteName, nodeOrClientObj,
nodeOrClientObj.nodestack.maintainConnections(force=True)

def stackHaChanged(self, txn, remoteName, nodeOrClientObj):
nodeHa = (txn[DATA][NODE_IP], txn[DATA][NODE_PORT])
cliHa = (txn[DATA][CLIENT_IP], txn[DATA][CLIENT_PORT])
nodeHa = None
cliHa = None
if self.isNode:
node_ha_changed = False
(ip, port) = nodeOrClientObj.nodeReg[remoteName]
if NODE_IP in txn[DATA] and ip != txn[DATA][NODE_IP]:
ip = txn[DATA][NODE_IP]
node_ha_changed = True

if NODE_PORT in txn[DATA] and port != txn[DATA][NODE_PORT]:
port = txn[DATA][NODE_PORT]
node_ha_changed = True

if node_ha_changed:
nodeHa = (ip, port)

cli_ha_changed = False
(ip, port) = nodeOrClientObj.cliNodeReg[remoteName + CLIENT_STACK_SUFFIX] \
if self.isNode \
else nodeOrClientObj.nodeReg[remoteName]

if CLIENT_IP in txn[DATA] and ip != txn[DATA][CLIENT_IP]:
ip = txn[DATA][CLIENT_IP]
cli_ha_changed = True

if CLIENT_PORT in txn[DATA] and port != txn[DATA][CLIENT_PORT]:
port = txn[DATA][CLIENT_PORT]
cli_ha_changed = True

if cli_ha_changed:
cliHa = (ip, port)

rid = self.removeRemote(nodeOrClientObj.nodestack, remoteName)
if self.isNode:
nodeOrClientObj.nodeReg[remoteName] = HA(*nodeHa)
nodeOrClientObj.cliNodeReg[remoteName +
CLIENT_STACK_SUFFIX] = HA(*cliHa)
else:
if nodeHa:
nodeOrClientObj.nodeReg[remoteName] = HA(*nodeHa)
if cliHa:
nodeOrClientObj.cliNodeReg[remoteName +
CLIENT_STACK_SUFFIX] = HA(*cliHa)
elif cliHa:
nodeOrClientObj.nodeReg[remoteName] = HA(*cliHa)

# Attempt connection at the new HA
Expand Down
39 changes: 34 additions & 5 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,9 @@ def rank(self) -> Optional[int]:
def get_name_by_rank(self, rank, nodeReg=None):
return self.poolManager.get_name_by_rank(rank, nodeReg=nodeReg)

def get_rank_by_name(self, name, nodeReg=None):
return self.poolManager.get_rank_by_name(name, nodeReg=nodeReg)

def newViewChanger(self):
if self.view_changer:
return self.view_changer
Expand Down Expand Up @@ -2354,15 +2357,41 @@ def lost_master_primary(self):
self._schedule_view_change()

def select_primaries(self, nodeReg: Dict[str, HA]=None):
primaries = set()
primary_rank = None
'''
Build a set of names of primaries, it is needed to avoid
duplicates of primary nodes for different replicas.
'''
for instance_id, replica in enumerate(self.replicas):
if replica.primaryName is not None:
name = replica.primaryName.split(":", 1)[0]
primaries.add(name)
'''
Remember the rank of primary of master instance, it is needed
for calculation of primaries for backup instances.
'''
if instance_id == 0:
primary_rank = self.get_rank_by_name(name, nodeReg)

for instance_id, replica in enumerate(self.replicas):
if replica.primaryName is not None:
logger.debug('{} already has a primary'.format(replica))
continue
new_primary_name = self.elector.next_primary_replica_name(
instance_id, nodeReg=nodeReg)
if instance_id == 0:
new_primary_name, new_primary_instance_name =\
self.elector.next_primary_replica_name_for_master(nodeReg=nodeReg)
primary_rank = self.get_rank_by_name(
new_primary_name, nodeReg)
else:
assert primary_rank is not None
new_primary_name, new_primary_instance_name =\
self.elector.next_primary_replica_name_for_backup(
instance_id, primary_rank, primaries, nodeReg=nodeReg)
primaries.add(new_primary_name)
logger.display("{}{} selected primary {} for instance {} (view {})"
.format(PRIMARY_SELECTION_PREFIX, replica,
new_primary_name, instance_id, self.viewNo),
new_primary_instance_name, instance_id, self.viewNo),
extra={"cli": "ANNOUNCE",
"tags": ["node-election"]})
if instance_id == 0:
Expand All @@ -2372,7 +2401,7 @@ def select_primaries(self, nodeReg: Dict[str, HA]=None):
# participating.
self.start_participating()

replica.primaryChanged(new_primary_name)
replica.primaryChanged(new_primary_instance_name)
self.primary_selected(instance_id)

logger.display("{}{} declares view change {} as completed for "
Expand All @@ -2383,7 +2412,7 @@ def select_primaries(self, nodeReg: Dict[str, HA]=None):
replica,
self.viewNo,
instance_id,
new_primary_name,
new_primary_instance_name,
self.ledger_summary),
extra={"cli": "ANNOUNCE",
"tags": ["node-election"]})
Expand Down
50 changes: 44 additions & 6 deletions plenum/server/pool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def getStackParamsAndNodeReg(self, name, keys_dir, nodeRegistry=None,
ha=HA('0.0.0.0', ha[1]),
main=True,
auth_mode=AuthMode.RESTRICTED.value)
nodeReg[name] = HA(*ha)

cliname = cliname or (name + CLIENT_STACK_SUFFIX)
if not cliha:
Expand All @@ -186,7 +185,6 @@ def getStackParamsAndNodeReg(self, name, keys_dir, nodeRegistry=None,
ha=HA('0.0.0.0', cliha[1]),
main=True,
auth_mode=AuthMode.ALLOW_ANY.value)
cliNodeReg[cliname] = HA(*cliha)

if keys_dir:
nstack['basedirpath'] = keys_dir
Expand Down Expand Up @@ -214,7 +212,7 @@ def executePoolTxnBatch(self, ppTime, reqs, stateRoot, txnRoot) -> List:
return committedTxns

def onPoolMembershipChange(self, txn):
if txn[TXN_TYPE] == NODE:
if txn[TXN_TYPE] == NODE and DATA in txn:
nodeName = txn[DATA][ALIAS]
nodeNym = txn[TARGET_NYM]

Expand Down Expand Up @@ -253,10 +251,15 @@ def _updateNode(txn):
def addNewNodeAndConnect(self, txn):
nodeName = txn[DATA][ALIAS]
if nodeName == self.name:
logger.debug("{} not adding itself to node registry".
logger.debug("{} adding itself to node registry".
format(self.name))
return
self.connectNewRemote(txn, nodeName, self.node)
self.node.nodeReg[nodeName] = HA(txn[DATA][NODE_IP],
txn[DATA][NODE_PORT])
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = \
HA(txn[DATA][CLIENT_IP],
txn[DATA][CLIENT_PORT])
else:
self.connectNewRemote(txn, nodeName, self.node, nodeName != self.name)
self.node.nodeJoined(txn)

def node_about_to_be_disconnected(self, nodeName):
Expand All @@ -269,6 +272,33 @@ def nodeHaChanged(self, txn):
# TODO: Check if new HA is same as old HA and only update if
# new HA is different.
if nodeName == self.name:
# Update itself in node registry if needed
ha_changed = False
(ip, port) = self.node.nodeReg[nodeName]
if NODE_IP in txn[DATA] and ip != txn[DATA][NODE_IP]:
ip = txn[DATA][NODE_IP]
ha_changed = True

if NODE_PORT in txn[DATA] and port != txn[DATA][NODE_PORT]:
port = txn[DATA][NODE_PORT]
ha_changed = True

if ha_changed:
self.node.nodeReg[nodeName] = HA(ip, port)

ha_changed = False
(ip, port) = self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX]
if CLIENT_IP in txn[DATA] and ip != txn[DATA][CLIENT_IP]:
ip = txn[DATA][CLIENT_IP]
ha_changed = True

if CLIENT_PORT in txn[DATA] and port != txn[DATA][CLIENT_PORT]:
port = txn[DATA][CLIENT_PORT]
ha_changed = True

if ha_changed:
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA(ip, port)

self.node.nodestack.onHostAddressChanged()
self.node.clientstack.onHostAddressChanged()
else:
Expand Down Expand Up @@ -411,6 +441,11 @@ def get_rank_of(self, node_id, nodeReg=None) -> Optional[int]:
return None
return self._get_rank(node_id, self.node_ids_ordered_by_rank(nodeReg))

def get_rank_by_name(self, name, nodeReg=None) -> Optional[int]:
for nym, nm in self._ordered_node_ids.items():
if name == nm:
return self.get_rank_of(nym, nodeReg)

def get_name_by_rank(self, rank, nodeReg=None) -> Optional[str]:
try:
nym = self.node_ids_ordered_by_rank(nodeReg)[rank]
Expand Down Expand Up @@ -532,6 +567,9 @@ def get_rank_of(self, node_id, nodeReg=None) -> Optional[int]:
# TODO node_id here has got another meaning
return self._get_rank(node_id, self.node_names_ordered_by_rank(nodeReg))

def get_rank_by_name(self, name, nodeReg=None) -> Optional[int]:
return self.get_rank_of(name, nodeReg)

def get_name_by_rank(self, rank, nodeReg=None) -> Optional[str]:
try:
return self.node_names_ordered_by_rank(nodeReg)[rank]
Expand Down
48 changes: 34 additions & 14 deletions plenum/server/primary_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,50 @@ def _is_master_instance(self, instance_id):
# Instance 0 is always master
return instance_id == 0

def _get_primary_id(self, view_no, instance_id, total_nodes):
return (view_no + instance_id) % total_nodes
def _get_master_primary_id(self, view_no, total_nodes):
return view_no % total_nodes

def next_primary_node_name(self, instance_id, nodeReg=None):
def _next_primary_node_name_for_master(self, nodeReg=None):
if nodeReg is None:
nodeReg = self.node.nodeReg
rank = self._get_primary_id(self.viewNo, instance_id, len(nodeReg))
rank = self._get_master_primary_id(self.viewNo, len(nodeReg))
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)

logger.trace("{} selected {} as next primary node for instId {}, "
assert name, "{} failed to get next primary node name for master instance".format(self)
logger.trace("{} selected {} as next primary node for master instance, "
"viewNo {} with rank {}, nodeReg {}".format(
self, name, instance_id, self.viewNo, rank, nodeReg))
assert name, "{} failed to get next primary node name".format(self)

self, name, self.viewNo, rank, nodeReg))
return name

def next_primary_replica_name(self, instance_id, nodeReg=None):
def next_primary_replica_name_for_master(self, nodeReg=None):
"""
Returns name and corresponding instance name of the next node which
is supposed to be a new Primary. In fact it is not round-robin on
this abstraction layer as currently the primary of master instance is
pointed directly depending on view number, instance id and total
number of nodes.
But since the view number is incremented by 1 before primary selection
then current approach may be treated as round robin.
"""
name = self._next_primary_node_name_for_master(nodeReg)
return name, Replica.generateName(nodeName=name, instId=0)

def next_primary_replica_name_for_backup(self, instance_id, master_primary_rank,
primaries, nodeReg=None):
"""
Returns name of the next node which is supposed to be a new Primary
in round-robin fashion
Returns name and corresponding instance name of the next node which
is supposed to be a new Primary for backup instance in round-robin
fashion starting from primary of master instance.
"""
return Replica.generateName(
nodeName=self.next_primary_node_name(instance_id, nodeReg=nodeReg),
instId=instance_id)
if nodeReg is None:
nodeReg = self.node.nodeReg
total_nodes = len(nodeReg)
rank = (master_primary_rank + 1) % total_nodes
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
while name in primaries:
rank = (rank + 1) % total_nodes
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
return name, Replica.generateName(nodeName=name, instId=instance_id)

# overridden method of PrimaryDecider
def start_election_for_instance(self, instance_id):
Expand Down
6 changes: 3 additions & 3 deletions plenum/server/view_change/view_changer.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _is_propagated_view_change_completed(self):
@property
def has_view_change_from_primary(self) -> bool:
if not self._has_view_change_from_primary:
next_primary_name = self.node.elector.next_primary_node_name(0)
next_primary_name = self.node.elector._next_primary_node_name_for_master()

if next_primary_name not in self._view_change_done:
logger.debug(
Expand Down Expand Up @@ -579,7 +579,7 @@ def _verify_primary(self, new_primary, ledger_info):
This method is called when sufficient number of ViewChangeDone
received and makes steps to switch to the new primary
"""
expected_primary = self.node.elector.next_primary_node_name(0)
expected_primary = self.node.elector._next_primary_node_name_for_master()
if new_primary != expected_primary:
logger.error("{}{} expected next primary to be {}, but majority "
"declared {} instead for view {}"
Expand All @@ -595,7 +595,7 @@ def _send_view_change_done_message(self):
"""
Sends ViewChangeDone message to other protocol participants
"""
new_primary_name = self.node.elector.next_primary_node_name(0)
new_primary_name = self.node.elector._next_primary_node_name_for_master()
ledger_summary = self.node.ledger_summary
message = ViewChangeDone(self.view_no,
new_primary_name,
Expand Down
20 changes: 8 additions & 12 deletions plenum/test/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,11 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None):
for node in nodes:
logger.debug("{}'s view no is {}".format(node, node.viewNo))
viewNos.add(node.viewNo)
assert len(viewNos) == 1
assert len(viewNos) == 1, 'Expected 1, but got {}'.format(len(viewNos))
vNo, = viewNos
if expectedViewNo is not None:
assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
node.name, node.monitor.masterThroughputRatio()) for node in nodes])
assert vNo >= expectedViewNo, \
'Expected at least {}, but got {}'.format(expectedViewNo, vNo)
return vNo


Expand Down Expand Up @@ -1023,8 +1023,6 @@ def sdk_get_reply(looper, sdk_req_resp, timeout=None):
try:
resp = looper.run(asyncio.wait_for(resp_task, timeout=timeout))
resp = json.loads(resp)
except asyncio.TimeoutError:
resp = None
except IndyError as e:
resp = e.error_code

Expand All @@ -1047,19 +1045,17 @@ def get_res(task, done_list):
resp = None
return resp

done, pend = looper.run(asyncio.wait(resp_tasks, timeout=timeout))
if pend:
raise AssertionError("{} transactions are still pending. Timeout: {}."
.format(len(pend), timeout))
done, pending = looper.run(asyncio.wait(resp_tasks, timeout=timeout))
if pending:
for task in pending:
task.cancel()
raise TimeoutError("{} requests timed out".format(len(pending)))
ret = [(req, get_res(resp, done)) for req, resp in sdk_req_resp]
return ret


def sdk_check_reply(req_res):
req, res = req_res
if res is None:
raise AssertionError("Got no confirmed result for request {}"
.format(req))
if isinstance(res, ErrorCode):
raise AssertionError("Got an error with code {} for request {}"
.format(res, req))
Expand Down
2 changes: 0 additions & 2 deletions plenum/test/node_catchup/test_config_ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def test_disconnected_node_catchup_config_ledger_txns(looper,
new_node = newNodeCaughtUp
disconnect_node_and_ensure_disconnected(
looper, txnPoolNodeSet, new_node, stopNode=False)
looper.removeProdable(new_node)

# Do some config txns; using a fixture as a method, passing some arguments
# as None as they only make sense for the fixture (pre-requisites)
Expand All @@ -144,6 +143,5 @@ def test_disconnected_node_catchup_config_ledger_txns(looper,
# Make sure new node got out of sync
waitNodeDataInequality(looper, new_node, *txnPoolNodeSet[:-1])

looper.add(new_node)
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, new_node)
waitNodeDataEquality(looper, new_node, *txnPoolNodeSet[:-1])
Loading