Skip to content

INDY-2032: Fix phantom transactions in audit ledger #1151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions plenum/server/batch_handlers/audit_batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ def __init__(self, database_manager: DatabaseManager):
def post_batch_applied(self, three_pc_batch: ThreePcBatch, prev_handler_result=None):
txn = self._add_to_ledger(three_pc_batch)
self.tracker.apply_batch(None, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size)
logger.info("applied audit txn {}; uncommitted root hash is {}; uncommitted size is {}".
format(str(txn), self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
logger.debug("applied audit txn {}; uncommitted root hash is {}; uncommitted size is {}".
format(str(txn), self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))

def post_batch_rejected(self, ledger_id, prev_handler_result=None):
_, _, txn_count = self.tracker.reject_batch()
self.ledger.discardTxns(txn_count)
logger.info("rejected {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txn_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
logger.debug("rejected {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txn_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))

def commit_batch(self, three_pc_batch, prev_handler_result=None):
_, _, txns_count = self.tracker.commit_batch()
_, committedTxns = self.ledger.commitTxns(txns_count)
logger.info("committed {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txns_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
logger.debug("committed {} audit txns; uncommitted root hash is {}; uncommitted size is {}".
format(txns_count, self.ledger.uncommitted_root_hash, self.ledger.uncommitted_size))
return committedTxns

def on_catchup_finished(self):
Expand Down
105 changes: 14 additions & 91 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,6 @@ def __init__(self,
else:
self.quota_control = StaticQuotaControl(node_quota=node_quota, client_quota=client_quota)

# Ordered requests received from replicas while the node was not
# participating
self.stashedOrderedReqs = deque()

# Set of (identifier, reqId) of all transactions that were received
# while catching up. Used to detect overlap between stashed requests
# and received replies while catching up.
# self.reqsFromCatchupReplies = set()

# Any messages that are intended for view numbers higher than the
# current view.
self.msgsForFutureViews = {}
Expand Down Expand Up @@ -720,6 +711,11 @@ def view_change_in_progress(self):
return False if self.view_changer is None \
else self.view_changer.view_change_in_progress

@property
def pre_view_change_in_progress(self):
return False if self.view_changer is None \
else self.view_changer.pre_view_change_in_progress
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer something more readable and more clear intentions like

if self.view_changer is None:
    return False
return self.view_changer.pre_view_change_in_progress

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def _add_config_ledger(self):
self.ledgerManager.addLedger(
CONFIG_LEDGER_ID,
Expand Down Expand Up @@ -2153,17 +2149,6 @@ def postAuditLedgerCaughtUp(self, **kwargs):
self.audit_handler.on_catchup_finished()

def preLedgerCatchUp(self, ledger_id):
# Process any Ordered requests. This causes less transactions to be
# requested during catchup. Also commits any uncommitted state that
# can be committed
logger.info('{} going to process any ordered requests before starting catchup.'.format(self))
self.force_process_ordered()
self.processStashedOrderedReqs()

# revert uncommitted txns and state for unordered requests
r = self.master_replica.revert_unordered_batches()
logger.info('{} reverted {} batches before starting catch up for ledger {}'.format(self, r, ledger_id))

if len(self.auditLedger.uncommittedTxns) > 0:
raise LogicError(
'{} audit ledger has uncommitted txns before catching up ledger {}'.format(self, ledger_id))
Expand Down Expand Up @@ -2242,10 +2227,6 @@ def allLedgersCaughtUp(self):
if self.view_change_in_progress:
self._process_replica_messages()

# TODO: Maybe a slight optimisation is to check result of
# `self.num_txns_caught_up_in_last_catchup()`
self.processStashedOrderedReqs()

# More than one catchup may be needed during the current ViewChange protocol
# TODO: separate view change and catchup logic
if self.is_catchup_needed():
Expand Down Expand Up @@ -2416,26 +2397,6 @@ def applyReq(self, request: Request, cons_time: int):
cons_time=cons_time, ledger_id=ledger_id,
seq_no=seq_no, txn=txn)

def apply_stashed_reqs(self, ordered):
request_ids = ordered.valid_reqIdr
requests = []
for req_key in request_ids:
if req_key in self.requests:
req = self.requests[req_key].finalised
else:
logger.warning("Could not apply stashed requests due to non-existent requests")
return
_, seq_no = self.seqNoDB.get(req.digest)
if seq_no is None:
requests.append(req)
three_pc_batch = ThreePcBatch.from_ordered(ordered)
self.apply_reqs(requests, three_pc_batch)

def apply_reqs(self, requests, three_pc_batch: ThreePcBatch):
for req in requests:
self.applyReq(req, three_pc_batch.pp_time)
self.onBatchCreated(three_pc_batch)

def handle_request_if_forced(self, request: Request):
if request.isForced():
req_handler = self.get_req_handler(
Expand Down Expand Up @@ -2680,13 +2641,13 @@ def processOrdered(self, ordered: Ordered):
logger.trace("{} got ordered requests from master replica"
.format(self))

logger.info("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}"
.format(self.name,
ordered.viewNo,
ordered.ppSeqNo,
len(ordered.valid_reqIdr),
ordered.stateRootHash,
ordered.txnRootHash))
logger.debug("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}"
.format(self.name,
ordered.viewNo,
ordered.ppSeqNo,
len(ordered.valid_reqIdr),
ordered.stateRootHash,
ordered.txnRootHash))

self.executeBatch(ThreePcBatch.from_ordered(ordered),
ordered.valid_reqIdr,
Expand Down Expand Up @@ -2725,11 +2686,10 @@ def force_process_ordered(self):
.format(self, num_processed, instance_id))

def try_processing_ordered(self, msg):
if self.isParticipating:
if self.master_replica.validator.can_order():
self.processOrdered(msg)
else:
logger.info("{} stashing {} since mode is {}".format(self, msg, self.mode))
self.stashedOrderedReqs.append(msg)
logger.info("{} can not process Ordered message {} since mode is {}".format(self, msg, self.mode))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be at least warning, since here we're discarding ordering result, and if I understand correctly we should never reach this point in production code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


def processEscalatedException(self, ex):
"""
Expand Down Expand Up @@ -2833,7 +2793,6 @@ def sum_for_values(obj):
self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_REPLICAS, len(self.msgsForFutureReplicas))
self.metrics.add_event(MetricsName.MSGS_TO_VIEW_CHANGER, len(self.msgsToViewChanger))
self.metrics.add_event(MetricsName.REQUEST_SENDER, len(self.requestSender))
self.metrics.add_event(MetricsName.STASHED_ORDERED_REQS, len(self.stashedOrderedReqs))

self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_VIEWS, len(self.msgsForFutureViews))

Expand Down Expand Up @@ -3277,11 +3236,6 @@ def executeBatch(self, three_pc_batch: ThreePcBatch,
format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no,
three_pc_batch.ledger_id, three_pc_batch.state_root,
three_pc_batch.txn_root, [key for key in valid_reqs_keys]))
logger.info("{} committed batch request, view no {}, ppSeqNo {}, "
"ledger {}, state root {}, txn root {}, requests: {}".
format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no,
three_pc_batch.ledger_id, three_pc_batch.state_root,
three_pc_batch.txn_root, len(valid_reqs_keys)))

for txn in committedTxns:
self.execute_hook(NodeHooks.POST_REQUEST_COMMIT, txn=txn,
Expand Down Expand Up @@ -3446,37 +3400,6 @@ def defaultAuthNr(self) -> ReqAuthenticator:
req_authnr.register_authenticator(self.init_core_authenticator())
return req_authnr

def processStashedOrderedReqs(self):
i = 0
while self.stashedOrderedReqs:
msg = self.stashedOrderedReqs.popleft()
if msg.instId == 0:
if compare_3PC_keys(
(msg.viewNo,
msg.ppSeqNo),
self.ledgerManager.last_caught_up_3PC) >= 0:
logger.info(
'{} ignoring stashed ordered msg {} since ledger '
'manager has last_caught_up_3PC as {}'.format(
self, msg, self.ledgerManager.last_caught_up_3PC))
continue
logger.info(
'{} applying stashed Ordered msg {}'.format(self, msg))
# Since the PRE-PREPAREs ans PREPAREs corresponding to these
# stashed ordered requests was not processed.
self.apply_stashed_reqs(msg)

self.processOrdered(msg)
i += 1

logger.info(
"{} processed {} stashed ordered requests".format(
self, i))
# Resetting monitor after executing all stashed requests so no view
# change can be proposed
self.monitor.reset()
return i

def ensureKeysAreSetup(self):
"""
Check whether the keys are setup in the local STP keep.
Expand Down
7 changes: 6 additions & 1 deletion plenum/server/observer/observer_sync_policy_each_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,18 @@ def _do_apply_batch(self, batch):

ledger_id = batch[f.LEDGER_ID.nm]
three_pc_batch = ThreePcBatch.from_batch_committed_dict(batch)
self._node.apply_reqs(reqs, three_pc_batch)
self._apply_reqs(reqs, three_pc_batch)

# We need hashes in apply and str in commit
three_pc_batch.txn_root = Ledger.hashToStr(three_pc_batch.txn_root)
three_pc_batch.state_root = Ledger.hashToStr(three_pc_batch.state_root)
self._node.get_executer(ledger_id)(three_pc_batch)

def _apply_reqs(self, requests, three_pc_batch: ThreePcBatch):
for req in requests:
self._node.applyReq(req, three_pc_batch.pp_time)
self._node.onBatchCreated(three_pc_batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use one from node? It looks like this should be really an atomic operation available through node interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one from node was deleted, since it's not used anywhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. The method is in Node again.


def _process_stashed_messages(self):
while True:
if not self._batches:
Expand Down
28 changes: 6 additions & 22 deletions plenum/server/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,6 @@ def on_view_change_start(self):
lst = self.last_prepared_certificate_in_view()
self.last_prepared_before_view_change = lst
self.logger.info('{} setting last prepared for master to {}'.format(self, lst))
# It can be that last_ordered_3pc was set for the previous view, since it's set during catch-up
# Example: a Node has last_ordered = (1, 300), and then the whole pool except this node restarted
# The new viewNo is 0, but last_ordered is (1, 300), so all new requests will be discarded by this Node
# if we don't reset last_ordered_3pc
if self.viewNo <= self.last_ordered_3pc[0]:
self.last_ordered_3pc = (self.viewNo, 0)

def on_view_change_done(self):
if self.isMaster:
Expand Down Expand Up @@ -807,6 +801,9 @@ def trackBatches(self, pp: PrePrepare, prevStateRootHash):
pp.ppTime, prevStateRootHash, len(pp.reqIdr)]

def send_3pc_batch(self):
if not self.validator.can_send_3pc_batch():
return 0

sent_batches = set()

# 1. send 3PC batches with requests for every ledger
Expand Down Expand Up @@ -1028,10 +1025,9 @@ def serviceQueues(self, limit=None):
:return: the number of messages successfully processed
"""
# TODO should handle SuspiciousNode here
r = self.dequeue_pre_prepares() if self.node.isParticipating else 0
r = self.dequeue_pre_prepares()
r += self.inBoxRouter.handleAllSync(self.inBox, limit)
r += self.send_3pc_batch() if (self.isPrimary and
self.node.isParticipating) else 0
r += self.send_3pc_batch()
r += self._serviceActions()
return r
# Messages that can be processed right now needs to be added back to the
Expand Down Expand Up @@ -1506,13 +1502,6 @@ def _can_process_pre_prepare(self, pre_prepare: PrePrepare, sender: str) -> Opti
if (pre_prepare.viewNo, pre_prepare.ppSeqNo) in self.prePrepares:
return PP_CHECK_DUPLICATE

if not self.node.isParticipating:
# Let the node stash the pre-prepare
# TODO: The next processed pre-prepare needs to take consider if
# the last pre-prepare was stashed or not since stashed requests
# do not make change to state or ledger
return None

if compare_3PC_keys((pre_prepare.viewNo, pre_prepare.ppSeqNo),
self.__last_pp_3pc) > 0:
return PP_CHECK_OLD # ignore old pre-prepare
Expand Down Expand Up @@ -1564,8 +1553,6 @@ def canPrepare(self, ppReq) -> (bool, str):

:param ppReq: any object with identifier and requestId attributes
"""
if not self.node.isParticipating:
return False, 'node is not participating'
if self.has_sent_prepare(ppReq):
return False, 'has already sent PREPARE for {}'.format(ppReq)
return True, ''
Expand Down Expand Up @@ -1713,8 +1700,6 @@ def canCommit(self, prepare: Prepare) -> (bool, str):

:param prepare: the PREPARE
"""
if not self.node.isParticipating:
return False, 'node is not participating'
quorum = self.quorums.prepare.value
if not self.prepares.hasQuorum(prepare, quorum):
return False, 'does not have prepare quorum for {}'.format(prepare)
Expand Down Expand Up @@ -1836,8 +1821,7 @@ def process_stashed_out_of_order_commits(self):
# were stashed due to lack of commits before them and orders them if it
# can

is_between_catchups_during_view_change = self.node.is_synced and self.node.view_change_in_progress
if not self.node.isParticipating and not is_between_catchups_during_view_change:
if not self.validator.can_order():
return

self.logger.debug('{} trying to order from out of order commits. '
Expand Down
24 changes: 23 additions & 1 deletion plenum/server/replica_validator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from plenum.common.messages.node_messages import Commit
from plenum.common.types import f
from plenum.common.util import compare_3PC_keys
from plenum.server.replica_validator_enums import DISCARD, INCORRECT_INSTANCE, PROCESS, ALREADY_ORDERED, FUTURE_VIEW, \
Expand Down Expand Up @@ -43,6 +44,8 @@ def validate_3pc_msg(self, msg):
if view_no < self.replica.viewNo - 1:
return DISCARD, OLD_VIEW
if view_no == self.replica.viewNo - 1:
if not isinstance(msg, Commit):
return DISCARD, OLD_VIEW
if not node.view_change_in_progress:
return DISCARD, OLD_VIEW
if self.replica.last_prepared_before_view_change is None:
Expand All @@ -52,7 +55,7 @@ def validate_3pc_msg(self, msg):
if view_no == self.replica.viewNo and node.view_change_in_progress:
return STASH_VIEW, FUTURE_VIEW

# If Catchup in View Change finished then process a message
# If Catchup in View Change finished then process Commit messages
if node.is_synced and node.view_change_in_progress:
return PROCESS, None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicit check that this is a really commit message would be good here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explicit check is already done above.


Expand Down Expand Up @@ -94,3 +97,22 @@ def validate_checkpoint_msg(self, msg):
return STASH_CATCH_UP, CATCHING_UP

return PROCESS, None

def can_send_3pc_batch(self):
if not self.replica.isPrimary:
return False
if not self.replica.node.isParticipating:
return False
if self.replica.node.pre_view_change_in_progress:
return False
if self.replica.viewNo < self.replica.last_ordered_3pc[0]:
return False
return True

def can_order(self):
node = self.replica.node
if node.isParticipating:
return True
if node.is_synced and node.view_change_in_progress:
return True
return False
11 changes: 11 additions & 0 deletions plenum/server/view_change/view_changer.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__(self, provider: ViewChangerDataProvider, timer: TimerService):
self._next_view_indications = {}

self._view_change_in_progress = False
self._pre_view_change_in_progress = False

self.previous_view_no = None
self.previous_master_primary = None
Expand Down Expand Up @@ -225,6 +226,14 @@ def view_change_in_progress(self) -> bool:
def view_change_in_progress(self, value: bool):
self._view_change_in_progress = value

@property
def pre_view_change_in_progress(self) -> bool:
return self._pre_view_change_in_progress

@pre_view_change_in_progress.setter
def pre_view_change_in_progress(self, value: bool):
self._pre_view_change_in_progress = value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to make this a property if we are just using trivial getter and setter without any additional logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a quick fix, I did it similar to view_change_in_progress, but yes, agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@property
def quorum(self) -> int:
return self.quorums.view_change_done.value
Expand Down Expand Up @@ -570,13 +579,15 @@ def start_view_change(self, proposed_view_no: int, continue_vc=False):
# implementations - we need to make this logic pluggable

if self.pre_vc_strategy and (not continue_vc):
self.pre_view_change_in_progress = True
self.pre_vc_strategy.prepare_view_change(proposed_view_no)
return
elif self.pre_vc_strategy:
self.pre_vc_strategy.on_strategy_complete()

self.previous_view_no = self.view_no
self.view_no = proposed_view_no
self.pre_view_change_in_progress = False
self.view_change_in_progress = True
self.previous_master_primary = self.provider.current_primary_name()
self.set_defaults()
Expand Down
Loading