diff --git a/plenum/server/replica.py b/plenum/server/replica.py index d8a3117577..2825ee562d 100644 --- a/plenum/server/replica.py +++ b/plenum/server/replica.py @@ -1751,7 +1751,7 @@ def order_3pc_key(self, key): if self.isMaster: self.metrics.add_event(MetricsName.MASTER_ORDERED_BATCH_SIZE, pp.discarded) - self.addToCheckpoint(pp.ppSeqNo, pp.digest, pp.ledgerId) + self.addToCheckpoint(pp.ppSeqNo, pp.digest, pp.ledgerId, pp.viewNo) # BLS multi-sig: self._bls_bft_replica.process_order(key, self.quorums, pp) @@ -1782,6 +1782,12 @@ def processCheckpoint(self, msg: Checkpoint, sender: str) -> bool: self.discard(msg, reason="Checkpoint already stable", logMethod=self.logger.debug) return True + if self.isPrimary is None and msg.viewNo < self.viewNo: + self.discard(msg, + reason="Checkpoint from previous view", + logMethod=self.logger.debug) + return True + seqNoStart = msg.seqNoStart key = (seqNoStart, seqNoEnd) @@ -1830,7 +1836,7 @@ def __start_catchup_if_needed(self): format(self, lag_in_checkpoints)) self.node.start_catchup() - def addToCheckpoint(self, ppSeqNo, digest, ledger_id): + def addToCheckpoint(self, ppSeqNo, digest, ledger_id, view_no): for (s, e) in self.checkpoints.keys(): if s <= ppSeqNo <= e: state = self.checkpoints[s, e] # type: CheckpointState @@ -1858,11 +1864,11 @@ def addToCheckpoint(self, ppSeqNo, digest, ledger_id): self.checkpoints[s, e] = state self.logger.info("{} sending Checkpoint {} view {} checkpointState digest {}. Ledger {} " "txn root hash {}. Committed state root hash {} Uncommitted state root hash {}". - format(self, (s, e), self.viewNo, state.digest, ledger_id, + format(self, (s, e), view_no, state.digest, ledger_id, self.txnRootHash(ledger_id), self.stateRootHash(ledger_id, committed=True), self.stateRootHash(ledger_id, committed=False))) - self.send(Checkpoint(self.instId, self.viewNo, s, e, state.digest)) - self.processStashedCheckpoints((s, e)) + self.send(Checkpoint(self.instId, view_no, s, e, state.digest)) + self.processStashedCheckpoints((s, e), view_no) def markCheckPointStable(self, seqNo): previousCheckpoints = [] @@ -1929,16 +1935,16 @@ def stashed_checkpoints_with_quorum(self): end_pp_seq_numbers.append(seq_no_end) return sorted(end_pp_seq_numbers) - def processStashedCheckpoints(self, key): + def processStashedCheckpoints(self, key, view_no): # Remove all checkpoints from previous views if any self._remove_stashed_checkpoints(till_3pc_key=(self.viewNo, 0)) - if key not in self.stashedRecvdCheckpoints.get(self.viewNo, {}): + if key not in self.stashedRecvdCheckpoints.get(view_no, {}): self.logger.trace("{} have no stashed checkpoints for {}") - return 0 + return # Get a snapshot of all the senders of stashed checkpoints for `key` - senders = list(self.stashedRecvdCheckpoints[self.viewNo][key].keys()) + senders = list(self.stashedRecvdCheckpoints[view_no][key].keys()) total_processed = 0 consumed = 0 @@ -1947,11 +1953,11 @@ def processStashedCheckpoints(self, key): # `stashedRecvdCheckpoints` because it might be removed from there # in case own checkpoint was stabilized when we were processing # stashed checkpoints from previous senders in this loop - if self.viewNo in self.stashedRecvdCheckpoints \ - and key in self.stashedRecvdCheckpoints[self.viewNo] \ - and sender in self.stashedRecvdCheckpoints[self.viewNo][key]: + if view_no in self.stashedRecvdCheckpoints \ + and key in self.stashedRecvdCheckpoints[view_no] \ + and sender in self.stashedRecvdCheckpoints[view_no][key]: if self.processCheckpoint( - self.stashedRecvdCheckpoints[self.viewNo][key].pop(sender), + self.stashedRecvdCheckpoints[view_no][key].pop(sender), sender): consumed += 1 # Note that if `processCheckpoint` returned False then the @@ -1961,12 +1967,12 @@ def processStashedCheckpoints(self, key): # If we have consumed stashed checkpoints for `key` from all the # senders then remove entries which have become empty - if self.viewNo in self.stashedRecvdCheckpoints \ - and key in self.stashedRecvdCheckpoints[self.viewNo] \ - and len(self.stashedRecvdCheckpoints[self.viewNo][key]) == 0: - del self.stashedRecvdCheckpoints[self.viewNo][key] - if len(self.stashedRecvdCheckpoints[self.viewNo]) == 0: - del self.stashedRecvdCheckpoints[self.viewNo] + if view_no in self.stashedRecvdCheckpoints \ + and key in self.stashedRecvdCheckpoints[view_no] \ + and len(self.stashedRecvdCheckpoints[view_no][key]) == 0: + del self.stashedRecvdCheckpoints[view_no][key] + if len(self.stashedRecvdCheckpoints[view_no]) == 0: + del self.stashedRecvdCheckpoints[view_no] restashed = total_processed - consumed self.logger.info('{} processed {} stashed checkpoints for {}, ' diff --git a/plenum/test/checkpoints/test_checkpoints_removal_in_view_change.py b/plenum/test/checkpoints/test_checkpoints_removal_in_view_change.py new file mode 100644 index 0000000000..3480032930 --- /dev/null +++ b/plenum/test/checkpoints/test_checkpoints_removal_in_view_change.py @@ -0,0 +1,119 @@ +from numbers import Rational + +import math +import pytest +import sys + +from plenum.common.constants import CHECKPOINT, COMMIT +from plenum.test.delayers import cDelay, chk_delay, lsDelay, vcd_delay +from plenum.test.helper import sdk_send_random_requests, \ + sdk_get_and_check_replies, sdk_send_random_and_check +from plenum.test.node_catchup.helper import ensure_all_nodes_have_same_data +from plenum.test.test_node import ensureElectionsDone +from stp_core.loop.eventually import eventually + +CHK_FREQ = 2 + + +def test_checkpoints_removed_in_view_change(chkFreqPatched, + txnPoolNodeSet, + looper, + sdk_pool_handle, + sdk_wallet_client): + ''' + Check that checkpoint finalize in view change before catchup doesn't clean + necessary data from requests and 3pc queues. + ''' + slow_nodes = txnPoolNodeSet[:3] + fast_nodes = txnPoolNodeSet[3:] + # delay checkpoints processing for slow_nodes + delay_msg(slow_nodes, chk_delay) + # send txns for finalizing current checkpoint + sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, + sdk_wallet_client, CHK_FREQ) + ensure_all_nodes_have_same_data(looper, nodes=txnPoolNodeSet) + # delay commits processing for slow_nodes + delay_msg(slow_nodes, cDelay) + + requests = sdk_send_random_requests(looper, sdk_pool_handle, + sdk_wallet_client, 1) + # check that slow nodes have prepared certificate with new txn + looper.run(eventually(last_prepared_certificate, + slow_nodes, + (0, CHK_FREQ + 1))) + # check that fast_nodes ordered new txn + looper.run(eventually(last_ordered_check, + fast_nodes, + (0, CHK_FREQ + 1))) + # check that fast_nodes finalized first checkpoint and slow_nodes are not + looper.run(eventually(check_checkpoint_finalize, + fast_nodes, + 1, CHK_FREQ)) + for n in slow_nodes: + assert not n.master_replica.checkpoints[(1, CHK_FREQ)].isStable + + # View change start emulation for change viewNo and fix last prepare + # certificate, because if we start a real view change then checkpoints will + # clean and the first checkpoint would not be need in finalizing. + for node in txnPoolNodeSet: + node.viewNo += 1 + node.master_replica.on_view_change_start() + + # reset delay for checkpoints + reset_delay(slow_nodes, CHECKPOINT) + # reset view change emulation and start real view change for finish it in + # a normal mode with catchup + for node in txnPoolNodeSet: + node.viewNo -= 1 + node.view_changer.on_master_degradation() + for n in slow_nodes: + assert not n.master_replica.checkpoints[(1, CHK_FREQ)].isStable + # Check ordering the last txn before catchup. Check client reply is enough + # because slow_nodes contains 3 nodes and without their replies sdk method + # for get reply will not successfully finish. + reset_delay(slow_nodes, COMMIT) + sdk_get_and_check_replies(looper, requests) + looper.run(eventually(last_ordered_check, + txnPoolNodeSet, + (0, CHK_FREQ + 1))) + # check view change finish and checkpoints were cleaned + ensureElectionsDone(looper, txnPoolNodeSet) + for n in slow_nodes: + assert (1, CHK_FREQ) not in n.master_replica.checkpoints + # check that all nodes have same data after new txns ordering + sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, + sdk_wallet_client, CHK_FREQ) + ensure_all_nodes_have_same_data(looper, nodes=txnPoolNodeSet) + + +def delay_msg(nodes, delay_function, types=None): + for n in nodes: + if types is None: + n.nodeIbStasher.delay(delay_function(sys.maxsize)) + else: + n.nodeIbStasher.delay(delay_function(sys.maxsize, types)) + + +def reset_delay(nodes, message): + for n in nodes: + n.nodeIbStasher.reset_delays_and_process_delayeds(message) + + +def last_prepared_certificate(nodes, num): + for n in nodes: + assert n.master_replica.last_prepared_certificate_in_view() == num + + +def check_checkpoint_finalize(nodes, start_pp_seq_no, end_pp_seq_no): + for n in nodes: + checkpoint = n.master_replica.checkpoints[(start_pp_seq_no, end_pp_seq_no)] + assert checkpoint.isStable + + +def last_ordered_check(nodes, last_ordered, instance_id=None): + for n in nodes: + last_ordered_3pc = n.master_last_ordered_3PC \ + if instance_id is None \ + else n.replicas[instance_id].last_ordered_3pc + assert last_ordered_3pc == last_ordered +