Skip to content

Commit f4a5fa8

Browse files
sergey-shilovashcherbakov
authored andcommitted
INDY-1112: change primeries election procedure for backup instances. (#539)
* INDY-1112: change primeries election procedure for backup instances. Now primaries for backup instances are choosen in round-robin manner always starting from primary. If the next node is a primary for some instance then this node is skipped. So the first non-primary node is choosen as primary for current instance. Such approach allows to avoid election of instances of the same node as a primeries for different instances. The election procedure of the primary for master instance is not changed. Signed-off-by: Sergey Shilov <[email protected]> * Add some stylistical fixes, add comments. Signed-off-by: Sergey Shilov <[email protected]> * Add test for primary demotion and promotion. Signed-off-by: Sergey Shilov <[email protected]> * Add test for primaries selection routines. Signed-off-by: Sergey Shilov <[email protected]> * Re-factor primary selector tests. Signed-off-by: Sergey Shilov <[email protected]> * Remove adding of node itself to nodeReg during initialisation of txnPoolManager. Now a node adds itself to nodeReg during catch-up of pool ledger. Signed-off-by: Sergey Shilov <[email protected]> * Fix test. Signed-off-by: Sergey Shilov <[email protected]> * Update node's HA in node registry on pool ledger catch-up reply. Signed-off-by: Sergey Shilov <[email protected]> * Add separate checks for IP/port to be updated for HA and cliHA. Signed-off-by: Sergey Shilov <[email protected]>
1 parent b5f63b3 commit f4a5fa8

10 files changed

+401
-66
lines changed

plenum/common/stack_manager.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,46 @@ def connectNewRemote(self, txn, remoteName, nodeOrClientObj,
170170
nodeOrClientObj.nodestack.maintainConnections(force=True)
171171

172172
def stackHaChanged(self, txn, remoteName, nodeOrClientObj):
173-
nodeHa = (txn[DATA][NODE_IP], txn[DATA][NODE_PORT])
174-
cliHa = (txn[DATA][CLIENT_IP], txn[DATA][CLIENT_PORT])
173+
nodeHa = None
174+
cliHa = None
175+
if self.isNode:
176+
node_ha_changed = False
177+
(ip, port) = nodeOrClientObj.nodeReg[remoteName]
178+
if NODE_IP in txn[DATA] and ip != txn[DATA][NODE_IP]:
179+
ip = txn[DATA][NODE_IP]
180+
node_ha_changed = True
181+
182+
if NODE_PORT in txn[DATA] and port != txn[DATA][NODE_PORT]:
183+
port = txn[DATA][NODE_PORT]
184+
node_ha_changed = True
185+
186+
if node_ha_changed:
187+
nodeHa = (ip, port)
188+
189+
cli_ha_changed = False
190+
(ip, port) = nodeOrClientObj.cliNodeReg[remoteName + CLIENT_STACK_SUFFIX] \
191+
if self.isNode \
192+
else nodeOrClientObj.nodeReg[remoteName]
193+
194+
if CLIENT_IP in txn[DATA] and ip != txn[DATA][CLIENT_IP]:
195+
ip = txn[DATA][CLIENT_IP]
196+
cli_ha_changed = True
197+
198+
if CLIENT_PORT in txn[DATA] and port != txn[DATA][CLIENT_PORT]:
199+
port = txn[DATA][CLIENT_PORT]
200+
cli_ha_changed = True
201+
202+
if cli_ha_changed:
203+
cliHa = (ip, port)
204+
175205
rid = self.removeRemote(nodeOrClientObj.nodestack, remoteName)
176206
if self.isNode:
177-
nodeOrClientObj.nodeReg[remoteName] = HA(*nodeHa)
178-
nodeOrClientObj.cliNodeReg[remoteName +
179-
CLIENT_STACK_SUFFIX] = HA(*cliHa)
180-
else:
207+
if nodeHa:
208+
nodeOrClientObj.nodeReg[remoteName] = HA(*nodeHa)
209+
if cliHa:
210+
nodeOrClientObj.cliNodeReg[remoteName +
211+
CLIENT_STACK_SUFFIX] = HA(*cliHa)
212+
elif cliHa:
181213
nodeOrClientObj.nodeReg[remoteName] = HA(*cliHa)
182214

183215
# Attempt connection at the new HA

plenum/server/node.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,9 @@ def rank(self) -> Optional[int]:
882882
def get_name_by_rank(self, rank, nodeReg=None):
883883
return self.poolManager.get_name_by_rank(rank, nodeReg=nodeReg)
884884

885+
def get_rank_by_name(self, name, nodeReg=None):
886+
return self.poolManager.get_rank_by_name(name, nodeReg=nodeReg)
887+
885888
def newViewChanger(self):
886889
if self.view_changer:
887890
return self.view_changer
@@ -2354,15 +2357,41 @@ def lost_master_primary(self):
23542357
self._schedule_view_change()
23552358

23562359
def select_primaries(self, nodeReg: Dict[str, HA]=None):
2360+
primaries = set()
2361+
primary_rank = None
2362+
'''
2363+
Build a set of names of primaries, it is needed to avoid
2364+
duplicates of primary nodes for different replicas.
2365+
'''
2366+
for instance_id, replica in enumerate(self.replicas):
2367+
if replica.primaryName is not None:
2368+
name = replica.primaryName.split(":", 1)[0]
2369+
primaries.add(name)
2370+
'''
2371+
Remember the rank of primary of master instance, it is needed
2372+
for calculation of primaries for backup instances.
2373+
'''
2374+
if instance_id == 0:
2375+
primary_rank = self.get_rank_by_name(name, nodeReg)
2376+
23572377
for instance_id, replica in enumerate(self.replicas):
23582378
if replica.primaryName is not None:
23592379
logger.debug('{} already has a primary'.format(replica))
23602380
continue
2361-
new_primary_name = self.elector.next_primary_replica_name(
2362-
instance_id, nodeReg=nodeReg)
2381+
if instance_id == 0:
2382+
new_primary_name, new_primary_instance_name =\
2383+
self.elector.next_primary_replica_name_for_master(nodeReg=nodeReg)
2384+
primary_rank = self.get_rank_by_name(
2385+
new_primary_name, nodeReg)
2386+
else:
2387+
assert primary_rank is not None
2388+
new_primary_name, new_primary_instance_name =\
2389+
self.elector.next_primary_replica_name_for_backup(
2390+
instance_id, primary_rank, primaries, nodeReg=nodeReg)
2391+
primaries.add(new_primary_name)
23632392
logger.display("{}{} selected primary {} for instance {} (view {})"
23642393
.format(PRIMARY_SELECTION_PREFIX, replica,
2365-
new_primary_name, instance_id, self.viewNo),
2394+
new_primary_instance_name, instance_id, self.viewNo),
23662395
extra={"cli": "ANNOUNCE",
23672396
"tags": ["node-election"]})
23682397
if instance_id == 0:
@@ -2372,7 +2401,7 @@ def select_primaries(self, nodeReg: Dict[str, HA]=None):
23722401
# participating.
23732402
self.start_participating()
23742403

2375-
replica.primaryChanged(new_primary_name)
2404+
replica.primaryChanged(new_primary_instance_name)
23762405
self.primary_selected(instance_id)
23772406

23782407
logger.display("{}{} declares view change {} as completed for "
@@ -2383,7 +2412,7 @@ def select_primaries(self, nodeReg: Dict[str, HA]=None):
23832412
replica,
23842413
self.viewNo,
23852414
instance_id,
2386-
new_primary_name,
2415+
new_primary_instance_name,
23872416
self.ledger_summary),
23882417
extra={"cli": "ANNOUNCE",
23892418
"tags": ["node-election"]})

