diff --git a/plenum/common/timer.py b/plenum/common/timer.py index 4a288595ec..40a025d570 100644 --- a/plenum/common/timer.py +++ b/plenum/common/timer.py @@ -1,11 +1,14 @@ from abc import ABC, abstractmethod from functools import wraps +from logging import getLogger from typing import Callable, NamedTuple import time from sortedcontainers import SortedListWithKey +logger = getLogger() + class TimerService(ABC): @abstractmethod @@ -64,14 +67,15 @@ def wrapped_callback(): self._timer.schedule(self._interval, self._callback) self._timer = timer - self._interval = interval + self._interval = None + self.update_interval(interval) self._callback = wrapped_callback self._active = False if active: self.start() def start(self): - if self._active: + if self._active or not self._interval: return self._active = True self._timer.schedule(self._interval, self._callback) @@ -81,3 +85,9 @@ def stop(self): return self._active = False self._timer.cancel(self._callback) + + def update_interval(self, interval): + if interval <= 0: + logger.debug("RepeatingTimer - incorrect interval {}".format(interval)) + return + self._interval = interval diff --git a/plenum/server/catchup/cons_proof_service.py b/plenum/server/catchup/cons_proof_service.py index 82ee9097b7..e6e15dc3e8 100644 --- a/plenum/server/catchup/cons_proof_service.py +++ b/plenum/server/catchup/cons_proof_service.py @@ -48,8 +48,16 @@ def __init__(self, self._cons_proofs = {} self._already_asked_for_cons_proofs_without_timeout = False self._last_txn_3PC_key = {} - self._ledger_status_timer = None - self._consistency_proof_timer = None + self._ledger_status_timer = \ + RepeatingTimer(self._timer, + self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1), + self._reask_for_ledger_status, + active=False) + self._consistency_proof_timer = \ + RepeatingTimer(self._timer, + self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1), + self._reask_for_last_consistency_proof, + active=False) def __repr__(self) -> str: return "{}:ConsProofService:{}".format(self._provider.node_name(), self._ledger_id) @@ -66,7 +74,7 @@ def start(self, request_ledger_statuses: bool): if request_ledger_statuses: self._request_ledger_status_from_nodes() - self._schedule_reask_ledger_status() + self._schedule_reask_ledger_status() def process_ledger_status(self, ledger_status: LedgerStatus, frm: str): if not self._can_process_ledger_status(ledger_status): @@ -432,22 +440,18 @@ def _schedule_reask_cons_proof(self): ) def _schedule_reask_ledger_status(self): - self._ledger_status_timer = \ - RepeatingTimer(self._timer, - self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1), - self._reask_for_ledger_status) + self._ledger_status_timer.update_interval( + self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1)) + self._ledger_status_timer.start() def _schedule_reask_last_cons_proof(self): - if self._consistency_proof_timer is None: - self._consistency_proof_timer = \ - RepeatingTimer(self._timer, - self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1), - self._reask_for_last_consistency_proof) + self._consistency_proof_timer.update_interval( + self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1)) + self._consistency_proof_timer.start() def _cancel_reask(self): if self._consistency_proof_timer: self._consistency_proof_timer.stop() - self._consistency_proof_timer = None if self._ledger_status_timer: self._ledger_status_timer.stop() self._timer.cancel(self._request_CPs_if_needed) diff --git a/plenum/test/node_catchup/test_node_catchup_with_connection_problem.py b/plenum/test/node_catchup/test_node_catchup_with_connection_problem.py index 8a50fd55e2..19daaf8330 100644 --- a/plenum/test/node_catchup/test_node_catchup_with_connection_problem.py +++ b/plenum/test/node_catchup/test_node_catchup_with_connection_problem.py @@ -1,6 +1,8 @@ import pytest from plenum.common.config_helper import PNodeConfigHelper +from plenum.common.messages.node_messages import LedgerStatus, ConsistencyProof from plenum.common.util import getCallableName +from plenum.server.router import Route from plenum.test.helper import sdk_send_random_and_check from plenum.test.node_catchup.helper import waitNodeDataEquality from plenum.test.pool_transactions.helper import \ @@ -12,7 +14,7 @@ call_count = 0 -@pytest.fixture(scope='function', params=range(1, 4)) +@pytest.fixture(scope='function', params=range(1, 5)) def lost_count(request): return request.param @@ -31,14 +33,6 @@ def test_catchup_with_lost_ledger_status(txnPoolNodeSet, node_to_disconnect = txnPoolNodeSet[-1] - def unpatch_after_call(status, frm): - global call_count - call_count += 1 - if call_count >= lost_count: - # unpatch processLedgerStatus after lost_count calls - monkeypatch.undo() - call_count = 0 - sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_steward, 5) @@ -60,9 +54,17 @@ def unpatch_after_call(status, frm): config=tconf, ha=nodeHa, cliha=nodeCHa, pluginPaths=allPluginsPath) + + def unpatch_after_call(status, frm): + global call_count + call_count += 1 + if call_count >= lost_count: + # unpatch processLedgerStatus after lost_count calls + node_to_disconnect.nodeMsgRouter.add((LedgerStatus, node_to_disconnect.ledgerManager.processLedgerStatus)) + call_count = 0 + # patch processLedgerStatus - monkeypatch.setattr(node_to_disconnect.ledgerManager, 'processLedgerStatus', - unpatch_after_call) + node_to_disconnect.nodeMsgRouter.add((LedgerStatus, unpatch_after_call)) # add node_to_disconnect to pool looper.add(node_to_disconnect) @@ -88,14 +90,6 @@ def test_catchup_with_lost_first_consistency_proofs(txnPoolNodeSet, Test makes sure that the node eventually finishes catchup''' node_to_disconnect = txnPoolNodeSet[-1] - def unpatch_after_call(proof, frm): - global call_count - call_count += 1 - if call_count >= lost_count: - # unpatch processConsistencyProof after lost_count calls - monkeypatch.undo() - call_count = 0 - sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_steward, 5) @@ -117,10 +111,18 @@ def unpatch_after_call(proof, frm): config=tconf, ha=nodeHa, cliha=nodeCHa, pluginPaths=allPluginsPath) + + def unpatch_after_call(proof, frm): + global call_count + call_count += 1 + if call_count >= lost_count: + # unpatch processConsistencyProof after lost_count calls + node_to_disconnect.nodeMsgRouter.add((ConsistencyProof, + node_to_disconnect.ledgerManager.processConsistencyProof)) + call_count = 0 + # patch processConsistencyProof - monkeypatch.setattr(node_to_disconnect.ledgerManager, - 'processConsistencyProof', - unpatch_after_call) + node_to_disconnect.nodeMsgRouter.add((ConsistencyProof, unpatch_after_call)) # add node_to_disconnect to pool looper.add(node_to_disconnect) txnPoolNodeSet[-1] = node_to_disconnect diff --git a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py index fb7df0b48f..9e55b96620 100644 --- a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py +++ b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py @@ -1,6 +1,7 @@ from logging import getLogger import pytest +from plenum.common.constants import LEDGER_STATUS from plenum.common.messages.node_messages import Checkpoint, LedgerStatus from plenum.common.startable import Mode @@ -88,7 +89,7 @@ def test_3pc_while_catchup_with_chkpoints_only(tdir, tconf, lagging_node.nodeIbStasher.delay(pDelay()) lagging_node.nodeIbStasher.delay(cDelay()) - with delay_rules(lagging_node.nodeIbStasher, lsDelay(), cr_delay()): + with delay_rules(lagging_node.nodeIbStasher, lsDelay(), cr_delay(), msg_rep_delay(types_to_delay=[LEDGER_STATUS])): looper.add(lagging_node) txnPoolNodeSet[-1] = lagging_node looper.run(checkNodesConnected(txnPoolNodeSet))