Skip to content

Commit 9ae0e15

Browse files
authored
Merge pull request #1330 from ashcherbakov/indy-1340
INDY-1340: go to next view on timeuout for PBFT View Change Service
2 parents bbd24aa + 8c284e2 commit 9ae0e15

10 files changed

+311
-82
lines changed

plenum/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,3 +412,5 @@
412412
# TAA acceptance time valid deviations (secs)
413413
TXN_AUTHOR_AGREEMENT_ACCEPTANCE_TIME_BEFORE_TAA_TIME = 120
414414
TXN_AUTHOR_AGREEMENT_ACCEPTANCE_TIME_AFTER_PP_TIME = 120
415+
416+
NEW_VIEW_TIMEOUT = 30 # in secs

plenum/server/consensus/ordering_service.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
PP_CHECK_WRONG_TIME, Stats, OrderedTracker, TPCStat, generateName, getNodeName
4747
from plenum.server.replica_freshness_checker import FreshnessChecker
4848
from plenum.server.replica_helper import replica_batch_digest
49+
from plenum.server.replica_validator_enums import STASH_WAITING_NEW_VIEW
4950
from plenum.server.request_managers.write_request_manager import WriteRequestManager
5051
from plenum.server.suspicion_codes import Suspicions
5152
from stp_core.common.log import getlogger
@@ -2245,6 +2246,8 @@ def process_new_view_checkpoints_applied(self, msg: NewViewCheckpointsApplied):
22452246
if result != PROCESS:
22462247
return result, reason
22472248

2249+
# apply PrePrepares from NewView that we have
2250+
# request missing PrePrepares from NewView
22482251
missing_batches = []
22492252
for batch_id in msg.batches:
22502253
pp = self.old_view_preprepares.get((batch_id.pp_view_no, batch_id.pp_seq_no, batch_id.pp_digest))
@@ -2256,6 +2259,9 @@ def process_new_view_checkpoints_applied(self, msg: NewViewCheckpointsApplied):
22562259
if missing_batches:
22572260
self._request_old_view_pre_prepares(missing_batches)
22582261

2262+
# unstash waiting for New View messages
2263+
self._stasher.process_all_stashed(STASH_WAITING_NEW_VIEW)
2264+
22592265
return PROCESS, None
22602266

22612267
def process_old_view_preprepare_request(self, msg: OldViewPrePrepareRequest, sender):

plenum/server/consensus/replica_service.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def __init__(self, name: str, validators: List[str], primary_name: str,
3333
# ToDo: Maybe ConsensusSharedData should be initiated before and passed already prepared?
3434
self._data = ConsensusSharedData(name, validators, 0)
3535
self._data.primary_name = generateName(primary_name, self._data.inst_id)
36-
config = getConfig()
37-
stasher = StashingRouter(config.REPLICA_STASH_LIMIT, buses=[bus, network])
36+
self.config = getConfig()
37+
self.stasher = StashingRouter(self.config.REPLICA_STASH_LIMIT, buses=[bus, network])
3838
self._write_manager = write_manager
3939
self._orderer = OrderingService(data=self._data,
4040
timer=timer,
@@ -43,12 +43,12 @@ def __init__(self, name: str, validators: List[str], primary_name: str,
4343
write_manager=self._write_manager,
4444
bls_bft_replica=bls_bft_replica,
4545
freshness_checker=FreshnessChecker(
46-
freshness_timeout=config.STATE_FRESHNESS_UPDATE_INTERVAL),
47-
stasher=stasher)
46+
freshness_timeout=self.config.STATE_FRESHNESS_UPDATE_INTERVAL),
47+
stasher=self.stasher)
4848
self._orderer._validator = OrderingServiceMsgValidator(self._orderer._data)
49-
self._checkpointer = CheckpointService(self._data, bus, network, stasher,
49+
self._checkpointer = CheckpointService(self._data, bus, network, self.stasher,
5050
write_manager.database_manager)
51-
self._view_changer = ViewChangeService(self._data, timer, bus, network, stasher)
51+
self._view_changer = ViewChangeService(self._data, timer, bus, network, self.stasher)
5252
self._message_requestor = MessageReq3pcService(self._data, bus, network)
5353

5454
self._add_ledgers()

plenum/server/consensus/view_change_service.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@
88
from plenum.common.config_util import getConfig
99
from plenum.common.event_bus import InternalBus, ExternalBus
1010
from plenum.common.messages.internal_messages import NeedViewChange, NewViewAccepted, ViewChangeStarted
11-
from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint
11+
from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint, InstanceChange
1212
from plenum.common.router import Subscription
1313
from plenum.common.stashing_router import StashingRouter, DISCARD, PROCESS
14-
from plenum.common.timer import TimerService
14+
from plenum.common.timer import TimerService, RepeatingTimer
1515
from plenum.server.consensus.consensus_shared_data import ConsensusSharedData, BatchID
1616
from plenum.server.consensus.primary_selector import RoundRobinPrimariesSelector
1717
from plenum.server.quorums import Quorums
1818
from plenum.server.replica_helper import generateName, getNodeName
1919
from plenum.server.replica_validator_enums import STASH_VIEW
20+
from plenum.server.suspicion_codes import Suspicions
2021
from stp_core.common.log import getlogger
2122

2223

@@ -41,6 +42,7 @@ def __init__(self, data: ConsensusSharedData, timer: TimerService, bus: Internal
4142
self._router = stasher
4243
self._votes = ViewChangeVotesForView(self._data.quorums)
4344
self._new_view = None # type: Optional[NewView]
45+
self._resend_inst_change_timer = None
4446

4547
self._router.subscribe(ViewChange, self.process_view_change_message)
4648
self._router.subscribe(ViewChangeAck, self.process_view_change_ack_message)
@@ -85,7 +87,16 @@ def process_need_view_change(self, msg: NeedViewChange):
8587
self._votes.add_view_change(vc, self._data.name)
8688

8789
# 6. Unstash messages for new view
88-
self._router.process_all_stashed()
90+
self._router.process_all_stashed(STASH_VIEW)
91+
92+
# 7. Schedule New View after timeout
93+
if self._resend_inst_change_timer is not None:
94+
self._resend_inst_change_timer.stop()
95+
self._resend_inst_change_timer = \
96+
RepeatingTimer(self._timer,
97+
self._config.NEW_VIEW_TIMEOUT,
98+
partial(self._propose_view_change, Suspicions.INSTANCE_CHANGE_TIMEOUT.code),
99+
active=True)
89100

90101
def _clean_on_view_change_start(self):
91102
self._clear_old_batches(self._old_prepared)
@@ -234,7 +245,7 @@ def _finish_view_change_if_needed(self):
234245
self._data.view_no,
235246
cp)
236247
)
237-
self._bus.send(NeedViewChange())
248+
self._propose_view_change(Suspicions.NEW_VIEW_INVALID_CHECKPOINTS.code)
238249
return
239250

