diff --git a/plenum/server/batch_handlers/audit_batch_handler.py b/plenum/server/batch_handlers/audit_batch_handler.py index cf22d1542a..4d5463cbb0 100644 --- a/plenum/server/batch_handlers/audit_batch_handler.py +++ b/plenum/server/batch_handlers/audit_batch_handler.py @@ -25,20 +25,20 @@ def __init__(self, database_manager: DatabaseManager): def post_batch_applied(self, three_pc_batch: ThreePcBatch, prev_handler_result=None): txn = self._add_to_ledger(three_pc_batch) self.tracker.apply_batch(None, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size) - logger.info("applied audit txn {}; uncommitted root hash is {}; uncommitted size is {}". - format(str(txn), self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)) + logger.debug("applied audit txn {}; uncommitted root hash is {}; uncommitted size is {}". + format(str(txn), self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)) def post_batch_rejected(self, ledger_id, prev_handler_result=None): _, _, txn_count = self.tracker.reject_batch() self.ledger.discardTxns(txn_count) - logger.info("rejected {} audit txns; uncommitted root hash is {}; uncommitted size is {}". - format(txn_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)) + logger.debug("rejected {} audit txns; uncommitted root hash is {}; uncommitted size is {}". + format(txn_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)) def commit_batch(self, three_pc_batch, prev_handler_result=None): _, _, txns_count = self.tracker.commit_batch() _, committedTxns = self.ledger.commitTxns(txns_count) - logger.info("committed {} audit txns; uncommitted root hash is {}; uncommitted size is {}". - format(txns_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)) + logger.debug("committed {} audit txns; uncommitted root hash is {}; uncommitted size is {}". + format(txns_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)) return committedTxns def on_catchup_finished(self): diff --git a/plenum/server/database_manager.py b/plenum/server/database_manager.py index d0d143ff8e..095145a579 100644 --- a/plenum/server/database_manager.py +++ b/plenum/server/database_manager.py @@ -10,11 +10,17 @@ class DatabaseManager(): def __init__(self): self.databases = {} # type: Dict[int, Database] self.stores = {} + self._init_db_list() + + def _init_db_list(self): + self._ledgers = {lid: db.ledger for lid, db in self.databases.items()} + self._states = {lid: db.state for lid, db in self.databases.items()} def register_new_database(self, lid, ledger: Ledger, state: Optional[State] = None): if lid in self.databases: raise LogicError('Trying to add already existing database') self.databases[lid] = Database(ledger, state) + self._init_db_list() def get_database(self, lid): if lid not in self.databases: @@ -43,13 +49,11 @@ def get_store(self, label): @property def states(self): - # TODO: change this. Too inefficient to build dict every time - return {lid: db.state for lid, db in self.databases.items()} + return self._states @property def ledgers(self): - # TODO: change this. Too inefficient to build dict every time - return {lid: db.ledger for lid, db in self.databases.items()} + return self._ledgers @property def bls_store(self): diff --git a/plenum/server/node.py b/plenum/server/node.py index 96280e81f4..77579c0e0b 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -355,15 +355,6 @@ def __init__(self, else: self.quota_control = StaticQuotaControl(node_quota=node_quota, client_quota=client_quota) - # Ordered requests received from replicas while the node was not - # participating - self.stashedOrderedReqs = deque() - - # Set of (identifier, reqId) of all transactions that were received - # while catching up. Used to detect overlap between stashed requests - # and received replies while catching up. - # self.reqsFromCatchupReplies = set() - # Any messages that are intended for view numbers higher than the # current view. self.msgsForFutureViews = {} @@ -717,8 +708,15 @@ def viewNo(self, value): @property def view_change_in_progress(self): - return False if self.view_changer is None \ - else self.view_changer.view_change_in_progress + if self.view_changer is None: + return False + return self.view_changer.view_change_in_progress + + @property + def pre_view_change_in_progress(self): + if self.view_changer is None: + return False + return self.view_changer.pre_view_change_in_progress def _add_config_ledger(self): self.ledgerManager.addLedger( @@ -2153,17 +2151,6 @@ def postAuditLedgerCaughtUp(self, **kwargs): self.audit_handler.on_catchup_finished() def preLedgerCatchUp(self, ledger_id): - # Process any Ordered requests. This causes less transactions to be - # requested during catchup. Also commits any uncommitted state that - # can be committed - logger.info('{} going to process any ordered requests before starting catchup.'.format(self)) - self.force_process_ordered() - self.processStashedOrderedReqs() - - # revert uncommitted txns and state for unordered requests - r = self.master_replica.revert_unordered_batches() - logger.info('{} reverted {} batches before starting catch up for ledger {}'.format(self, r, ledger_id)) - if len(self.auditLedger.uncommittedTxns) > 0: raise LogicError( '{} audit ledger has uncommitted txns before catching up ledger {}'.format(self, ledger_id)) @@ -2242,10 +2229,6 @@ def allLedgersCaughtUp(self): if self.view_change_in_progress: self._process_replica_messages() - # TODO: Maybe a slight optimisation is to check result of - # `self.num_txns_caught_up_in_last_catchup()` - self.processStashedOrderedReqs() - # More than one catchup may be needed during the current ViewChange protocol # TODO: separate view change and catchup logic if self.is_catchup_needed(): @@ -2416,8 +2399,8 @@ def applyReq(self, request: Request, cons_time: int): cons_time=cons_time, ledger_id=ledger_id, seq_no=seq_no, txn=txn) - def apply_stashed_reqs(self, ordered): - request_ids = ordered.valid_reqIdr + def apply_stashed_reqs(self, three_pc_batch): + request_ids = three_pc_batch.valid_digests requests = [] for req_key in request_ids: if req_key in self.requests: @@ -2428,7 +2411,6 @@ def apply_stashed_reqs(self, ordered): _, seq_no = self.seqNoDB.get(req.digest) if seq_no is None: requests.append(req) - three_pc_batch = ThreePcBatch.from_ordered(ordered) self.apply_reqs(requests, three_pc_batch) def apply_reqs(self, requests, three_pc_batch: ThreePcBatch): @@ -2680,15 +2662,31 @@ def processOrdered(self, ordered: Ordered): logger.trace("{} got ordered requests from master replica" .format(self)) - logger.info("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}" - .format(self.name, - ordered.viewNo, - ordered.ppSeqNo, - len(ordered.valid_reqIdr), - ordered.stateRootHash, - ordered.txnRootHash)) + logger.debug("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}" + .format(self.name, + ordered.viewNo, + ordered.ppSeqNo, + len(ordered.valid_reqIdr), + ordered.stateRootHash, + ordered.txnRootHash)) - self.executeBatch(ThreePcBatch.from_ordered(ordered), + three_pc_batch = ThreePcBatch.from_ordered(ordered) + if self.db_manager.ledgers[AUDIT_LEDGER_ID].uncommittedRootHash is None: + # if we order request during view change + # in between catchup rounds, then the 3PC batch will not be applied, + # since it was reverted before catchup started, and only COMMITs were + # processed in between catchup that led to this ORDERED msg + logger.info("{} applying stashed requests for batch {} {} of {} requests; state root {}; txn root {}" + .format(self.name, + three_pc_batch.view_no, + three_pc_batch.pp_seq_no, + len(three_pc_batch.valid_digests), + three_pc_batch.state_root, + three_pc_batch.txn_root)) + + self.apply_stashed_reqs(three_pc_batch) + + self.executeBatch(three_pc_batch, ordered.valid_reqIdr, ordered.invalid_reqIdr, ordered.auditTxnRootHash) @@ -2725,11 +2723,10 @@ def force_process_ordered(self): .format(self, num_processed, instance_id)) def try_processing_ordered(self, msg): - if self.isParticipating: + if self.master_replica.validator.can_order(): self.processOrdered(msg) else: - logger.info("{} stashing {} since mode is {}".format(self, msg, self.mode)) - self.stashedOrderedReqs.append(msg) + logger.warning("{} can not process Ordered message {} since mode is {}".format(self, msg, self.mode)) def processEscalatedException(self, ex): """ @@ -2833,7 +2830,6 @@ def sum_for_values(obj): self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_REPLICAS, len(self.msgsForFutureReplicas)) self.metrics.add_event(MetricsName.MSGS_TO_VIEW_CHANGER, len(self.msgsToViewChanger)) self.metrics.add_event(MetricsName.REQUEST_SENDER, len(self.requestSender)) - self.metrics.add_event(MetricsName.STASHED_ORDERED_REQS, len(self.stashedOrderedReqs)) self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_VIEWS, len(self.msgsForFutureViews)) @@ -3277,11 +3273,6 @@ def executeBatch(self, three_pc_batch: ThreePcBatch, format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no, three_pc_batch.ledger_id, three_pc_batch.state_root, three_pc_batch.txn_root, [key for key in valid_reqs_keys])) - logger.info("{} committed batch request, view no {}, ppSeqNo {}, " - "ledger {}, state root {}, txn root {}, requests: {}". - format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no, - three_pc_batch.ledger_id, three_pc_batch.state_root, - three_pc_batch.txn_root, len(valid_reqs_keys))) for txn in committedTxns: self.execute_hook(NodeHooks.POST_REQUEST_COMMIT, txn=txn, @@ -3446,37 +3437,6 @@ def defaultAuthNr(self) -> ReqAuthenticator: req_authnr.register_authenticator(self.init_core_authenticator()) return req_authnr - def processStashedOrderedReqs(self): - i = 0 - while self.stashedOrderedReqs: - msg = self.stashedOrderedReqs.popleft() - if msg.instId == 0: - if compare_3PC_keys( - (msg.viewNo, - msg.ppSeqNo), - self.ledgerManager.last_caught_up_3PC) >= 0: - logger.info( - '{} ignoring stashed ordered msg {} since ledger ' - 'manager has last_caught_up_3PC as {}'.format( - self, msg, self.ledgerManager.last_caught_up_3PC)) - continue - logger.info( - '{} applying stashed Ordered msg {}'.format(self, msg)) - # Since the PRE-PREPAREs ans PREPAREs corresponding to these - # stashed ordered requests was not processed. - self.apply_stashed_reqs(msg) - - self.processOrdered(msg) - i += 1 - - logger.info( - "{} processed {} stashed ordered requests".format( - self, i)) - # Resetting monitor after executing all stashed requests so no view - # change can be proposed - self.monitor.reset() - return i - def ensureKeysAreSetup(self): """ Check whether the keys are setup in the local STP keep. diff --git a/plenum/server/replica.py b/plenum/server/replica.py index 5f5c49250a..6aac510252 100644 --- a/plenum/server/replica.py +++ b/plenum/server/replica.py @@ -636,12 +636,6 @@ def on_view_change_start(self): lst = self.last_prepared_certificate_in_view() self.last_prepared_before_view_change = lst self.logger.info('{} setting last prepared for master to {}'.format(self, lst)) - # It can be that last_ordered_3pc was set for the previous view, since it's set during catch-up - # Example: a Node has last_ordered = (1, 300), and then the whole pool except this node restarted - # The new viewNo is 0, but last_ordered is (1, 300), so all new requests will be discarded by this Node - # if we don't reset last_ordered_3pc - if self.viewNo <= self.last_ordered_3pc[0]: - self.last_ordered_3pc = (self.viewNo, 0) def on_view_change_done(self): if self.isMaster: @@ -807,6 +801,9 @@ def trackBatches(self, pp: PrePrepare, prevStateRootHash): pp.ppTime, prevStateRootHash, len(pp.reqIdr)] def send_3pc_batch(self): + if not self.validator.can_send_3pc_batch(): + return 0 + sent_batches = set() # 1. send 3PC batches with requests for every ledger @@ -1028,10 +1025,9 @@ def serviceQueues(self, limit=None): :return: the number of messages successfully processed """ # TODO should handle SuspiciousNode here - r = self.dequeue_pre_prepares() if self.node.isParticipating else 0 + r = self.dequeue_pre_prepares() r += self.inBoxRouter.handleAllSync(self.inBox, limit) - r += self.send_3pc_batch() if (self.isPrimary and - self.node.isParticipating) else 0 + r += self.send_3pc_batch() r += self._serviceActions() return r # Messages that can be processed right now needs to be added back to the @@ -1506,13 +1502,6 @@ def _can_process_pre_prepare(self, pre_prepare: PrePrepare, sender: str) -> Opti if (pre_prepare.viewNo, pre_prepare.ppSeqNo) in self.prePrepares: return PP_CHECK_DUPLICATE - if not self.node.isParticipating: - # Let the node stash the pre-prepare - # TODO: The next processed pre-prepare needs to take consider if - # the last pre-prepare was stashed or not since stashed requests - # do not make change to state or ledger - return None - if compare_3PC_keys((pre_prepare.viewNo, pre_prepare.ppSeqNo), self.__last_pp_3pc) > 0: return PP_CHECK_OLD # ignore old pre-prepare @@ -1564,8 +1553,6 @@ def canPrepare(self, ppReq) -> (bool, str): :param ppReq: any object with identifier and requestId attributes """ - if not self.node.isParticipating: - return False, 'node is not participating' if self.has_sent_prepare(ppReq): return False, 'has already sent PREPARE for {}'.format(ppReq) return True, '' @@ -1713,8 +1700,6 @@ def canCommit(self, prepare: Prepare) -> (bool, str): :param prepare: the PREPARE """ - if not self.node.isParticipating: - return False, 'node is not participating' quorum = self.quorums.prepare.value if not self.prepares.hasQuorum(prepare, quorum): return False, 'does not have prepare quorum for {}'.format(prepare) @@ -1836,8 +1821,7 @@ def process_stashed_out_of_order_commits(self): # were stashed due to lack of commits before them and orders them if it # can - is_between_catchups_during_view_change = self.node.is_synced and self.node.view_change_in_progress - if not self.node.isParticipating and not is_between_catchups_during_view_change: + if not self.validator.can_order(): return self.logger.debug('{} trying to order from out of order commits. ' diff --git a/plenum/server/replica_validator.py b/plenum/server/replica_validator.py index 4f7145bbb4..46456f6ef6 100644 --- a/plenum/server/replica_validator.py +++ b/plenum/server/replica_validator.py @@ -1,3 +1,4 @@ +from plenum.common.messages.node_messages import Commit from plenum.common.types import f from plenum.common.util import compare_3PC_keys from plenum.server.replica_validator_enums import DISCARD, INCORRECT_INSTANCE, PROCESS, ALREADY_ORDERED, FUTURE_VIEW, \ @@ -43,6 +44,8 @@ def validate_3pc_msg(self, msg): if view_no < self.replica.viewNo - 1: return DISCARD, OLD_VIEW if view_no == self.replica.viewNo - 1: + if not isinstance(msg, Commit): + return DISCARD, OLD_VIEW if not node.view_change_in_progress: return DISCARD, OLD_VIEW if self.replica.last_prepared_before_view_change is None: @@ -52,7 +55,7 @@ def validate_3pc_msg(self, msg): if view_no == self.replica.viewNo and node.view_change_in_progress: return STASH_VIEW, FUTURE_VIEW - # If Catchup in View Change finished then process a message + # If Catchup in View Change finished then process Commit messages if node.is_synced and node.view_change_in_progress: return PROCESS, None @@ -94,3 +97,25 @@ def validate_checkpoint_msg(self, msg): return STASH_CATCH_UP, CATCHING_UP return PROCESS, None + + def can_send_3pc_batch(self): + if not self.replica.isPrimary: + return False + if not self.replica.node.isParticipating: + return False + if self.replica.node.pre_view_change_in_progress: + return False + if self.replica.viewNo < self.replica.last_ordered_3pc[0]: + return False + if self.replica.viewNo == self.replica.last_ordered_3pc[0] and \ + self.replica.lastPrePrepareSeqNo < self.replica.last_ordered_3pc[1]: + return False + return True + + def can_order(self): + node = self.replica.node + if node.isParticipating: + return True + if node.is_synced and node.view_change_in_progress: + return True + return False diff --git a/plenum/server/view_change/view_changer.py b/plenum/server/view_change/view_changer.py index 50f4b5093b..523671d6c8 100644 --- a/plenum/server/view_change/view_changer.py +++ b/plenum/server/view_change/view_changer.py @@ -159,6 +159,7 @@ def __init__(self, provider: ViewChangerDataProvider, timer: TimerService): self._next_view_indications = {} self._view_change_in_progress = False + self.pre_view_change_in_progress = False self.previous_view_no = None self.previous_master_primary = None @@ -570,6 +571,7 @@ def start_view_change(self, proposed_view_no: int, continue_vc=False): # implementations - we need to make this logic pluggable if self.pre_vc_strategy and (not continue_vc): + self.pre_view_change_in_progress = True self.pre_vc_strategy.prepare_view_change(proposed_view_no) return elif self.pre_vc_strategy: @@ -577,6 +579,7 @@ def start_view_change(self, proposed_view_no: int, continue_vc=False): self.previous_view_no = self.view_no self.view_no = proposed_view_no + self.pre_view_change_in_progress = False self.view_change_in_progress = True self.previous_master_primary = self.provider.current_primary_name() self.set_defaults() diff --git a/plenum/test/batching_3pc/catch-up/test_catchup_during_3pc.py b/plenum/test/batching_3pc/catch-up/test_catchup_during_3pc.py index f32041c15a..4c10247f73 100644 --- a/plenum/test/batching_3pc/catch-up/test_catchup_during_3pc.py +++ b/plenum/test/batching_3pc/catch-up/test_catchup_during_3pc.py @@ -1,33 +1,57 @@ import pytest -from plenum.test.batching_3pc.helper import add_txns_to_ledger_before_order, checkNodesHaveSameRoots -from plenum.test.test_node import getNonPrimaryReplicas -import json -from plenum.test.helper import sdk_signed_random_requests, sdk_send_and_check + +from plenum.common.startable import Mode +from plenum.test import waits +from plenum.test.delayers import cDelay +from plenum.test.node_catchup.helper import waitNodeDataEquality, ensure_all_nodes_have_same_data +from plenum.test.helper import sdk_send_random_requests, check_last_ordered_3pc_on_master, assertExp, \ + sdk_send_random_and_check, sdk_get_replies, max_3pc_batch_limits +from plenum.test.stasher import delay_rules +from stp_core.loop.eventually import eventually @pytest.fixture(scope="module") -def tconf(tconf, request): - oldSize = tconf.Max3PCBatchSize - oldTimeout = tconf.Max3PCBatchWait - tconf.Max3PCBatchSize = 10 - tconf.Max3PCBatchWait = 1 +def tconf(tconf): + with max_3pc_batch_limits(tconf, size=10) as tconf: + yield tconf - def reset(): - tconf.Max3PCBatchSize = oldSize - tconf.Max3PCBatchWait = oldTimeout - request.addfinalizer(reset) - return tconf +def test_catchup_during_3pc(tconf, looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle): + ''' + 1) Send 1 3PC batch + 2 reqs + 2) Delay commits on one node + 3) Make sure the batch is ordered on all nodes except the lagged one + 4) start catchup of the lagged node + 5) Make sure that all nodes are equal + 6) Send more requests that we have 3 batches in total + 7) Make sure that all nodes are equal + ''' + lagging_node = txnPoolNodeSet[-1] + rest_nodes = txnPoolNodeSet[:-1] -def test_catchup_during_3pc(tconf, looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle): - reqs = sdk_signed_random_requests(looper, sdk_wallet_client, tconf.Max3PCBatchSize) - non_primary_replica = getNonPrimaryReplicas(txnPoolNodeSet, instId=0)[0] - - # Simulate catch-up (add txns to ledger): - # add txns corresponding to the requests after we got enough COMMITs to - # order, but before ordering. - add_txns_to_ledger_before_order( - non_primary_replica, [json.loads(req) for req in reqs[:tconf.Max3PCBatchSize]]) - sdk_send_and_check(reqs, looper, txnPoolNodeSet, sdk_pool_handle) - checkNodesHaveSameRoots(txnPoolNodeSet) + with delay_rules(lagging_node.nodeIbStasher, cDelay()): + sdk_reqs = sdk_send_random_requests(looper, sdk_pool_handle, + sdk_wallet_client, tconf.Max3PCBatchSize + 2) + + looper.run( + eventually(check_last_ordered_3pc_on_master, rest_nodes, (0, 1)) + ) + + lagging_node.start_catchup() + + looper.run( + eventually( + lambda: assertExp(lagging_node.mode == Mode.participating), retryWait=1, + timeout=waits.expectedPoolCatchupTime(len(txnPoolNodeSet)) + ) + ) + + waitNodeDataEquality(looper, *txnPoolNodeSet, customTimeout=5) + + sdk_get_replies(looper, sdk_reqs) + + sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, + sdk_wallet_client, 2 * tconf.Max3PCBatchSize - 2) + + ensure_all_nodes_have_same_data(looper, txnPoolNodeSet) diff --git a/plenum/test/batching_3pc/catch-up/test_catchup_during_3pc_continue_working.py b/plenum/test/batching_3pc/catch-up/test_catchup_during_3pc_continue_working.py deleted file mode 100644 index 602fd06b8d..0000000000 --- a/plenum/test/batching_3pc/catch-up/test_catchup_during_3pc_continue_working.py +++ /dev/null @@ -1,39 +0,0 @@ -import pytest -from plenum.test.batching_3pc.helper import add_txns_to_ledger_before_order, checkNodesHaveSameRoots -from plenum.test.test_node import getNonPrimaryReplicas -import json -from plenum.test.helper import sdk_signed_random_requests, sdk_send_and_check - - -@pytest.fixture(scope="module") -def tconf(tconf, request): - oldSize = tconf.Max3PCBatchSize - oldTimeout = tconf.Max3PCBatchWait - tconf.Max3PCBatchSize = 10 - tconf.Max3PCBatchWait = 1 - - def reset(): - tconf.Max3PCBatchSize = oldSize - tconf.Max3PCBatchWait = oldTimeout - - request.addfinalizer(reset) - return tconf - - -def test_catchup_during_3pc_continue_sending(tconf, looper, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle): - reqs = sdk_signed_random_requests(looper, sdk_wallet_client, tconf.Max3PCBatchSize + 2) - non_primary_replica = getNonPrimaryReplicas(txnPoolNodeSet, instId=0)[0] - - # Simulate catch-up (add txns to ledger): - # add txns corresponding to the requests after we got enough COMMITs to - # order, but before ordering. - add_txns_to_ledger_before_order( - non_primary_replica, [json.loads(req) for req in reqs[:tconf.Max3PCBatchSize]]) - - sdk_send_and_check(reqs, looper, txnPoolNodeSet, sdk_pool_handle) - checkNodesHaveSameRoots(txnPoolNodeSet) - - # send another requests and check that they are received - reqs = sdk_signed_random_requests(looper, sdk_wallet_client, 2 * tconf.Max3PCBatchSize - 2) - sdk_send_and_check(reqs, looper, txnPoolNodeSet, sdk_pool_handle) - checkNodesHaveSameRoots(txnPoolNodeSet) diff --git a/plenum/test/batching_3pc/catch-up/test_state_reverted_before_catchup.py b/plenum/test/batching_3pc/catch-up/test_state_reverted_before_catchup.py index a25370a1bc..0c46bb8f51 100644 --- a/plenum/test/batching_3pc/catch-up/test_state_reverted_before_catchup.py +++ b/plenum/test/batching_3pc/catch-up/test_state_reverted_before_catchup.py @@ -1,5 +1,5 @@ from plenum.common.constants import DOMAIN_LEDGER_ID -from plenum.test.delayers import cDelay +from plenum.test.delayers import cDelay, cpDelay from plenum.test.test_node import getNonPrimaryReplicas from plenum.test.batching_3pc.helper import checkNodesHaveSameRoots from plenum.test.helper import sdk_signed_random_requests, sdk_send_and_check, \ @@ -39,8 +39,9 @@ def test_unordered_state_reverted_before_catchup( # EXECUTE # Delay commit requests on the node - delay_c = 60 - non_primary_node.nodeIbStasher.delay(cDelay(delay_c)) + non_primary_node.nodeIbStasher.delay(cDelay()) + # Delay Consistency proofs to not finish catchup + non_primary_node.nodeIbStasher.delay(cpDelay()) # send requests reqs = sdk_send_random_requests(looper, sdk_pool_handle, sdk_wallet_client, tconf.Max3PCBatchSize) @@ -56,7 +57,7 @@ def test_unordered_state_reverted_before_catchup( ledger_id).headHash # start catchup - non_primary_node.ledgerManager.preCatchupClbk(ledger_id) + non_primary_node.start_catchup() committed_ledger_reverted = non_primary_ledger.tree.root_hash uncommitted_ledger_reverted = non_primary_ledger.uncommittedRootHash diff --git a/plenum/test/batching_3pc/helper.py b/plenum/test/batching_3pc/helper.py index 4d02987daa..854e58d4f9 100644 --- a/plenum/test/batching_3pc/helper.py +++ b/plenum/test/batching_3pc/helper.py @@ -4,12 +4,7 @@ from plenum.common.constants import DOMAIN_LEDGER_ID from plenum.common.messages.node_messages import ThreePhaseType from plenum.common.startable import Mode -from plenum.common.txn_util import reqToTxn, append_txn_metadata from plenum.common.util import check_if_all_equal_in_list -from plenum.server.batch_handlers.three_pc_batch import ThreePcBatch -from plenum.test.testing_utils import FakeSomething -from plenum.server.catchup.catchup_rep_service import LedgerCatchupComplete -from plenum.server.catchup.utils import NodeCatchupComplete def checkNodesHaveSameRoots(nodes, checkUnCommitted=True, @@ -60,62 +55,6 @@ def addRoot(root, collection): assert len(txnRoots) == 1 -def add_txns_to_ledger_before_order(replica, reqs): - replica.added = False - origMethod = replica.tryOrder - - def tryOrderAndAddTxns(self, commit): - canOrder, _ = self.canOrder(commit) - node = replica.node - if not replica.added and canOrder: - pp = self.getPrePrepare(commit.viewNo, commit.ppSeqNo) - ledger_manager = node.ledgerManager - ledger_id = DOMAIN_LEDGER_ID - catchup_rep_service = ledger_manager._node_leecher._leechers[ledger_id]._catchup_rep_service - - # simulate audit ledger catchup - three_pc_batch = ThreePcBatch.from_pre_prepare(pre_prepare=pp, - state_root=pp.stateRootHash, - txn_root=pp.txnRootHash, - primaries=self.node.primaries, - valid_digests=pp.reqIdr) - node.audit_handler.post_batch_applied(three_pc_batch) - node.audit_handler.commit_batch(FakeSomething()) - - ledger_manager.preCatchupClbk(ledger_id) - pp = self.getPrePrepare(commit.viewNo, commit.ppSeqNo) - for req in reqs: - txn = append_txn_metadata(reqToTxn(req), txn_time=pp.ppTime) - catchup_rep_service._add_txn(txn) - ledger_manager._on_ledger_sync_complete(LedgerCatchupComplete( - ledger_id=DOMAIN_LEDGER_ID, - num_caught_up=len(reqs))) - ledger_manager._on_catchup_complete(NodeCatchupComplete()) - replica.added = True - - return origMethod(commit) - - replica.tryOrder = types.MethodType(tryOrderAndAddTxns, replica) - - -def start_precatchup_before_order(replica): - called = False - origMethod = replica.tryOrder - - def tryOrderAndAddTxns(self, commit): - nonlocal called - canOrder, _ = self.canOrder(commit) - - if not called and canOrder: - ledger_manager = replica.node.ledgerManager - ledger_manager.preCatchupClbk(DOMAIN_LEDGER_ID) - called = True - - return origMethod(commit) - - replica.tryOrder = types.MethodType(tryOrderAndAddTxns, replica) - - def make_node_syncing(replica, three_phase_type: ThreePhaseType): added = False diff --git a/plenum/test/delayers.py b/plenum/test/delayers.py index 9efec73ad5..71b9c855e3 100644 --- a/plenum/test/delayers.py +++ b/plenum/test/delayers.py @@ -1,6 +1,7 @@ import random from typing import Iterable, List +from plenum.common.messages.message_base import MessageBase from plenum.common.request import Request from plenum.common.messages.node_messages import Nomination, Reelection, Primary, \ @@ -27,7 +28,8 @@ def inner(rx): def delayerMsgTuple(seconds, opType, senderFilter=None, instFilter: int = None, - ledgerFilter: int = None): + ledgerFilter: int = None, + viewFilter: int = None): """ Used for nodeInBoxStasher @@ -47,10 +49,13 @@ def inner(wrappedMsg): getattr(msg, f.INST_ID.nm) == instFilter)) and \ (ledgerFilter is None or f.LEDGER_ID.nm in msg._fields and - getattr(msg, f.LEDGER_ID.nm) == ledgerFilter): + getattr(msg, f.LEDGER_ID.nm) == ledgerFilter) and \ + (viewFilter is None or + f.VIEW_NO.nm in msg._fields and + getattr(msg, f.VIEW_NO.nm) == viewFilter): return seconds - if hasattr(opType, 'typename'): + if hasattr(opType, 'typename') and opType.typename is not None: inner.__name__ = opType.typename else: inner.__name__ = opType.__name__ @@ -118,14 +123,14 @@ def cDelay(delay: float = DEFAULT_DELAY, instId: int = None, sender_filter: str delay, Commit, instFilter=instId, senderFilter=sender_filter) -def icDelay(delay: float = DEFAULT_DELAY): +def icDelay(delay: float = DEFAULT_DELAY, viewNo: int = None): # Delayer of INSTANCE-CHANGE requests - return delayerMsgTuple(delay, InstanceChange) + return delayerMsgTuple(delay, InstanceChange, viewFilter=viewNo) -def vcd_delay(delay: float = DEFAULT_DELAY): +def vcd_delay(delay: float = DEFAULT_DELAY, viewNo: int = None): # Delayer of VIEW_CHANGE_DONE requests - return delayerMsgTuple(delay, ViewChangeDone) + return delayerMsgTuple(delay, ViewChangeDone, viewFilter=viewNo) def cs_delay(delay: float = DEFAULT_DELAY): @@ -187,6 +192,12 @@ def specific_msgs(msg): return specific_msgs +def delay_for_view(viewNo: int, delay: float = DEFAULT_DELAY): + d = delayerMsgTuple(delay, MessageBase, viewFilter=viewNo) + d.__name__ = "view_no" + str(viewNo) + return d + + def delay(what, frm, to, howlong): from plenum.test.test_node import TestNode @@ -250,6 +261,16 @@ def delay_3pc_messages(nodes, inst_id, delay=None, min_delay=None, delay_messages('3pc', nodes, inst_id, delay, min_delay, max_delay) +def all_delay(delay: float = DEFAULT_DELAY, no_check_delays=[]): + def inner(msg): + for d in no_check_delays: + if d(msg): + return 0 + return delay + + return inner + + def reset_delays_and_process_delayeds(nodes): for node in nodes: node.reset_delays_and_process_delayeds() @@ -258,4 +279,3 @@ def reset_delays_and_process_delayeds(nodes): def reset_delays_and_process_delayeds_for_client(nodes): for node in nodes: node.reset_delays_and_process_delayeds_for_clients() - diff --git a/plenum/test/freshness/test_replica_freshness.py b/plenum/test/freshness/test_replica_freshness.py index fa0820ec20..0fc7bbbb81 100644 --- a/plenum/test/freshness/test_replica_freshness.py +++ b/plenum/test/freshness/test_replica_freshness.py @@ -39,18 +39,18 @@ def inst_id(request): @pytest.fixture(scope='function') -def replica_with_valid_requests(replica): +def replica_with_valid_requests(primary_replica): requests = {ledger_id: sdk_random_request_objects(1, identifier="did", protocol_version=CURRENT_PROTOCOL_VERSION)[0] for ledger_id in LEDGER_IDS} def patched_consume_req_queue_for_pre_prepare(ledger_id, tm, view_no, pp_seq_no): - reqs = [requests[ledger_id]] if len(replica.requestQueues[ledger_id]) > 0 else [] + reqs = [requests[ledger_id]] if len(primary_replica.requestQueues[ledger_id]) > 0 else [] return [reqs, [], []] - replica.consume_req_queue_for_pre_prepare = patched_consume_req_queue_for_pre_prepare + primary_replica.consume_req_queue_for_pre_prepare = patched_consume_req_queue_for_pre_prepare - return replica, requests + return primary_replica, requests def set_current_time(replica, ts): @@ -81,76 +81,76 @@ def check_and_pop_freshness_pre_prepare(replica, ledger_id): assert msg.reqIdr == [] -def test_no_freshness_pre_prepare_when_disabled(tconf, replica): +def test_no_freshness_pre_prepare_when_disabled(tconf, primary_replica): with freshness(tconf, enabled=False, timeout=FRESHNESS_TIMEOUT): - assert len(replica.outBox) == 0 + assert len(primary_replica.outBox) == 0 - replica.send_3pc_batch() - assert len(replica.outBox) == 0 + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 0 - set_current_time(replica, FRESHNESS_TIMEOUT + 1) - replica.send_3pc_batch() - assert len(replica.outBox) == 0 + set_current_time(primary_replica, FRESHNESS_TIMEOUT + 1) + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 0 -def test_no_freshness_pre_prepare_for_non_master(tconf, replica): - replica.isMaster = False - replica.instId = 1 - assert len(replica.outBox) == 0 +def test_no_freshness_pre_prepare_for_non_master(tconf, primary_replica): + primary_replica.isMaster = False + primary_replica.instId = 1 + assert len(primary_replica.outBox) == 0 - replica.send_3pc_batch() - assert len(replica.outBox) == 0 + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 0 - set_current_time(replica, FRESHNESS_TIMEOUT + 1) - replica.send_3pc_batch() - assert len(replica.outBox) == 0 + set_current_time(primary_replica, FRESHNESS_TIMEOUT + 1) + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 0 -def test_freshness_pre_prepare_initially(replica): - assert len(replica.outBox) == 0 - replica.send_3pc_batch() - assert len(replica.outBox) == 0 +def test_freshness_pre_prepare_initially(primary_replica): + assert len(primary_replica.outBox) == 0 + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 0 @pytest.mark.parametrize('ts', [ 0, 1, FRESHNESS_TIMEOUT, -1, -FRESHNESS_TIMEOUT ]) -def test_freshness_pre_prepare_before_timeout(replica, ts): - assert len(replica.outBox) == 0 - set_current_time(replica, ts) - replica.send_3pc_batch() - assert len(replica.outBox) == 0 +def test_freshness_pre_prepare_before_timeout(primary_replica, ts): + assert len(primary_replica.outBox) == 0 + set_current_time(primary_replica, ts) + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 0 -def test_freshness_pre_prepare_after_timepout(replica): - assert len(replica.outBox) == 0 - replica.send_3pc_batch() - set_current_time(replica, FRESHNESS_TIMEOUT + 1) - replica.send_3pc_batch() - assert len(replica.outBox) == 3 +def test_freshness_pre_prepare_after_timeout(primary_replica): + assert len(primary_replica.outBox) == 0 + primary_replica.send_3pc_batch() + set_current_time(primary_replica, FRESHNESS_TIMEOUT + 1) + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 3 - check_and_pop_freshness_pre_prepare(replica, POOL_LEDGER_ID) - check_and_pop_freshness_pre_prepare(replica, DOMAIN_LEDGER_ID) - check_and_pop_freshness_pre_prepare(replica, CONFIG_LEDGER_ID) + check_and_pop_freshness_pre_prepare(primary_replica, POOL_LEDGER_ID) + check_and_pop_freshness_pre_prepare(primary_replica, DOMAIN_LEDGER_ID) + check_and_pop_freshness_pre_prepare(primary_replica, CONFIG_LEDGER_ID) -def test_freshness_pre_prepare_not_resend_before_next_timeout(replica): - assert len(replica.outBox) == 0 +def test_freshness_pre_prepare_not_resend_before_next_timeout(primary_replica): + assert len(primary_replica.outBox) == 0 - set_current_time(replica, FRESHNESS_TIMEOUT + 1) - replica.send_3pc_batch() - assert len(replica.outBox) == 3 + set_current_time(primary_replica, FRESHNESS_TIMEOUT + 1) + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 3 - replica.send_3pc_batch() - assert len(replica.outBox) == 3 + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 3 - set_current_time(replica, FRESHNESS_TIMEOUT + 1 + FRESHNESS_TIMEOUT) - replica.send_3pc_batch() - assert len(replica.outBox) == 3 + set_current_time(primary_replica, FRESHNESS_TIMEOUT + 1 + FRESHNESS_TIMEOUT) + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 3 - set_current_time(replica, FRESHNESS_TIMEOUT + 1 + FRESHNESS_TIMEOUT + 1) - replica.send_3pc_batch() - assert len(replica.outBox) == 6 + set_current_time(primary_replica, FRESHNESS_TIMEOUT + 1 + FRESHNESS_TIMEOUT + 1) + primary_replica.send_3pc_batch() + assert len(primary_replica.outBox) == 6 @pytest.mark.parametrize('ordered, refreshed', [ diff --git a/plenum/test/node_catchup/test_catchup_scenarios.py b/plenum/test/node_catchup/test_catchup_scenarios.py deleted file mode 100644 index 48b316fd88..0000000000 --- a/plenum/test/node_catchup/test_catchup_scenarios.py +++ /dev/null @@ -1,57 +0,0 @@ -import pytest - -from stp_core.loop.eventually import eventually -from stp_core.common.log import getlogger -from plenum.common.startable import Mode -from plenum.test.delayers import cqDelay -from plenum.test.helper import sdk_send_random_requests -from plenum.test.node_request.helper import sdk_ensure_pool_functional -from plenum.test.test_node import checkNodesConnected -from plenum.test import waits - -logger = getlogger() - -txnCount = 10 - - -@pytest.fixture("module") -def nodeStashingOrderedRequests(txnPoolNodeSet, sdk_node_created_after_some_txns): - looper, new_node, sdk_pool_handle, new_steward_wallet_handle = sdk_node_created_after_some_txns - for node in txnPoolNodeSet: - node.nodeIbStasher.delay(cqDelay(5)) - txnPoolNodeSet.append(new_node) - - sdk_ensure_pool_functional(looper, txnPoolNodeSet, new_steward_wallet_handle, sdk_pool_handle) - sdk_send_random_requests(looper, sdk_pool_handle, new_steward_wallet_handle, 10) - looper.run(checkNodesConnected(txnPoolNodeSet)) - - def stashing(): - assert new_node.mode != Mode.participating - assert len(new_node.stashedOrderedReqs) > 0 - # assert len(newNode.reqsFromCatchupReplies) > 0 - - timeout = waits.expectedTransactionExecutionTime(len(txnPoolNodeSet)) - looper.run(eventually(stashing, retryWait=1, timeout=timeout)) - - -@pytest.mark.skip(reason="SOV-552. Incomplete") -def testNodeNotProcessingOrderedReqsWhileCatchingUp( - nodeStashingOrderedRequests): - """ - Check that node does not execute requests while catching up - :return: - """ - - -@pytest.mark.skip(reason="SOV-553. Incomplete") -def testExecutedInOrderAfterCatchingUp(txnPoolNodeSet, - nodeStashingOrderedRequests): - """ - After catching up, while executing check for already see client id and - request id., maintain a list of seen client id and request ids, the node - while catching up keeps track of seen client ids and request id - Reset monitor after executing all stashed requests so no view change can - be proposed - :return: - """ - newNode = txnPoolNodeSet[-1] diff --git a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup.py b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup.py index 124e6ed885..c9994de296 100644 --- a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup.py +++ b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup.py @@ -94,7 +94,6 @@ def test_3pc_while_catchup(tdir, tconf, lagging_node.spylog.count(Node.allLedgersCaughtUp) == initial_all_ledgers_caught_up + 1) ) ) - # check that the node was able to order requests stashed during catch-up - # do not check for audit ledger since we didn't catch-up audit ledger when txns were ordering during catch-up - waitNodeDataEquality(looper, *txnPoolNodeSet, exclude_from_check='check_audit', customTimeout=5) + + waitNodeDataEquality(looper, *txnPoolNodeSet, customTimeout=5) assert all(replica.stasher.num_stashed_catchup == 0 for inst_id, replica in lagging_node.replicas.items()) diff --git a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_checkpoints.py b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_checkpoints.py index 618437516b..f6422964ba 100644 --- a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_checkpoints.py +++ b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_checkpoints.py @@ -149,9 +149,7 @@ def test_3pc_while_catchup_with_chkpoints(tdir, tconf, ) ) - # check that the node was able to order requests stashed during catch-up - # do not check for audit ledger since we didn't catch-up audit ledger when txns were ordering durinf catch-up - waitNodeDataEquality(looper, *txnPoolNodeSet, exclude_from_check='check_audit', customTimeout=5) + waitNodeDataEquality(looper, *txnPoolNodeSet, customTimeout=5) def get_stashed_checkpoints(node): diff --git a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py index 6833bc70c3..cb83b59c3a 100644 --- a/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py +++ b/plenum/test/node_catchup_with_3pc/test_stashing_3pc_while_catchup_only_checkpoints.py @@ -143,9 +143,7 @@ def test_3pc_while_catchup_with_chkpoints_only(tdir, tconf, assert lagging_node.spylog.count(Node.allLedgersCaughtUp) == initial_all_ledgers_caught_up + 1 assert lagging_node.spylog.count(Node.start_catchup) == 1 - # do not check for audit ledger since we didn't catch-up audit ledger when txns were ordering durinf catch-up - waitNodeDataEquality(looper, *txnPoolNodeSet, customTimeout=5, - exclude_from_check=['check_audit', 'check_last_ordered_3pc_backup']) + waitNodeDataEquality(looper, *txnPoolNodeSet, customTimeout=5) def get_stashed_checkpoints(node): diff --git a/plenum/test/node_request/test_apply_stashed_partially_ordered.py b/plenum/test/node_request/test_apply_stashed_partially_ordered.py deleted file mode 100644 index cfdaf09a18..0000000000 --- a/plenum/test/node_request/test_apply_stashed_partially_ordered.py +++ /dev/null @@ -1,75 +0,0 @@ -import pytest - -from plenum.common.constants import DOMAIN_LEDGER_ID -from plenum.common.startable import Mode -from plenum.test.delayers import cDelay -from plenum.test.helper import sdk_get_and_check_replies, sdk_send_random_requests, assertExp -from plenum.test.node_catchup.helper import ensure_all_nodes_have_same_data -from plenum.test.stasher import delay_rules -from plenum.test.test_node import getNonPrimaryReplicas -from stp_core.loop.eventually import eventually - -TOTAL_REQUESTS = 10 - - -@pytest.fixture(scope="module") -def tconf(tconf): - old_max_batch_wait = tconf.Max3PCBatchWait - old_max_batch_size = tconf.Max3PCBatchSize - # Make sure that all requests in test will end up in one batch - tconf.Max3PCBatchWait = 1000 - tconf.Max3PCBatchSize = TOTAL_REQUESTS - yield tconf - tconf.Max3PCBatchWait = old_max_batch_wait - tconf.Max3PCBatchSize = old_max_batch_size - - -def test_apply_stashed_partially_ordered(looper, - txnPoolNodeSet, - sdk_pool_handle, - sdk_wallet_client): - test_node = getNonPrimaryReplicas(txnPoolNodeSet)[0].node - test_stasher = test_node.nodeIbStasher - ledger_size = max(node.domainLedger.size for node in txnPoolNodeSet) - - def check_pool_ordered_some_requests(): - assert max(node.domainLedger.size for node in txnPoolNodeSet) > ledger_size - - def check_test_node_has_stashed_ordered_requests(): - assert len(test_node.stashedOrderedReqs) > 0 - - # Delay COMMITs so requests are not ordered on test node - with delay_rules(test_stasher, cDelay()): - reqs = sdk_send_random_requests(looper, sdk_pool_handle, sdk_wallet_client, TOTAL_REQUESTS) - looper.run(eventually(check_pool_ordered_some_requests)) - - # Get some of txns that need to be ordered - ledger_info = test_node.ledgerManager.ledgerRegistry[DOMAIN_LEDGER_ID] - txns = ledger_info.ledger.uncommittedTxns - txns = txns[:len(txns) // 2] - assert len(txns) > 1 - - # Emulate incomplete catchup simultaneous with generation of ORDERED message - origin_fun = test_node.try_processing_ordered - ordered_msgs = [] - test_node.try_processing_ordered = lambda msg: ordered_msgs.append(msg) - test_node.master_replica.revert_unordered_batches() - looper.run(eventually(lambda: assertExp(len(ordered_msgs) > 0))) - - test_node.mode = Mode.synced - test_node.try_processing_ordered = origin_fun - for msg in ordered_msgs: - test_node.try_processing_ordered(msg) - - looper.run(eventually(check_test_node_has_stashed_ordered_requests)) - for txn in txns: - ledger_info.ledger.add(txn) - ledger_info.postTxnAddedToLedgerClbk(DOMAIN_LEDGER_ID, txn) - test_node.mode = Mode.participating - test_node.processStashedOrderedReqs() - for r in test_node.replicas.values(): - r.stasher.unstash_catchup() - - ensure_all_nodes_have_same_data(looper, txnPoolNodeSet) - - sdk_get_and_check_replies(looper, reqs) diff --git a/plenum/test/primary_selection/test_primary_selector.py b/plenum/test/primary_selection/test_primary_selector.py index 21311ae360..5eae2f9318 100644 --- a/plenum/test/primary_selection/test_primary_selector.py +++ b/plenum/test/primary_selection/test_primary_selector.py @@ -142,9 +142,6 @@ def _clean_req_from_verified(self, request): def doneProcessingReq(self, key): pass - def processStashedOrderedReqs(self): - pass - def is_catchup_needed(self): return False diff --git a/plenum/test/replica/conftest.py b/plenum/test/replica/conftest.py index e9da121dd0..9fb7b80e94 100644 --- a/plenum/test/replica/conftest.py +++ b/plenum/test/replica/conftest.py @@ -31,6 +31,7 @@ def __init__(self, viewNo, quorums, ledger_ids): utc_epoch=lambda *args: get_utc_epoch(), mode=Mode.participating, view_change_in_progress=False, + pre_view_change_in_progress=False, requests=Requests(), onBatchCreated=lambda self, *args, **kwargs: True, applyReq=lambda self, *args, **kwargs: True, @@ -107,7 +108,7 @@ def replica(tconf, viewNo, inst_id, ledger_ids, mock_timestamp, fake_requests, t update_prepare=lambda a, b: a, process_prepare=lambda a, b: None, process_pre_prepare=lambda a, b: None, - process_order =lambda *args: None + process_order=lambda *args: None ) replica = Replica( node, instId=inst_id, isMaster=inst_id == 0, @@ -137,6 +138,12 @@ def reportSuspiciousNodeEx(ex): return replica +@pytest.fixture(scope='function') +def primary_replica(replica): + replica.primaryName = replica.name + return replica + + @pytest.fixture(scope='function') def replica_with_requests(replica, fake_requests): replica._apply_pre_prepare = lambda a: (fake_requests, [], []) @@ -173,4 +180,4 @@ def pre_prepare(replica, state_roots, txn_roots, multi_sig, fake_requests): @pytest.fixture(scope="function") def prepare(pre_prepare): - return create_prepare_from_pre_prepare(pre_prepare) \ No newline at end of file + return create_prepare_from_pre_prepare(pre_prepare) diff --git a/plenum/test/replica/stashing/test_unstash_after_catchup_in_view_change.py b/plenum/test/replica/stashing/test_unstash_after_catchup_in_view_change.py index 299f623bbf..3e2563a35e 100644 --- a/plenum/test/replica/stashing/test_unstash_after_catchup_in_view_change.py +++ b/plenum/test/replica/stashing/test_unstash_after_catchup_in_view_change.py @@ -4,10 +4,11 @@ from plenum.common.constants import COMMIT, PREPREPARE, PREPARE, LEDGER_STATUS from plenum.common.startable import Mode -from plenum.test.delayers import vcd_delay, msg_rep_delay, cDelay, cr_delay, lsDelay, cpDelay, cs_delay +from plenum.test.delayers import vcd_delay, msg_rep_delay, cDelay, cr_delay from plenum.test.helper import waitForViewChange, sdk_send_random_and_check, assertExp, sdk_send_random_request, \ sdk_get_and_check_replies -from plenum.test.node_catchup.helper import waitNodeDataEquality +from plenum.test.node_catchup.helper import ensure_all_nodes_have_same_data +from plenum.test.node_request.helper import sdk_ensure_pool_functional from plenum.test.stasher import delay_rules from plenum.test.test_node import ensureElectionsDone from stp_core.loop.eventually import eventually @@ -34,7 +35,7 @@ def test_unstash_three_phase_msg_after_catchup_in_view_change(txnPoolNodeSet, lo 10. Reset Ledger Status on Nodes1-3 11. Check that 3 nodes finished VC while Node4 is syncing and not finished 12. Reset CatchupRep on Node4 - 13. Check that Node4 finished VC, and there was just 1 round of cacth-up (edited) + 13. Check that Node4 finished VC, and there was just 1 round of catch-up """ slow_node = txnPoolNodeSet[-1] fast_nodes = txnPoolNodeSet[:-1] @@ -107,6 +108,9 @@ def check_commits(commit_key): for n in txnPoolNodeSet) assert slow_node.catchup_rounds_without_txns == 1 + ensure_all_nodes_have_same_data(looper, txnPoolNodeSet) + sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_steward, sdk_pool_handle) + def _check_nodes_stashed(nodes, old_stashed, new_stashed): for n in nodes: diff --git a/plenum/test/replica/test_replica_3pc_validation.py b/plenum/test/replica/test_replica_3pc_validation.py index 4485ba4fab..1538ca8bb0 100644 --- a/plenum/test/replica/test_replica_3pc_validation.py +++ b/plenum/test/replica/test_replica_3pc_validation.py @@ -1,9 +1,6 @@ -import functools - import pytest from plenum.common.startable import Mode -from plenum.server.node import Node from plenum.server.replica_validator import ReplicaValidator from plenum.server.replica_validator_enums import DISCARD, INCORRECT_INSTANCE, PROCESS, ALREADY_ORDERED, FUTURE_VIEW, \ GREATER_PREP_CERT, OLD_VIEW, CATCHING_UP, OUTSIDE_WATERMARKS, INCORRECT_PP_SEQ_NO, STASH_VIEW, STASH_WATERMARKS, \ @@ -26,6 +23,11 @@ def validator(replica, inst_id): return ReplicaValidator(replica=replica) +@pytest.fixture(scope='function') +def primary_validator(primary_replica, inst_id): + return ReplicaValidator(replica=primary_replica) + + def create_3pc_msgs(view_no, pp_seq_no, inst_id): pre_prepare = create_pre_prepare_no_bls(generate_state_root(), view_no=view_no, @@ -113,16 +115,40 @@ def test_check_previous_view_view_change_no_prep_cert(validator): (Mode.synced, (PROCESS, None)), (Mode.participating, (PROCESS, None)) ]) -def test_check_catchup_modes_in_view_change_for_prep_cert(validator, result, mode): +def test_check_catchup_modes_in_view_change_for_prep_cert_for_commit(validator, result, mode): pp_seq_no = 10 validator.replica.node.view_change_in_progress = True validator.replica.node.mode = mode validator.replica.last_prepared_before_view_change = (validator.view_no - 1, pp_seq_no) - for msg in create_3pc_msgs(view_no=validator.view_no - 1, - pp_seq_no=pp_seq_no, - inst_id=validator.inst_id): - assert validator.validate_3pc_msg(msg) == result + commit = create_commit_no_bls_sig(req_key=(validator.view_no - 1, pp_seq_no), + inst_id=validator.inst_id) + assert validator.validate_3pc_msg(commit) == result + + +@pytest.mark.parametrize('mode', [ + Mode.starting, + Mode.discovering, + Mode.discovered, + Mode.syncing, + Mode.synced, + Mode.participating +]) +def test_check_catchup_modes_in_view_change_for_prep_cert_for_non_commit(validator, mode): + pp_seq_no = 10 + validator.replica.node.view_change_in_progress = True + validator.replica.node.mode = mode + validator.replica.last_prepared_before_view_change = (validator.view_no - 1, + pp_seq_no) + pre_prepare = create_pre_prepare_no_bls(generate_state_root(), + view_no=validator.view_no - 1, + pp_seq_no=pp_seq_no, + inst_id=validator.inst_id) + prepare = create_prepare(req_key=(validator.view_no - 1, pp_seq_no), + state_root=generate_state_root(), + inst_id=validator.inst_id) + assert validator.validate_3pc_msg(pre_prepare) == (DISCARD, OLD_VIEW) + assert validator.validate_3pc_msg(prepare) == (DISCARD, OLD_VIEW) @pytest.mark.parametrize('pp_seq_no, result', [ @@ -135,13 +161,29 @@ def test_check_catchup_modes_in_view_change_for_prep_cert(validator, result, mod (12, (DISCARD, GREATER_PREP_CERT)), (100, (DISCARD, GREATER_PREP_CERT)), ]) -def test_check_previous_view_view_change_prep_cert(validator, pp_seq_no, result): +def test_check_previous_view_view_change_prep_cert_commit(validator, pp_seq_no, result): validator.replica.node.view_change_in_progress = True validator.replica.last_prepared_before_view_change = (validator.view_no - 1, 10) - for msg in create_3pc_msgs(view_no=validator.view_no - 1, - pp_seq_no=pp_seq_no, - inst_id=validator.inst_id): - assert validator.validate_3pc_msg(msg) == result + commit = create_commit_no_bls_sig(req_key=(validator.view_no - 1, pp_seq_no), + inst_id=validator.inst_id) + assert validator.validate_3pc_msg(commit) == result + + +@pytest.mark.parametrize('pp_seq_no', [ + 1, 9, 10, 11, 12, 100 +]) +def test_check_previous_view_view_change_prep_cert_non_commit(validator, pp_seq_no): + validator.replica.node.view_change_in_progress = True + validator.replica.last_prepared_before_view_change = (validator.view_no - 1, 10) + pre_prepare = create_pre_prepare_no_bls(generate_state_root(), + view_no=validator.view_no - 1, + pp_seq_no=pp_seq_no, + inst_id=validator.inst_id) + prepare = create_prepare(req_key=(validator.view_no - 1, pp_seq_no), + state_root=generate_state_root(), + inst_id=validator.inst_id) + assert validator.validate_3pc_msg(pre_prepare) == (DISCARD, OLD_VIEW) + assert validator.validate_3pc_msg(prepare) == (DISCARD, OLD_VIEW) @pytest.mark.parametrize('pp_seq_no, result', [ @@ -242,3 +284,88 @@ def test_check_ordered_not_participating(validator, pp_seq_no, result): pp_seq_no=pp_seq_no, inst_id=validator.inst_id): assert validator.validate_3pc_msg(msg) == result + + +def test_can_send_3pc_batch_by_primary_only(primary_validator): + assert primary_validator.can_send_3pc_batch() + primary_validator.replica.primaryName = "SomeNode:0" + assert not primary_validator.can_send_3pc_batch() + + +@pytest.mark.parametrize('mode', [ + Mode.starting, + Mode.discovering, + Mode.discovered, + Mode.syncing, + Mode.synced, + Mode.participating +]) +def test_can_send_3pc_batch_not_participating(primary_validator, mode): + primary_validator.replica.node.mode = mode + result = primary_validator.can_send_3pc_batch() + assert result == (mode == Mode.participating) + + +@pytest.mark.parametrize('mode', [ + Mode.starting, + Mode.discovering, + Mode.discovered, + Mode.syncing, + Mode.synced, + Mode.participating +]) +def test_can_send_3pc_batch_pre_view_change(primary_validator, mode): + primary_validator.replica.node.pre_view_change_in_progress = True + primary_validator.replica.node.mode = mode + assert not primary_validator.can_send_3pc_batch() + + +@pytest.mark.parametrize('mode', [ + Mode.starting, + Mode.discovering, + Mode.discovered, + Mode.syncing, + Mode.synced, + Mode.participating +]) +def test_can_send_3pc_batch_old_view(primary_validator, mode): + primary_validator.replica.last_ordered_3pc = (primary_validator.replica.viewNo + 1, 0) + primary_validator.replica.node.mode = mode + assert not primary_validator.can_send_3pc_batch() + + +@pytest.mark.parametrize('mode', [ + Mode.starting, + Mode.discovering, + Mode.discovered, + Mode.syncing, + Mode.synced, + Mode.participating +]) +def test_can_send_3pc_batch_old_pp_seq_no_for_view(primary_validator, mode): + primary_validator.replica.last_ordered_3pc = (primary_validator.replica.viewNo, 100) + primary_validator.replica.lastPrePrepareSeqNo = 0 + primary_validator.replica.node.mode = mode + assert not primary_validator.can_send_3pc_batch() + + +def test_can_order(validator): + assert validator.can_order() + + +@pytest.mark.parametrize('mode', [ + Mode.starting, + Mode.discovering, + Mode.discovered, + Mode.syncing, + Mode.synced +]) +def test_cant_order_not_participating(validator, mode): + validator.replica.node.mode = mode + assert not validator.can_order() + + +def test_can_order_synced_and_view_change(validator): + validator.replica.node.mode = Mode.synced + validator.replica.node.view_change_in_progress = True + assert validator.can_order() diff --git a/plenum/test/test_node.py b/plenum/test/test_node.py index 009d2a8f2b..9a4e0a9045 100644 --- a/plenum/test/test_node.py +++ b/plenum/test/test_node.py @@ -324,7 +324,6 @@ def processRequest(self, request, frm): Node.forward, Node.send, Node.checkPerformance, - Node.processStashedOrderedReqs, Node.lost_master_primary, Node.propose_view_change, Node.getReplyFromLedger, @@ -344,8 +343,7 @@ def processRequest(self, request, frm): Node.request_propagates, Node.transmitToClient, Node.has_ordered_till_last_prepared_certificate, - Node.on_inconsistent_3pc_state, - Node.apply_stashed_reqs + Node.on_inconsistent_3pc_state ] diff --git a/plenum/test/view_change/test_catchup_to_next_view_during_view_change.py b/plenum/test/view_change/test_catchup_to_next_view_during_view_change.py new file mode 100644 index 0000000000..e11887c739 --- /dev/null +++ b/plenum/test/view_change/test_catchup_to_next_view_during_view_change.py @@ -0,0 +1,131 @@ +import pytest + +from plenum.test.delayers import icDelay, vcd_delay, delay_for_view +from plenum.test.helper import checkViewNoForNodes, sdk_send_random_and_check, waitForViewChange +from plenum.test.node_catchup.helper import ensure_all_nodes_have_same_data +from plenum.test.node_request.helper import sdk_ensure_pool_functional +from plenum.test.stasher import delay_rules +from plenum.test.test_node import checkProtocolInstanceSetup, ensureElectionsDone + +nodeCount = 7 + + +def test_catchup_to_next_view_during_view_change_0_to_1_then_1_to_2(txnPoolNodeSet, looper, + sdk_pool_handle, sdk_wallet_steward): + ''' + 1) Lagging node is not a primary for new views + 2) All nodes except the lagging one go to view=1 + 3) All nodes except the lagging one order txns on view=1 + 4) All nodes except the lagging one go to view=2 + 5) All nodes except the lagging one order txns on view=2 + 6) Lagging node gets InstanceChanges for view=1 => it changes to view=1, and catches up till txns from view=2 + 7) Lagging node gets InstanceChanges for view=2 => it changes to view=2 + 8) Make sure that the lagging node is up to date, and canc participate in consensus + ''' + lagging_node = txnPoolNodeSet[0] + other_nodes = txnPoolNodeSet[1:] + initial_view_no = checkViewNoForNodes(txnPoolNodeSet) + initial_last_ordered = lagging_node.master_last_ordered_3PC + + with delay_rules(lagging_node.nodeIbStasher, icDelay(viewNo=2), vcd_delay(viewNo=2)): + with delay_rules(lagging_node.nodeIbStasher, delay_for_view(viewNo=0), delay_for_view(viewNo=1)): + # view change to viewNo=1 + for n in txnPoolNodeSet: + n.view_changer.on_master_degradation() + waitForViewChange(looper, + other_nodes, + expectedViewNo=initial_view_no + 1) + checkProtocolInstanceSetup(looper=looper, nodes=other_nodes, instances=range(3)) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # order some txns + sdk_send_random_and_check(looper, txnPoolNodeSet, + sdk_pool_handle, sdk_wallet_steward, 5) + + # view change to viewNo=2 + for n in txnPoolNodeSet: + n.view_changer.on_master_degradation() + waitForViewChange(looper, + other_nodes, + expectedViewNo=initial_view_no + 2) + checkProtocolInstanceSetup(looper=looper, nodes=other_nodes, instances=range(3)) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # order some txns + sdk_send_random_and_check(looper, txnPoolNodeSet, + sdk_pool_handle, sdk_wallet_steward, 5) + + assert initial_view_no == lagging_node.viewNo + assert initial_last_ordered == lagging_node.master_last_ordered_3PC + + # make sure that the first View Change happened on the lagging node + waitForViewChange(looper, [lagging_node], expectedViewNo=initial_view_no + 1, + customTimeout=20) + assert initial_view_no + 1 == lagging_node.viewNo + + # make sure that the second View Change happened on the lagging node + waitForViewChange(looper, [lagging_node], expectedViewNo=initial_view_no + 2, + customTimeout=20) + ensureElectionsDone(looper=looper, nodes=txnPoolNodeSet) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # make sure that the pool is functional + sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_steward, sdk_pool_handle) + + +@pytest.mark.skip("INDY-2044") +def test_catchup_to_next_view_during_view_change_0_to_2(txnPoolNodeSet, looper, + sdk_pool_handle, sdk_wallet_steward): + ''' + 1) Lagging node is not a primary for new views + 2) All nodes except the lagging one go to view=1 + 3) All nodes except the lagging one order txns on view=1 + 4) All nodes except the lagging one go to view=2 + 5) All nodes except the lagging one order txns on view=2 + 6) Lagging node gets InstanceChanges for view=1 and view=2 => it changes to view=2, and catches up till txns from view=2 + 7) Make sure that the lagging node is up to date, and can participate in consensus + ''' + lagging_node = txnPoolNodeSet[0] + other_nodes = txnPoolNodeSet[1:] + initial_view_no = checkViewNoForNodes(txnPoolNodeSet) + initial_last_ordered = lagging_node.master_last_ordered_3PC + + with delay_rules(lagging_node.nodeIbStasher, delay_for_view(viewNo=0), delay_for_view(viewNo=1), + delay_for_view(viewNo=2)): + # view change to viewNo=1 + for n in txnPoolNodeSet: + n.view_changer.on_master_degradation() + waitForViewChange(looper, + other_nodes, + expectedViewNo=initial_view_no + 1) + checkProtocolInstanceSetup(looper=looper, nodes=other_nodes, instances=range(3)) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # order some txns + sdk_send_random_and_check(looper, txnPoolNodeSet, + sdk_pool_handle, sdk_wallet_steward, 5) + + # view change to viewNo=2 + for n in txnPoolNodeSet: + n.view_changer.on_master_degradation() + waitForViewChange(looper, + other_nodes, + expectedViewNo=initial_view_no + 2) + checkProtocolInstanceSetup(looper=looper, nodes=other_nodes, instances=range(3)) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # order some txns + sdk_send_random_and_check(looper, txnPoolNodeSet, + sdk_pool_handle, sdk_wallet_steward, 5) + + assert initial_view_no == lagging_node.viewNo + assert initial_last_ordered == lagging_node.master_last_ordered_3PC + + # make sure that the second View Change happened on the lagging node + waitForViewChange(looper, [lagging_node], expectedViewNo=initial_view_no + 2, + customTimeout=20) + ensureElectionsDone(looper=looper, nodes=txnPoolNodeSet) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # make sure that the pool is functional + sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_steward, sdk_pool_handle) diff --git a/plenum/test/view_change/test_catchup_to_next_view_during_view_change_by_primary.py b/plenum/test/view_change/test_catchup_to_next_view_during_view_change_by_primary.py new file mode 100644 index 0000000000..591bb4873a --- /dev/null +++ b/plenum/test/view_change/test_catchup_to_next_view_during_view_change_by_primary.py @@ -0,0 +1,70 @@ +import pytest + +from plenum.common.constants import DOMAIN_LEDGER_ID +from plenum.test.delayers import delay_for_view +from plenum.test.helper import checkViewNoForNodes, sdk_send_random_and_check, waitForViewChange, view_change_timeout +from plenum.test.node_catchup.helper import ensure_all_nodes_have_same_data +from plenum.test.node_request.helper import sdk_ensure_pool_functional +from plenum.test.stasher import delay_rules +from plenum.test.test_node import checkProtocolInstanceSetup, ensureElectionsDone + +nodeCount = 7 +VIEW_CHANGE_TIMEOUT = 5 + + +@pytest.fixture(scope="module") +def tconf(tconf): + with view_change_timeout(tconf, VIEW_CHANGE_TIMEOUT): + yield tconf + + +def test_catchup_to_next_view_during_view_change_by_primary(txnPoolNodeSet, looper, + sdk_pool_handle, sdk_wallet_steward): + ''' + 1) Lagging node is a primary for view=1 + 2) All nodes except the lagging one start a view change (to view=1) + 3) The nodes can not finish it on time since the Primary for view=1 is lagging + 4) All nodes except the lagging one go to view=2 then + 5) All nodes except the lagging one order txns on view=2 + 6) Lagging node gets InstanceChanges for view=1 => it changes to view=2, and catches up till txns from view=2 + 7) Lagging node gets InstanceChanges for view=2 => it changes to view=2 + 8) Make sure that the lagging node is up to date, and can participate in consensus + ''' + lagging_node = txnPoolNodeSet[1] + other_nodes = list(set(txnPoolNodeSet) - {lagging_node}) + initial_view_no = checkViewNoForNodes(txnPoolNodeSet) + initial_last_ordered = lagging_node.master_last_ordered_3PC + + with delay_rules(lagging_node.nodeIbStasher, delay_for_view(viewNo=2)): + with delay_rules(lagging_node.nodeIbStasher, delay_for_view(viewNo=0), delay_for_view(viewNo=1)): + # view change to viewNo=2 since a primary for viewNo=1 is a lagging node + for n in txnPoolNodeSet: + n.view_changer.on_master_degradation() + waitForViewChange(looper, + other_nodes, + expectedViewNo=initial_view_no + 2, + customTimeout=30) + checkProtocolInstanceSetup(looper=looper, nodes=other_nodes, instances=range(3)) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # order some txns + sdk_send_random_and_check(looper, txnPoolNodeSet, + sdk_pool_handle, sdk_wallet_steward, 5) + + assert initial_view_no == lagging_node.viewNo + assert initial_last_ordered == lagging_node.master_last_ordered_3PC + assert len(lagging_node.master_replica.requestQueues[DOMAIN_LEDGER_ID]) > 0 + + # make sure that the first View Change happened on lagging node + waitForViewChange(looper, [lagging_node], expectedViewNo=initial_view_no + 1, + customTimeout=20) + assert initial_view_no + 1 == lagging_node.viewNo + + # make sure that the second View Change happened on lagging node + waitForViewChange(looper, [lagging_node], expectedViewNo=initial_view_no + 2, + customTimeout=20) + ensureElectionsDone(looper=looper, nodes=txnPoolNodeSet) + ensure_all_nodes_have_same_data(looper, nodes=other_nodes) + + # make sure that the pool is functional + sdk_ensure_pool_functional(looper, txnPoolNodeSet, sdk_wallet_steward, sdk_pool_handle) diff --git a/plenum/test/view_change/test_last_ordered_reset_for_new_view.py b/plenum/test/view_change/test_last_ordered_reset_for_new_view.py index dedbaa8a9c..452092ee29 100644 --- a/plenum/test/view_change/test_last_ordered_reset_for_new_view.py +++ b/plenum/test/view_change/test_last_ordered_reset_for_new_view.py @@ -3,66 +3,44 @@ from plenum.test.view_change.helper import ensure_view_change_complete -def test_last_ordered_3pc_reset_if_more_than_new_view(txnPoolNodeSet, looper, sdk_pool_handle, sdk_wallet_client): +def test_last_ordered_3pc_not_reset_if_more_than_new_view(txnPoolNodeSet, looper, sdk_pool_handle, sdk_wallet_client): """ Check that if last_ordered_3pc's viewNo on a Replica is greater than the new viewNo after view change, - then last_ordered_3pc is reset to (0,0). - It can be that last_ordered_3pc was set for the previous view, since it's set during catch-up + then last_ordered_3pc is not reset. + It can be that last_ordered_3pc was set for the next view, since it's set during catch-up - Example: a Node has last_ordered = (1, 300), and then the whole pool except this node restarted. - The new viewNo is 0, but last_ordered is (1, 300), so all new requests will be discarded by this Node - if we don't reset last_ordered_3pc """ old_view_no = checkViewNoForNodes(txnPoolNodeSet) for node in txnPoolNodeSet: node.master_replica.last_ordered_3pc = (old_view_no + 2, 100) ensure_view_change_complete(looper, txnPoolNodeSet, customTimeout=60) - view_no = checkViewNoForNodes(txnPoolNodeSet) - # After view_change, master primary must initiate 3pc batch for node in txnPoolNodeSet: - assert (view_no, 1) == node.master_replica.last_ordered_3pc + assert (old_view_no + 2, 100) == node.master_replica.last_ordered_3pc - # Make sure the pool is working - sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_client, 5) - ensure_all_nodes_have_same_data(looper, txnPoolNodeSet) - -def test_last_ordered_3pc_reset_if_equal_to_new_view(txnPoolNodeSet, looper, sdk_pool_handle, sdk_wallet_client): +def test_last_ordered_3pc_not_reset_if_equal_to_new_view(txnPoolNodeSet, looper, sdk_pool_handle, sdk_wallet_client): """ Check that if last_ordered_3pc's viewNo on a Replica is equal to the new viewNo after view change, - then last_ordered_3pc is reset to (0,0). - It can be that last_ordered_3pc was set for the previous view, since it's set during catch-up + then last_ordered_3pc is not reset. + It can be that last_ordered_3pc was set for the next view, since it's set during catch-up - Example: a Node has last_ordered = (1, 300), and then the whole pool except this node restarted. - The new viewNo is 0, but last_ordered is (1, 300), so all new requests will be discarded by this Node - if we don't reset last_ordered_3pc """ old_view_no = checkViewNoForNodes(txnPoolNodeSet) for node in txnPoolNodeSet: node.master_replica.last_ordered_3pc = (old_view_no + 1, 100) ensure_view_change_complete(looper, txnPoolNodeSet, customTimeout=60) - view_no = checkViewNoForNodes(txnPoolNodeSet) for node in txnPoolNodeSet: - assert (view_no, 1) == node.master_replica.last_ordered_3pc - - # Make sure the pool is working - sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_client, 5) - ensure_all_nodes_have_same_data(looper, txnPoolNodeSet) + assert (old_view_no + 1, 100) == node.master_replica.last_ordered_3pc def test_last_ordered_3pc_not_reset_if_less_than_new_view(txnPoolNodeSet, looper, sdk_pool_handle, sdk_wallet_client): """ - Check that if last_ordered_3pc's viewNo on a Replica is equal to the new viewNo after view change, - then last_ordered_3pc is reset to (0,0). - It can be that last_ordered_3pc was set for the previous view, since it's set during catch-up - - Example: a Node has last_ordered = (1, 300), and then the whole pool except this node restarted. - The new viewNo is 0, but last_ordered is (1, 300), so all new requests will be discarded by this Node - if we don't reset last_ordered_3pc + Check that if last_ordered_3pc's viewNo on a Replica is less than the new viewNo after view change, + then last_ordered_3pc is not reset. """ old_view_no = checkViewNoForNodes(txnPoolNodeSet) for node in txnPoolNodeSet: