Skip to content

INDY-2032: Fix phantom transactions in audit ledger #1151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions plenum/server/batch_handlers/audit_batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ def __init__(self, database_manager: DatabaseManager):
def post_batch_applied(self, three_pc_batch: ThreePcBatch, prev_handler_result=None):
txn = self._add_to_ledger(three_pc_batch)
self.tracker.apply_batch(None, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)
logger.info("applied audit txn {}; uncommitted root hash is {}; uncommitted size is {}".
format(str(txn), self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
logger.debug("applied audit txn {}; uncommitted root hash is {}; uncommitted size is {}".
format(str(txn), self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))

def post_batch_rejected(self, ledger_id, prev_handler_result=None):
_, _, txn_count = self.tracker.reject_batch()
self.ledger.discardTxns(txn_count)
logger.info("rejected {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txn_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
logger.debug("rejected {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txn_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))

def commit_batch(self, three_pc_batch, prev_handler_result=None):
_, _, txns_count = self.tracker.commit_batch()
_, committedTxns = self.ledger.commitTxns(txns_count)
logger.info("committed {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txns_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
logger.debug("committed {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txns_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
return committedTxns

def on_catchup_finished(self):
Expand Down
12 changes: 8 additions & 4 deletions plenum/server/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ class DatabaseManager():
def __init__(self):
self.databases = {} # type: Dict[int, Database]
self.stores = {}
self._init_db_list()

def _init_db_list(self):
self._ledgers = {lid: db.ledger for lid, db in self.databases.items()}
self._states = {lid: db.state for lid, db in self.databases.items()}

def register_new_database(self, lid, ledger: Ledger, state: Optional[State] = None):
if lid in self.databases:
raise LogicError('Trying to add already existing database')
self.databases[lid] = Database(ledger, state)
self._init_db_list()

def get_database(self, lid):
if lid not in self.databases:
Expand Down Expand Up @@ -43,13 +49,11 @@ def get_store(self, label):

@property
def states(self):
# TODO: change this. Too inefficient to build dict every time
return {lid: db.state for lid, db in self.databases.items()}
return self._states

@property
def ledgers(self):
# TODO: change this. Too inefficient to build dict every time
return {lid: db.ledger for lid, db in self.databases.items()}
return self._ledgers

@property
def bls_store(self):
Expand Down
112 changes: 36 additions & 76 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,6 @@ def __init__(self,
else:
self.quota_control = StaticQuotaControl(node_quota=node_quota, client_quota=client_quota)

# Ordered requests received from replicas while the node was not
# participating
self.stashedOrderedReqs = deque()

# Set of (identifier, reqId) of all transactions that were received
# while catching up. Used to detect overlap between stashed requests
# and received replies while catching up.
# self.reqsFromCatchupReplies = set()

# Any messages that are intended for view numbers higher than the
# current view.
self.msgsForFutureViews = {}
Expand Down Expand Up @@ -717,8 +708,15 @@ def viewNo(self, value):

@property
def view_change_in_progress(self):
return False if self.view_changer is None \
else self.view_changer.view_change_in_progress
if self.view_changer is None:
return False
return self.view_changer.view_change_in_progress

@property
def pre_view_change_in_progress(self):
if self.view_changer is None:
return False
return self.view_changer.pre_view_change_in_progress

def _add_config_ledger(self):
self.ledgerManager.addLedger(
Expand Down Expand Up @@ -2153,17 +2151,6 @@ def postAuditLedgerCaughtUp(self, **kwargs):
self.audit_handler.on_catchup_finished()

def preLedgerCatchUp(self, ledger_id):
# Process any Ordered requests. This causes less transactions to be
# requested during catchup. Also commits any uncommitted state that
# can be committed
logger.info('{} going to process any ordered requests before starting catchup.'.format(self))
self.force_process_ordered()
self.processStashedOrderedReqs()

# revert uncommitted txns and state for unordered requests
r = self.master_replica.revert_unordered_batches()
logger.info('{} reverted {} batches before starting catch up for ledger {}'.format(self, r, ledger_id))

if len(self.auditLedger.uncommittedTxns) > 0:
raise LogicError(
'{} audit ledger has uncommitted txns before catching up ledger {}'.format(self, ledger_id))
Expand Down Expand Up @@ -2242,10 +2229,6 @@ def allLedgersCaughtUp(self):
if self.view_change_in_progress:
self._process_replica_messages()

# TODO: Maybe a slight optimisation is to check result of
# `self.num_txns_caught_up_in_last_catchup()`
self.processStashedOrderedReqs()

# More than one catchup may be needed during the current ViewChange protocol
# TODO: separate view change and catchup logic
if self.is_catchup_needed():
Expand Down Expand Up @@ -2416,8 +2399,8 @@ def applyReq(self, request: Request, cons_time: int):
cons_time=cons_time, ledger_id=ledger_id,
seq_no=seq_no, txn=txn)

def apply_stashed_reqs(self, ordered):
request_ids = ordered.valid_reqIdr
def apply_stashed_reqs(self, three_pc_batch):
request_ids = three_pc_batch.valid_digests
requests = []
for req_key in request_ids:
if req_key in self.requests:
Expand All @@ -2428,7 +2411,6 @@ def apply_stashed_reqs(self, ordered):
_, seq_no = self.seqNoDB.get(req.digest)
if seq_no is None:
requests.append(req)
three_pc_batch = ThreePcBatch.from_ordered(ordered)
self.apply_reqs(requests, three_pc_batch)

def apply_reqs(self, requests, three_pc_batch: ThreePcBatch):
Expand Down Expand Up @@ -2680,13 +2662,13 @@ def processOrdered(self, ordered: Ordered):
logger.trace("{} got ordered requests from master replica"
.format(self))

logger.info("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}"
.format(self.name,
ordered.viewNo,
ordered.ppSeqNo,
len(ordered.valid_reqIdr),
ordered.stateRootHash,
ordered.txnRootHash))
logger.debug("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}"
.format(self.name,
ordered.viewNo,
ordered.ppSeqNo,
len(ordered.valid_reqIdr),
ordered.stateRootHash,
ordered.txnRootHash))

self.executeBatch(ThreePcBatch.from_ordered(ordered),
ordered.valid_reqIdr,
Expand Down Expand Up @@ -2725,11 +2707,10 @@ def force_process_ordered(self):
.format(self, num_processed, instance_id))

def try_processing_ordered(self, msg):
if self.isParticipating:
if self.master_replica.validator.can_order():
self.processOrdered(msg)
else:
logger.info("{} stashing {} since mode is {}".format(self, msg, self.mode))
self.stashedOrderedReqs.append(msg)
logger.warning("{} can not process Ordered message {} since mode is {}".format(self, msg, self.mode))

def processEscalatedException(self, ex):
"""
Expand Down Expand Up @@ -2833,7 +2814,6 @@ def sum_for_values(obj):
self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_REPLICAS, len(self.msgsForFutureReplicas))
self.metrics.add_event(MetricsName.MSGS_TO_VIEW_CHANGER, len(self.msgsToViewChanger))
self.metrics.add_event(MetricsName.REQUEST_SENDER, len(self.requestSender))
self.metrics.add_event(MetricsName.STASHED_ORDERED_REQS, len(self.stashedOrderedReqs))

self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_VIEWS, len(self.msgsForFutureViews))

Expand Down Expand Up @@ -3277,11 +3257,6 @@ def executeBatch(self, three_pc_batch: ThreePcBatch,
format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no,
three_pc_batch.ledger_id, three_pc_batch.state_root,
three_pc_batch.txn_root, [key for key in valid_reqs_keys]))
logger.info("{} committed batch request, view no {}, ppSeqNo {}, "
"ledger {}, state root {}, txn root {}, requests: {}".
format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no,
three_pc_batch.ledger_id, three_pc_batch.state_root,
three_pc_batch.txn_root, len(valid_reqs_keys)))

for txn in committedTxns:
self.execute_hook(NodeHooks.POST_REQUEST_COMMIT, txn=txn,
Expand Down Expand Up @@ -3323,6 +3298,22 @@ def updateSeqNoMap(self, committedTxns, ledger_id):

def commitAndSendReplies(self, three_pc_batch: ThreePcBatch) -> List:
logger.trace('{} going to commit and send replies to client'.format(self))

if self.db_manager.ledgers[AUDIT_LEDGER_ID].uncommittedRootHash is None:
# if we order request during view change
# in between catchup rounds, then the 3PC batch will not be applied,
# since it was reverted before catchup started, and only COMMITs were
# processed in between catchup that led to this ORDERED msg
logger.info("{} applying stashed requests for batch {} {} of {} requests; state root {}; txn root {}"
.format(self.name,
three_pc_batch.view_no,
three_pc_batch.pp_seq_no,
len(three_pc_batch.valid_digests),
three_pc_batch.state_root,
three_pc_batch.txn_root))

self.apply_stashed_reqs(three_pc_batch)

reqHandler = self.get_req_handler(three_pc_batch.ledger_id)
committedTxns = reqHandler.commit(len(three_pc_batch.valid_digests),
three_pc_batch.state_root, three_pc_batch.txn_root,
Expand Down Expand Up @@ -3446,37 +3437,6 @@ def defaultAuthNr(self) -> ReqAuthenticator:
req_authnr.register_authenticator(self.init_core_authenticator())
return req_authnr

def processStashedOrderedReqs(self):
i = 0
while self.stashedOrderedReqs:
msg = self.stashedOrderedReqs.popleft()
if msg.instId == 0:
if compare_3PC_keys(
(msg.viewNo,
msg.ppSeqNo),
self.ledgerManager.last_caught_up_3PC) >= 0:
logger.info(
'{} ignoring stashed ordered msg {} since ledger '
'manager has last_caught_up_3PC as {}'.format(
self, msg, self.ledgerManager.last_caught_up_3PC))
continue
logger.info(
'{} applying stashed Ordered msg {}'.format(self, msg))
# Since the PRE-PREPAREs ans PREPAREs corresponding to these
# stashed ordered requests was not processed.
self.apply_stashed_reqs(msg)

self.processOrdered(msg)
i += 1

logger.info(
"{} processed {} stashed ordered requests".format(
self, i))
# Resetting monitor after executing all stashed requests so no view
# change can be proposed
self.monitor.reset()
return i

def ensureKeysAreSetup(self):
"""
Check whether the keys are setup in the local STP keep.
Expand Down
28 changes: 6 additions & 22 deletions plenum/server/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,6 @@ def on_view_change_start(self):
lst = self.last_prepared_certificate_in_view()
self.last_prepared_before_view_change = lst
self.logger.info('{} setting last prepared for master to {}'.format(self, lst))
# It can be that last_ordered_3pc was set for the previous view, since it's set during catch-up
# Example: a Node has last_ordered = (1, 300), and then the whole pool except this node restarted
# The new viewNo is 0, but last_ordered is (1, 300), so all new requests will be discarded by this Node
# if we don't reset last_ordered_3pc
if self.viewNo <= self.last_ordered_3pc[0]:
self.last_ordered_3pc = (self.viewNo, 0)

def on_view_change_done(self):
if self.isMaster:
Expand Down Expand Up @@ -807,6 +801,9 @@ def trackBatches(self, pp: PrePrepare, prevStateRootHash):
pp.ppTime, prevStateRootHash, len(pp.reqIdr)]

def send_3pc_batch(self):
if not self.validator.can_send_3pc_batch():
return 0

sent_batches = set()

# 1. send 3PC batches with requests for every ledger
Expand Down Expand Up @@ -1028,10 +1025,9 @@ def serviceQueues(self, limit=None):
:return: the number of messages successfully processed
"""
# TODO should handle SuspiciousNode here
r = self.dequeue_pre_prepares() if self.node.isParticipating else 0
r = self.dequeue_pre_prepares()
r += self.inBoxRouter.handleAllSync(self.inBox, limit)
r += self.send_3pc_batch() if (self.isPrimary and
self.node.isParticipating) else 0
r += self.send_3pc_batch()
r += self._serviceActions()
return r
# Messages that can be processed right now needs to be added back to the
Expand Down Expand Up @@ -1506,13 +1502,6 @@ def _can_process_pre_prepare(self, pre_prepare: PrePrepare, sender: str) -> Opti
if (pre_prepare.viewNo, pre_prepare.ppSeqNo) in self.prePrepares:
return PP_CHECK_DUPLICATE

if not self.node.isParticipating:
# Let the node stash the pre-prepare
# TODO: The next processed pre-prepare needs to take consider if
# the last pre-prepare was stashed or not since stashed requests
# do not make change to state or ledger
return None

if compare_3PC_keys((pre_prepare.viewNo, pre_prepare.ppSeqNo),
self.__last_pp_3pc) > 0:
return PP_CHECK_OLD # ignore old pre-prepare
Expand Down Expand Up @@ -1564,8 +1553,6 @@ def canPrepare(self, ppReq) -> (bool, str):

:param ppReq: any object with identifier and requestId attributes
"""
if not self.node.isParticipating:
return False, 'node is not participating'
if self.has_sent_prepare(ppReq):
return False, 'has already sent PREPARE for {}'.format(ppReq)
return True, ''
Expand Down Expand Up @@ -1713,8 +1700,6 @@ def canCommit(self, prepare: Prepare) -> (bool, str):

:param prepare: the PREPARE
"""
if not self.node.isParticipating:
return False, 'node is not participating'
quorum = self.quorums.prepare.value
if not self.prepares.hasQuorum(prepare, quorum):
return False, 'does not have prepare quorum for {}'.format(prepare)
Expand Down Expand Up @@ -1836,8 +1821,7 @@ def process_stashed_out_of_order_commits(self):
# were stashed due to lack of commits before them and orders them if it
# can

is_between_catchups_during_view_change = self.node.is_synced and self.node.view_change_in_progress
if not self.node.isParticipating and not is_between_catchups_during_view_change:
if not self.validator.can_order():
return

self.logger.debug('{} trying to order from out of order commits. '
Expand Down
27 changes: 26 additions & 1 deletion plenum/server/replica_validator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from plenum.common.messages.node_messages import Commit
from plenum.common.types import f
from plenum.common.util import compare_3PC_keys
from plenum.server.replica_validator_enums import DISCARD, INCORRECT_INSTANCE, PROCESS, ALREADY_ORDERED, FUTURE_VIEW, \
Expand Down Expand Up @@ -43,6 +44,8 @@ def validate_3pc_msg(self, msg):
if view_no < self.replica.viewNo - 1:
return DISCARD, OLD_VIEW
if view_no == self.replica.viewNo - 1:
if not isinstance(msg, Commit):
return DISCARD, OLD_VIEW
if not node.view_change_in_progress:
return DISCARD, OLD_VIEW
if self.replica.last_prepared_before_view_change is None:
Expand All @@ -52,7 +55,7 @@ def validate_3pc_msg(self, msg):
if view_no == self.replica.viewNo and node.view_change_in_progress:
return STASH_VIEW, FUTURE_VIEW

# If Catchup in View Change finished then process a message
# If Catchup in View Change finished then process Commit messages
if node.is_synced and node.view_change_in_progress:
return PROCESS, None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicit check that this is a really commit message would be good here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explicit check is already done above.


Expand Down Expand Up @@ -94,3 +97,25 @@ def validate_checkpoint_msg(self, msg):
return STASH_CATCH_UP, CATCHING_UP

return PROCESS, None

def can_send_3pc_batch(self):
if not self.replica.isPrimary:
return False
if not self.replica.node.isParticipating:
return False
if self.replica.node.pre_view_change_in_progress:
return False
if self.replica.viewNo < self.replica.last_ordered_3pc[0]:
return False
if self.replica.viewNo == self.replica.last_ordered_3pc[0] and \
self.replica.lastPrePrepareSeqNo < self.replica.last_ordered_3pc[1]:
return False
return True

def can_order(self):
node = self.replica.node
if node.isParticipating:
return True
if node.is_synced and node.view_change_in_progress:
return True
return False
Loading