Skip to content

INDY-1545: stop processing checkpoints in view change #854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

44 changes: 25 additions & 19 deletions plenum/server/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ def order_3pc_key(self, key):
if self.isMaster:
self.metrics.add_event(MetricsName.MASTER_ORDERED_BATCH_SIZE, pp.discarded)

self.addToCheckpoint(pp.ppSeqNo, pp.digest, pp.ledgerId)
self.addToCheckpoint(pp.ppSeqNo, pp.digest, pp.ledgerId, pp.viewNo)

# BLS multi-sig:
self._bls_bft_replica.process_order(key, self.quorums, pp)
Expand Down Expand Up @@ -1782,6 +1782,12 @@ def processCheckpoint(self, msg: Checkpoint, sender: str) -> bool:
self.discard(msg, reason="Checkpoint already stable", logMethod=self.logger.debug)
return True

if self.isPrimary is None and msg.viewNo < self.viewNo:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a check self.isPrimary is None?

self.discard(msg,
reason="Checkpoint from previous view",
logMethod=self.logger.debug)
return True

seqNoStart = msg.seqNoStart
key = (seqNoStart, seqNoEnd)

Expand Down Expand Up @@ -1830,7 +1836,7 @@ def __start_catchup_if_needed(self):
format(self, lag_in_checkpoints))
self.node.start_catchup()

