Skip to content

INDY-1340: go to next view on timeuout for PBFT View Change Service #1330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 13, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,5 @@
# TAA acceptance time valid deviations (secs)
TXN_AUTHOR_AGREEMENT_ACCEPTANCE_TIME_BEFORE_TAA_TIME = 120
TXN_AUTHOR_AGREEMENT_ACCEPTANCE_TIME_AFTER_PP_TIME = 120

NEW_VIEW_TIMEOUT = 30 # in secs
6 changes: 6 additions & 0 deletions plenum/server/consensus/ordering_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
PP_CHECK_WRONG_TIME, Stats, OrderedTracker, TPCStat, generateName, getNodeName
from plenum.server.replica_freshness_checker import FreshnessChecker
from plenum.server.replica_helper import replica_batch_digest
from plenum.server.replica_validator_enums import STASH_WAITING_NEW_VIEW
from plenum.server.request_managers.write_request_manager import WriteRequestManager
from plenum.server.suspicion_codes import Suspicions
from stp_core.common.log import getlogger
Expand Down Expand Up @@ -2243,6 +2244,8 @@ def process_new_view_checkpoints_applied(self, msg: NewViewCheckpointsApplied):
if result != PROCESS:
return result, reason

# apply PrePrepares from NewView that we have
# request missing PrePrepares from NewView
missing_batches = []
for batch_id in msg.batches:
pp = self.old_view_preprepares.get((batch_id.pp_view_no, batch_id.pp_seq_no, batch_id.pp_digest))
Expand All @@ -2254,6 +2257,9 @@ def process_new_view_checkpoints_applied(self, msg: NewViewCheckpointsApplied):
if missing_batches:
self._request_old_view_pre_prepares(missing_batches)

# unstash waiting for New View messages
self._stasher.process_all_stashed(STASH_WAITING_NEW_VIEW)

return PROCESS, None

def process_old_view_preprepare_request(self, msg: OldViewPrePrepareRequest, sender):
Expand Down
12 changes: 6 additions & 6 deletions plenum/server/consensus/replica_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ def __init__(self, name: str, validators: List[str], primary_name: str,
bls_bft_replica: BlsBftReplica=None):
self._data = ConsensusSharedData(name, validators, 0)
self._data.primary_name = primary_name
config = getConfig()
stasher = StashingRouter(config.REPLICA_STASH_LIMIT, buses=[bus, network])
self.config = getConfig()
self.stasher = StashingRouter(self.config.REPLICA_STASH_LIMIT, buses=[bus, network])
self._orderer = OrderingService(data=self._data,
timer=timer,
bus=bus,
network=network,
write_manager=write_manager,
bls_bft_replica=bls_bft_replica,
freshness_checker=FreshnessChecker(
freshness_timeout=config.STATE_FRESHNESS_UPDATE_INTERVAL),
stasher=stasher)
freshness_timeout=self.config.STATE_FRESHNESS_UPDATE_INTERVAL),
stasher=self.stasher)
self._orderer._validator = OrderingServiceMsgValidator(self._orderer._data)
self._checkpointer = CheckpointService(self._data, bus, network, stasher,
self._checkpointer = CheckpointService(self._data, bus, network, self.stasher,
write_manager.database_manager)
self._view_changer = ViewChangeService(self._data, timer, bus, network, stasher)
self._view_changer = ViewChangeService(self._data, timer, bus, network, self.stasher)
self._message_requestor = MessageReq3pcService(self._data, bus, network)

# TODO: This is just for testing purposes only
Expand Down
30 changes: 25 additions & 5 deletions plenum/server/consensus/view_change_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
from plenum.common.config_util import getConfig
from plenum.common.event_bus import InternalBus, ExternalBus
from plenum.common.messages.internal_messages import NeedViewChange, NewViewAccepted, ViewChangeStarted
from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint
from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint, InstanceChange
from plenum.common.router import Subscription
from plenum.common.stashing_router import StashingRouter, DISCARD, PROCESS
from plenum.common.timer import TimerService
from plenum.common.timer import TimerService, RepeatingTimer
from plenum.server.consensus.consensus_shared_data import ConsensusSharedData, BatchID
from plenum.server.consensus.primary_selector import RoundRobinPrimariesSelector
from plenum.server.quorums import Quorums
from plenum.server.replica_helper import generateName, getNodeName
from plenum.server.replica_validator_enums import STASH_VIEW
from plenum.server.suspicion_codes import Suspicions
from stp_core.common.log import getlogger


Expand All @@ -41,6 +42,7 @@ def __init__(self, data: ConsensusSharedData, timer: TimerService, bus: Internal
self._router = stasher
self._votes = ViewChangeVotesForView(self._data.quorums)
self._new_view = None # type: Optional[NewView]
self._resend_inst_change_timer = None

self._router.subscribe(ViewChange, self.process_view_change_message)
self._router.subscribe(ViewChangeAck, self.process_view_change_ack_message)
Expand Down Expand Up @@ -85,7 +87,16 @@ def process_need_view_change(self, msg: NeedViewChange):
self._votes.add_view_change(vc, self._data.name)

# 6. Unstash messages for new view
self._router.process_all_stashed()
self._router.process_all_stashed(STASH_VIEW)

# 7. Schedule New View after timeout
if self._resend_inst_change_timer is not None:
self._resend_inst_change_timer.stop()
self._resend_inst_change_timer = \
RepeatingTimer(self._timer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think will be better in a future use one RepeatingTimer without recreating it.

self._config.NEW_VIEW_TIMEOUT,
partial(self._propose_view_change, Suspicions.INSTANCE_CHANGE_TIMEOUT.code),
active=True)

def _clean_on_view_change_start(self):
self._clear_old_batches(self._old_prepared)
Expand Down Expand Up @@ -234,7 +245,7 @@ def _finish_view_change_if_needed(self):
self._data.view_no,
cp)
)
self._bus.send(NeedViewChange())
self._propose_view_change(Suspicions.NEW_VIEW_INVALID_CHECKPOINTS.code)
return

