Skip to content

INDY-2215: add re-ask LedgerStatuses for the init catchup #1305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions plenum/common/timer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from abc import ABC, abstractmethod
from functools import wraps
from logging import getLogger
from typing import Callable, NamedTuple

import time

from sortedcontainers import SortedListWithKey

logger = getLogger()


class TimerService(ABC):
@abstractmethod
Expand Down Expand Up @@ -64,14 +67,15 @@ def wrapped_callback():
self._timer.schedule(self._interval, self._callback)

self._timer = timer
self._interval = interval
self._interval = None
self.update_interval(interval)
self._callback = wrapped_callback
self._active = False
if active:
self.start()

def start(self):
if self._active:
if self._active or not self._interval:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self._active or not self._interval:
if self._active or self._interval is None:

Copy link
Contributor Author

@Toktar Toktar Aug 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we will not catch interval = 0

return
self._active = True
self._timer.schedule(self._interval, self._callback)
Expand All @@ -81,3 +85,9 @@ def stop(self):
return
self._active = False
self._timer.cancel(self._callback)

def update_interval(self, interval):
if interval <= 0:
logger.debug("RepeatingTimer - incorrect interval {}".format(interval))
return
self._interval = interval
30 changes: 17 additions & 13 deletions plenum/server/catchup/cons_proof_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,16 @@ def __init__(self,
self._cons_proofs = {}
self._already_asked_for_cons_proofs_without_timeout = False
self._last_txn_3PC_key = {}
self._ledger_status_timer = None
self._consistency_proof_timer = None
self._ledger_status_timer = \
RepeatingTimer(self._timer,
self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1),
self._reask_for_ledger_status,
active=False)
self._consistency_proof_timer = \
RepeatingTimer(self._timer,
self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1),
self._reask_for_last_consistency_proof,
active=False)

def __repr__(self) -> str:
return "{}:ConsProofService:{}".format(self._provider.node_name(), self._ledger_id)
Expand All @@ -66,7 +74,7 @@ def start(self, request_ledger_statuses: bool):

if request_ledger_statuses:
self._request_ledger_status_from_nodes()
self._schedule_reask_ledger_status()
self._schedule_reask_ledger_status()

def process_ledger_status(self, ledger_status: LedgerStatus, frm: str):
if not self._can_process_ledger_status(ledger_status):
Expand Down Expand Up @@ -432,22 +440,18 @@ def _schedule_reask_cons_proof(self):
)

def _schedule_reask_ledger_status(self):
self._ledger_status_timer = \
RepeatingTimer(self._timer,
self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1),
self._reask_for_ledger_status)
self._ledger_status_timer.update_interval(
self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1))
self._ledger_status_timer.start()

def _schedule_reask_last_cons_proof(self):
if self._consistency_proof_timer is None:
self._consistency_proof_timer = \
RepeatingTimer(self._timer,
self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1),
self._reask_for_last_consistency_proof)
self._consistency_proof_timer.update_interval(
self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1))
self._consistency_proof_timer.start()

def _cancel_reask(self):
if self._consistency_proof_timer:
self._consistency_proof_timer.stop()
self._consistency_proof_timer = None
if self._ledger_status_timer:
self._ledger_status_timer.stop()
self._timer.cancel(self._request_CPs_if_needed)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import pytest
from plenum.common.config_helper import PNodeConfigHelper
from plenum.common.messages.node_messages import LedgerStatus, ConsistencyProof
from plenum.common.util import getCallableName
from plenum.server.router import Route
from plenum.test.helper import sdk_send_random_and_check
from plenum.test.node_catchup.helper import waitNodeDataEquality
from plenum.test.pool_transactions.helper import \
Expand All @@ -12,7 +14,7 @@
call_count = 0


@pytest.fixture(scope='function', params=range(1, 4))
@pytest.fixture(scope='function', params=range(1, 5))
def lost_count(request):
return request.param

Expand All @@ -31,14 +33,6 @@ def test_catchup_with_lost_ledger_status(txnPoolNodeSet,

node_to_disconnect = txnPoolNodeSet[-1]

def unpatch_after_call(status, frm):
global call_count
call_count += 1
if call_count >= lost_count:
# unpatch processLedgerStatus after lost_count calls
monkeypatch.undo()
call_count = 0

sdk_send_random_and_check(looper, txnPoolNodeSet,
sdk_pool_handle, sdk_wallet_steward, 5)

Expand All @@ -60,9 +54,17 @@ def unpatch_after_call(status, frm):
config=tconf,
ha=nodeHa, cliha=nodeCHa,
pluginPaths=allPluginsPath)

def unpatch_after_call(status, frm):
global call_count
call_count += 1
if call_count >= lost_count:
# unpatch processLedgerStatus after lost_count calls
node_to_disconnect.nodeMsgRouter.add((LedgerStatus, node_to_disconnect.ledgerManager.processLedgerStatus))
call_count = 0

# patch processLedgerStatus
monkeypatch.setattr(node_to_disconnect.ledgerManager, 'processLedgerStatus',
unpatch_after_call)
node_to_disconnect.nodeMsgRouter.add((LedgerStatus, unpatch_after_call))

# add node_to_disconnect to pool
looper.add(node_to_disconnect)
Expand All @@ -88,14 +90,6 @@ def test_catchup_with_lost_first_consistency_proofs(txnPoolNodeSet,
Test makes sure that the node eventually finishes catchup'''
node_to_disconnect = txnPoolNodeSet[-1]

def unpatch_after_call(proof, frm):
global call_count
call_count += 1
if call_count >= lost_count:
# unpatch processConsistencyProof after lost_count calls
monkeypatch.undo()
call_count = 0

sdk_send_random_and_check(looper, txnPoolNodeSet,
sdk_pool_handle, sdk_wallet_steward, 5)

Expand All @@ -117,10 +111,18 @@ def unpatch_after_call(proof, frm):
config=tconf,
ha=nodeHa, cliha=nodeCHa,
pluginPaths=allPluginsPath)

def unpatch_after_call(proof, frm):
global call_count
call_count += 1
if call_count >= lost_count:
# unpatch processConsistencyProof after lost_count calls
node_to_disconnect.nodeMsgRouter.add((ConsistencyProof,
node_to_disconnect.ledgerManager.processConsistencyProof))
call_count = 0

# patch processConsistencyProof
monkeypatch.setattr(node_to_disconnect.ledgerManager,
'processConsistencyProof',
unpatch_after_call)
node_to_disconnect.nodeMsgRouter.add((ConsistencyProof, unpatch_after_call))
# add node_to_disconnect to pool
looper.add(node_to_disconnect)
txnPoolNodeSet[-1] = node_to_disconnect
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from logging import getLogger

import pytest
from plenum.common.constants import LEDGER_STATUS

from plenum.common.messages.node_messages import Checkpoint, LedgerStatus
from plenum.common.startable import Mode
Expand Down Expand Up @@ -88,7 +89,7 @@ def test_3pc_while_catchup_with_chkpoints_only(tdir, tconf,
lagging_node.nodeIbStasher.delay(pDelay())
lagging_node.nodeIbStasher.delay(cDelay())

with delay_rules(lagging_node.nodeIbStasher, lsDelay(), cr_delay()):
with delay_rules(lagging_node.nodeIbStasher, lsDelay(), cr_delay(), msg_rep_delay(types_to_delay=[LEDGER_STATUS])):
looper.add(lagging_node)
txnPoolNodeSet[-1] = lagging_node
looper.run(checkNodesConnected(txnPoolNodeSet))
Expand Down