Skip to content

Commit 83937fe

Browse files
authored
Merge pull request #811 from skhoroshavin/indy-1199
INDY-1199: Implement inconsistent 3PC state detection
2 parents fdf7323 + ced1050 commit 83937fe

18 files changed

+267
-16
lines changed

plenum/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,5 @@
310310
# 1 for recorder
311311
# 2 during replay
312312
STACK_COMPANION = 0
313+
314+
ENABLE_INCONSISTENCY_WATCHER_NETWORK = False
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from typing import Callable, Iterable
2+
from plenum.server.quorums import Quorums
3+
4+
5+
class NetworkInconsistencyWatcher:
6+
def __init__(self, cb: Callable):
7+
self.callback = cb
8+
self._nodes = set()
9+
self._connected = set()
10+
self._quorums = Quorums(0)
11+
self._reached_consensus = False
12+
13+
def connect(self, name: str):
14+
self._connected.add(name)
15+
if self._quorums.strong.is_reached(len(self._connected)):
16+
self._reached_consensus = True
17+
18+
def disconnect(self, name: str):
19+
self._connected.discard(name)
20+
if self._reached_consensus and not self._quorums.weak.is_reached(len(self._connected)):
21+
self._reached_consensus = False
22+
self.callback()
23+
24+
@property
25+
def nodes(self):
26+
return self._nodes
27+
28+
def set_nodes(self, nodes: Iterable[str]):
29+
self._nodes = set(nodes)
30+
self._quorums = Quorums(len(self._nodes))
31+
32+
def _has_consensus(self):
33+
return self._quorums.weak.is_reached(len(self._connected))

