Skip to content

Commit a2a74a5

Browse files
authored
Merge pull request #1305 from Toktar/bug-2215-ls-reask
INDY-2215: add re-ask LedgerStatuses for the init catchup
2 parents a6d062f + 47c09cf commit a2a74a5

File tree

4 files changed

+55
-38
lines changed

4 files changed

+55
-38
lines changed

plenum/common/timer.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
from abc import ABC, abstractmethod
22
from functools import wraps
3+
from logging import getLogger
34
from typing import Callable, NamedTuple
45

56
import time
67

78
from sortedcontainers import SortedListWithKey
89

10+
logger = getLogger()
11+
912

1013
class TimerService(ABC):
1114
@abstractmethod
@@ -64,14 +67,15 @@ def wrapped_callback():
6467
self._timer.schedule(self._interval, self._callback)
6568

6669
self._timer = timer
67-
self._interval = interval
70+
self._interval = None
71+
self.update_interval(interval)
6872
self._callback = wrapped_callback
6973
self._active = False
7074
if active:
7175
self.start()
7276

7377
def start(self):
74-
if self._active:
78+
if self._active or not self._interval:
7579
return
7680
self._active = True
7781
self._timer.schedule(self._interval, self._callback)
@@ -81,3 +85,9 @@ def stop(self):
8185
return
8286
self._active = False
8387
self._timer.cancel(self._callback)
88+
89+
def update_interval(self, interval):
90+
if interval <= 0:
91+
logger.debug("RepeatingTimer - incorrect interval {}".format(interval))
92+
return
93+
self._interval = interval

plenum/server/catchup/cons_proof_service.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,16 @@ def __init__(self,
4848
self._cons_proofs = {}
4949
self._already_asked_for_cons_proofs_without_timeout = False
5050
self._last_txn_3PC_key = {}
51-
self._ledger_status_timer = None
52-
self._consistency_proof_timer = None
51+
self._ledger_status_timer = \
52+
RepeatingTimer(self._timer,
53+
self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1),
54+
self._reask_for_ledger_status,
55+
active=False)
56+
self._consistency_proof_timer = \
57+
RepeatingTimer(self._timer,
58+
self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1),
59+
self._reask_for_last_consistency_proof,
60+
active=False)
5361

5462
def __repr__(self) -> str:
5563
return "{}:ConsProofService:{}".format(self._provider.node_name(), self._ledger_id)
@@ -66,7 +74,7 @@ def start(self, request_ledger_statuses: bool):
6674

6775
if request_ledger_statuses:
6876
self._request_ledger_status_from_nodes()
69-
self._schedule_reask_ledger_status()
77+
self._schedule_reask_ledger_status()
7078

7179
def process_ledger_status(self, ledger_status: LedgerStatus, frm: str):
7280
if not self._can_process_ledger_status(ledger_status):
@@ -432,22 +440,18 @@ def _schedule_reask_cons_proof(self):
432440
)
433441

434442
def _schedule_reask_ledger_status(self):
435-
self._ledger_status_timer = \
436-
RepeatingTimer(self._timer,
437-
self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1),
438-
self._reask_for_ledger_status)
443+
self._ledger_status_timer.update_interval(
444+
self._config.LedgerStatusTimeout * (len(self._provider.all_nodes_names()) - 1))
445+
self._ledger_status_timer.start()
439446

440447
def _schedule_reask_last_cons_proof(self):
441-
if self._consistency_proof_timer is None:
442-
self._consistency_proof_timer = \
443-
RepeatingTimer(self._timer,
444-
self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1),
445-
self._reask_for_last_consistency_proof)
448+
self._consistency_proof_timer.update_interval(
449+
self._config.ConsistencyProofsTimeout * (len(self._provider.all_nodes_names()) - 1))
450+
self._consistency_proof_timer.start()
446451

447452
def _cancel_reask(self):
448453
if self._consistency_proof_timer:
449454
self._consistency_proof_timer.stop()
450-
self._consistency_proof_timer = None
451455
if self._ledger_status_timer:
452456
self._ledger_status_timer.stop()
453457
self._timer.cancel(self._request_CPs_if_needed)

plenum/test/node_catchup/test_node_catchup_with_connection_problem.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import pytest
22
from plenum.common.config_helper import PNodeConfigHelper
3+
from plenum.common.messages.node_messages import LedgerStatus, ConsistencyProof
34
from plenum.common.util import getCallableName
5+
from plenum.server.router import Route
46
from plenum.test.helper import sdk_send_random_and_check
57
from plenum.test.node_catchup.helper import waitNodeDataEquality
68
from plenum.test.pool_transactions.helper import \
@@ -12,7 +14,7 @@
1214
call_count = 0
1315

