Skip to content

[INDY-465] Warning and callback on unordered requests #619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@

# Monitoring configuration
PerfCheckFreq = 10
UnorderedCheckFreq = 60

# Temporarily reducing DELTA till the calculations for extra work are not
# incorporated
Expand Down
46 changes: 16 additions & 30 deletions plenum/server/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ class Monitor(HasActionQueue, PluginLoaderHelper):
monitors the performance of each instance. Throughput of requests and
latency per client request are measured.
"""
WARN_NOT_PARTICIPATING_WINDOW_MINS = 5
WARN_NOT_PARTICIPATING_UNORDERED_NUM = 10
WARN_NOT_PARTICIPATING_MIN_DIFF_SEC = 3

def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
instances: Instances, nodestack,
Expand Down Expand Up @@ -174,6 +171,8 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
# value is a tuple of ordering time and latency of a request
self.latenciesByBackupsInLast = {}

self.unordered_requests_handlers = [] # type: List[Callable]

# Monitoring suspicious spikes in cluster throughput
self.clusterThroughputSpikeMonitorData = {
'value': 0,
Expand All @@ -196,6 +195,8 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
self.checkPerformance,
self.config.notifierEventTriggeringConfig['clusterThroughputSpike']['freq'])

self.startRepeating(self.check_unordered, self.config.UnorderedCheckFreq)

if 'disable_view_change' in self.config.unsafe:
self.isMasterDegraded = lambda: False
if 'disable_monitor' in self.config.unsafe:
Expand Down Expand Up @@ -289,9 +290,8 @@ def requestOrdered(self, reqIdrs: List[Tuple[str, int]], instId: int,
durations = {}
for identifier, reqId in reqIdrs:
if (identifier, reqId) not in self.requestTracker:
logger.debug(
"Got untracked ordered request with identifier {} and reqId {}".
format(identifier, reqId))
logger.debug("Got untracked ordered request with identifier {} and reqId {}".
format(identifier, reqId))
continue
duration = self.requestTracker.order(instId, identifier, reqId, now)
if byMaster:
Expand Down Expand Up @@ -335,30 +335,17 @@ def requestUnOrdered(self, identifier: str, reqId: int):
Record the time at which request ordering started.
"""
self.requestTracker.start(identifier, reqId, time.perf_counter())
self.warn_has_lot_unordered_requests()

def warn_has_lot_unordered_requests(self):
unordered_started_at = []
def check_unordered(self):
now = time.perf_counter()
sorted_by_started_at = sorted(self.requestTracker.unordered(), key=itemgetter(1))
for _, started_at in sorted_by_started_at:
in_window = (now - started_at) < self.WARN_NOT_PARTICIPATING_WINDOW_MINS * 60
if not in_window:
continue
dt = (started_at - unordered_started_at[-1]) if unordered_started_at else None
if dt is None or dt > self.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC:
unordered_started_at.append(started_at)

if len(unordered_started_at) >= self.WARN_NOT_PARTICIPATING_UNORDERED_NUM:
logger.warning('It looks like {} does not participate in processing messages '
'because it has {} unordered requests '
'in the last {} minutes (assumed that minimum difference between unordered '
'requests is at least {} seconds)'
.format(self, len(unordered_started_at),
self.WARN_NOT_PARTICIPATING_WINDOW_MINS,
self.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC))
return True
return False
unordered = [req for req, started in self.requestTracker.unordered()
if now - started < self.config.UnorderedCheckFreq]
if len(unordered) == 0:
return
for handler in self.unordered_requests_handlers:
handler(unordered)
logger.warning('Following requests were not ordered for more than {} seconds: {}'
.format(self.config.UnorderedCheckFreq, unordered))

def isMasterDegraded(self):
"""
Expand Down Expand Up @@ -441,8 +428,7 @@ def isMasterAvgReqLatencyTooHigh(self):
"threshold".format(MONITORING_PREFIX, self, d))
logger.trace(
"{}'s master's avg request latency is {} and backup's "
"avg request latency is {} ".
format(self, avgLatM, avgLatB))
"avg request latency is {}".format(self, avgLatM, avgLatB))
return True
logger.trace("{} found difference between master and backups "
"avg latencies to be acceptable".format(self))
Expand Down
2 changes: 1 addition & 1 deletion plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ def postToNodeInBox(self, msg, frm):
:param msg: a node message
:param frm: the name of the node that sent this `msg`
"""
logger.debug("{} appending to nodeInbox {}".format(self, msg))
logger.trace("{} appending to nodeInbox {}".format(self, msg))
self.nodeInBox.append((msg, frm))

async def processNodeInBox(self):
Expand Down
135 changes: 47 additions & 88 deletions plenum/test/monitoring/test_warn_unordered_log_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,111 +3,70 @@
from plenum.test.malicious_behaviors_node import delaysCommitProcessing
from plenum.test.test_node import getNonPrimaryReplicas
from stp_core.common.log import getlogger
from plenum.test.helper import sdk_send_random_and_check
from plenum.test.helper import sdk_send_random_requests

nodeCount = 4
logger = getlogger()

UNORDERED_CHECK_FREQ = 5


@pytest.fixture(scope="module")
def tconf(tconf):
oldUnorderedCheckFreq = tconf.UnorderedCheckFreq
tconf.UnorderedCheckFreq = UNORDERED_CHECK_FREQ
yield tconf
tconf.UnorderedCheckFreq = oldUnorderedCheckFreq


@pytest.fixture(scope="module")
def txnPoolNodeSet(txnPoolNodeSet):
for node in txnPoolNodeSet:
install_unordered_requests_watcher(node.monitor)
yield txnPoolNodeSet