def addToCheckpoint(self, ppSeqNo, digest, ledger_id):
def addToCheckpoint(self, ppSeqNo, digest, ledger_id, view_no):
for (s, e) in self.checkpoints.keys():
if s <= ppSeqNo <= e:
state = self.checkpoints[s, e] # type: CheckpointState
Expand Down Expand Up @@ -1858,11 +1864,11 @@ def addToCheckpoint(self, ppSeqNo, digest, ledger_id):
self.checkpoints[s, e] = state
self.logger.info("{} sending Checkpoint {} view {} checkpointState digest {}. Ledger {} "
"txn root hash {}. Committed state root hash {} Uncommitted state root hash {}".
format(self, (s, e), self.viewNo, state.digest, ledger_id,
format(self, (s, e), view_no, state.digest, ledger_id,
self.txnRootHash(ledger_id), self.stateRootHash(ledger_id, committed=True),
self.stateRootHash(ledger_id, committed=False)))
self.send(Checkpoint(self.instId, self.viewNo, s, e, state.digest))
self.processStashedCheckpoints((s, e))
self.send(Checkpoint(self.instId, view_no, s, e, state.digest))
self.processStashedCheckpoints((s, e), view_no)

def markCheckPointStable(self, seqNo):
previousCheckpoints = []
Expand Down Expand Up @@ -1929,16 +1935,16 @@ def stashed_checkpoints_with_quorum(self):
end_pp_seq_numbers.append(seq_no_end)
return sorted(end_pp_seq_numbers)

def processStashedCheckpoints(self, key):
def processStashedCheckpoints(self, key, view_no):
# Remove all checkpoints from previous views if any
self._remove_stashed_checkpoints(till_3pc_key=(self.viewNo, 0))

if key not in self.stashedRecvdCheckpoints.get(self.viewNo, {}):
if key not in self.stashedRecvdCheckpoints.get(view_no, {}):
self.logger.trace("{} have no stashed checkpoints for {}")
return 0
return

# Get a snapshot of all the senders of stashed checkpoints for `key`
senders = list(self.stashedRecvdCheckpoints[self.viewNo][key].keys())
senders = list(self.stashedRecvdCheckpoints[view_no][key].keys())
total_processed = 0
consumed = 0

Expand All @@ -1947,11 +1953,11 @@ def processStashedCheckpoints(self, key):
# `stashedRecvdCheckpoints` because it might be removed from there
# in case own checkpoint was stabilized when we were processing
# stashed checkpoints from previous senders in this loop
if self.viewNo in self.stashedRecvdCheckpoints \
and key in self.stashedRecvdCheckpoints[self.viewNo] \
and sender in self.stashedRecvdCheckpoints[self.viewNo][key]:
if view_no in self.stashedRecvdCheckpoints \
and key in self.stashedRecvdCheckpoints[view_no] \
and sender in self.stashedRecvdCheckpoints[view_no][key]:
if self.processCheckpoint(
self.stashedRecvdCheckpoints[self.viewNo][key].pop(sender),
self.stashedRecvdCheckpoints[view_no][key].pop(sender),
sender):
consumed += 1
# Note that if `processCheckpoint` returned False then the
Expand All @@ -1961,12 +1967,12 @@ def processStashedCheckpoints(self, key):

# If we have consumed stashed checkpoints for `key` from all the
# senders then remove entries which have become empty
if self.viewNo in self.stashedRecvdCheckpoints \
and key in self.stashedRecvdCheckpoints[self.viewNo] \
and len(self.stashedRecvdCheckpoints[self.viewNo][key]) == 0:
del self.stashedRecvdCheckpoints[self.viewNo][key]
if len(self.stashedRecvdCheckpoints[self.viewNo]) == 0:
del self.stashedRecvdCheckpoints[self.viewNo]
if view_no in self.stashedRecvdCheckpoints \
and key in self.stashedRecvdCheckpoints[view_no] \
and len(self.stashedRecvdCheckpoints[view_no][key]) == 0:
del self.stashedRecvdCheckpoints[view_no][key]
if len(self.stashedRecvdCheckpoints[view_no]) == 0:
del self.stashedRecvdCheckpoints[view_no]

restashed = total_processed - consumed
self.logger.info('{} processed {} stashed checkpoints for {}, '
Expand Down
119 changes: 119 additions & 0 deletions plenum/test/checkpoints/test_checkpoints_removal_in_view_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from numbers import Rational

import math
import pytest
import sys

from plenum.common.constants import CHECKPOINT, COMMIT
from plenum.test.delayers import cDelay, chk_delay, lsDelay, vcd_delay
from plenum.test.helper import sdk_send_random_requests, \
sdk_get_and_check_replies, sdk_send_random_and_check
from plenum.test.node_catchup.helper import ensure_all_nodes_have_same_data
from plenum.test.test_node import ensureElectionsDone
from stp_core.loop.eventually import eventually

CHK_FREQ = 2


def test_checkpoints_removed_in_view_change(chkFreqPatched,
txnPoolNodeSet,
looper,
sdk_pool_handle,
sdk_wallet_client):
'''
Check that checkpoint finalize in view change before catchup doesn't clean
necessary data from requests and 3pc queues.
'''
slow_nodes = txnPoolNodeSet[:3]
fast_nodes = txnPoolNodeSet[3:]
# delay checkpoints processing for slow_nodes
delay_msg(slow_nodes, chk_delay)
# send txns for finalizing current checkpoint
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle,
sdk_wallet_client, CHK_FREQ)
ensure_all_nodes_have_same_data(looper, nodes=txnPoolNodeSet)
# delay commits processing for slow_nodes
delay_msg(slow_nodes, cDelay)

requests = sdk_send_random_requests(looper, sdk_pool_handle,
sdk_wallet_client, 1)
# check that slow nodes have prepared certificate with new txn
looper.run(eventually(last_prepared_certificate,
slow_nodes,
(0, CHK_FREQ + 1)))
# check that fast_nodes ordered new txn
looper.run(eventually(last_ordered_check,
fast_nodes,
(0, CHK_FREQ + 1)))
# check that fast_nodes finalized first checkpoint and slow_nodes are not
looper.run(eventually(check_checkpoint_finalize,
fast_nodes,
1, CHK_FREQ))
for n in slow_nodes:
assert not n.master_replica.checkpoints[(1, CHK_FREQ)].isStable

# View change start emulation for change viewNo and fix last prepare
# certificate, because if we start a real view change then checkpoints will
# clean and the first checkpoint would not be need in finalizing.
for node in txnPoolNodeSet:
node.viewNo += 1
node.master_replica.on_view_change_start()

# reset delay for checkpoints
reset_delay(slow_nodes, CHECKPOINT)
# reset view change emulation and start real view change for finish it in
# a normal mode with catchup
for node in txnPoolNodeSet:
node.viewNo -= 1
node.view_changer.on_master_degradation()
for n in slow_nodes:
assert not n.master_replica.checkpoints[(1, CHK_FREQ)].isStable
# Check ordering the last txn before catchup. Check client reply is enough
# because slow_nodes contains 3 nodes and without their replies sdk method
# for get reply will not successfully finish.
reset_delay(slow_nodes, COMMIT)
sdk_get_and_check_replies(looper, requests)
looper.run(eventually(last_ordered_check,
txnPoolNodeSet,
(0, CHK_FREQ + 1)))
# check view change finish and checkpoints were cleaned
ensureElectionsDone(looper, txnPoolNodeSet)
for n in slow_nodes:
assert (1, CHK_FREQ) not in n.master_replica.checkpoints
# check that all nodes have same data after new txns ordering
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle,
sdk_wallet_client, CHK_FREQ)
ensure_all_nodes_have_same_data(looper, nodes=txnPoolNodeSet)


def delay_msg(nodes, delay_function, types=None):
for n in nodes:
if types is None:
n.nodeIbStasher.delay(delay_function(sys.maxsize))
else:
n.nodeIbStasher.delay(delay_function(sys.maxsize, types))


def reset_delay(nodes, message):
for n in nodes:
n.nodeIbStasher.reset_delays_and_process_delayeds(message)


def last_prepared_certificate(nodes, num):
for n in nodes:
assert n.master_replica.last_prepared_certificate_in_view() == num


def check_checkpoint_finalize(nodes, start_pp_seq_no, end_pp_seq_no):
for n in nodes:
checkpoint = n.master_replica.checkpoints[(start_pp_seq_no, end_pp_seq_no)]
assert checkpoint.isStable


def last_ordered_check(nodes, last_ordered, instance_id=None):
for n in nodes:
last_ordered_3pc = n.master_last_ordered_3PC \
if instance_id is None \
else n.replicas[instance_id].last_ordered_3pc
assert last_ordered_3pc == last_ordered