1416

15-
@pytest.fixture(scope='function', params=range(1, 4))
17+
@pytest.fixture(scope='function', params=range(1, 5))
1618
def lost_count(request):
1719
return request.param
1820

@@ -31,14 +33,6 @@ def test_catchup_with_lost_ledger_status(txnPoolNodeSet,
3133

3234
node_to_disconnect = txnPoolNodeSet[-1]
3335

34-
def unpatch_after_call(status, frm):
35-
global call_count
36-
call_count += 1
37-
if call_count >= lost_count:
38-
# unpatch processLedgerStatus after lost_count calls
39-
monkeypatch.undo()
40-
call_count = 0
41-
4236
sdk_send_random_and_check(looper, txnPoolNodeSet,
4337
sdk_pool_handle, sdk_wallet_steward, 5)
4438

@@ -60,9 +54,17 @@ def unpatch_after_call(status, frm):
6054
config=tconf,
6155
ha=nodeHa, cliha=nodeCHa,
6256
pluginPaths=allPluginsPath)
57+
58+
def unpatch_after_call(status, frm):
59+
global call_count
60+
call_count += 1
61+
if call_count >= lost_count:
62+
# unpatch processLedgerStatus after lost_count calls
63+
node_to_disconnect.nodeMsgRouter.add((LedgerStatus, node_to_disconnect.ledgerManager.processLedgerStatus))
64+
call_count = 0
65+
6366
# patch processLedgerStatus
64-
monkeypatch.setattr(node_to_disconnect.ledgerManager, 'processLedgerStatus',
65-
unpatch_after_call)
67+
node_to_disconnect.nodeMsgRouter.add((LedgerStatus, unpatch_after_call))
6668

6769
# add node_to_disconnect to pool
6870
looper.add(node_to_disconnect)
@@ -88,14 +90,6 @@ def test_catchup_with_lost_first_consistency_proofs(txnPoolNodeSet,
8890
Test makes sure that the node eventually finishes catchup'''
8991
node_to_disconnect = txnPoolNodeSet[-1]
9092

91-
def unpatch_after_call(proof, frm):
92-
global call_count
93-
call_count += 1
94-
if call_count >= lost_count:
95-
# unpatch processConsistencyProof after lost_count calls
96-
monkeypatch.undo()
97-
call_count = 0
98-
9993
sdk_send_random_and_check(looper, txnPoolNodeSet,
10094
sdk_pool_handle, sdk_wallet_steward, 5)
10195

@@ -117,10 +111,18 @@ def unpatch_after_call(proof, frm):
117111
config=tconf,
118112
ha=nodeHa, cliha=nodeCHa,
119113
pluginPaths=allPluginsPath)
114+
115+
def unpatch_after_call(proof, frm):
116+
global call_count
117+
call_count += 1
118+
if call_count >= lost_count:
119+
# unpatch processConsistencyProof after lost_count calls
120+
node_to_disconnect.nodeMsgRouter.add((ConsistencyProof,
121+
node_to_disconnect.ledgerManager.processConsistencyProof))
122+
call_count = 0
123+
120124
# patch processConsistencyProof
121-
monkeypatch.setattr(node_to_disconnect.ledgerManager,
122-
'processConsistencyProof',
123-
unpatch_after_call)
125+
node_to_disconnect.nodeMsgRouter.add((ConsistencyProof, unpatch_after_call))
124126
# add node_to_disconnect to pool
125127
looper.add(node_to_disconnect)
126128
txnPoolNodeSet[-1] = node_to_disconnect

plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from logging import getLogger
22

33
import pytest
4+
from plenum.common.constants import LEDGER_STATUS
45

56
from plenum.common.messages.node_messages import Checkpoint, LedgerStatus
67
from plenum.common.startable import Mode
@@ -88,7 +89,7 @@ def test_3pc_while_catchup_with_chkpoints_only(tdir, tconf,
8889
lagging_node.nodeIbStasher.delay(pDelay())
8990
lagging_node.nodeIbStasher.delay(cDelay())
9091

91-
with delay_rules(lagging_node.nodeIbStasher, lsDelay(), cr_delay()):
92+
with delay_rules(lagging_node.nodeIbStasher, lsDelay(), cr_delay(), msg_rep_delay(types_to_delay=[LEDGER_STATUS])):
9293
looper.add(lagging_node)
9394
txnPoolNodeSet[-1] = lagging_node
9495
looper.run(checkNodesConnected(txnPoolNodeSet))

0 commit comments

Comments
 (0)