Skip to content

Commit d396e9d

Browse files
authored
Merge pull request #87 from hyperledger/master
Master
2 parents c2e46f2 + 551d50a commit d396e9d

28 files changed

+710
-196
lines changed

plenum/common/ledger_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,14 @@ def request_CPs_if_needed(self, ledgerId):
9191
if ledgerInfo.consistencyProofsTimer is None:
9292
return
9393

94+
proofs = ledgerInfo.recvdConsistencyProofs
95+
# there is no any received ConsistencyProofs
96+
if not proofs:
97+
return
9498
logger.debug("{} requesting consistency "
9599
"proofs after timeout".format(self))
96100

97101
quorum = Quorums(self.owner.totalNodes)
98-
proofs = ledgerInfo.recvdConsistencyProofs
99102
groupedProofs, null_proofs_count = self._groupConsistencyProofs(proofs)
100103
if quorum.same_consistency_proof.is_reached(null_proofs_count):
101104
return

plenum/common/stack_manager.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,46 @@ def connectNewRemote(self, txn, remoteName, nodeOrClientObj,
170170
nodeOrClientObj.nodestack.maintainConnections(force=True)
171171

172172
def stackHaChanged(self, txn, remoteName, nodeOrClientObj):
173-
nodeHa = (txn[DATA][NODE_IP], txn[DATA][NODE_PORT])
174-
cliHa = (txn[DATA][CLIENT_IP], txn[DATA][CLIENT_PORT])
173+
nodeHa = None
174+
cliHa = None
175+
if self.isNode:
176+
node_ha_changed = False
177+
(ip, port) = nodeOrClientObj.nodeReg[remoteName]
178+
if NODE_IP in txn[DATA] and ip != txn[DATA][NODE_IP]:
179+
ip = txn[DATA][NODE_IP]
180+
node_ha_changed = True
181+
182+
if NODE_PORT in txn[DATA] and port != txn[DATA][NODE_PORT]:
183+
port = txn[DATA][NODE_PORT]
184+
node_ha_changed = True
185+
186+
if node_ha_changed:
187+
nodeHa = (ip, port)
188+
189+
cli_ha_changed = False
190+
(ip, port) = nodeOrClientObj.cliNodeReg[remoteName + CLIENT_STACK_SUFFIX] \
191+
if self.isNode \
192+
else nodeOrClientObj.nodeReg[remoteName]
193+
194+
if CLIENT_IP in txn[DATA] and ip != txn[DATA][CLIENT_IP]:
195+
ip = txn[DATA][CLIENT_IP]
196+
cli_ha_changed = True
197+
198+
if CLIENT_PORT in txn[DATA] and port != txn[DATA][CLIENT_PORT]:
199+
port = txn[DATA][CLIENT_PORT]
200+
cli_ha_changed = True
201+
202+
if cli_ha_changed:
203+
cliHa = (ip, port)
204+
175205
rid = self.removeRemote(nodeOrClientObj.nodestack, remoteName)
176206
if self.isNode:
177-
nodeOrClientObj.nodeReg[remoteName] = HA(*nodeHa)
178-
nodeOrClientObj.cliNodeReg[remoteName +
179-
CLIENT_STACK_SUFFIX] = HA(*cliHa)
180-
else:
207+
if nodeHa:
208+
nodeOrClientObj.nodeReg[remoteName] = HA(*nodeHa)
209+
if cliHa:
210+
nodeOrClientObj.cliNodeReg[remoteName +
211+
CLIENT_STACK_SUFFIX] = HA(*cliHa)
212+
elif cliHa:
181213
nodeOrClientObj.nodeReg[remoteName] = HA(*cliHa)
182214

183215
# Attempt connection at the new HA

plenum/server/node.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,9 @@ def rank(self) -> Optional[int]:
882882
def get_name_by_rank(self, rank, nodeReg=None):
883883
return self.poolManager.get_name_by_rank(rank, nodeReg=nodeReg)
884884

885+
def get_rank_by_name(self, name, nodeReg=None):
886+
return self.poolManager.get_rank_by_name(name, nodeReg=nodeReg)
887+
885888
def newViewChanger(self):
886889
if self.view_changer:
887890
return self.view_changer
@@ -2354,15 +2357,41 @@ def lost_master_primary(self):
23542357
self._schedule_view_change()
23552358

23562359
def select_primaries(self, nodeReg: Dict[str, HA]=None):
2360+
primaries = set()
2361+
primary_rank = None
2362+
'''
2363+
Build a set of names of primaries, it is needed to avoid
2364+
duplicates of primary nodes for different replicas.
2365+
'''
2366+
for instance_id, replica in enumerate(self.replicas):
2367+
if replica.primaryName is not None:
2368+
name = replica.primaryName.split(":", 1)[0]
2369+
primaries.add(name)
2370+
'''
2371+
Remember the rank of primary of master instance, it is needed
2372+
for calculation of primaries for backup instances.
2373+
'''
2374+
if instance_id == 0:
2375+
primary_rank = self.get_rank_by_name(name, nodeReg)
2376+
23572377
for instance_id, replica in enumerate(self.replicas):
23582378
if replica.primaryName is not None:
23592379
logger.debug('{} already has a primary'.format(replica))
23602380
continue
2361-
new_primary_name = self.elector.next_primary_replica_name(
2362-
instance_id, nodeReg=nodeReg)
2381+
if instance_id == 0:
2382+
new_primary_name, new_primary_instance_name =\
2383+
self.elector.next_primary_replica_name_for_master(nodeReg=nodeReg)
2384+
primary_rank = self.get_rank_by_name(
2385+
new_primary_name, nodeReg)
2386+
else:
2387+
assert primary_rank is not None
2388+
new_primary_name, new_primary_instance_name =\
2389+
self.elector.next_primary_replica_name_for_backup(
2390+
instance_id, primary_rank, primaries, nodeReg=nodeReg)
2391+
primaries.add(new_primary_name)
23632392
logger.display("{}{} selected primary {} for instance {} (view {})"
23642393
.format(PRIMARY_SELECTION_PREFIX, replica,
2365-
new_primary_name, instance_id, self.viewNo),
2394+
new_primary_instance_name, instance_id, self.viewNo),
23662395
extra={"cli": "ANNOUNCE",
23672396
"tags": ["node-election"]})
23682397
if instance_id == 0:
@@ -2372,7 +2401,7 @@ def select_primaries(self, nodeReg: Dict[str, HA]=None):
23722401
# participating.
23732402
self.start_participating()
23742403

2375-
replica.primaryChanged(new_primary_name)
2404+
replica.primaryChanged(new_primary_instance_name)
23762405
self.primary_selected(instance_id)
23772406

23782407
logger.display("{}{} declares view change {} as completed for "
@@ -2383,7 +2412,7 @@ def select_primaries(self, nodeReg: Dict[str, HA]=None):
23832412
replica,
23842413
self.viewNo,
23852414
instance_id,
2386-
new_primary_name,
2415+
new_primary_instance_name,
23872416
self.ledger_summary),
23882417
extra={"cli": "ANNOUNCE",
23892418
"tags": ["node-election"]})

plenum/server/pool_manager.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ def getStackParamsAndNodeReg(self, name, keys_dir, nodeRegistry=None,
177177
ha=HA('0.0.0.0', ha[1]),
178178
main=True,
179179
auth_mode=AuthMode.RESTRICTED.value)
180-
nodeReg[name] = HA(*ha)
181180

182181
cliname = cliname or (name + CLIENT_STACK_SUFFIX)
183182
if not cliha:
@@ -186,7 +185,6 @@ def getStackParamsAndNodeReg(self, name, keys_dir, nodeRegistry=None,
186185
ha=HA('0.0.0.0', cliha[1]),
187186
main=True,
188187
auth_mode=AuthMode.ALLOW_ANY.value)
189-
cliNodeReg[cliname] = HA(*cliha)
190188