# noinspection PyIncorrectDocstring
def test_working_has_no_warn_log_msg(looper, txnPoolNodeSet,
sdk_pool_handle, sdk_wallet_client, patch_monitors):
monitor = txnPoolNodeSet[0].monitor
assert no_any_warn(*txnPoolNodeSet)
sdk_pool_handle, sdk_wallet_client):
clear_unordered_requests(*txnPoolNodeSet)

for i in range(monitor.WARN_NOT_PARTICIPATING_UNORDERED_NUM):
sdk_send_random_and_check(looper, txnPoolNodeSet,
sdk_pool_handle,
sdk_wallet_client,
1)
looper.runFor(monitor.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC)
sdk_send_random_requests(looper, sdk_pool_handle, sdk_wallet_client, 5)
looper.runFor(1.2 * UNORDERED_CHECK_FREQ)

assert no_any_warn(*txnPoolNodeSet)
assert all(len(node.monitor.unordered_requests) == 0 for node in txnPoolNodeSet)


# noinspection PyIncorrectDocstring
def test_slow_node_has_warn_unordered_log_msg(looper,
txnPoolNodeSet,
sdk_pool_handle,
sdk_wallet_client,
patch_monitors):
npr = getNonPrimaryReplicas(txnPoolNodeSet, 0)[0]
slow_node = npr.node

monitor = txnPoolNodeSet[0].monitor
delay = monitor.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC * \
monitor.WARN_NOT_PARTICIPATING_UNORDERED_NUM + 10
delaysCommitProcessing(slow_node, delay=delay)

assert no_any_warn(*txnPoolNodeSet), \
'all nodes do not have warnings before test'

for i in range(monitor.WARN_NOT_PARTICIPATING_UNORDERED_NUM):
sdk_send_random_and_check(looper, txnPoolNodeSet,
sdk_pool_handle,
sdk_wallet_client,
1)
looper.runFor(monitor.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC)

others = [node for node in txnPoolNodeSet if node.name != slow_node.name]
assert no_any_warn(*others), \
'others do not have warning after test'
assert has_some_warn(slow_node), \
'slow node has the warning'

# wait at least windows time
looper.runFor(monitor.WARN_NOT_PARTICIPATING_WINDOW_MINS * 60)
sdk_send_random_and_check(looper, txnPoolNodeSet,
sdk_pool_handle,
sdk_wallet_client,
1)
assert no_any_warn(*others), 'others do not have warning'
assert no_last_warn(slow_node), \
'the last call of warn_has_lot_unordered_requests returned False ' \
'so slow node has no the warning for now'


def no_any_warn(*nodes):
for node in nodes:
calls = node.monitor.spylog.getAll(node.monitor.warn_has_lot_unordered_requests)
if any(call.result for call in calls):
return False
return True
sdk_wallet_client):
clear_unordered_requests(*txnPoolNodeSet)

slow_node = getNonPrimaryReplicas(txnPoolNodeSet, 0)[0].node
delaysCommitProcessing(slow_node, delay=1.5 * UNORDERED_CHECK_FREQ)

def has_some_warn(*nodes):
for node in nodes:
calls = node.monitor.spylog.getAll(node.monitor.warn_has_lot_unordered_requests)
if not any(call.result for call in calls):
return False
return True
sdk_send_random_requests(looper, sdk_pool_handle, sdk_wallet_client, 5)
looper.runFor(1.2 * UNORDERED_CHECK_FREQ)

assert all(len(node.monitor.unordered_requests) == 0 for node in txnPoolNodeSet if node.name != slow_node.name)
assert len(slow_node.monitor.unordered_requests) != 0

def no_last_warn(*nodes):
# Check that after being ordered request is no longer logged
clear_unordered_requests(*txnPoolNodeSet)
looper.runFor(1.2 * UNORDERED_CHECK_FREQ)
assert all(len(node.monitor.unordered_requests) == 0 for node in txnPoolNodeSet)


def install_unordered_requests_watcher(monitor):
def handler(requests):
monitor.unordered_requests.extend(requests)

monitor.unordered_requests = []
monitor.unordered_requests_handlers.append(handler)


def clear_unordered_requests(*nodes):
for node in nodes:
call = node.monitor.spylog.getLast(node.monitor.warn_has_lot_unordered_requests)
if call.result:
return False
return True


@pytest.fixture(scope="function")
def patch_monitors(txnPoolNodeSet):
backup = {}
req_num = 3
diff_sec = 1
window_mins = 0.25
for node in txnPoolNodeSet:
backup[node.name] = (
node.monitor.WARN_NOT_PARTICIPATING_UNORDERED_NUM,
node.monitor.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC,
node.monitor.WARN_NOT_PARTICIPATING_WINDOW_MINS,
)
node.monitor.WARN_NOT_PARTICIPATING_UNORDERED_NUM = req_num
node.monitor.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC = diff_sec
node.monitor.WARN_NOT_PARTICIPATING_WINDOW_MINS = window_mins
yield req_num, diff_sec, window_mins
for node in txnPoolNodeSet:
node.monitor.WARN_NOT_PARTICIPATING_UNORDERED_NUM = backup[node.name][0]
node.monitor.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC = backup[node.name][1]
node.monitor.WARN_NOT_PARTICIPATING_WINDOW_MINS = backup[node.name][2]
node.monitor.unordered_requests = []
3 changes: 1 addition & 2 deletions plenum/test/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,7 @@ def getAllMsgReceived(self, node: NodeRef, method: str = None) -> List:
Monitor.isMasterReqLatencyTooHigh,
Monitor.sendThroughput,
Monitor.requestOrdered,
Monitor.reset,
Monitor.warn_has_lot_unordered_requests
Monitor.reset
]


Expand Down