Skip to content

Commit 220c11d

Browse files
authored
Merge pull request #854 from Toktar/bugfix-1545-checkponts-cleaning-preprepares
INDY-1545: stop processing checkpoints in view change
2 parents b664ab5 + eef5bc6 commit 220c11d

File tree

2 files changed

+144
-19
lines changed

2 files changed

+144
-19
lines changed

plenum/server/replica.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1751,7 +1751,7 @@ def order_3pc_key(self, key):
17511751
if self.isMaster:
17521752
self.metrics.add_event(MetricsName.MASTER_ORDERED_BATCH_SIZE, pp.discarded)
17531753

1754-
self.addToCheckpoint(pp.ppSeqNo, pp.digest, pp.ledgerId)
1754+
self.addToCheckpoint(pp.ppSeqNo, pp.digest, pp.ledgerId, pp.viewNo)
17551755

17561756
# BLS multi-sig:
17571757
self._bls_bft_replica.process_order(key, self.quorums, pp)
@@ -1782,6 +1782,12 @@ def processCheckpoint(self, msg: Checkpoint, sender: str) -> bool:
17821782
self.discard(msg, reason="Checkpoint already stable", logMethod=self.logger.debug)
17831783
return True
17841784

1785+
if self.isPrimary is None and msg.viewNo < self.viewNo:
1786+
self.discard(msg,
1787+
reason="Checkpoint from previous view",
1788+
logMethod=self.logger.debug)
1789+
return True
1790+
17851791
seqNoStart = msg.seqNoStart
17861792
key = (seqNoStart, seqNoEnd)
17871793

@@ -1830,7 +1836,7 @@ def __start_catchup_if_needed(self):
18301836
format(self, lag_in_checkpoints))
18311837
self.node.start_catchup()
18321838