191189
if keys_dir:
192190
nstack['basedirpath'] = keys_dir
@@ -214,7 +212,7 @@ def executePoolTxnBatch(self, ppTime, reqs, stateRoot, txnRoot) -> List:
214212
return committedTxns
215213

216214
def onPoolMembershipChange(self, txn):
217-
if txn[TXN_TYPE] == NODE:
215+
if txn[TXN_TYPE] == NODE and DATA in txn:
218216
nodeName = txn[DATA][ALIAS]
219217
nodeNym = txn[TARGET_NYM]
220218

@@ -253,10 +251,15 @@ def _updateNode(txn):
253251
def addNewNodeAndConnect(self, txn):
254252
nodeName = txn[DATA][ALIAS]
255253
if nodeName == self.name:
256-
logger.debug("{} not adding itself to node registry".
254+
logger.debug("{} adding itself to node registry".
257255
format(self.name))
258-
return
259-
self.connectNewRemote(txn, nodeName, self.node)
256+
self.node.nodeReg[nodeName] = HA(txn[DATA][NODE_IP],
257+
txn[DATA][NODE_PORT])
258+
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = \
259+
HA(txn[DATA][CLIENT_IP],
260+
txn[DATA][CLIENT_PORT])
261+
else:
262+
self.connectNewRemote(txn, nodeName, self.node, nodeName != self.name)
260263
self.node.nodeJoined(txn)
261264

262265
def node_about_to_be_disconnected(self, nodeName):
@@ -269,6 +272,33 @@ def nodeHaChanged(self, txn):
269272
# TODO: Check if new HA is same as old HA and only update if
270273
# new HA is different.
271274
if nodeName == self.name:
275+
# Update itself in node registry if needed
276+
ha_changed = False
277+
(ip, port) = self.node.nodeReg[nodeName]
278+
if NODE_IP in txn[DATA] and ip != txn[DATA][NODE_IP]:
279+
ip = txn[DATA][NODE_IP]
280+
ha_changed = True
281+
282+
if NODE_PORT in txn[DATA] and port != txn[DATA][NODE_PORT]:
283+
port = txn[DATA][NODE_PORT]
284+
ha_changed = True
285+
286+
if ha_changed:
287+
self.node.nodeReg[nodeName] = HA(ip, port)
288+
289+
ha_changed = False
290+
(ip, port) = self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX]
291+
if CLIENT_IP in txn[DATA] and ip != txn[DATA][CLIENT_IP]:
292+
ip = txn[DATA][CLIENT_IP]
293+
ha_changed = True
294+
295+
if CLIENT_PORT in txn[DATA] and port != txn[DATA][CLIENT_PORT]:
296+
port = txn[DATA][CLIENT_PORT]
297+
ha_changed = True
298+
299+
if ha_changed:
300+
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA(ip, port)
301+
272302
self.node.nodestack.onHostAddressChanged()
273303
self.node.clientstack.onHostAddressChanged()
274304
else:
@@ -411,6 +441,11 @@ def get_rank_of(self, node_id, nodeReg=None) -> Optional[int]:
411441
return None
412442
return self._get_rank(node_id, self.node_ids_ordered_by_rank(nodeReg))
413443

444+
def get_rank_by_name(self, name, nodeReg=None) -> Optional[int]:
445+
for nym, nm in self._ordered_node_ids.items():
446+
if name == nm:
447+
return self.get_rank_of(nym, nodeReg)
448+
414449
def get_name_by_rank(self, rank, nodeReg=None) -> Optional[str]:
415450
try:
416451
nym = self.node_ids_ordered_by_rank(nodeReg)[rank]
@@ -532,6 +567,9 @@ def get_rank_of(self, node_id, nodeReg=None) -> Optional[int]:
532567
# TODO node_id here has got another meaning
533568
return self._get_rank(node_id, self.node_names_ordered_by_rank(nodeReg))
534569