plenum/server/node.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from common.exceptions import LogicError
1313
from crypto.bls.bls_key_manager import LoadBLSKeyError
14+
from plenum.server.inconsistency_watchers import NetworkInconsistencyWatcher
1415
from state.pruning_state import PruningState
1516
from state.state import State
1617
from storage.helper import initKeyValueStorage, initHashStore, initKeyValueStorageIntKeys
@@ -59,7 +60,7 @@
5960
from plenum.common.signer_simple import SimpleSigner
6061
from plenum.common.stacks import nodeStackClass, clientStackClass
6162
from plenum.common.startable import Status, Mode
62-
from plenum.common.txn_util import idr_from_req_data, get_from, get_req_id, \
63+
from plenum.common.txn_util import idr_from_req_data, get_req_id, \
6364
get_seq_no, get_type, get_payload_data, \
6465
get_txn_time, get_digest
6566
from plenum.common.types import PLUGIN_TYPE_VERIFICATION, \
@@ -246,8 +247,13 @@ def __init__(self,
246247
self.nodeInBox = deque()
247248
self.clientInBox = deque()
248249

250+
# 3PC state consistency watchdog based on network events
251+
self.network_i3pc_watcher = NetworkInconsistencyWatcher(self.on_inconsistent_3pc_state_from_network)
252+
249253
self.setPoolParams()
250254

255+
self.network_i3pc_watcher.connect(self.name)
256+
251257
self.clientBlacklister = SimpleBlacklister(
252258
self.name + CLIENT_BLACKLISTER_SUFFIX) # type: Blacklister
253259

@@ -608,6 +614,14 @@ def on_view_change_complete(self):
608614
for replica in self.replicas:
609615
replica.clear_requests_and_fix_last_ordered()
610616

617+
def on_inconsistent_3pc_state_from_network(self):
618+
if self.config.ENABLE_INCONSISTENCY_WATCHER_NETWORK:
619+
self.on_inconsistent_3pc_state()
620+
621+
def on_inconsistent_3pc_state(self):
622+
logger.warning("There is high probability that current 3PC state is inconsistent,"
623+
"immediate restart is recommended")
624+
611625
def create_replicas(self) -> Replicas:
612626
return Replicas(self, self.monitor, self.config)
613627

@@ -655,6 +669,7 @@ def loadSeqNoDB(self):
655669
def setPoolParams(self):
656670
# TODO should be always called when nodeReg is changed - automate
657671
self.allNodeNames = set(self.nodeReg.keys())
672+
self.network_i3pc_watcher.set_nodes(self.allNodeNames)
658673
self.totalNodes = len(self.allNodeNames)
659674
self.f = getMaxFailures(self.totalNodes)
660675
self.requiredNumberOfInstances = self.f + 1 # per RBFT
@@ -1198,6 +1213,12 @@ def onConnsChanged(self, joined: Set[str], left: Set[str]):
11981213
self.send_current_state_to_lagging_node(node)
11991214
self.send_ledger_status_to_newly_connected_node(node)
12001215

1216+
for node in left:
1217+
self.network_i3pc_watcher.disconnect(node)
1218+
1219+
for node in joined:
1220+
self.network_i3pc_watcher.connect(node)
1221+
12011222
def request_ledger_status_from_nodes(self, ledger_id, nodes=None):
12021223
for node_name in nodes if nodes else self.nodeReg:
12031224
if node_name == self.name:

plenum/server/quorums.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ class Quorums:
1616
def __init__(self, n):
1717
f = getMaxFailures(n)
1818
self.f = f
19+
self.weak = Quorum(f + 1)
20+
self.strong = Quorum(n - f)
1921
self.propagate = Quorum(f + 1)
2022
self.prepare = Quorum(n - f - 1)
2123
self.commit = Quorum(n - f)

plenum/test/restart/helper.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def get_group(nodeSet, group_cnt, include_primary=False):
2020

2121

2222
def restart_nodes(looper, nodeSet, restart_set, tconf, tdir, allPluginsPath,
23-
after_restart_timeout=None, restart_one_by_one=True):
23+
after_restart_timeout=None, start_one_by_one=True, wait_for_elections=True):
2424
for node_to_stop in restart_set:
2525
node_to_stop.cleanupOnStopping = True
2626
node_to_stop.stop()
@@ -42,10 +42,11 @@ def restart_nodes(looper, nodeSet, restart_set, tconf, tdir, allPluginsPath,
4242
idx = nodeSet.index(node_to_restart)
4343
nodeSet[idx] = restarted_node
4444
rest_nodes += [restarted_node]
45-
if restart_one_by_one:
45+
if start_one_by_one:
4646
looper.run(checkNodesConnected(rest_nodes))
4747

48-
if not restart_one_by_one:
48+
if not start_one_by_one:
4949
looper.run(checkNodesConnected(nodeSet))
5050

51-
ensureElectionsDone(looper=looper, nodes=nodeSet)
51+
if wait_for_elections:
52+
ensureElectionsDone(looper=looper, nodes=nodeSet)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import pytest
2+
3+
from plenum.server.inconsistency_watchers import NetworkInconsistencyWatcher
4+
5+
DEFAULT_NODE_SET = {'Alpha', 'Beta', 'Gamma', 'Delta'}
6+
7+
8+
class WatcherCallbackMock:
9+
def __init__(self):
10+
self.call_count = 0
11+
12+
def __call__(self):
13+
self.call_count += 1
14+
15+
16+
@pytest.fixture
17+
def watcher():
18+
cb = WatcherCallbackMock()
19+
watcher = NetworkInconsistencyWatcher(cb)
20+
watcher.set_nodes(DEFAULT_NODE_SET)
21+
return watcher
22+
23+
24+
def _add_node(watcher: NetworkInconsistencyWatcher, node: str):
25+
nodes = watcher.nodes
26+
nodes.add(node)
27+
watcher.set_nodes(nodes)
28+
29+
30+
def _remove_node(watcher: NetworkInconsistencyWatcher, node: str):
31+
nodes = watcher.nodes
32+
nodes.discard(node)
33+
watcher.set_nodes(nodes)
34+
35+
36+
def test_watcher_is_not_triggered_when_created(watcher: NetworkInconsistencyWatcher):
37+
assert watcher.callback.call_count == 0
38+
39+
40+
def test_watcher_is_not_triggered_when_nodes_are_initially_connected(watcher: NetworkInconsistencyWatcher):
41+
watcher.connect('Alpha')
42+
watcher.connect('Beta')
43+
watcher.connect('Gamma')
44+
watcher.connect('Delta')
45+
46+
assert watcher.callback.call_count == 0
47+
48+
49+
def test_watcher_is_not_triggered_when_just_one_node_connects_and_disconnects(watcher: NetworkInconsistencyWatcher):
50+
watcher.connect('Alpha')
51+
watcher.disconnect('Alpha')
52+
53+
assert watcher.callback.call_count == 0
54+
55+
56+
def test_watcher_is_triggered_when_going_below_consensus(watcher: NetworkInconsistencyWatcher):
57+
watcher.connect('Alpha')
58+
watcher.connect('Beta')
59+
watcher.connect('Gamma')
60+
watcher.disconnect('Beta')
61+
watcher.disconnect('Gamma')
62+
63+
assert watcher.callback.call_count == 1
64+
65+
66+
def test_watcher_is_not_triggered_when_going_below_consensus_without_going_above_strong_quorum(watcher: NetworkInconsistencyWatcher):
67+
watcher.connect('Alpha')
68+
watcher.connect('Beta')
69+
watcher.disconnect('Beta')
70+
71+
assert watcher.callback.call_count == 0
72+
73+
74+
def test_watcher_is_not_triggered_when_adding_nodes_while_on_edge_of_consensus(watcher: NetworkInconsistencyWatcher):
75+
watcher.connect('Alpha')
76+
watcher.connect('Beta')
77+
watcher.connect('Gamma')
78+
watcher.disconnect('Gamma')
79+
_add_node(watcher, 'Epsilon')
80+
_add_node(watcher, 'Zeta')
81+
_add_node(watcher, 'Eta')
82+
83+
assert watcher.callback.call_count == 0
84+
85+
86+
def test_watcher_is_not_triggered_when_removing_nodes_below_minimum_count(watcher: NetworkInconsistencyWatcher):
87+
watcher.connect('Alpha')
88+
watcher.connect('Beta')
89+
watcher.connect('Gamma')
90+
watcher.connect('Delta')
91+
_remove_node(watcher, 'Delta')
92+
93+
assert watcher.callback.call_count == 0
94+
95+
96+
def test_watcher_is_not_triggered_when_removing_nodes_and_going_below_consensus(watcher: NetworkInconsistencyWatcher):
97+
_add_node(watcher, 'Theta')
98+
watcher.connect('Alpha')
99+
watcher.connect('Beta')
100+
watcher.connect('Gamma')
101+
watcher.connect('Theta')
102+
watcher.disconnect('Beta')
103+
watcher.disconnect('Gamma')
104+
_remove_node(watcher, 'Theta')
105+
106+
assert watcher.callback.call_count == 0
107+
108+
109+
def test_watcher_is_not_triggered_when_just_three_nodes_connect_and_disconnect_in_7_node_pool(
110+
watcher: NetworkInconsistencyWatcher):
111+
watcher.set_nodes(['Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon', 'Zeta', 'Eta'])
112+
watcher.connect('Alpha')
113+
watcher.connect('Beta')
114+
watcher.connect('Gamma')
115+
watcher.disconnect('Alpha')
116+
watcher.disconnect('Beta')
117+
watcher.disconnect('Gamma')
118+
119+
assert watcher.callback.call_count == 0

plenum/test/restart/test_restart_node_4_all.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_np_no_tm(looper, txnPoolNodeSet, tconf, tdir,
1212
restart_group = get_group(txnPoolNodeSet, 4, include_primary=False)
1313

1414
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
15-
after_restart_timeout=tm, restart_one_by_one=False)
15+
after_restart_timeout=tm, start_one_by_one=False)
1616
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_np_tm(looper, txnPoolNodeSet, tconf, tdir,
1212
restart_group = get_group(txnPoolNodeSet, 4, include_primary=False)
1313

1414
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
15-
after_restart_timeout=tm, restart_one_by_one=True)
15+
after_restart_timeout=tm, start_one_by_one=True)
1616
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_4_all_wp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
1212
restart_group = get_group(txnPoolNodeSet, 4, include_primary=True)
1313

