diff --git a/plenum/config.py b/plenum/config.py index f2663e9f36..b8e63f75f3 100644 --- a/plenum/config.py +++ b/plenum/config.py @@ -412,3 +412,5 @@ # TAA acceptance time valid deviations (secs) TXN_AUTHOR_AGREEMENT_ACCEPTANCE_TIME_BEFORE_TAA_TIME = 120 TXN_AUTHOR_AGREEMENT_ACCEPTANCE_TIME_AFTER_PP_TIME = 120 + +NEW_VIEW_TIMEOUT = 30 # in secs diff --git a/plenum/server/consensus/ordering_service.py b/plenum/server/consensus/ordering_service.py index 5b0770e40a..6ec8d4e258 100644 --- a/plenum/server/consensus/ordering_service.py +++ b/plenum/server/consensus/ordering_service.py @@ -46,6 +46,7 @@ PP_CHECK_WRONG_TIME, Stats, OrderedTracker, TPCStat, generateName, getNodeName from plenum.server.replica_freshness_checker import FreshnessChecker from plenum.server.replica_helper import replica_batch_digest +from plenum.server.replica_validator_enums import STASH_WAITING_NEW_VIEW from plenum.server.request_managers.write_request_manager import WriteRequestManager from plenum.server.suspicion_codes import Suspicions from stp_core.common.log import getlogger @@ -2245,6 +2246,8 @@ def process_new_view_checkpoints_applied(self, msg: NewViewCheckpointsApplied): if result != PROCESS: return result, reason + # apply PrePrepares from NewView that we have + # request missing PrePrepares from NewView missing_batches = [] for batch_id in msg.batches: pp = self.old_view_preprepares.get((batch_id.pp_view_no, batch_id.pp_seq_no, batch_id.pp_digest)) @@ -2256,6 +2259,9 @@ def process_new_view_checkpoints_applied(self, msg: NewViewCheckpointsApplied): if missing_batches: self._request_old_view_pre_prepares(missing_batches) + # unstash waiting for New View messages + self._stasher.process_all_stashed(STASH_WAITING_NEW_VIEW) + return PROCESS, None def process_old_view_preprepare_request(self, msg: OldViewPrePrepareRequest, sender): diff --git a/plenum/server/consensus/replica_service.py b/plenum/server/consensus/replica_service.py index b0b1ddcb9e..cadb6b0941 100644 --- a/plenum/server/consensus/replica_service.py +++ b/plenum/server/consensus/replica_service.py @@ -33,8 +33,8 @@ def __init__(self, name: str, validators: List[str], primary_name: str, # ToDo: Maybe ConsensusSharedData should be initiated before and passed already prepared? self._data = ConsensusSharedData(name, validators, 0) self._data.primary_name = generateName(primary_name, self._data.inst_id) - config = getConfig() - stasher = StashingRouter(config.REPLICA_STASH_LIMIT, buses=[bus, network]) + self.config = getConfig() + self.stasher = StashingRouter(self.config.REPLICA_STASH_LIMIT, buses=[bus, network]) self._write_manager = write_manager self._orderer = OrderingService(data=self._data, timer=timer, @@ -43,12 +43,12 @@ def __init__(self, name: str, validators: List[str], primary_name: str, write_manager=self._write_manager, bls_bft_replica=bls_bft_replica, freshness_checker=FreshnessChecker( - freshness_timeout=config.STATE_FRESHNESS_UPDATE_INTERVAL), - stasher=stasher) + freshness_timeout=self.config.STATE_FRESHNESS_UPDATE_INTERVAL), + stasher=self.stasher) self._orderer._validator = OrderingServiceMsgValidator(self._orderer._data) - self._checkpointer = CheckpointService(self._data, bus, network, stasher, + self._checkpointer = CheckpointService(self._data, bus, network, self.stasher, write_manager.database_manager) - self._view_changer = ViewChangeService(self._data, timer, bus, network, stasher) + self._view_changer = ViewChangeService(self._data, timer, bus, network, self.stasher) self._message_requestor = MessageReq3pcService(self._data, bus, network) self._add_ledgers() diff --git a/plenum/server/consensus/view_change_service.py b/plenum/server/consensus/view_change_service.py index 2d4849a334..7bc4d78202 100644 --- a/plenum/server/consensus/view_change_service.py +++ b/plenum/server/consensus/view_change_service.py @@ -8,15 +8,16 @@ from plenum.common.config_util import getConfig from plenum.common.event_bus import InternalBus, ExternalBus from plenum.common.messages.internal_messages import NeedViewChange, NewViewAccepted, ViewChangeStarted -from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint +from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint, InstanceChange from plenum.common.router import Subscription from plenum.common.stashing_router import StashingRouter, DISCARD, PROCESS -from plenum.common.timer import TimerService +from plenum.common.timer import TimerService, RepeatingTimer from plenum.server.consensus.consensus_shared_data import ConsensusSharedData, BatchID from plenum.server.consensus.primary_selector import RoundRobinPrimariesSelector from plenum.server.quorums import Quorums from plenum.server.replica_helper import generateName, getNodeName from plenum.server.replica_validator_enums import STASH_VIEW +from plenum.server.suspicion_codes import Suspicions from stp_core.common.log import getlogger @@ -41,6 +42,7 @@ def __init__(self, data: ConsensusSharedData, timer: TimerService, bus: Internal self._router = stasher self._votes = ViewChangeVotesForView(self._data.quorums) self._new_view = None # type: Optional[NewView] + self._resend_inst_change_timer = None self._router.subscribe(ViewChange, self.process_view_change_message) self._router.subscribe(ViewChangeAck, self.process_view_change_ack_message) @@ -85,7 +87,16 @@ def process_need_view_change(self, msg: NeedViewChange): self._votes.add_view_change(vc, self._data.name) # 6. Unstash messages for new view - self._router.process_all_stashed() + self._router.process_all_stashed(STASH_VIEW) + + # 7. Schedule New View after timeout + if self._resend_inst_change_timer is not None: + self._resend_inst_change_timer.stop() + self._resend_inst_change_timer = \ + RepeatingTimer(self._timer, + self._config.NEW_VIEW_TIMEOUT, + partial(self._propose_view_change, Suspicions.INSTANCE_CHANGE_TIMEOUT.code), + active=True) def _clean_on_view_change_start(self): self._clear_old_batches(self._old_prepared) @@ -234,7 +245,7 @@ def _finish_view_change_if_needed(self): self._data.view_no, cp) ) - self._bus.send(NeedViewChange()) + self._propose_view_change(Suspicions.NEW_VIEW_INVALID_CHECKPOINTS.code) return batches = self._new_view_builder.calc_batches(cp, view_changes) @@ -246,7 +257,7 @@ def _finish_view_change_if_needed(self): self._data.view_no, batches) ) - self._bus.send(NeedViewChange()) + self._propose_view_change(Suspicions.NEW_VIEW_INVALID_BATCHES.code) return self._finish_view_change() @@ -255,12 +266,21 @@ def _finish_view_change(self): # Update shared data self._data.waiting_for_new_view = False + # Cancel View Change timeout task + if self._resend_inst_change_timer is not None: + self._resend_inst_change_timer.stop() + self._resend_inst_change_timer = None + # send message to other services self._bus.send(NewViewAccepted(view_no=self._new_view.viewNo, view_changes=self._new_view.viewChanges, checkpoint=self._new_view.checkpoint, batches=self._new_view.batches)) + def _propose_view_change(self, suspision_code): + msg = InstanceChange(self._data.view_no + 1, suspision_code) + self._network.send(msg) + class NewViewBuilder: diff --git a/plenum/server/suspicion_codes.py b/plenum/server/suspicion_codes.py index 835133314e..5c2fe7252f 100644 --- a/plenum/server/suspicion_codes.py +++ b/plenum/server/suspicion_codes.py @@ -88,6 +88,8 @@ class Suspicions: "incorrect audit ledger transaction root hash") REPLICAS_COUNT_CHANGED = Suspicion(46, "Replica's count changed") + NEW_VIEW_INVALID_CHECKPOINTS = Suspicion(47, "New View's Primary sent NewView with invalid checkpoints") + NEW_VIEW_INVALID_BATCHES = Suspicion(48, "New View's Primary sent NewView with invalid batches") @classmethod def get_list(cls): diff --git a/plenum/test/consensus/conftest.py b/plenum/test/consensus/conftest.py index a73b09edad..a112bc9f87 100644 --- a/plenum/test/consensus/conftest.py +++ b/plenum/test/consensus/conftest.py @@ -2,6 +2,7 @@ import pytest +from plenum.bls.bls_crypto_factory import create_default_bls_crypto_factory from plenum.common.constants import DOMAIN_LEDGER_ID, AUDIT_LEDGER_ID from plenum.common.messages.internal_messages import RequestPropagates from plenum.common.startable import Mode @@ -11,14 +12,15 @@ from plenum.common.util import get_utc_epoch from plenum.server.consensus.consensus_shared_data import ConsensusSharedData from plenum.common.messages.node_messages import Checkpoint +from plenum.server.consensus.replica_service import ReplicaService from plenum.server.consensus.view_change_service import ViewChangeService from plenum.server.database_manager import DatabaseManager from plenum.server.replica_helper import generateName from plenum.server.request_managers.write_request_manager import WriteRequestManager from plenum.test.checkpoints.helper import cp_digest -from plenum.test.consensus.helper import primary_in_view +from plenum.test.consensus.helper import primary_in_view, create_test_write_req_manager from plenum.test.greek import genNodeNames -from plenum.test.helper import MockTimer, MockNetwork +from plenum.test.helper import MockTimer, MockNetwork, create_pool_txn_data from plenum.test.testing_utils import FakeSomething @@ -59,9 +61,14 @@ def _data(name): @pytest.fixture -def view_change_service(internal_bus, external_bus, stasher): +def timer(): + return MockTimer(0) + + +@pytest.fixture +def view_change_service(internal_bus, external_bus, timer, stasher): data = ConsensusSharedData("some_name", genNodeNames(4), 0) - return ViewChangeService(data, MockTimer(0), internal_bus, external_bus, stasher) + return ViewChangeService(data, timer, internal_bus, external_bus, stasher) @pytest.fixture @@ -161,3 +168,19 @@ def write_manager(db_manager): @pytest.fixture() def stasher(internal_bus, external_bus): return StashingRouter(limit=100000, buses=[internal_bus, external_bus]) + + +@pytest.fixture() +def replica_service(validators, primary, timer, + internal_bus, external_bus): + genesis_txns = create_pool_txn_data( + node_names=validators, + crypto_factory=create_default_bls_crypto_factory(), + get_free_port=lambda: 8090)['txns'] + return ReplicaService("Alpha:0", + validators, primary, + timer, + internal_bus, + external_bus, + write_manager=create_test_write_req_manager("Alpha", genesis_txns), + bls_bft_replica=FakeSomething(gc=lambda key: None)) diff --git a/plenum/test/consensus/helper.py b/plenum/test/consensus/helper.py index d820b8ba9a..7d8181996f 100644 --- a/plenum/test/consensus/helper.py +++ b/plenum/test/consensus/helper.py @@ -143,8 +143,10 @@ def __init__(self, node_count: int = 4, random: Optional[SimRandom] = None): self.network.create_peer(name, handler), write_manager=create_test_write_req_manager(name, genesis_txns), bls_bft_replica=MockBlsBftReplica()) + replica.config.NEW_VIEW_TIMEOUT = 30 * 1000 self._nodes.append(replica) + @property def timer(self) -> MockTimer: return self._timer diff --git a/plenum/test/consensus/test_stash.py b/plenum/test/consensus/test_stash.py new file mode 100644 index 0000000000..f60f48d328 --- /dev/null +++ b/plenum/test/consensus/test_stash.py @@ -0,0 +1,41 @@ +from plenum.common.messages.internal_messages import NeedViewChange, NewViewCheckpointsApplied +from plenum.common.startable import Mode +from plenum.server.replica_validator_enums import STASH_VIEW, STASH_WAITING_NEW_VIEW +from plenum.test.consensus.helper import create_new_view +from plenum.test.helper import create_commit_no_bls_sig + + +def test_unstash_future_view_on_need_view_change(external_bus, internal_bus, + replica_service): + replica_service._data.view_no = 1 + replica_service._data.node_mode = Mode.participating + external_bus.process_incoming(create_new_view(initial_view_no=1, stable_cp=200), + replica_service._data.primary_name) + external_bus.process_incoming(create_commit_no_bls_sig(req_key=(2, 10)), + replica_service._data.primary_name) + assert replica_service.stasher.stash_size(STASH_VIEW) == 2 + + internal_bus.send(NeedViewChange(view_no=2)) + + assert replica_service.stasher.stash_size(STASH_VIEW) == 0 + assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 1 + + +def test_unstash_waiting_new_view_on_new_view_checkpoint_applied(external_bus, internal_bus, + replica_service): + replica_service._data.view_no = 2 + replica_service._data.node_mode = Mode.participating + replica_service._data.waiting_for_new_view = True + + external_bus.process_incoming(create_commit_no_bls_sig(req_key=(2, 10)), + replica_service._data.primary_name) + assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 1 + + new_view = create_new_view(initial_view_no=1, stable_cp=200) + replica_service._data.waiting_for_new_view = False + internal_bus.send(NewViewCheckpointsApplied(view_no=2, + view_changes=new_view.viewChanges, + checkpoint=new_view.checkpoint, + batches=new_view.batches)) + + assert replica_service.stasher.stash_size(STASH_WAITING_NEW_VIEW) == 0 diff --git a/plenum/test/consensus/view_change/test_sim_view_change.py b/plenum/test/consensus/view_change/test_sim_view_change.py index 12fd289bd8..3c98c5cf02 100644 --- a/plenum/test/consensus/view_change/test_sim_view_change.py +++ b/plenum/test/consensus/view_change/test_sim_view_change.py @@ -21,7 +21,8 @@ def check_view_change_completes_under_normal_conditions(random: SimRandom): # Make sure all nodes complete view change pool.timer.wait_for(lambda: all(not node._data.waiting_for_new_view and node._data.view_no > 0 - for node in pool.nodes)) + for node in pool.nodes), + timeout=5 * 30 * 1000) # 5 NEW_VIEW_TIMEOUT intervals # Make sure all nodes end up in same state for node_a, node_b in zip(pool.nodes, pool.nodes[1:]): diff --git a/plenum/test/consensus/view_change/test_view_change_service.py b/plenum/test/consensus/view_change/test_view_change_service.py index adbf7310d6..bb0306acbc 100644 --- a/plenum/test/consensus/view_change/test_view_change_service.py +++ b/plenum/test/consensus/view_change/test_view_change_service.py @@ -5,9 +5,10 @@ from plenum.common.messages.internal_messages import NeedViewChange, NewViewAccepted, ViewChangeStarted, \ NewViewCheckpointsApplied -from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint +from plenum.common.messages.node_messages import ViewChange, ViewChangeAck, NewView, Checkpoint, InstanceChange from plenum.server.consensus.view_change_service import ViewChangeService, view_change_digest from plenum.server.replica_helper import generateName, getNodeName +from plenum.server.suspicion_codes import Suspicions from plenum.test.checkpoints.helper import cp_digest from plenum.test.consensus.helper import copy_shared_data, check_service_changed_only_owned_fields_in_shared_data, \ create_new_view, create_view_change, create_new_view_from_vc, create_view_change_acks, create_batches @@ -16,10 +17,10 @@ @pytest.fixture -def view_change_service_builder(consensus_data, mock_timer, internal_bus, external_bus, stasher): +def view_change_service_builder(consensus_data, timer, internal_bus, external_bus, stasher): def _service(name): data = consensus_data(name) - service = ViewChangeService(data, mock_timer, internal_bus, external_bus, stasher) + service = ViewChangeService(data, timer, internal_bus, external_bus, stasher) return service return _service @@ -146,12 +147,12 @@ def test_start_view_change_sends_view_change_started(internal_bus, view_change_s handler.assert_called_with(ViewChangeStarted(view_no=5)) -def test_start_view_change_broadcasts_view_change_message(internal_bus, view_change_service, +def test_start_view_change_broadcasts_view_change_message(internal_bus, external_bus, view_change_service, initial_view_no): internal_bus.send(NeedViewChange()) - assert len(view_change_service._network.sent_messages) == 1 - msg, dst = view_change_service._network.sent_messages[0] + assert len(external_bus.sent_messages) == 1 + msg, dst = external_bus.sent_messages[0] assert dst is None # message was broadcast assert isinstance(msg, ViewChange) assert msg.viewNo == initial_view_no + 1 @@ -159,20 +160,21 @@ def test_start_view_change_broadcasts_view_change_message(internal_bus, view_cha def test_non_primary_responds_to_view_change_message_with_view_change_ack_to_new_primary( - internal_bus, some_item, other_item, validators, primary, view_change_service_builder, initial_view_no): + internal_bus, external_bus, some_item, other_item, validators, primary, view_change_service_builder, + initial_view_no): next_view_no = initial_view_no + 1 non_primary_name = some_item(validators, exclude=[primary(next_view_no)]) service = view_change_service_builder(non_primary_name) internal_bus.send(NeedViewChange()) - service._network.sent_messages.clear() + external_bus.sent_messages.clear() vc = create_view_change(initial_view_no) frm = other_item(validators, exclude=[non_primary_name]) - service._network.process_incoming(vc, generateName(frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(frm, service._data.inst_id)) - assert len(service._network.sent_messages) == 1 - msg, dst = service._network.sent_messages[0] + assert len(external_bus.sent_messages) == 1 + msg, dst = external_bus.sent_messages[0] assert dst == [getNodeName(service._data.primary_name)] assert isinstance(msg, ViewChangeAck) assert msg.viewNo == vc.viewNo @@ -181,45 +183,46 @@ def test_non_primary_responds_to_view_change_message_with_view_change_ack_to_new def test_primary_doesnt_respond_to_view_change_message( - some_item, validators, primary, view_change_service_builder, initial_view_no, view_change_message): + some_item, validators, primary, external_bus, view_change_service_builder, initial_view_no, + view_change_message): name = primary(initial_view_no + 1) service = view_change_service_builder(name) vc = create_view_change(initial_view_no) frm = some_item(validators, exclude=[name]) - service._network.process_incoming(vc, generateName(frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(frm, service._data.inst_id)) - assert len(service._network.sent_messages) == 0 + assert len(external_bus.sent_messages) == 0 def test_new_view_message_is_sent_by_primary_when_view_change_certificate_is_reached( - internal_bus, validators, primary, view_change_service_builder, initial_view_no, + internal_bus, external_bus, validators, primary, view_change_service_builder, initial_view_no, view_change_acks): primary_name = primary(initial_view_no + 1) service = view_change_service_builder(primary_name) # start view change internal_bus.send(NeedViewChange()) - service._network.sent_messages.clear() + external_bus.sent_messages.clear() # receive quorum of ViewChanges and ViewChangeAcks non_primaries = [item for item in validators if item != primary_name] vc = create_view_change(initial_view_no) for vc_frm in non_primaries: - service._network.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) for ack, ack_frm in view_change_acks(vc, vc_frm, primary_name, len(validators) - 2): - service._network.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) # check that NewView has been sent - assert len(service._network.sent_messages) == 1 - msg, dst = service._network.sent_messages[0] + assert len(external_bus.sent_messages) == 1 + msg, dst = external_bus.sent_messages[0] assert dst is None # message was broadcast assert isinstance(msg, NewView) assert msg.viewNo == initial_view_no + 1 def test_new_view_message_is_not_sent_by_non_primary_when_view_change_certificate_is_reached( - internal_bus, validators, primary, view_change_service_builder, initial_view_no, some_item): + internal_bus, external_bus, validators, primary, view_change_service_builder, initial_view_no, some_item): next_view_no = initial_view_no + 1 primary_name = primary(next_view_no) non_primary_name = some_item(validators, exclude=[primary_name]) @@ -227,21 +230,22 @@ def test_new_view_message_is_not_sent_by_non_primary_when_view_change_certificat # start view change internal_bus.send(NeedViewChange()) - service._network.sent_messages.clear() + external_bus.sent_messages.clear() # receive quorum of ViewChanges and ViewChangeAcks non_primaries = [item for item in validators if item != primary_name] vc = create_view_change(initial_view_no) for vc_frm in non_primaries: - service._network.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) for ack, ack_frm in create_view_change_acks(vc, vc_frm, non_primaries): - service._network.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) # check that NewView hasn't been sent - assert all(not isinstance(msg, NewView) for msg in service._network.sent_messages) + assert all(not isinstance(msg, NewView) for msg in external_bus.sent_messages) -def test_view_change_finished_is_sent_by_primary_once_view_change_certificate_is_reached(internal_bus, validators, +def test_view_change_finished_is_sent_by_primary_once_view_change_certificate_is_reached(internal_bus, external_bus, + validators, primary, view_change_service_builder, initial_view_no): @@ -253,7 +257,7 @@ def test_view_change_finished_is_sent_by_primary_once_view_change_certificate_is # start view change internal_bus.send(NeedViewChange()) - service._network.sent_messages.clear() + external_bus.sent_messages.clear() old_data = copy_shared_data(service._data) # receive quorum of ViewChanges and ViewChangeAcks @@ -262,9 +266,9 @@ def test_view_change_finished_is_sent_by_primary_once_view_change_certificate_is vc = create_view_change(initial_view_no) new_view = create_new_view_from_vc(vc, non_primaries) for vc_frm in non_primaries: - service._network.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) for ack, ack_frm in create_view_change_acks(vc, vc_frm, non_primaries): - service._network.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) # check that NewViewAccepted has been sent expected_finish_vc = NewViewAccepted(view_no=initial_view_no + 1, @@ -281,7 +285,7 @@ def test_view_change_finished_is_sent_by_primary_once_view_change_certificate_is def test_view_change_finished_is_sent_by_non_primary_once_view_change_certificate_is_reached_and_new_view_from_primary( - internal_bus, validators, primary, view_change_service_builder, initial_view_no, some_item): + internal_bus, external_bus, validators, primary, view_change_service_builder, initial_view_no, some_item): handler = Mock() internal_bus.subscribe(NewViewAccepted, handler) @@ -298,25 +302,25 @@ def test_view_change_finished_is_sent_by_non_primary_once_view_change_certificat # start view change internal_bus.send(NeedViewChange()) - service._network.sent_messages.clear() + external_bus.sent_messages.clear() # receive quorum of ViewChanges and ViewChangeAcks non_primaries = [item for item in validators if item != primary_name] non_primaries = random.sample(non_primaries, service._data.quorums.view_change.value) new_view = create_new_view_from_vc(vc, non_primaries) for vc_frm in non_primaries: - service._network.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) for ack, ack_frm in create_view_change_acks(vc, vc_frm, non_primaries): - service._network.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) - # check that NewViewAccepted hasn't been sent if NewView is from non-primary - service._network.process_incoming(new_view, generateName(non_primary_name, service._data.inst_id)) + # check that NewViewAccepted hasn't been sent if NewView is from non-primary + external_bus.process_incoming(new_view, generateName(non_primary_name, service._data.inst_id)) handler.assert_not_called() assert service._data.view_no == initial_view_no + 1 assert service._data.waiting_for_new_view # check that NewViewAccepted has been sent if NewView is from primary - service._network.process_incoming(new_view, generateName(primary_name, service._data.inst_id)) + external_bus.process_incoming(new_view, generateName(primary_name, service._data.inst_id)) expected_finish_vc = NewViewAccepted(view_no=initial_view_no + 1, view_changes=new_view.viewChanges, checkpoint=new_view.checkpoint, @@ -330,8 +334,10 @@ def test_view_change_finished_is_sent_by_non_primary_once_view_change_certificat assert not service._data.waiting_for_new_view -def test_new_view_incorrect_checkpoint(internal_bus, validators, primary, view_change_service_builder, initial_view_no, - some_item): +def test_send_instance_change_on_new_view_with_incorrect_checkpoint(internal_bus, external_bus, validators, primary, + view_change_service_builder, + initial_view_no, + some_item): next_view_no = initial_view_no + 1 primary_name = primary(next_view_no) non_primary_name = some_item(validators, exclude=[primary_name]) @@ -345,35 +351,37 @@ def test_new_view_incorrect_checkpoint(internal_bus, validators, primary, view_c # start view change internal_bus.send(NeedViewChange()) - service._network.sent_messages.clear() - - handler = Mock() - internal_bus.subscribe(NeedViewChange, handler) + external_bus.sent_messages.clear() # receive quorum of ViewChanges and ViewChangeAcks non_primaries = [item for item in validators if item != primary_name] non_primaries = random.sample(non_primaries, service._data.quorums.view_change.value) for vc_frm in non_primaries: - service._network.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) for ack, ack_frm in create_view_change_acks(vc, vc_frm, non_primaries): - service._network.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) cp = Checkpoint(instId=0, viewNo=initial_view_no, seqNoStart=0, seqNoEnd=1000, digest=cp_digest(1000)) new_view = create_new_view_from_vc(vc, non_primaries, checkpoint=cp) # send NewView by Primary - service._network.process_incoming(new_view, generateName(primary_name, service._data.inst_id)) - - # make sure that NeedViewChange is called - handler.assert_called_with(NeedViewChange()) - - # make sure that we get to the next view - assert service._data.view_no == initial_view_no + 2 - assert service._data.waiting_for_new_view + init_network_msg_count = len(external_bus.sent_messages) + external_bus.process_incoming(new_view, generateName(primary_name, service._data.inst_id)) - -def test_new_view_incorrect_batches(internal_bus, validators, primary, view_change_service_builder, initial_view_no, - some_item): + # we don't go to new view, just send Instance Change + assert service._data.view_no == initial_view_no + 1 + assert init_network_msg_count + 1 == len(external_bus.sent_messages) + msg, dst = external_bus.sent_messages[-1] + assert dst is None # broadcast + assert isinstance(msg, InstanceChange) + assert msg.viewNo == initial_view_no + 2 + assert msg.reason == Suspicions.NEW_VIEW_INVALID_CHECKPOINTS.code + + +def test_send_instance_change_on_new_view_with_incorrect_batches(internal_bus, external_bus, validators, primary, + view_change_service_builder, + initial_view_no, + some_item): next_view_no = initial_view_no + 1 primary_name = primary(next_view_no) non_primary_name = some_item(validators, exclude=[primary_name]) @@ -387,27 +395,151 @@ def test_new_view_incorrect_batches(internal_bus, validators, primary, view_chan # start view change internal_bus.send(NeedViewChange()) - service._network.sent_messages.clear() - - handler = Mock() - internal_bus.subscribe(NeedViewChange, handler) + external_bus.sent_messages.clear() # receive quorum of ViewChanges and ViewChangeAcks non_primaries = [item for item in validators if item != primary_name] non_primaries = random.sample(non_primaries, service._data.quorums.view_change.value) for vc_frm in non_primaries: - service._network.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) for ack, ack_frm in create_view_change_acks(vc, vc_frm, non_primaries): - service._network.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) new_view = create_new_view_from_vc(vc, non_primaries, batches=create_batches(view_no=initial_view_no + 2)) # send NewView by Primary - service._network.process_incoming(new_view, generateName(primary_name, service._data.inst_id)) + init_network_msg_count = len(external_bus.sent_messages) + external_bus.process_incoming(new_view, generateName(primary_name, service._data.inst_id)) + + # we don't go to new view, just send Instance Change + assert service._data.view_no == initial_view_no + 1 + assert init_network_msg_count + 1 == len(external_bus.sent_messages) + msg, dst = external_bus.sent_messages[-1] + assert dst is None # broadcast + assert isinstance(msg, InstanceChange) + assert msg.viewNo == initial_view_no + 2 + assert msg.reason == Suspicions.NEW_VIEW_INVALID_BATCHES.code - # make sure that NeedViewChange is called - handler.assert_called_with(NeedViewChange()) - # make sure that we get to the next view +def test_send_instance_change_on_timeout_no_new_view_received(internal_bus, external_bus, + view_change_service, timer, + initial_view_no): + internal_bus.send(NeedViewChange()) + + init_network_msg_count = len(external_bus.sent_messages) + timer.sleep(view_change_service._config.NEW_VIEW_TIMEOUT - 1) + assert view_change_service._data.view_no == initial_view_no + 1 + assert init_network_msg_count == len(external_bus.sent_messages) + + timer.sleep(2) + # we don't go to new view, just send Instance Change + assert view_change_service._data.view_no == initial_view_no + 1 + assert init_network_msg_count + 1 == len(external_bus.sent_messages) + msg, dst = external_bus.sent_messages[-1] + assert dst is None # broadcast + assert isinstance(msg, InstanceChange) + assert msg.viewNo == initial_view_no + 2 + assert msg.reason == Suspicions.INSTANCE_CHANGE_TIMEOUT.code + + timer.sleep(view_change_service._config.NEW_VIEW_TIMEOUT + 1) + # we don't go to new view, just send Instance Change + assert view_change_service._data.view_no == initial_view_no + 1 + assert init_network_msg_count + 2 == len(external_bus.sent_messages) + msg, dst = external_bus.sent_messages[-1] + assert dst is None # broadcast + assert isinstance(msg, InstanceChange) + assert msg.viewNo == initial_view_no + 2 + assert msg.reason == Suspicions.INSTANCE_CHANGE_TIMEOUT.code + + +def test_send_instance_change_on_timeout_when_new_view_received_but_not_processed(internal_bus, external_bus, timer, + view_change_service, initial_view_no): + internal_bus.send(NeedViewChange()) + init_network_msg_count = len(external_bus.sent_messages) + new_view = create_new_view(initial_view_no=0, stable_cp=200) + external_bus.process_incoming(new_view, view_change_service._data.primary_name) + + timer.sleep(view_change_service._config.NEW_VIEW_TIMEOUT + 1) + + # we don't go to new view, just send Instance Change + assert view_change_service._data.view_no == initial_view_no + 1 + assert init_network_msg_count + 1 == len(external_bus.sent_messages) + msg, dst = external_bus.sent_messages[-1] + assert dst is None # broadcast + assert isinstance(msg, InstanceChange) + assert msg.viewNo == initial_view_no + 2 + assert msg.reason == Suspicions.INSTANCE_CHANGE_TIMEOUT.code + + +def test_do_not_send_instance_change_on_timeout_when_view_change_finished_on_time(internal_bus, external_bus, + validators, + primary, view_change_service_builder, + timer, + initial_view_no): + primary_name = primary(initial_view_no + 1) + service = view_change_service_builder(primary_name) + + # start view change + internal_bus.send(NeedViewChange()) + external_bus.sent_messages.clear() + + # receive quorum of ViewChanges and ViewChangeAcks + non_primaries = [item for item in validators if item != primary_name] + vc = create_view_change(initial_view_no) + for vc_frm in non_primaries: + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + for ack, ack_frm in create_view_change_acks(vc, vc_frm, non_primaries): + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + + # check that view change is finished + assert service._data.view_no == initial_view_no + 1 + assert not service._data.waiting_for_new_view + assert len(external_bus.sent_messages) == 1 + msg, dst = external_bus.sent_messages[0] + assert isinstance(msg, NewView) + + # make sure view change hasn't been started again + timer.sleep(service._config.NEW_VIEW_TIMEOUT + 1) + assert service._data.view_no == initial_view_no + 1 + assert len(external_bus.sent_messages) == 1 + msg, dst = external_bus.sent_messages[0] + assert isinstance(msg, NewView) + + +def test_do_not_send_instance_change_on_timeout_when_multiple_view_change_finished_on_time(internal_bus, external_bus, + validators, + primary, + view_change_service_builder, + timer, + initial_view_no): + primary_name = primary(initial_view_no + 2) + service = view_change_service_builder(primary_name) + + # start first view change + internal_bus.send(NeedViewChange()) + + # start second view change + internal_bus.send(NeedViewChange()) + external_bus.sent_messages.clear() + + # receive quorum of ViewChanges and ViewChangeAcks + non_primaries = [item for item in validators if item != primary_name] + vc = create_view_change(initial_view_no + 1) + for vc_frm in non_primaries: + external_bus.process_incoming(vc, generateName(vc_frm, service._data.inst_id)) + for ack, ack_frm in create_view_change_acks(vc, vc_frm, non_primaries): + external_bus.process_incoming(ack, generateName(ack_frm, service._data.inst_id)) + + # check that view change is finished assert service._data.view_no == initial_view_no + 2 - assert service._data.waiting_for_new_view + assert not service._data.waiting_for_new_view + assert len(external_bus.sent_messages) == 1 + msg, dst = external_bus.sent_messages[0] + assert isinstance(msg, NewView) + + # make sure view change hasn't been started again + timer.sleep(service._config.NEW_VIEW_TIMEOUT + 1) + assert service._data.view_no == initial_view_no + 2 + assert len(external_bus.sent_messages) == 1 + msg, dst = external_bus.sent_messages[0] + assert isinstance(msg, NewView)