570+
def get_rank_by_name(self, name, nodeReg=None) -> Optional[int]:
571+
return self.get_rank_of(name, nodeReg)
572+
535573
def get_name_by_rank(self, rank, nodeReg=None) -> Optional[str]:
536574
try:
537575
return self.node_names_ordered_by_rank(nodeReg)[rank]

plenum/server/primary_selector.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,30 +38,50 @@ def _is_master_instance(self, instance_id):
3838
# Instance 0 is always master
3939
return instance_id == 0
4040

41-
def _get_primary_id(self, view_no, instance_id, total_nodes):
42-
return (view_no + instance_id) % total_nodes
41+
def _get_master_primary_id(self, view_no, total_nodes):
42+
return view_no % total_nodes
4343

44-
def next_primary_node_name(self, instance_id, nodeReg=None):
44+
def _next_primary_node_name_for_master(self, nodeReg=None):
4545
if nodeReg is None:
4646
nodeReg = self.node.nodeReg
47-
rank = self._get_primary_id(self.viewNo, instance_id, len(nodeReg))
47+
rank = self._get_master_primary_id(self.viewNo, len(nodeReg))
4848
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
4949

50-
logger.trace("{} selected {} as next primary node for instId {}, "
50+
assert name, "{} failed to get next primary node name for master instance".format(self)
51+
logger.trace("{} selected {} as next primary node for master instance, "
5152
"viewNo {} with rank {}, nodeReg {}".format(
52-
self, name, instance_id, self.viewNo, rank, nodeReg))
53-
assert name, "{} failed to get next primary node name".format(self)
54-
53+
self, name, self.viewNo, rank, nodeReg))
5554
return name
5655

57-
def next_primary_replica_name(self, instance_id, nodeReg=None):
56+
def next_primary_replica_name_for_master(self, nodeReg=None):
57+
"""
58+
Returns name and corresponding instance name of the next node which
59+
is supposed to be a new Primary. In fact it is not round-robin on
60+
this abstraction layer as currently the primary of master instance is
61+
pointed directly depending on view number, instance id and total
62+
number of nodes.
63+
But since the view number is incremented by 1 before primary selection
64+
then current approach may be treated as round robin.
65+
"""
66+
name = self._next_primary_node_name_for_master(nodeReg)
67+
return name, Replica.generateName(nodeName=name, instId=0)
68+
69+
def next_primary_replica_name_for_backup(self, instance_id, master_primary_rank,
70+
primaries, nodeReg=None):
5871
"""
59-
Returns name of the next node which is supposed to be a new Primary
60-
in round-robin fashion
72+
Returns name and corresponding instance name of the next node which
73+
is supposed to be a new Primary for backup instance in round-robin
74+
fashion starting from primary of master instance.
6175
"""
62-
return Replica.generateName(
63-
nodeName=self.next_primary_node_name(instance_id, nodeReg=nodeReg),
64-
instId=instance_id)
76+
if nodeReg is None:
77+
nodeReg = self.node.nodeReg
78+
total_nodes = len(nodeReg)
79+
rank = (master_primary_rank + 1) % total_nodes
80+
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
81+
while name in primaries:
82+
rank = (rank + 1) % total_nodes
83+
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
84+
return name, Replica.generateName(nodeName=name, instId=instance_id)
6585

6686
# overridden method of PrimaryDecider
6787
def start_election_for_instance(self, instance_id):

plenum/server/view_change/view_changer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def _is_propagated_view_change_completed(self):
144144
@property
145145
def has_view_change_from_primary(self) -> bool:
146146
if not self._has_view_change_from_primary:
147-
next_primary_name = self.node.elector.next_primary_node_name(0)
147+
next_primary_name = self.node.elector._next_primary_node_name_for_master()
148148