1414
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
15-
after_restart_timeout=tm, restart_one_by_one=False)
15+
after_restart_timeout=tm, start_one_by_one=False)
1616
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_4_np.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
1212
restart_group = get_group(txnPoolNodeSet, 4, include_primary=True)
1313

1414
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
15-
after_restart_timeout=tm, restart_one_by_one=True)
15+
after_restart_timeout=tm, start_one_by_one=True)
1616
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_6_all_np.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_np_no_tm(looper, txnPoolNodeSet, tconf, tdir,
1212
restart_group = get_group(txnPoolNodeSet, 6, include_primary=False)
1313

1414
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
15-
after_restart_timeout=tm, restart_one_by_one=False)
15+
after_restart_timeout=tm, start_one_by_one=False)
1616
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_6_all_wp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
1212
restart_group = get_group(txnPoolNodeSet, 6, include_primary=True)
1313

1414
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
15-
after_restart_timeout=tm, restart_one_by_one=False)
15+
after_restart_timeout=tm, start_one_by_one=False)
1616
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_6_np.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ def test_restart_groups_6_of_7_np_tm(looper, txnPoolNodeSet, tconf, tdir,
1414
restart_group = get_group(txnPoolNodeSet, 6, include_primary=False)
1515

1616
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
17-
after_restart_timeout=tm, restart_one_by_one=True)
17+
after_restart_timeout=tm, start_one_by_one=True)
1818
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_6_wp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ def test_restart_groups_6_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
1414
restart_group = get_group(txnPoolNodeSet, 6, include_primary=True)
1515

1616
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
17-
after_restart_timeout=tm, restart_one_by_one=True)
17+
after_restart_timeout=tm, start_one_by_one=True)
1818
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_7.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ def test_restart_groups_7_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
1414
restart_group = get_group(txnPoolNodeSet, 7, include_primary=False)
1515

1616
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
17-
after_restart_timeout=tm, restart_one_by_one=True)
17+
after_restart_timeout=tm, start_one_by_one=True)
1818
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

plenum/test/restart/test_restart_nodes_7_all.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ def test_restart_groups_7_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
1212
restart_group = get_group(txnPoolNodeSet, 7, include_primary=True)
1313

1414
restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
15-
after_restart_timeout=tm, restart_one_by_one=False)
15+
after_restart_timeout=tm, start_one_by_one=False)
1616
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)

0 commit comments

Comments
 (0)