batches = self._new_view_builder.calc_batches(cp, view_changes)
Expand All @@ -246,7 +257,7 @@ def _finish_view_change_if_needed(self):
self._data.view_no,
batches)
)
self._bus.send(NeedViewChange())
self._propose_view_change(Suspicions.NEW_VIEW_INVALID_BATCHES.code)
return

self._finish_view_change()
Expand All @@ -255,12 +266,21 @@ def _finish_view_change(self):
# Update shared data
self._data.waiting_for_new_view = False

# Cancel View Change timeout task
if self._resend_inst_change_timer is not None:
self._resend_inst_change_timer.stop()
self._resend_inst_change_timer = None

# send message to other services
self._bus.send(NewViewAccepted(view_no=self._new_view.viewNo,
view_changes=self._new_view.viewChanges,
checkpoint=self._new_view.checkpoint,
batches=self._new_view.batches))

def _propose_view_change(self, suspision_code):
msg = InstanceChange(self._data.view_no + 1, suspision_code)
self._network.send(msg)


class NewViewBuilder:

Expand Down
2 changes: 2 additions & 0 deletions plenum/server/suspicion_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class Suspicions:
"incorrect audit ledger transaction root hash")

REPLICAS_COUNT_CHANGED = Suspicion(46, "Replica's count changed")
NEW_VIEW_INVALID_CHECKPOINTS = Suspicion(47, "New View's Primary sent NewView with invalid checkpoints")
NEW_VIEW_INVALID_BATCHES = Suspicion(48, "New View's Primary sent NewView with invalid batches")

@classmethod
def get_list(cls):
Expand Down
31 changes: 27 additions & 4 deletions plenum/test/consensus/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from plenum.bls.bls_crypto_factory import create_default_bls_crypto_factory
from plenum.common.constants import DOMAIN_LEDGER_ID, AUDIT_LEDGER_ID
from plenum.common.messages.internal_messages import RequestPropagates
from plenum.common.startable import Mode
Expand All @@ -11,14 +12,15 @@
from plenum.common.util import get_utc_epoch
from plenum.server.consensus.consensus_shared_data import ConsensusSharedData
from plenum.common.messages.node_messages import Checkpoint
from plenum.server.consensus.replica_service import ReplicaService
from plenum.server.consensus.view_change_service import ViewChangeService
from plenum.server.database_manager import DatabaseManager
from plenum.server.replica_helper import generateName
from plenum.server.request_managers.write_request_manager import WriteRequestManager
from plenum.test.checkpoints.helper import cp_digest
from plenum.test.consensus.helper import primary_in_view
from plenum.test.consensus.helper import primary_in_view, create_test_write_req_manager
from plenum.test.greek import genNodeNames
from plenum.test.helper import MockTimer, MockNetwork
from plenum.test.helper import MockTimer, MockNetwork, create_pool_txn_data
from plenum.test.testing_utils import FakeSomething


