-
Notifications
You must be signed in to change notification settings - Fork 377
INDY-2032: Fix phantom transactions in audit ledger #1151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
e118a3c
a16851f
6d94993
e3455a4
c14ee26
012aa93
e8e35a3
d8b78b1
f9b0661
e4e9781
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -355,15 +355,6 @@ def __init__(self, | |
else: | ||
self.quota_control = StaticQuotaControl(node_quota=node_quota, client_quota=client_quota) | ||
|
||
# Ordered requests received from replicas while the node was not | ||
# participating | ||
self.stashedOrderedReqs = deque() | ||
|
||
# Set of (identifier, reqId) of all transactions that were received | ||
# while catching up. Used to detect overlap between stashed requests | ||
# and received replies while catching up. | ||
# self.reqsFromCatchupReplies = set() | ||
|
||
# Any messages that are intended for view numbers higher than the | ||
# current view. | ||
self.msgsForFutureViews = {} | ||
|
@@ -720,6 +711,11 @@ def view_change_in_progress(self): | |
return False if self.view_changer is None \ | ||
else self.view_changer.view_change_in_progress | ||
|
||
@property | ||
def pre_view_change_in_progress(self): | ||
return False if self.view_changer is None \ | ||
else self.view_changer.pre_view_change_in_progress | ||
|
||
def _add_config_ledger(self): | ||
self.ledgerManager.addLedger( | ||
CONFIG_LEDGER_ID, | ||
|
@@ -2153,17 +2149,6 @@ def postAuditLedgerCaughtUp(self, **kwargs): | |
self.audit_handler.on_catchup_finished() | ||
|
||
def preLedgerCatchUp(self, ledger_id): | ||
# Process any Ordered requests. This causes less transactions to be | ||
# requested during catchup. Also commits any uncommitted state that | ||
# can be committed | ||
logger.info('{} going to process any ordered requests before starting catchup.'.format(self)) | ||
self.force_process_ordered() | ||
self.processStashedOrderedReqs() | ||
|
||
# revert uncommitted txns and state for unordered requests | ||
r = self.master_replica.revert_unordered_batches() | ||
logger.info('{} reverted {} batches before starting catch up for ledger {}'.format(self, r, ledger_id)) | ||
|
||
if len(self.auditLedger.uncommittedTxns) > 0: | ||
raise LogicError( | ||
'{} audit ledger has uncommitted txns before catching up ledger {}'.format(self, ledger_id)) | ||
|
@@ -2242,10 +2227,6 @@ def allLedgersCaughtUp(self): | |
if self.view_change_in_progress: | ||
self._process_replica_messages() | ||
|
||
# TODO: Maybe a slight optimisation is to check result of | ||
# `self.num_txns_caught_up_in_last_catchup()` | ||
self.processStashedOrderedReqs() | ||
|
||
# More than one catchup may be needed during the current ViewChange protocol | ||
# TODO: separate view change and catchup logic | ||
if self.is_catchup_needed(): | ||
|
@@ -2416,26 +2397,6 @@ def applyReq(self, request: Request, cons_time: int): | |
cons_time=cons_time, ledger_id=ledger_id, | ||
seq_no=seq_no, txn=txn) | ||
|
||
def apply_stashed_reqs(self, ordered): | ||
request_ids = ordered.valid_reqIdr | ||
requests = [] | ||
for req_key in request_ids: | ||
if req_key in self.requests: | ||
req = self.requests[req_key].finalised | ||
else: | ||
logger.warning("Could not apply stashed requests due to non-existent requests") | ||
return | ||
_, seq_no = self.seqNoDB.get(req.digest) | ||
if seq_no is None: | ||
requests.append(req) | ||
three_pc_batch = ThreePcBatch.from_ordered(ordered) | ||
self.apply_reqs(requests, three_pc_batch) | ||
|
||
def apply_reqs(self, requests, three_pc_batch: ThreePcBatch): | ||
for req in requests: | ||
self.applyReq(req, three_pc_batch.pp_time) | ||
self.onBatchCreated(three_pc_batch) | ||
|
||
def handle_request_if_forced(self, request: Request): | ||
if request.isForced(): | ||
req_handler = self.get_req_handler( | ||
|
@@ -2680,13 +2641,13 @@ def processOrdered(self, ordered: Ordered): | |
logger.trace("{} got ordered requests from master replica" | ||
.format(self)) | ||
|
||
logger.info("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}" | ||
.format(self.name, | ||
ordered.viewNo, | ||
ordered.ppSeqNo, | ||
len(ordered.valid_reqIdr), | ||
ordered.stateRootHash, | ||
ordered.txnRootHash)) | ||
logger.debug("{} executing Ordered batch {} {} of {} requests; state root {}; txn root {}" | ||
.format(self.name, | ||
ordered.viewNo, | ||
ordered.ppSeqNo, | ||
len(ordered.valid_reqIdr), | ||
ordered.stateRootHash, | ||
ordered.txnRootHash)) | ||
|
||
self.executeBatch(ThreePcBatch.from_ordered(ordered), | ||
ordered.valid_reqIdr, | ||
|
@@ -2725,11 +2686,10 @@ def force_process_ordered(self): | |
.format(self, num_processed, instance_id)) | ||
|
||
def try_processing_ordered(self, msg): | ||
if self.isParticipating: | ||
if self.master_replica.validator.can_order(): | ||
self.processOrdered(msg) | ||
else: | ||
logger.info("{} stashing {} since mode is {}".format(self, msg, self.mode)) | ||
self.stashedOrderedReqs.append(msg) | ||
logger.info("{} can not process Ordered message {} since mode is {}".format(self, msg, self.mode)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be at least warning, since here we're discarding ordering result, and if I understand correctly we should never reach this point in production code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
|
||
def processEscalatedException(self, ex): | ||
""" | ||
|
@@ -2833,7 +2793,6 @@ def sum_for_values(obj): | |
self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_REPLICAS, len(self.msgsForFutureReplicas)) | ||
self.metrics.add_event(MetricsName.MSGS_TO_VIEW_CHANGER, len(self.msgsToViewChanger)) | ||
self.metrics.add_event(MetricsName.REQUEST_SENDER, len(self.requestSender)) | ||
self.metrics.add_event(MetricsName.STASHED_ORDERED_REQS, len(self.stashedOrderedReqs)) | ||
|
||
self.metrics.add_event(MetricsName.MSGS_FOR_FUTURE_VIEWS, len(self.msgsForFutureViews)) | ||
|
||
|
@@ -3277,11 +3236,6 @@ def executeBatch(self, three_pc_batch: ThreePcBatch, | |
format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no, | ||
three_pc_batch.ledger_id, three_pc_batch.state_root, | ||
three_pc_batch.txn_root, [key for key in valid_reqs_keys])) | ||
logger.info("{} committed batch request, view no {}, ppSeqNo {}, " | ||
"ledger {}, state root {}, txn root {}, requests: {}". | ||
format(self, three_pc_batch.view_no, three_pc_batch.pp_seq_no, | ||
three_pc_batch.ledger_id, three_pc_batch.state_root, | ||
three_pc_batch.txn_root, len(valid_reqs_keys))) | ||
|
||
for txn in committedTxns: | ||
self.execute_hook(NodeHooks.POST_REQUEST_COMMIT, txn=txn, | ||
|
@@ -3446,37 +3400,6 @@ def defaultAuthNr(self) -> ReqAuthenticator: | |
req_authnr.register_authenticator(self.init_core_authenticator()) | ||
return req_authnr | ||
|
||
def processStashedOrderedReqs(self): | ||
i = 0 | ||
while self.stashedOrderedReqs: | ||
msg = self.stashedOrderedReqs.popleft() | ||
if msg.instId == 0: | ||
if compare_3PC_keys( | ||
(msg.viewNo, | ||
msg.ppSeqNo), | ||
self.ledgerManager.last_caught_up_3PC) >= 0: | ||
logger.info( | ||
'{} ignoring stashed ordered msg {} since ledger ' | ||
'manager has last_caught_up_3PC as {}'.format( | ||
self, msg, self.ledgerManager.last_caught_up_3PC)) | ||
continue | ||
logger.info( | ||
'{} applying stashed Ordered msg {}'.format(self, msg)) | ||
# Since the PRE-PREPAREs ans PREPAREs corresponding to these | ||
# stashed ordered requests was not processed. | ||
self.apply_stashed_reqs(msg) | ||
|
||
self.processOrdered(msg) | ||
i += 1 | ||
|
||
logger.info( | ||
"{} processed {} stashed ordered requests".format( | ||
self, i)) | ||
# Resetting monitor after executing all stashed requests so no view | ||
# change can be proposed | ||
self.monitor.reset() | ||
return i | ||
|
||
def ensureKeysAreSetup(self): | ||
""" | ||
Check whether the keys are setup in the local STP keep. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -145,13 +145,18 @@ def _do_apply_batch(self, batch): | |
|
||
ledger_id = batch[f.LEDGER_ID.nm] | ||
three_pc_batch = ThreePcBatch.from_batch_committed_dict(batch) | ||
self._node.apply_reqs(reqs, three_pc_batch) | ||
self._apply_reqs(reqs, three_pc_batch) | ||
|
||
# We need hashes in apply and str in commit | ||
three_pc_batch.txn_root = Ledger.hashToStr(three_pc_batch.txn_root) | ||
three_pc_batch.state_root = Ledger.hashToStr(three_pc_batch.state_root) | ||
self._node.get_executer(ledger_id)(three_pc_batch) | ||
|
||
def _apply_reqs(self, requests, three_pc_batch: ThreePcBatch): | ||
for req in requests: | ||
self._node.applyReq(req, three_pc_batch.pp_time) | ||
self._node.onBatchCreated(three_pc_batch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use one from node? It looks like this should be really an atomic operation available through node interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The one from node was deleted, since it's not used anywhere else. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. The method is in Node again. |
||
|
||
def _process_stashed_messages(self): | ||
while True: | ||
if not self._batches: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from plenum.common.messages.node_messages import Commit | ||
from plenum.common.types import f | ||
from plenum.common.util import compare_3PC_keys | ||
from plenum.server.replica_validator_enums import DISCARD, INCORRECT_INSTANCE, PROCESS, ALREADY_ORDERED, FUTURE_VIEW, \ | ||
|
@@ -43,6 +44,8 @@ def validate_3pc_msg(self, msg): | |
if view_no < self.replica.viewNo - 1: | ||
return DISCARD, OLD_VIEW | ||
if view_no == self.replica.viewNo - 1: | ||
if not isinstance(msg, Commit): | ||
return DISCARD, OLD_VIEW | ||
if not node.view_change_in_progress: | ||
return DISCARD, OLD_VIEW | ||
if self.replica.last_prepared_before_view_change is None: | ||
|
@@ -52,7 +55,7 @@ def validate_3pc_msg(self, msg): | |
if view_no == self.replica.viewNo and node.view_change_in_progress: | ||
return STASH_VIEW, FUTURE_VIEW | ||
|
||
# If Catchup in View Change finished then process a message | ||
# If Catchup in View Change finished then process Commit messages | ||
if node.is_synced and node.view_change_in_progress: | ||
return PROCESS, None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explicit check that this is a really commit message would be good here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The explicit check is already done above. |
||
|
||
|
@@ -94,3 +97,22 @@ def validate_checkpoint_msg(self, msg): | |
return STASH_CATCH_UP, CATCHING_UP | ||
|
||
return PROCESS, None | ||
|
||
def can_send_3pc_batch(self): | ||
if not self.replica.isPrimary: | ||
return False | ||
if not self.replica.node.isParticipating: | ||
return False | ||
if self.replica.node.pre_view_change_in_progress: | ||
return False | ||
if self.replica.viewNo < self.replica.last_ordered_3pc[0]: | ||
return False | ||
return True | ||
|
||
def can_order(self): | ||
node = self.replica.node | ||
if node.isParticipating: | ||
return True | ||
if node.is_synced and node.view_change_in_progress: | ||
return True | ||
return False |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,6 +159,7 @@ def __init__(self, provider: ViewChangerDataProvider, timer: TimerService): | |
self._next_view_indications = {} | ||
|
||
self._view_change_in_progress = False | ||
self._pre_view_change_in_progress = False | ||
|
||
self.previous_view_no = None | ||
self.previous_master_primary = None | ||
|
@@ -225,6 +226,14 @@ def view_change_in_progress(self) -> bool: | |
def view_change_in_progress(self, value: bool): | ||
self._view_change_in_progress = value | ||
|
||
@property | ||
def pre_view_change_in_progress(self) -> bool: | ||
return self._pre_view_change_in_progress | ||
|
||
@pre_view_change_in_progress.setter | ||
def pre_view_change_in_progress(self, value: bool): | ||
self._pre_view_change_in_progress = value | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need to make this a property if we are just using trivial getter and setter without any additional logic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a quick fix, I did it similar to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
@property | ||
def quorum(self) -> int: | ||
return self.quorums.view_change_done.value | ||
|
@@ -570,13 +579,15 @@ def start_view_change(self, proposed_view_no: int, continue_vc=False): | |
# implementations - we need to make this logic pluggable | ||
|
||
if self.pre_vc_strategy and (not continue_vc): | ||
self.pre_view_change_in_progress = True | ||
self.pre_vc_strategy.prepare_view_change(proposed_view_no) | ||
return | ||
elif self.pre_vc_strategy: | ||
self.pre_vc_strategy.on_strategy_complete() | ||
|
||
self.previous_view_no = self.view_no | ||
self.view_no = proposed_view_no | ||
self.pre_view_change_in_progress = False | ||
self.view_change_in_progress = True | ||
self.previous_master_primary = self.provider.current_primary_name() | ||
self.set_defaults() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer something more readable and more clear intentions like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done