Skip to content

INDY-1199: Implement inconsistent 3PC state detection #811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 18, 2018
Merged
2 changes: 2 additions & 0 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,5 @@
# 1 for recorder
# 2 during replay
STACK_COMPANION = 0

ENABLE_NETWORK_I3PC_WATCHER = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is I3PC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent 3PC state, should I add comments or change name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ENABLE_INCONSISTENCY_WATCHER_NETWORK?

25 changes: 25 additions & 0 deletions plenum/server/i3pc_watchers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Callable, Iterable


class NetworkI3PCWatcher:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NetworkInconcistencyWatcher?

def __init__(self, cb: Callable):
self.nodes = set()
self.connected = set()
self.callback = cb

def connect(self, name: str):
self.connected.add(name)

def disconnect(self, name: str):
had_consensus = self._has_consensus()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we face disconnection during initial establishing of connections:

  • consider a pool of 4 nodes
  • a node connected to Node1
  • a node connected to Node2 (so f+1 now)
  • a node disconnected from Node 2 (f now) => restart???

Maybe we should do restart only if we go to <=f connection, AND we were connected to at least n-f nodes before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed that case, going to fix soon

self.connected.discard(name)
if had_consensus and not self._has_consensus():
self.callback()

def set_nodes(self, nodes: Iterable[str]):
self.nodes = set(nodes)