plenum/server/pool_manager.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ def getStackParamsAndNodeReg(self, name, keys_dir, nodeRegistry=None,
177177
ha=HA('0.0.0.0', ha[1]),
178178
main=True,
179179
auth_mode=AuthMode.RESTRICTED.value)
180-
nodeReg[name] = HA(*ha)
181180

182181
cliname = cliname or (name + CLIENT_STACK_SUFFIX)
183182
if not cliha:
@@ -186,7 +185,6 @@ def getStackParamsAndNodeReg(self, name, keys_dir, nodeRegistry=None,
186185
ha=HA('0.0.0.0', cliha[1]),
187186
main=True,
188187
auth_mode=AuthMode.ALLOW_ANY.value)
189-
cliNodeReg[cliname] = HA(*cliha)
190188

191189
if keys_dir:
192190
nstack['basedirpath'] = keys_dir
@@ -214,7 +212,7 @@ def executePoolTxnBatch(self, ppTime, reqs, stateRoot, txnRoot) -> List:
214212
return committedTxns
215213

216214
def onPoolMembershipChange(self, txn):
217-
if txn[TXN_TYPE] == NODE:
215+
if txn[TXN_TYPE] == NODE and DATA in txn:
218216
nodeName = txn[DATA][ALIAS]
219217
nodeNym = txn[TARGET_NYM]
220218

