-
Notifications
You must be signed in to change notification settings - Fork 377
INDY-1199: Implement inconsistent 3PC state detection #811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
9398e99
408719d
793a401
dd8288c
ca7dce2
68fe9f8
a9389aa
df27108
2d379a3
bd8137f
47e2bd1
ced1050
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -310,3 +310,5 @@ | |
# 1 for recorder | ||
# 2 during replay | ||
STACK_COMPANION = 0 | ||
|
||
ENABLE_NETWORK_I3PC_WATCHER = False | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from typing import Callable, Iterable | ||
|
||
|
||
class NetworkI3PCWatcher: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NetworkInconcistencyWatcher? |
||
def __init__(self, cb: Callable): | ||
self.nodes = set() | ||
self.connected = set() | ||
self.callback = cb | ||
|
||
def connect(self, name: str): | ||
self.connected.add(name) | ||
|
||
def disconnect(self, name: str): | ||
had_consensus = self._has_consensus() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we face disconnection during initial establishing of connections:
Maybe we should do restart only if we go to <=f connection, AND we were connected to at least n-f nodes before? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I missed that case, going to fix soon |
||
self.connected.discard(name) | ||
if had_consensus and not self._has_consensus(): | ||
self.callback() | ||
|
||
def set_nodes(self, nodes: Iterable[str]): | ||
self.nodes = set(nodes) | ||
|
||
def _has_consensus(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, will use it |
||
n = len(self.nodes) | ||
f = (n - 1) // 3 | ||
return len(self.connected) > f |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import pytest | ||
|
||
from plenum.server.i3pc_watchers import NetworkI3PCWatcher | ||
|
||
DEFAULT_NODE_SET = {'Alpha', 'Beta', 'Gamma', 'Delta'} | ||
|
||
|
||
class WatcherCallbackMock: | ||
def __init__(self): | ||
self.call_count = 0 | ||
|
||
def __call__(self): | ||
self.call_count += 1 | ||
|
||
|
||
@pytest.fixture | ||
def watcher(): | ||
cb = WatcherCallbackMock() | ||
watcher = NetworkI3PCWatcher(cb) | ||
watcher.set_nodes(DEFAULT_NODE_SET) | ||
return watcher | ||
|
||
|
||
def _add_node(watcher: NetworkI3PCWatcher, node: str): | ||
nodes = watcher.nodes | ||
nodes.add(node) | ||
watcher.set_nodes(nodes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At first I was thinking that watchdog could potentially react to change in number of nodes, that's why I'm using setter (which is also called from production code) instead of directly manipulating field. Also I'm a little bit afraid of adding _add_node/_remove_node to I think renaming |
||
|
||
|
||
def _remove_node(watcher: NetworkI3PCWatcher, node: str): | ||
nodes = watcher.nodes | ||
nodes.discard(node) | ||
watcher.set_nodes(nodes) | ||
|
||
|
||
def test_watcher_is_not_triggered_when_created(watcher: NetworkI3PCWatcher): | ||
assert watcher.callback.call_count == 0 | ||
|
||
|
||
def test_watcher_is_not_triggered_when_nodes_are_initially_connected(watcher: NetworkI3PCWatcher): | ||
watcher.connect('Alpha') | ||
watcher.connect('Beta') | ||
watcher.connect('Gamma') | ||
watcher.connect('Delta') | ||
|
||
assert watcher.callback.call_count == 0 | ||
|
||
|
||
def test_watcher_is_not_triggered_when_just_one_node_connects_and_disconnects(watcher: NetworkI3PCWatcher): | ||
watcher.connect('Alpha') | ||
watcher.disconnect('Alpha') | ||
|
||
assert watcher.callback.call_count == 0 | ||
|
||
|
||
def test_watcher_is_triggered_when_going_below_consensus(watcher: NetworkI3PCWatcher): | ||
watcher.connect('Alpha') | ||
watcher.connect('Beta') | ||
watcher.disconnect('Beta') | ||
|
||
assert watcher.callback.call_count == 1 | ||
|
||
|
||
def test_watcher_is_not_triggered_when_adding_nodes_while_on_edge_of_consensus(watcher: NetworkI3PCWatcher): | ||
watcher.connect('Alpha') | ||
watcher.connect('Beta') | ||
_add_node(watcher, 'Epsilon') | ||
_add_node(watcher, 'Zeta') | ||
_add_node(watcher, 'Eta') | ||
|
||
assert watcher.callback.call_count == 0 | ||
|
||
|
||
def test_watcher_is_not_triggered_when_removing_nodes_below_minimum_count(watcher: NetworkI3PCWatcher): | ||
watcher.connect('Alpha') | ||
watcher.connect('Beta') | ||
watcher.connect('Gamma') | ||
watcher.connect('Delta') | ||
_remove_node(watcher, 'Delta') | ||
|
||
assert watcher.callback.call_count == 0 | ||
|
||
|
||
def test_watcher_is_not_triggered_when_removing_nodes_and_going_below_consensus(watcher: NetworkI3PCWatcher): | ||
_add_node(watcher, 'Theta') | ||
watcher.connect('Alpha') | ||
watcher.connect('Theta') | ||
_remove_node(watcher, 'Theta') | ||
|
||
assert watcher.callback.call_count == 0 | ||
|
||
|
||
def test_watcher_is_not_triggered_when_just_two_nodes_connect_and_disconnect_in_7_node_pool( | ||
watcher: NetworkI3PCWatcher): | ||
watcher.set_nodes(['Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon', 'Zeta', 'Eta']) | ||
watcher.connect('Alpha') | ||
watcher.connect('Beta') | ||
watcher.disconnect('Alpha') | ||
watcher.disconnect('Beta') | ||
|
||
assert watcher.callback.call_count == 0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import pytest | ||
|
||
from plenum.test import waits | ||
from plenum.test.helper import sdk_send_random_and_check | ||
from plenum.test.node_request.helper import sdk_ensure_pool_functional | ||
from plenum.test.restart.helper import restart_nodes | ||
from plenum.test.view_change.helper import ensure_view_change_complete | ||
|
||
nodeCount = 4 | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def tconf(tconf): | ||
old_network_3pc_watcher_state = tconf.ENABLE_NETWORK_I3PC_WATCHER | ||
tconf.ENABLE_NETWORK_I3PC_WATCHER = True | ||
yield tconf | ||
tconf.ENABLE_NETWORK_I3PC_WATCHER = old_network_3pc_watcher_state | ||
|
||
|
||
def test_restart_majority_to_lower_view(looper, txnPoolNodeSet, tconf, tdir, allPluginsPath, | ||
sdk_pool_handle, sdk_wallet_client): | ||
# Add transaction to ledger | ||
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_client, 1) | ||
|
||
# Move to higher view | ||
ensure_view_change_complete(looper, txnPoolNodeSet) | ||
|
||
majority = txnPoolNodeSet[:3] | ||
minority = txnPoolNodeSet[3:] | ||
|
||
# Restart majority group | ||
tm = tconf.ToleratePrimaryDisconnection + waits.expectedPoolElectionTimeout(len(txnPoolNodeSet)) | ||
restart_nodes(looper, txnPoolNodeSet, majority, tconf, tdir, allPluginsPath, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we wait for election here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because they don't finish (or at least There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
after_restart_timeout=tm, start_one_by_one=False, wait_for_elections=False) | ||
|
||
# Check that nodes in minority group are aware that they might have inconsistent 3PC state | ||
for node in minority: | ||
assert node.spylog.count(node.on_inconsistent_3pc_state) == 1 | ||
|
||
# Check that nodes in majority group don't think they might have inconsistent 3PC state | ||
for node in majority: | ||
assert node.spylog.count(node.on_inconsistent_3pc_state) == 0 | ||
|
||
# Restart minority group | ||
restart_nodes(looper, txnPoolNodeSet, minority, tconf, tdir, allPluginsPath, | ||
after_restart_timeout=tm, start_one_by_one=False) | ||
|
||
# Check that all nodes are still functional | ||
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if election is still in progress, and we are trying to send requests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Requests will probably still propagate and when elections are done will be ordered. I can add explicit call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the second look - elections are surely done here, |
||
|
||
|
||
def test_restart_half_to_lower_view(looper, txnPoolNodeSet, tconf, tdir, allPluginsPath, | ||
sdk_pool_handle, sdk_wallet_client): | ||
# Add transaction to ledger | ||
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_client, 1) | ||
|
||
# Move to higher view | ||
ensure_view_change_complete(looper, txnPoolNodeSet) | ||
|
||
# Restart half of nodes | ||
tm = tconf.ToleratePrimaryDisconnection + waits.expectedPoolElectionTimeout(len(txnPoolNodeSet)) | ||
restart_nodes(looper, txnPoolNodeSet, txnPoolNodeSet[2:], tconf, tdir, allPluginsPath, | ||
after_restart_timeout=tm, start_one_by_one=False, wait_for_elections=False) | ||
|
||
# Check that nodes don't think they may have inconsistent 3PC state | ||
for node in txnPoolNodeSet: | ||
assert node.spylog.count(node.on_inconsistent_3pc_state) == 0 | ||
|
||
# Check that all nodes are still functional | ||
sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is I3PC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent 3PC state, should I add comments or change name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ENABLE_INCONSISTENCY_WATCHER_NETWORK
?