240251
batches = self._new_view_builder.calc_batches(cp, view_changes)
@@ -246,7 +257,7 @@ def _finish_view_change_if_needed(self):
246257
self._data.view_no,
247258
batches)
248259
)
249-
self._bus.send(NeedViewChange())
260+
self._propose_view_change(Suspicions.NEW_VIEW_INVALID_BATCHES.code)
250261
return
251262

252263
self._finish_view_change()
@@ -255,12 +266,21 @@ def _finish_view_change(self):
255266
# Update shared data
256267
self._data.waiting_for_new_view = False
257268

269+
# Cancel View Change timeout task
270+
if self._resend_inst_change_timer is not None:
271+
self._resend_inst_change_timer.stop()
272+
self._resend_inst_change_timer = None
273+
258274
# send message to other services
259275
self._bus.send(NewViewAccepted(view_no=self._new_view.viewNo,
260276
view_changes=self._new_view.viewChanges,
261277
checkpoint=self._new_view.checkpoint,
262278
batches=self._new_view.batches))
263279

280+
def _propose_view_change(self, suspision_code):
281+
msg = InstanceChange(self._data.view_no + 1, suspision_code)
282+
self._network.send(msg)
283+
264284

265285
class NewViewBuilder:
266286

plenum/server/suspicion_codes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ class Suspicions:
8888
"incorrect audit ledger transaction root hash")
8989

9090
REPLICAS_COUNT_CHANGED = Suspicion(46, "Replica's count changed")
91+
NEW_VIEW_INVALID_CHECKPOINTS = Suspicion(47, "New View's Primary sent NewView with invalid checkpoints")
92+
NEW_VIEW_INVALID_BATCHES = Suspicion(48, "New View's Primary sent NewView with invalid batches")
9193

9294
@classmethod
9395
def get_list(cls):

plenum/test/consensus/conftest.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pytest
44

5+
from plenum.bls.bls_crypto_factory import create_default_bls_crypto_factory
56
from plenum.common.constants import DOMAIN_LEDGER_ID, AUDIT_LEDGER_ID
67
from plenum.common.messages.internal_messages import RequestPropagates
78
from plenum.common.startable import Mode
@@ -11,14 +12,15 @@
1112
from plenum.common.util import get_utc_epoch
1213
from plenum.server.consensus.consensus_shared_data import ConsensusSharedData
1314
from plenum.common.messages.node_messages import Checkpoint
15+
from plenum.server.consensus.replica_service import ReplicaService
1416
from plenum.server.consensus.view_change_service import ViewChangeService
1517
from plenum.server.database_manager import DatabaseManager
1618
from plenum.server.replica_helper import generateName
1719
from plenum.server.request_managers.write_request_manager import WriteRequestManager
1820
from plenum.test.checkpoints.helper import cp_digest
19-
from plenum.test.consensus.helper import primary_in_view
21+
from plenum.test.consensus.helper import primary_in_view, create_test_write_req_manager
2022
from plenum.test.greek import genNodeNames
21-
from plenum.test.helper import MockTimer, MockNetwork
23+
from plenum.test.helper import MockTimer, MockNetwork, create_pool_txn_data
2224
from plenum.test.testing_utils import FakeSomething
2325

