Skip to content

INDY-1199: Implement inconsistent 3PC state detection #811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 18, 2018
Merged
2 changes: 2 additions & 0 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,5 @@
# 1 for recorder
# 2 during replay
STACK_COMPANION = 0

ENABLE_INCONSISTENCY_WATCHER_NETWORK = False
33 changes: 33 additions & 0 deletions plenum/server/inconsistency_watchers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Callable, Iterable
from plenum.server.quorums import Quorums


class NetworkInconsistencyWatcher:
def __init__(self, cb: Callable):
self.callback = cb
self._nodes = set()
self._connected = set()
self._quorums = Quorums(0)
self._reached_consensus = False

def connect(self, name: str):
self._connected.add(name)
if self._quorums.strong.is_reached(len(self._connected)):
self._reached_consensus = True

def disconnect(self, name: str):
self._connected.discard(name)
if self._reached_consensus and not self._quorums.weak.is_reached(len(self._connected)):
self._reached_consensus = False
self.callback()

@property
def nodes(self):
return self._nodes

def set_nodes(self, nodes: Iterable[str]):
self._nodes = set(nodes)
self._quorums = Quorums(len(self._nodes))

def _has_consensus(self):
return self._quorums.weak.is_reached(len(self._connected))
23 changes: 22 additions & 1 deletion plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from common.exceptions import LogicError
from crypto.bls.bls_key_manager import LoadBLSKeyError
from plenum.server.inconsistency_watchers import NetworkInconsistencyWatcher
from state.pruning_state import PruningState
from state.state import State
from storage.helper import initKeyValueStorage, initHashStore, initKeyValueStorageIntKeys
Expand Down Expand Up @@ -59,7 +60,7 @@
from plenum.common.signer_simple import SimpleSigner
from plenum.common.stacks import nodeStackClass, clientStackClass
from plenum.common.startable import Status, Mode
from plenum.common.txn_util import idr_from_req_data, get_from, get_req_id, \
from plenum.common.txn_util import idr_from_req_data, get_req_id, \
get_seq_no, get_type, get_payload_data, \
get_txn_time, get_digest
from plenum.common.types import PLUGIN_TYPE_VERIFICATION, \
Expand Down Expand Up @@ -246,8 +247,13 @@ def __init__(self,
self.nodeInBox = deque()
self.clientInBox = deque()

# 3PC state consistency watchdog based on network events
self.network_i3pc_watcher = NetworkInconsistencyWatcher(self.on_inconsistent_3pc_state_from_network)

self.setPoolParams()

self.network_i3pc_watcher.connect(self.name)

self.clientBlacklister = SimpleBlacklister(
self.name + CLIENT_BLACKLISTER_SUFFIX) # type: Blacklister

Expand Down Expand Up @@ -607,6 +613,14 @@ def on_view_change_complete(self):
for replica in self.replicas:
replica.clear_requests_and_fix_last_ordered()

def on_inconsistent_3pc_state_from_network(self):
if self.config.ENABLE_INCONSISTENCY_WATCHER_NETWORK:
self.on_inconsistent_3pc_state()

def on_inconsistent_3pc_state(self):
logger.warning("There is high probability that current 3PC state is inconsistent,"
"immediate restart is recommended")

def create_replicas(self) -> Replicas:
return Replicas(self, self.monitor, self.config)

Expand Down Expand Up @@ -654,6 +668,7 @@ def loadSeqNoDB(self):
def setPoolParams(self):
# TODO should be always called when nodeReg is changed - automate
self.allNodeNames = set(self.nodeReg.keys())
self.network_i3pc_watcher.set_nodes(self.allNodeNames)
self.totalNodes = len(self.allNodeNames)
self.f = getMaxFailures(self.totalNodes)
self.requiredNumberOfInstances = self.f + 1 # per RBFT
Expand Down Expand Up @@ -1197,6 +1212,12 @@ def onConnsChanged(self, joined: Set[str], left: Set[str]):
self.send_current_state_to_lagging_node(node)
self.send_ledger_status_to_newly_connected_node(node)

for node in left:
self.network_i3pc_watcher.disconnect(node)

for node in joined:
self.network_i3pc_watcher.connect(node)

def request_ledger_status_from_nodes(self, ledger_id, nodes=None):
for node_name in nodes if nodes else self.nodeReg:
if node_name == self.name:
Expand Down
2 changes: 2 additions & 0 deletions plenum/server/quorums.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class Quorums:
def __init__(self, n):
f = getMaxFailures(n)
self.f = f
self.weak = Quorum(f + 1)
self.strong = Quorum(n - f)
self.propagate = Quorum(f + 1)
self.prepare = Quorum(n - f - 1)
self.commit = Quorum(n - f)
Expand Down
9 changes: 5 additions & 4 deletions plenum/test/restart/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_group(nodeSet, group_cnt, include_primary=False):


def restart_nodes(looper, nodeSet, restart_set, tconf, tdir, allPluginsPath,
after_restart_timeout=None, restart_one_by_one=True):
after_restart_timeout=None, start_one_by_one=True, wait_for_elections=True):
for node_to_stop in restart_set:
node_to_stop.cleanupOnStopping = True
node_to_stop.stop()
Expand All @@ -42,10 +42,11 @@ def restart_nodes(looper, nodeSet, restart_set, tconf, tdir, allPluginsPath,
idx = nodeSet.index(node_to_restart)
nodeSet[idx] = restarted_node
rest_nodes += [restarted_node]
if restart_one_by_one:
if start_one_by_one:
looper.run(checkNodesConnected(rest_nodes))

if not restart_one_by_one:
if not start_one_by_one:
looper.run(checkNodesConnected(nodeSet))

ensureElectionsDone(looper=looper, nodes=nodeSet)
if wait_for_elections:
ensureElectionsDone(looper=looper, nodes=nodeSet)
119 changes: 119 additions & 0 deletions plenum/test/restart/test_network_inconsistency_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import pytest

from plenum.server.inconsistency_watchers import NetworkInconsistencyWatcher

DEFAULT_NODE_SET = {'Alpha', 'Beta', 'Gamma', 'Delta'}


class WatcherCallbackMock:
def __init__(self):
self.call_count = 0

def __call__(self):
self.call_count += 1


@pytest.fixture
def watcher():
cb = WatcherCallbackMock()
watcher = NetworkInconsistencyWatcher(cb)
watcher.set_nodes(DEFAULT_NODE_SET)
return watcher


def _add_node(watcher: NetworkInconsistencyWatcher, node: str):
nodes = watcher.nodes
nodes.add(node)
watcher.set_nodes(nodes)


def _remove_node(watcher: NetworkInconsistencyWatcher, node: str):
nodes = watcher.nodes
nodes.discard(node)
watcher.set_nodes(nodes)


def test_watcher_is_not_triggered_when_created(watcher: NetworkInconsistencyWatcher):
assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_nodes_are_initially_connected(watcher: NetworkInconsistencyWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.connect('Delta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_just_one_node_connects_and_disconnects(watcher: NetworkInconsistencyWatcher):
watcher.connect('Alpha')
watcher.disconnect('Alpha')

assert watcher.callback.call_count == 0


def test_watcher_is_triggered_when_going_below_consensus(watcher: NetworkInconsistencyWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.disconnect('Beta')
watcher.disconnect('Gamma')

assert watcher.callback.call_count == 1


def test_watcher_is_not_triggered_when_going_below_consensus_without_going_above_strong_quorum(watcher: NetworkInconsistencyWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.disconnect('Beta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_adding_nodes_while_on_edge_of_consensus(watcher: NetworkInconsistencyWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.disconnect('Gamma')
_add_node(watcher, 'Epsilon')
_add_node(watcher, 'Zeta')
_add_node(watcher, 'Eta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_removing_nodes_below_minimum_count(watcher: NetworkInconsistencyWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.connect('Delta')
_remove_node(watcher, 'Delta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_removing_nodes_and_going_below_consensus(watcher: NetworkInconsistencyWatcher):
_add_node(watcher, 'Theta')
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.connect('Theta')
watcher.disconnect('Beta')
watcher.disconnect('Gamma')
_remove_node(watcher, 'Theta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_just_three_nodes_connect_and_disconnect_in_7_node_pool(
watcher: NetworkInconsistencyWatcher):
watcher.set_nodes(['Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon', 'Zeta', 'Eta'])
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.disconnect('Alpha')
watcher.disconnect('Beta')
watcher.disconnect('Gamma')

assert watcher.callback.call_count == 0
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_node_4_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_np_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_np_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_4_all_wp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_4_np.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_all_np.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_np_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_all_wp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_np.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_np_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_wp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_7_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 7, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_7_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_7_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 7, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
Loading