@@ -253,10 +251,15 @@ def _updateNode(txn):
253251
def addNewNodeAndConnect(self, txn):
254252
nodeName = txn[DATA][ALIAS]
255253
if nodeName == self.name:
256-
logger.debug("{} not adding itself to node registry".
254+
logger.debug("{} adding itself to node registry".
257255
format(self.name))
258-
return
259-
self.connectNewRemote(txn, nodeName, self.node)
256+
self.node.nodeReg[nodeName] = HA(txn[DATA][NODE_IP],
257+
txn[DATA][NODE_PORT])
258+
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = \
259+
HA(txn[DATA][CLIENT_IP],
260+
txn[DATA][CLIENT_PORT])
261+
else:
262+
self.connectNewRemote(txn, nodeName, self.node, nodeName != self.name)
260263
self.node.nodeJoined(txn)
261264

262265
def node_about_to_be_disconnected(self, nodeName):
@@ -269,6 +272,33 @@ def nodeHaChanged(self, txn):
269272
# TODO: Check if new HA is same as old HA and only update if
270273
# new HA is different.
271274
if nodeName == self.name:
275+
# Update itself in node registry if needed
276+
ha_changed = False
277+
(ip, port) = self.node.nodeReg[nodeName]
278+
if NODE_IP in txn[DATA] and ip != txn[DATA][NODE_IP]:
279+
ip = txn[DATA][NODE_IP]
280+
ha_changed = True
281+
282+
if NODE_PORT in txn[DATA] and port != txn[DATA][NODE_PORT]:
283+
port = txn[DATA][NODE_PORT]
284+
ha_changed = True
285+
286+
if ha_changed:
287+
self.node.nodeReg[nodeName] = HA(ip, port)
288+
289+
ha_changed = False
290+
(ip, port) = self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX]
291+
if CLIENT_IP in txn[DATA] and ip != txn[DATA][CLIENT_IP]:
292+
ip = txn[DATA][CLIENT_IP]
293+
ha_changed = True
294+
295+
if CLIENT_PORT in txn[DATA] and port != txn[DATA][CLIENT_PORT]:
296+
port = txn[DATA][CLIENT_PORT]
297+
ha_changed = True
298+
299+
if ha_changed:
300+
self.node.cliNodeReg[nodeName + CLIENT_STACK_SUFFIX] = HA(ip, port)
301+
272302
self.node.nodestack.onHostAddressChanged()
273303
self.node.clientstack.onHostAddressChanged()
274304
else:
@@ -411,6 +441,11 @@ def get_rank_of(self, node_id, nodeReg=None) -> Optional[int]:
411441
return None
412442
return self._get_rank(node_id, self.node_ids_ordered_by_rank(nodeReg))
413443

444+
def get_rank_by_name(self, name, nodeReg=None) -> Optional[int]:
445+
for nym, nm in self._ordered_node_ids.items():
446+
if name == nm:
447+
return self.get_rank_of(nym, nodeReg)
448+
414449
def get_name_by_rank(self, rank, nodeReg=None) -> Optional[str]:
415450
try:
416451
nym = self.node_ids_ordered_by_rank(nodeReg)[rank]
@@ -532,6 +567,9 @@ def get_rank_of(self, node_id, nodeReg=None) -> Optional[int]:
532567
# TODO node_id here has got another meaning
533568
return self._get_rank(node_id, self.node_names_ordered_by_rank(nodeReg))
534569

570+
def get_rank_by_name(self, name, nodeReg=None) -> Optional[int]:
571+
return self.get_rank_of(name, nodeReg)
572+
535573
def get_name_by_rank(self, rank, nodeReg=None) -> Optional[str]:
536574
try:
537575
return self.node_names_ordered_by_rank(nodeReg)[rank]

plenum/server/primary_selector.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,30 +38,50 @@ def _is_master_instance(self, instance_id):
3838
# Instance 0 is always master
3939
return instance_id == 0
4040

41-
def _get_primary_id(self, view_no, instance_id, total_nodes):
42-
return (view_no + instance_id) % total_nodes
41+
def _get_master_primary_id(self, view_no, total_nodes):
42+
return view_no % total_nodes
4343

44-
def next_primary_node_name(self, instance_id, nodeReg=None):
44+
def _next_primary_node_name_for_master(self, nodeReg=None):
4545
if nodeReg is None:
4646
nodeReg = self.node.nodeReg
47-
rank = self._get_primary_id(self.viewNo, instance_id, len(nodeReg))
47+
rank = self._get_master_primary_id(self.viewNo, len(nodeReg))
4848
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
4949

50-
logger.trace("{} selected {} as next primary node for instId {}, "
50+
assert name, "{} failed to get next primary node name for master instance".format(self)
51+
logger.trace("{} selected {} as next primary node for master instance, "
5152
"viewNo {} with rank {}, nodeReg {}".format(
52-
self, name, instance_id, self.viewNo, rank, nodeReg))
53-
assert name, "{} failed to get next primary node name".format(self)
54-
53+
self, name, self.viewNo, rank, nodeReg))
5554
return name
5655