def _has_consensus(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Quorums class (add a new field there is needed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will use it

n = len(self.nodes)
f = (n - 1) // 3
return len(self.connected) > f
23 changes: 22 additions & 1 deletion plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from common.exceptions import LogicError
from crypto.bls.bls_key_manager import LoadBLSKeyError
from plenum.server.i3pc_watchers import NetworkI3PCWatcher
from state.pruning_state import PruningState
from state.state import State
from storage.helper import initKeyValueStorage, initHashStore, initKeyValueStorageIntKeys
Expand Down Expand Up @@ -59,7 +60,7 @@
from plenum.common.signer_simple import SimpleSigner
from plenum.common.stacks import nodeStackClass, clientStackClass
from plenum.common.startable import Status, Mode
from plenum.common.txn_util import idr_from_req_data, get_from, get_req_id, \
from plenum.common.txn_util import idr_from_req_data, get_req_id, \
get_seq_no, get_type, get_payload_data, \
get_txn_time, get_digest
from plenum.common.types import PLUGIN_TYPE_VERIFICATION, \
Expand Down Expand Up @@ -246,8 +247,13 @@ def __init__(self,
self.nodeInBox = deque()
self.clientInBox = deque()

# 3PC state consistency watchdog based on network events
self.network_i3pc_watcher = NetworkI3PCWatcher(self.on_inconsistent_3pc_state_from_network)

self.setPoolParams()

self.network_i3pc_watcher.connect(self.name)

self.clientBlacklister = SimpleBlacklister(
self.name + CLIENT_BLACKLISTER_SUFFIX) # type: Blacklister

Expand Down Expand Up @@ -607,6 +613,14 @@ def on_view_change_complete(self):
for replica in self.replicas:
replica.clear_requests_and_fix_last_ordered()

def on_inconsistent_3pc_state_from_network(self):
if self.config.ENABLE_NETWORK_I3PC_WATCHER:
self.on_inconsistent_3pc_state()

def on_inconsistent_3pc_state(self):
logger.warning("There is high probability that current 3PC state is inconsistent,"
"immediate restart is recommended")

def create_replicas(self) -> Replicas:
return Replicas(self, self.monitor, self.config)

Expand Down Expand Up @@ -654,6 +668,7 @@ def loadSeqNoDB(self):
def setPoolParams(self):
# TODO should be always called when nodeReg is changed - automate
self.allNodeNames = set(self.nodeReg.keys())
self.network_i3pc_watcher.set_nodes(self.allNodeNames)
self.totalNodes = len(self.allNodeNames)
self.f = getMaxFailures(self.totalNodes)
self.requiredNumberOfInstances = self.f + 1 # per RBFT
Expand Down Expand Up @@ -1197,6 +1212,12 @@ def onConnsChanged(self, joined: Set[str], left: Set[str]):
self.send_current_state_to_lagging_node(node)
self.send_ledger_status_to_newly_connected_node(node)

for node in left:
self.network_i3pc_watcher.disconnect(node)

for node in joined:
self.network_i3pc_watcher.connect(node)

def request_ledger_status_from_nodes(self, ledger_id, nodes=None):
for node_name in nodes if nodes else self.nodeReg:
if node_name == self.name:
Expand Down
9 changes: 5 additions & 4 deletions plenum/test/restart/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_group(nodeSet, group_cnt, include_primary=False):


def restart_nodes(looper, nodeSet, restart_set, tconf, tdir, allPluginsPath,
after_restart_timeout=None, restart_one_by_one=True):
after_restart_timeout=None, start_one_by_one=True, wait_for_elections=True):
for node_to_stop in restart_set:
node_to_stop.cleanupOnStopping = True
node_to_stop.stop()
Expand All @@ -42,10 +42,11 @@ def restart_nodes(looper, nodeSet, restart_set, tconf, tdir, allPluginsPath,
idx = nodeSet.index(node_to_restart)
nodeSet[idx] = restarted_node
rest_nodes += [restarted_node]
if restart_one_by_one:
if start_one_by_one:
looper.run(checkNodesConnected(rest_nodes))

if not restart_one_by_one:
if not start_one_by_one:
looper.run(checkNodesConnected(nodeSet))

ensureElectionsDone(looper=looper, nodes=nodeSet)
if wait_for_elections:
ensureElectionsDone(looper=looper, nodes=nodeSet)
101 changes: 101 additions & 0 deletions plenum/test/restart/test_network_i3pc_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pytest

from plenum.server.i3pc_watchers import NetworkI3PCWatcher

DEFAULT_NODE_SET = {'Alpha', 'Beta', 'Gamma', 'Delta'}


class WatcherCallbackMock:
def __init__(self):
self.call_count = 0

def __call__(self):
self.call_count += 1


@pytest.fixture
def watcher():
cb = WatcherCallbackMock()
watcher = NetworkI3PCWatcher(cb)
watcher.set_nodes(DEFAULT_NODE_SET)
return watcher


def _add_node(watcher: NetworkI3PCWatcher, node: str):
nodes = watcher.nodes
nodes.add(node)
watcher.set_nodes(nodes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we call set_nodes again here? Why not watcher.nodes.add(node)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I was thinking that watchdog could potentially react to change in number of nodes, that's why I'm using setter (which is also called from production code) instead of directly manipulating field. Also I'm a little bit afraid of adding _add_node/_remove_node to Watcher class because someone might "optimize" them to change nodes directly instead of going through set_nodes, which will render tests useless.

I think renaming nodes to _nodes and adding read-only property getter can make intentions more clear, what do you think?



def _remove_node(watcher: NetworkI3PCWatcher, node: str):
nodes = watcher.nodes
nodes.discard(node)
watcher.set_nodes(nodes)


def test_watcher_is_not_triggered_when_created(watcher: NetworkI3PCWatcher):
assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_nodes_are_initially_connected(watcher: NetworkI3PCWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.connect('Delta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_just_one_node_connects_and_disconnects(watcher: NetworkI3PCWatcher):
watcher.connect('Alpha')
watcher.disconnect('Alpha')

assert watcher.callback.call_count == 0


def test_watcher_is_triggered_when_going_below_consensus(watcher: NetworkI3PCWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.disconnect('Beta')

assert watcher.callback.call_count == 1


def test_watcher_is_not_triggered_when_adding_nodes_while_on_edge_of_consensus(watcher: NetworkI3PCWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
_add_node(watcher, 'Epsilon')
_add_node(watcher, 'Zeta')
_add_node(watcher, 'Eta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_removing_nodes_below_minimum_count(watcher: NetworkI3PCWatcher):
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.connect('Gamma')
watcher.connect('Delta')
_remove_node(watcher, 'Delta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_removing_nodes_and_going_below_consensus(watcher: NetworkI3PCWatcher):
_add_node(watcher, 'Theta')
watcher.connect('Alpha')
watcher.connect('Theta')
_remove_node(watcher, 'Theta')

assert watcher.callback.call_count == 0


def test_watcher_is_not_triggered_when_just_two_nodes_connect_and_disconnect_in_7_node_pool(
watcher: NetworkI3PCWatcher):
watcher.set_nodes(['Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon', 'Zeta', 'Eta'])
watcher.connect('Alpha')
watcher.connect('Beta')
watcher.disconnect('Alpha')
watcher.disconnect('Beta')

assert watcher.callback.call_count == 0
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_node_4_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_np_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_np_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_4_all_wp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_4_np.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_4_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 4, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_all_np.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_np_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_all_wp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_np.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_np_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_6_wp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_6_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 6, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_7_of_7_wp_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 7, include_primary=False)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=True)
after_restart_timeout=tm, start_one_by_one=True)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
2 changes: 1 addition & 1 deletion plenum/test/restart/test_restart_nodes_7_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_restart_groups_7_of_7_wp_no_tm(looper, txnPoolNodeSet, tconf, tdir,
restart_group = get_group(txnPoolNodeSet, 7, include_primary=True)

restart_nodes(looper, txnPoolNodeSet, restart_group, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, restart_one_by_one=False)
after_restart_timeout=tm, start_one_by_one=False)
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
70 changes: 70 additions & 0 deletions plenum/test/restart/test_restart_to_lower_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest

from plenum.test import waits
from plenum.test.helper import sdk_send_random_and_check
from plenum.test.node_request.helper import sdk_ensure_pool_functional
from plenum.test.restart.helper import restart_nodes
from plenum.test.view_change.helper import ensure_view_change_complete

nodeCount = 4


@pytest.fixture(scope="module")
def tconf(tconf):
old_network_3pc_watcher_state = tconf.ENABLE_NETWORK_I3PC_WATCHER
tconf.ENABLE_NETWORK_I3PC_WATCHER = True
yield tconf
tconf.ENABLE_NETWORK_I3PC_WATCHER = old_network_3pc_watcher_state


def test_restart_majority_to_lower_view(looper, txnPoolNodeSet, tconf, tdir, allPluginsPath,
sdk_pool_handle, sdk_wallet_client):
# Add transaction to ledger
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_client, 1)

# Move to higher view
ensure_view_change_complete(looper, txnPoolNodeSet)

majority = txnPoolNodeSet[:3]
minority = txnPoolNodeSet[3:]

# Restart majority group
tm = tconf.ToleratePrimaryDisconnection + waits.expectedPoolElectionTimeout(len(txnPoolNodeSet))
restart_nodes(looper, txnPoolNodeSet, majority, tconf, tdir, allPluginsPath,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we wait for election here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because they don't finish (or at least ensure_elections_done is not satisfied with result) and test just fails before it has chance to check that inconsistent 3pc state was indeed detected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably ensureElectionsDone looking only at majority of nodes will still pass, although I'm not sure whether we should call it here manually or improve restart_nodes or even ensureElectionsDone itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

after_restart_timeout=tm, start_one_by_one=False, wait_for_elections=False)

# Check that nodes in minority group are aware that they might have inconsistent 3PC state
for node in minority:
assert node.spylog.count(node.on_inconsistent_3pc_state) == 1

# Check that nodes in majority group don't think they might have inconsistent 3PC state
for node in majority:
assert node.spylog.count(node.on_inconsistent_3pc_state) == 0

# Restart minority group
restart_nodes(looper, txnPoolNodeSet, minority, tconf, tdir, allPluginsPath,
after_restart_timeout=tm, start_one_by_one=False)

# Check that all nodes are still functional
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if election is still in progress, and we are trying to send requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requests will probably still propagate and when elections are done will be ordered. I can add explicit call to ensureElectionsDone before this if this is neccessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the second look - elections are surely done here, ensureElectionsDone is called from second restart_nodes



def test_restart_half_to_lower_view(looper, txnPoolNodeSet, tconf, tdir, allPluginsPath,
sdk_pool_handle, sdk_wallet_client):
# Add transaction to ledger
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_client, 1)

# Move to higher view
ensure_view_change_complete(looper, txnPoolNodeSet)

# Restart half of nodes
tm = tconf.ToleratePrimaryDisconnection + waits.expectedPoolElectionTimeout(len(txnPoolNodeSet))
restart_nodes(looper, txnPoolNodeSet, txnPoolNodeSet[2:], tconf, tdir, allPluginsPath,
after_restart_timeout=tm, start_one_by_one=False, wait_for_elections=False)

# Check that nodes don't think they may have inconsistent 3PC state
for node in txnPoolNodeSet:
assert node.spylog.count(node.on_inconsistent_3pc_state) == 0

# Check that all nodes are still functional
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle)
3 changes: 2 additions & 1 deletion plenum/test/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ def processRequest(self, request, frm):
Node.send_current_state_to_lagging_node,
Node.process_current_state_message,
Node.transmitToClient,
Node.has_ordered_till_last_prepared_certificate
Node.has_ordered_till_last_prepared_certificate,
Node.on_inconsistent_3pc_state
]


Expand Down