149149
if next_primary_name not in self._view_change_done:
150150
logger.debug(
@@ -579,7 +579,7 @@ def _verify_primary(self, new_primary, ledger_info):
579579
This method is called when sufficient number of ViewChangeDone
580580
received and makes steps to switch to the new primary
581581
"""
582-
expected_primary = self.node.elector.next_primary_node_name(0)
582+
expected_primary = self.node.elector._next_primary_node_name_for_master()
583583
if new_primary != expected_primary:
584584
logger.error("{}{} expected next primary to be {}, but majority "
585585
"declared {} instead for view {}"
@@ -595,7 +595,7 @@ def _send_view_change_done_message(self):
595595
"""
596596
Sends ViewChangeDone message to other protocol participants
597597
"""
598-
new_primary_name = self.node.elector.next_primary_node_name(0)
598+
new_primary_name = self.node.elector._next_primary_node_name_for_master()
599599
ledger_summary = self.node.ledger_summary
600600
message = ViewChangeDone(self.view_no,
601601
new_primary_name,

plenum/test/helper.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -635,11 +635,11 @@ def checkViewNoForNodes(nodes: Iterable[TestNode], expectedViewNo: int = None):
635635
for node in nodes:
636636
logger.debug("{}'s view no is {}".format(node, node.viewNo))
637637
viewNos.add(node.viewNo)
638-
assert len(viewNos) == 1
638+
assert len(viewNos) == 1, 'Expected 1, but got {}'.format(len(viewNos))
639639
vNo, = viewNos
640640
if expectedViewNo is not None:
641-
assert vNo >= expectedViewNo, ','.join(['{} -> Ratio: {}'.format(
642-
node.name, node.monitor.masterThroughputRatio()) for node in nodes])
641+
assert vNo >= expectedViewNo, \
642+
'Expected at least {}, but got {}'.format(expectedViewNo, vNo)
643643
return vNo
644644

645645

@@ -1023,8 +1023,6 @@ def sdk_get_reply(looper, sdk_req_resp, timeout=None):
10231023
try:
10241024
resp = looper.run(asyncio.wait_for(resp_task, timeout=timeout))
10251025
resp = json.loads(resp)
1026-
except asyncio.TimeoutError:
1027-
resp = None
10281026
except IndyError as e:
10291027
resp = e.error_code
10301028

@@ -1047,19 +1045,17 @@ def get_res(task, done_list):
10471045
resp = None
10481046
return resp
10491047

1050-
done, pend = looper.run(asyncio.wait(resp_tasks, timeout=timeout))
1051-
if pend:
1052-
raise AssertionError("{} transactions are still pending. Timeout: {}."
1053-
.format(len(pend), timeout))
1048+
done, pending = looper.run(asyncio.wait(resp_tasks, timeout=timeout))
1049+
if pending:
1050+
for task in pending:
1051+
task.cancel()
1052+
raise TimeoutError("{} requests timed out".format(len(pending)))
10541053
ret = [(req, get_res(resp, done)) for req, resp in sdk_req_resp]
10551054
return ret
10561055

10571056

10581057
def sdk_check_reply(req_res):
10591058
req, res = req_res
1060-
if res is None:
1061-
raise AssertionError("Got no confirmed result for request {}"
1062-
.format(req))
10631059
if isinstance(res, ErrorCode):
10641060
raise AssertionError("Got an error with code {} for request {}"
10651061
.format(res, req))

plenum/test/node_catchup/test_config_ledger.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def test_disconnected_node_catchup_config_ledger_txns(looper,
135135
new_node = newNodeCaughtUp
136136
disconnect_node_and_ensure_disconnected(
137137
looper, txnPoolNodeSet, new_node, stopNode=False)
138-
looper.removeProdable(new_node)
139138

140139
# Do some config txns; using a fixture as a method, passing some arguments
141140
# as None as they only make sense for the fixture (pre-requisites)
@@ -144,6 +143,5 @@ def test_disconnected_node_catchup_config_ledger_txns(looper,
144143
# Make sure new node got out of sync
145144
waitNodeDataInequality(looper, new_node, *txnPoolNodeSet[:-1])
146145

147-
looper.add(new_node)
148146
reconnect_node_and_ensure_connected(looper, txnPoolNodeSet, new_node)
149147
waitNodeDataEquality(looper, new_node, *txnPoolNodeSet[:-1])

0 commit comments

Comments
 (0)