2426

@@ -59,9 +61,14 @@ def _data(name):
5961

6062

6163
@pytest.fixture
62-
def view_change_service(internal_bus, external_bus, stasher):
64+
def timer():
65+
return MockTimer(0)
66+
67+
68+
@pytest.fixture
69+
def view_change_service(internal_bus, external_bus, timer, stasher):
6370
data = ConsensusSharedData("some_name", genNodeNames(4), 0)
64-
return ViewChangeService(data, MockTimer(0), internal_bus, external_bus, stasher)
71+
return ViewChangeService(data, timer, internal_bus, external_bus, stasher)
6572

6673

6774
@pytest.fixture
@@ -161,3 +168,19 @@ def write_manager(db_manager):
161168
@pytest.fixture()
162169
def stasher(internal_bus, external_bus):
163170
return StashingRouter(limit=100000, buses=[internal_bus, external_bus])
171+
172+
173+
@pytest.fixture()
174+
def replica_service(validators, primary, timer,
175+
internal_bus, external_bus):
176+
genesis_txns = create_pool_txn_data(
177+
node_names=validators,
178+
crypto_factory=create_default_bls_crypto_factory(),
179+
get_free_port=lambda: 8090)['txns']
180+
return ReplicaService("Alpha:0",
181+
validators, primary,
182+
timer,
183+
internal_bus,
184+
external_bus,
185+
write_manager=create_test_write_req_manager("Alpha", genesis_txns),
186+
bls_bft_replica=FakeSomething(gc=lambda key: None))

plenum/test/consensus/helper.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,10 @@ def __init__(self, node_count: int = 4, random: Optional[SimRandom] = None):
143143
self.network.create_peer(name, handler),
144144
write_manager=create_test_write_req_manager(name, genesis_txns),
145145
bls_bft_replica=MockBlsBftReplica())
146+
replica.config.NEW_VIEW_TIMEOUT = 30 * 1000
146147
self._nodes.append(replica)
147148

149+
148150
@property
149151
def timer(self) -> MockTimer:
150152
return self._timer

plenum/test/consensus/test_stash.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from plenum.common.messages.internal_messages import NeedViewChange, NewViewCheckpointsApplied
2+
from plenum.common.startable import Mode
3+
from plenum.server.replica_validator_enums import STASH_VIEW, STASH_WAITING_NEW_VIEW
4+
from plenum.test.consensus.helper import create_new_view
5+
from plenum.test.helper import create_commit_no_bls_sig
6+
7+
8+
def test_unstash_future_view_on_need_view_change(external_bus, internal_bus,
9+
replica_service):
10+
replica_service._data.view_no = 1
11+
replica_service._data.node_mode = Mode.participating
12+
external_bus.process_incoming(create_new_view(initial_view_no=1, stable_cp=200),
13+
replica_service._data.primary_name)
14+
external_bus.process_incoming(create_commit_no_bls_sig(req_key=(2, 10)),
15+
replica_service._data.primary_name)
16+
assert replica_service.stasher.stash_size(STASH_VIEW) == 2
17+
18+
internal_bus.send(NeedViewChange(view_no=2))
19+
20+
assert replica_service.stasher.stash_size(STASH_VIEW) == 0
21+
assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 1
22+
23+
24+
def test_unstash_waiting_new_view_on_new_view_checkpoint_applied(external_bus, internal_bus,
25+
replica_service):
26+
replica_service._data.view_no = 2
27+
replica_service._data.node_mode = Mode.participating
28+
replica_service._data.waiting_for_new_view = True
29+
30+
external_bus.process_incoming(create_commit_no_bls_sig(req_key=(2, 10)),
31+
replica_service._data.primary_name)
32+
assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 1
33+
34+
new_view = create_new_view(initial_view_no=1, stable_cp=200)
35+
replica_service._data.waiting_for_new_view = False
36+
internal_bus.send(NewViewCheckpointsApplied(view_no=2,
37+
view_changes=new_view.viewChanges,
38+
checkpoint=new_view.checkpoint,
39+
batches=new_view.batches))
40+
41+
assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 0

plenum/test/consensus/view_change/test_sim_view_change.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ def check_view_change_completes_under_normal_conditions(random: SimRandom):
2121
# Make sure all nodes complete view change
2222
pool.timer.wait_for(lambda: all(not node._data.waiting_for_new_view
2323
and node._data.view_no > 0
24-
for node in pool.nodes))
24+
for node in pool.nodes),
25+
timeout=5 * 30 * 1000) # 5 NEW_VIEW_TIMEOUT intervals
2526

2627
# Make sure all nodes end up in same state
2728
for node_a, node_b in zip(pool.nodes, pool.nodes[1:]):

0 commit comments

Comments
 (0)