1833-
def addToCheckpoint(self, ppSeqNo, digest, ledger_id):
1839+
def addToCheckpoint(self, ppSeqNo, digest, ledger_id, view_no):
18341840
for (s, e) in self.checkpoints.keys():
18351841
if s <= ppSeqNo <= e:
18361842
state = self.checkpoints[s, e] # type: CheckpointState
@@ -1858,11 +1864,11 @@ def addToCheckpoint(self, ppSeqNo, digest, ledger_id):
18581864
self.checkpoints[s, e] = state
18591865
self.logger.info("{} sending Checkpoint {} view {} checkpointState digest {}. Ledger {} "
18601866
"txn root hash {}. Committed state root hash {} Uncommitted state root hash {}".
1861-
format(self, (s, e), self.viewNo, state.digest, ledger_id,
1867+
format(self, (s, e), view_no, state.digest, ledger_id,
18621868
self.txnRootHash(ledger_id), self.stateRootHash(ledger_id, committed=True),
18631869
self.stateRootHash(ledger_id, committed=False)))
1864-
self.send(Checkpoint(self.instId, self.viewNo, s, e, state.digest))
1865-
self.processStashedCheckpoints((s, e))
1870+
self.send(Checkpoint(self.instId, view_no, s, e, state.digest))
1871+
self.processStashedCheckpoints((s, e), view_no)
18661872

18671873
def markCheckPointStable(self, seqNo):
18681874
previousCheckpoints = []
@@ -1929,16 +1935,16 @@ def stashed_checkpoints_with_quorum(self):
19291935
end_pp_seq_numbers.append(seq_no_end)
19301936
return sorted(end_pp_seq_numbers)
19311937

1932-
def processStashedCheckpoints(self, key):
1938+
def processStashedCheckpoints(self, key, view_no):
19331939
# Remove all checkpoints from previous views if any
19341940
self._remove_stashed_checkpoints(till_3pc_key=(self.viewNo, 0))
19351941

1936-
if key not in self.stashedRecvdCheckpoints.get(self.viewNo, {}):
1942+
if key not in self.stashedRecvdCheckpoints.get(view_no, {}):
19371943
self.logger.trace("{} have no stashed checkpoints for {}")
1938-
return 0
1944+
return
19391945

19401946
# Get a snapshot of all the senders of stashed checkpoints for `key`
1941-
senders = list(self.stashedRecvdCheckpoints[self.viewNo][key].keys())
1947+
senders = list(self.stashedRecvdCheckpoints[view_no][key].keys())
19421948
total_processed = 0
19431949
consumed = 0
19441950

@@ -1947,11 +1953,11 @@ def processStashedCheckpoints(self, key):
19471953
# `stashedRecvdCheckpoints` because it might be removed from there
19481954
# in case own checkpoint was stabilized when we were processing
19491955
# stashed checkpoints from previous senders in this loop
1950-
if self.viewNo in self.stashedRecvdCheckpoints \
1951-
and key in self.stashedRecvdCheckpoints[self.viewNo] \
1952-
and sender in self.stashedRecvdCheckpoints[self.viewNo][key]:
1956+
if view_no in self.stashedRecvdCheckpoints \
1957+
and key in self.stashedRecvdCheckpoints[view_no] \
1958+
and sender in self.stashedRecvdCheckpoints[view_no][key]:
19531959
if self.processCheckpoint(
1954-
self.stashedRecvdCheckpoints[self.viewNo][key].pop(sender),
1960+
self.stashedRecvdCheckpoints[view_no][key].pop(sender),
19551961
sender):
19561962
consumed += 1
19571963
# Note that if `processCheckpoint` returned False then the
@@ -1961,12 +1967,12 @@ def processStashedCheckpoints(self, key):
19611967

19621968
# If we have consumed stashed checkpoints for `key` from all the
19631969
# senders then remove entries which have become empty
1964-
if self.viewNo in self.stashedRecvdCheckpoints \
1965-
and key in self.stashedRecvdCheckpoints[self.viewNo] \
1966-
and len(self.stashedRecvdCheckpoints[self.viewNo][key]) == 0:
1967-
del self.stashedRecvdCheckpoints[self.viewNo][key]
1968-
if len(self.stashedRecvdCheckpoints[self.viewNo]) == 0:
1969-
del self.stashedRecvdCheckpoints[self.viewNo]
1970+
if view_no in self.stashedRecvdCheckpoints \
1971+
and key in self.stashedRecvdCheckpoints[view_no] \
1972+
and len(self.stashedRecvdCheckpoints[view_no][key]) == 0:
1973+
del self.stashedRecvdCheckpoints[view_no][key]
1974+
if len(self.stashedRecvdCheckpoints[view_no]) == 0:
1975+
del self.stashedRecvdCheckpoints[view_no]
19701976

19711977
restashed = total_processed - consumed
19721978
self.logger.info('{} processed {} stashed checkpoints for {}, '
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from numbers import Rational
2+
3+
import math
4+
import pytest
5+
import sys
6+
7+
from plenum.common.constants import CHECKPOINT, COMMIT
8+
from plenum.test.delayers import cDelay, chk_delay, lsDelay, vcd_delay
9+
from plenum.test.helper import sdk_send_random_requests, \
10+
sdk_get_and_check_replies, sdk_send_random_and_check
11+
from plenum.test.node_catchup.helper import ensure_all_nodes_have_same_data
12+
from plenum.test.test_node import ensureElectionsDone
13+
from stp_core.loop.eventually import eventually
14+
15+
CHK_FREQ = 2
16+
17+
18+
def test_checkpoints_removed_in_view_change(chkFreqPatched,
19+
txnPoolNodeSet,
20+
looper,
21+
sdk_pool_handle,
22+
sdk_wallet_client):
23+
'''
24+
Check that checkpoint finalize in view change before catchup doesn't clean
25+
necessary data from requests and 3pc queues.
26+
'''
27+
slow_nodes = txnPoolNodeSet[:3]
28+
fast_nodes = txnPoolNodeSet[3:]
29+
# delay checkpoints processing for slow_nodes
30+
delay_msg(slow_nodes, chk_delay)
31+
# send txns for finalizing current checkpoint
32+
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle,
33+
sdk_wallet_client, CHK_FREQ)
34+
ensure_all_nodes_have_same_data(looper, nodes=txnPoolNodeSet)
35+
# delay commits processing for slow_nodes
36+
delay_msg(slow_nodes, cDelay)
37+
38+
requests = sdk_send_random_requests(looper, sdk_pool_handle,
39+
sdk_wallet_client, 1)
40+
# check that slow nodes have prepared certificate with new txn
41+
looper.run(eventually(last_prepared_certificate,
42+
slow_nodes,
43+
(0, CHK_FREQ + 1)))
44+
# check that fast_nodes ordered new txn
45+
looper.run(eventually(last_ordered_check,
46+
fast_nodes,
47+
(0, CHK_FREQ + 1)))
48+
# check that fast_nodes finalized first checkpoint and slow_nodes are not
49+
looper.run(eventually(check_checkpoint_finalize,
50+
fast_nodes,
51+
1, CHK_FREQ))
52+
for n in slow_nodes:
53+
assert not n.master_replica.checkpoints[(1, CHK_FREQ)].isStable
54+
55+
# View change start emulation for change viewNo and fix last prepare
56+
# certificate, because if we start a real view change then checkpoints will
57+
# clean and the first checkpoint would not be need in finalizing.
58+
for node in txnPoolNodeSet:
59+
node.viewNo += 1
60+
node.master_replica.on_view_change_start()
61+
62+
# reset delay for checkpoints
63+
reset_delay(slow_nodes, CHECKPOINT)
64+
# reset view change emulation and start real view change for finish it in
65+
# a normal mode with catchup
66+
for node in txnPoolNodeSet:
67+
node.viewNo -= 1
68+
node.view_changer.on_master_degradation()
69+
for n in slow_nodes:
70+
assert not n.master_replica.checkpoints[(1, CHK_FREQ)].isStable
71+
# Check ordering the last txn before catchup. Check client reply is enough
72+
# because slow_nodes contains 3 nodes and without their replies sdk method
73+
# for get reply will not successfully finish.
74+
reset_delay(slow_nodes, COMMIT)
75+
sdk_get_and_check_replies(looper, requests)
76+
looper.run(eventually(last_ordered_check,
77+
txnPoolNodeSet,
78+
(0, CHK_FREQ + 1)))
79+
# check view change finish and checkpoints were cleaned
80+
ensureElectionsDone(looper, txnPoolNodeSet)
81+
for n in slow_nodes:
82+
assert (1, CHK_FREQ) not in n.master_replica.checkpoints
83+
# check that all nodes have same data after new txns ordering
84+
sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle,
85+
sdk_wallet_client, CHK_FREQ)
86+
ensure_all_nodes_have_same_data(looper, nodes=txnPoolNodeSet)
87+
88+
89+
def delay_msg(nodes, delay_function, types=None):
90+
for n in nodes:
91+
if types is None:
92+
n.nodeIbStasher.delay(delay_function(sys.maxsize))
93+
else:
94+
n.nodeIbStasher.delay(delay_function(sys.maxsize, types))
95+
96+
97+
def reset_delay(nodes, message):
98+
for n in nodes:
99+
n.nodeIbStasher.reset_delays_and_process_delayeds(message)
100+
101+
102+
def last_prepared_certificate(nodes, num):
103+
for n in nodes:
104+
assert n.master_replica.last_prepared_certificate_in_view() == num
105+
106+
107+
def check_checkpoint_finalize(nodes, start_pp_seq_no, end_pp_seq_no):
108+
for n in nodes:
109+
checkpoint = n.master_replica.checkpoints[(start_pp_seq_no, end_pp_seq_no)]
110+
assert checkpoint.isStable
111+
112+
113+
def last_ordered_check(nodes, last_ordered, instance_id=None):
114+
for n in nodes:
115+
last_ordered_3pc = n.master_last_ordered_3PC \
116+
if instance_id is None \
117+
else n.replicas[instance_id].last_ordered_3pc
118+
assert last_ordered_3pc == last_ordered
119+

0 commit comments

Comments
 (0)