Expand Down Expand Up @@ -59,9 +61,14 @@ def _data(name):


@pytest.fixture
def view_change_service(internal_bus, external_bus, stasher):
def timer():
return MockTimer(0)


@pytest.fixture
def view_change_service(internal_bus, external_bus, timer, stasher):
data = ConsensusSharedData("some_name", genNodeNames(4), 0)
return ViewChangeService(data, MockTimer(0), internal_bus, external_bus, stasher)
return ViewChangeService(data, timer, internal_bus, external_bus, stasher)


@pytest.fixture
Expand Down Expand Up @@ -161,3 +168,19 @@ def write_manager(db_manager):
@pytest.fixture()
def stasher(internal_bus, external_bus):
return StashingRouter(limit=100000, buses=[internal_bus, external_bus])


@pytest.fixture()
def replica_service(validators, primary, timer,
internal_bus, external_bus):
genesis_txns = create_pool_txn_data(
node_names=validators,
crypto_factory=create_default_bls_crypto_factory(),
get_free_port=lambda: 8090)['txns']
return ReplicaService("Alpha:0",
validators, primary,
timer,
internal_bus,
external_bus,
write_manager=create_test_write_req_manager("Alpha", genesis_txns),
bls_bft_replica=FakeSomething(gc=lambda key: None))
2 changes: 2 additions & 0 deletions plenum/test/consensus/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ def __init__(self, node_count: int = 4, random: Optional[SimRandom] = None):
self.network.create_peer(name, handler),
write_manager=create_test_write_req_manager(name, genesis_txns),
bls_bft_replica=FakeSomething(gc=lambda key: None))
replica.config.NEW_VIEW_TIMEOUT = 30 * 1000
self._nodes.append(replica)


@property
def timer(self) -> MockTimer:
return self._timer
Expand Down
41 changes: 41 additions & 0 deletions plenum/test/consensus/test_stash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from plenum.common.messages.internal_messages import NeedViewChange, NewViewCheckpointsApplied
from plenum.common.startable import Mode
from plenum.server.replica_validator_enums import STASH_VIEW, STASH_WAITING_NEW_VIEW
from plenum.test.consensus.helper import create_new_view
from plenum.test.helper import create_commit_no_bls_sig


def test_unstash_future_view_on_need_view_change(external_bus, internal_bus,
replica_service):
replica_service._data.view_no = 1
replica_service._data.node_mode = Mode.participating
external_bus.process_incoming(create_new_view(initial_view_no=1, stable_cp=200),
replica_service._data.primary_name)
external_bus.process_incoming(create_commit_no_bls_sig(req_key=(2, 10)),
replica_service._data.primary_name)
assert replica_service.stasher.stash_size(STASH_VIEW) == 2

internal_bus.send(NeedViewChange(view_no=2))

assert replica_service.stasher.stash_size(STASH_VIEW) == 0
assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 1


def test_unstash_waiting_new_view_on_new_view_checkpoint_applied(external_bus, internal_bus,
replica_service):
replica_service._data.view_no = 2
replica_service._data.node_mode = Mode.participating
replica_service._data.waiting_for_new_view = True

external_bus.process_incoming(create_commit_no_bls_sig(req_key=(2, 10)),
replica_service._data.primary_name)
assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 1

new_view = create_new_view(initial_view_no=1, stable_cp=200)
replica_service._data.waiting_for_new_view = False
internal_bus.send(NewViewCheckpointsApplied(view_no=2,
view_changes=new_view.viewChanges,
checkpoint=new_view.checkpoint,
batches=new_view.batches))

assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 0
3 changes: 2 additions & 1 deletion plenum/test/consensus/view_change/test_sim_view_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def check_view_change_completes_under_normal_conditions(random: SimRandom):
# Make sure all nodes complete view change
pool.timer.wait_for(lambda: all(not node._data.waiting_for_new_view
and node._data.view_no > 0
for node in pool.nodes))
for node in pool.nodes),
timeout=5 * 30 * 1000) # 5 NEW_VIEW_TIMEOUT intervals

# Make sure all nodes end up in same state
for node_a, node_b in zip(pool.nodes, pool.nodes[1:]):
Expand Down
Loading