57-
def next_primary_replica_name(self, instance_id, nodeReg=None):
56+
def next_primary_replica_name_for_master(self, nodeReg=None):
57+
"""
58+
Returns name and corresponding instance name of the next node which
59+
is supposed to be a new Primary. In fact it is not round-robin on
60+
this abstraction layer as currently the primary of master instance is
61+
pointed directly depending on view number, instance id and total
62+
number of nodes.
63+
But since the view number is incremented by 1 before primary selection
64+
then current approach may be treated as round robin.
65+
"""
66+
name = self._next_primary_node_name_for_master(nodeReg)
67+
return name, Replica.generateName(nodeName=name, instId=0)
68+
69+
def next_primary_replica_name_for_backup(self, instance_id, master_primary_rank,
70+
primaries, nodeReg=None):
5871
"""
59-
Returns name of the next node which is supposed to be a new Primary
60-
in round-robin fashion
72+
Returns name and corresponding instance name of the next node which
73+
is supposed to be a new Primary for backup instance in round-robin
74+
fashion starting from primary of master instance.
6175
"""
62-
return Replica.generateName(
63-
nodeName=self.next_primary_node_name(instance_id, nodeReg=nodeReg),
64-
instId=instance_id)
76+
if nodeReg is None:
77+
nodeReg = self.node.nodeReg
78+
total_nodes = len(nodeReg)
79+
rank = (master_primary_rank + 1) % total_nodes
80+
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
81+
while name in primaries:
82+
rank = (rank + 1) % total_nodes
83+
name = self.node.get_name_by_rank(rank, nodeReg=nodeReg)
84+
return name, Replica.generateName(nodeName=name, instId=instance_id)
6585

6686
# overridden method of PrimaryDecider
6787
def start_election_for_instance(self, instance_id):

plenum/server/view_change/view_changer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def _is_propagated_view_change_completed(self):
144144
@property
145145
def has_view_change_from_primary(self) -> bool:
146146
if not self._has_view_change_from_primary:
147-
next_primary_name = self.node.elector.next_primary_node_name(0)
147+
next_primary_name = self.node.elector._next_primary_node_name_for_master()
148148

149149
if next_primary_name not in self._view_change_done:
150150
logger.debug(
@@ -579,7 +579,7 @@ def _verify_primary(self, new_primary, ledger_info):
579579
This method is called when sufficient number of ViewChangeDone
580580
received and makes steps to switch to the new primary
581581
"""
582-
expected_primary = self.node.elector.next_primary_node_name(0)
582+
expected_primary = self.node.elector._next_primary_node_name_for_master()
583583
if new_primary != expected_primary:
584584
logger.error("{}{} expected next primary to be {}, but majority "
585585
"declared {} instead for view {}"
@@ -595,7 +595,7 @@ def _send_view_change_done_message(self):
595595
"""
596596
Sends ViewChangeDone message to other protocol participants
597597
"""
598-
new_primary_name = self.node.elector.next_primary_node_name(0)
598+
new_primary_name = self.node.elector._next_primary_node_name_for_master()
599599
ledger_summary = self.node.ledger_summary
600600
message = ViewChangeDone(self.view_no,
601601
new_primary_name,

plenum/test/pool_transactions/helper.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,13 @@ def start_not_added_node(looper,
144144
tdir, randomString(32).encode(),
145145
(nodeIp, nodePort), (clientIp, clientPort),
146146
tconf, True, allPluginsPath, TestNode)
147-
return sigseed, bls_key, new_node
147+
return sigseed, bls_key, new_node, (nodeIp, nodePort), (clientIp, clientPort)
148148

149149

150150
def add_started_node(looper,
151151
new_node,
152+
node_ha,
153+
client_ha,
152154
txnPoolNodeSet,
153155
client_tdir,
154156
stewardClient, stewardWallet,
@@ -165,10 +167,10 @@ def add_started_node(looper,
165167
clientClass=TestClient)
166168
node_name = new_node.name
167169
send_new_node_txn(sigseed,
168-
new_node.poolManager.nodeReg[node_name][0],
169-
new_node.poolManager.nodeReg[node_name][1],
170-
new_node.poolManager.cliNodeReg[node_name + "C"][0],
171-
new_node.poolManager.cliNodeReg[node_name + "C"][1],
170+
node_ha[0],
171+
node_ha[1],
172+
client_ha[0],
173+
client_ha[1],
172174
bls_key,
173175
node_name,
174176
newSteward, newStewardWallet)

0 commit comments

Comments
 (0)