diff --git a/plenum/common/messages/fields.py b/plenum/common/messages/fields.py index 6aa92dd1d7..86bfed39c3 100644 --- a/plenum/common/messages/fields.py +++ b/plenum/common/messages/fields.py @@ -35,6 +35,7 @@ def __init__(self, optional=False, nullable=False): self.optional = optional self.nullable = nullable + # TODO: `validate` should be renamed to `validation_error` def validate(self, val): """ Performs basic validation of field value and then passes it for @@ -78,6 +79,9 @@ def _wrong_type_msg(self, val): "".format(types_str, type(val).__name__) +# TODO: The fields below should be singleton. + + class AnyField(FieldBase): _base_types = (object,) diff --git a/plenum/common/messages/node_messages.py b/plenum/common/messages/node_messages.py index e077974b98..3eb59a67ed 100644 --- a/plenum/common/messages/node_messages.py +++ b/plenum/common/messages/node_messages.py @@ -113,7 +113,7 @@ class Propagate(MessageBase): typename = PROPAGATE schema = ( (f.REQUEST.nm, ClientMessageValidator(operation_schema_is_strict=True)), - (f.SENDER_CLIENT.nm, NonEmptyStringField()), + (f.SENDER_CLIENT.nm, NonEmptyStringField(nullable=True)), ) @@ -243,7 +243,6 @@ class CatchupReq(MessageBase): class CatchupRep(MessageBase): - typename = CATCHUP_REP schema = ( (f.LEDGER_ID.nm, LedgerIdField()), @@ -282,10 +281,11 @@ class MessageReq(MessageBase): """ Purpose: ask node for any message """ + allowed_types = {LEDGER_STATUS, CONSISTENCY_PROOF, PREPREPARE, + PROPAGATE} typename = MESSAGE_REQUEST schema = ( - (f.MSG_TYPE.nm, ChooseField(values={LEDGER_STATUS, - CONSISTENCY_PROOF, PREPREPARE})), + (f.MSG_TYPE.nm, ChooseField(values=allowed_types)), (f.PARAMS.nm, AnyMapField()) ) @@ -296,8 +296,7 @@ class MessageRep(MessageBase): """ typename = MESSAGE_RESPONSE schema = ( - (f.MSG_TYPE.nm, ChooseField(values={LEDGER_STATUS, - CONSISTENCY_PROOF, PREPREPARE})), + (f.MSG_TYPE.nm, ChooseField(values=MessageReq.allowed_types)), (f.PARAMS.nm, AnyMapField()), (f.MSG.nm, AnyField()) ) diff --git a/plenum/server/message_req_processor.py b/plenum/server/message_req_processor.py new file mode 100644 index 0000000000..5a2ad59728 --- /dev/null +++ b/plenum/server/message_req_processor.py @@ -0,0 +1,259 @@ +from typing import Dict +from typing import List + +from plenum.common.constants import LEDGER_STATUS, PREPREPARE, CONSISTENCY_PROOF, \ + PROPAGATE +from plenum.common.messages.fields import RequestIdentifierField +from plenum.common.messages.node_messages import MessageReq, MessageRep, \ + LedgerStatus, PrePrepare, ConsistencyProof, Propagate +from plenum.common.request import Request +from plenum.common.types import f +from plenum.server import replica +from stp_core.common.log import getlogger + + +logger = getlogger() + + +class MessageReqProcessor: + # This is a mixin, it's mixed with node. + def __init__(self): + self.validation_handlers = { + LEDGER_STATUS: self._validate_requested_ledger_status, + CONSISTENCY_PROOF: self._validate_requested_cons_proof, + PREPREPARE: self._validate_requested_preprepare, + PROPAGATE: self._validate_requested_propagate + } + + self.req_handlers = { + LEDGER_STATUS: self._serve_ledger_status_request, + CONSISTENCY_PROOF: self._serve_cons_proof_request, + PREPREPARE: self._serve_preprepare_request, + PROPAGATE: self._serve_propagate_request + } + + self.rep_handlers = { + LEDGER_STATUS: self._process_requested_ledger_status, + CONSISTENCY_PROOF: self._process_requested_cons_proof, + PREPREPARE: self._process_requested_preprepare, + PROPAGATE: self._process_requested_propagate + } + + def process_message_req(self, msg: MessageReq, frm): + # Assumes a shared memory architecture. In case of multiprocessing, + # RPC architecture, use deques to communicate the message and node will + # maintain a unique internal message id to correlate responses. + msg_type = msg.msg_type + resp = self.req_handlers[msg_type](msg) + + if resp is False: + return + + self.sendToNodes(MessageRep(**{ + f.MSG_TYPE.nm: msg_type, + f.PARAMS.nm: msg.params, + f.MSG.nm: resp + }), names=[frm, ]) + + def process_message_rep(self, msg: MessageRep, frm): + msg_type = msg.msg_type + if msg.msg is None: + logger.debug('{} got null response for requested {} from {}'. + format(self, msg_type, frm)) + return + return self.rep_handlers[msg_type](msg, frm) + + def valid_requested_msg(self, msg_type, **kwargs): + return self.validation_handlers[msg_type](**kwargs) + + def request_msg(self, typ, params: Dict, frm: List[str]=None): + self.sendToNodes(MessageReq(**{ + f.MSG_TYPE.nm: typ, + f.PARAMS.nm: params + }), names=frm) + + def _validate_requested_ledger_status(self, **kwargs): + if kwargs['ledger_id'] in self.ledger_ids: + if 'ledger_status' in kwargs: + try: + return LedgerStatus(*kwargs['ledger_status']) + except TypeError as ex: + logger.warning( + '{} could not create LEDGER_STATUS out of {}'. + format(self, *kwargs['ledger_status'])) + else: + return True + + def _serve_ledger_status_request(self, msg): + params = msg.params + ledger_id = params.get(f.LEDGER_ID.nm) + if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id): + return self.getLedgerStatus(ledger_id) + else: + self.discard(msg, 'cannot serve request', + logMethod=logger.debug) + return False + + def _process_requested_ledger_status(self, msg, frm): + params = msg.params + ledger_id = params.get(f.LEDGER_ID.nm) + ledger_status = msg.msg + ledger_status = self.valid_requested_msg(msg.msg_type, + ledger_id=ledger_id, + ledger_status=ledger_status) + if ledger_status: + self.ledgerManager.processLedgerStatus(ledger_status, frm=frm) + return + self.discard(msg, + 'cannot process requested message response', + logMethod=logger.debug) + + def _validate_requested_cons_proof(self, **kwargs): + if kwargs['ledger_id'] in self.ledger_ids and \ + (isinstance(kwargs['seq_no_start'], int) and kwargs[ + 'seq_no_start'] > 0) and \ + (isinstance(kwargs['seq_no_end'], int) and kwargs[ + 'seq_no_end'] > 0): + if 'cons_proof' in kwargs: + try: + return ConsistencyProof(*kwargs['cons_proof']) + except TypeError as ex: + logger.warning( + '{} could not create CONSISTENCY_PROOF out of {}'. + format(self, *kwargs['cons_proof'])) + else: + return True + + def _serve_cons_proof_request(self, msg): + params = msg.params + ledger_id = params.get(f.LEDGER_ID.nm) + seq_no_start = params.get(f.SEQ_NO_START.nm) + seq_no_end = params.get(f.SEQ_NO_END.nm) + if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id, + seq_no_start=seq_no_start, + seq_no_end=seq_no_end): + return self.ledgerManager._buildConsistencyProof(ledger_id, + seq_no_start, + seq_no_end) + else: + self.discard(msg, 'cannot serve request', + logMethod=logger.debug) + return False + + def _process_requested_cons_proof(self, msg, frm): + params = msg.params + ledger_id = params.get(f.LEDGER_ID.nm) + seq_no_start = params.get(f.SEQ_NO_START.nm) + seq_no_end = params.get(f.SEQ_NO_END.nm) + cons_proof = msg.msg + cons_proof = self.valid_requested_msg(msg.msg_type, + ledger_id=ledger_id, + seq_no_start=seq_no_start, + seq_no_end=seq_no_end, + cons_proof=cons_proof) + if cons_proof: + self.ledgerManager.processConsistencyProof(cons_proof, frm=frm) + return + self.discard(msg, + 'cannot process requested message response', + logMethod=logger.debug) + + def _validate_requested_preprepare(self, **kwargs): + if kwargs['inst_id'] in range(len(self.replicas)) and \ + kwargs['view_no'] == self.viewNo and \ + isinstance(kwargs['pp_seq_no'], int) and \ + kwargs['pp_seq_no'] > 0: + if 'pp' in kwargs: + try: + pp = PrePrepare(*kwargs['pp']) + if pp.instId != kwargs['inst_id'] or pp.viewNo != kwargs['view_no']: + logger.warning('{} found PREPREPARE {} not satisfying ' + 'query criteria'.format(self, *kwargs['pp'])) + return + return pp + except TypeError as ex: + logger.warning( + '{} could not create PREPREPARE out of {}'. + format(self, *kwargs['pp'])) + else: + return True + + def _serve_preprepare_request(self, msg): + params = msg.params + inst_id = params.get(f.INST_ID.nm) + view_no = params.get(f.VIEW_NO.nm) + pp_seq_no = params.get(f.PP_SEQ_NO.nm) + if self.valid_requested_msg(msg.msg_type, inst_id=inst_id, + view_no=view_no, pp_seq_no=pp_seq_no): + return self.replicas[inst_id].getPrePrepare(view_no, pp_seq_no) + else: + self.discard(msg, 'cannot serve request', + logMethod=logger.debug) + return False + + def _process_requested_preprepare(self, msg, frm): + params = msg.params + inst_id = params.get(f.INST_ID.nm) + view_no = params.get(f.VIEW_NO.nm) + pp_seq_no = params.get(f.PP_SEQ_NO.nm) + pp = msg.msg + pp = self.valid_requested_msg(msg.msg_type, inst_id=inst_id, + view_no=view_no, pp_seq_no=pp_seq_no, + pp=pp) + if pp: + frm = replica.Replica.generateName(frm, inst_id) + self.replicas[inst_id].process_requested_pre_prepare(pp, + sender=frm) + return + self.discard(msg, + 'cannot process requested message response', + logMethod=logger.debug) + + def _validate_requested_propagate(self, **kwargs): + if not (RequestIdentifierField().validate((kwargs['identifier'], + kwargs['req_id']))): + if 'propagate' in kwargs: + try: + ppg = Propagate(*kwargs['propagate']) + if ppg.request[f.IDENTIFIER.nm] != kwargs['identifier'] or \ + ppg.request[f.REQ_ID.nm] != kwargs['req_id']: + logger.warning('{} found PROPAGATE {} not ' + 'satisfying query criteria'.format(self, *kwargs['ppg'])) + return + return ppg + except TypeError as ex: + logger.warning( + '{} could not create PROPAGATE out of {}'. + format(self, *kwargs['propagate'])) + else: + return True + + def _serve_propagate_request(self, msg): + params = msg.params + identifier = params.get(f.IDENTIFIER.nm) + req_id = params.get(f.REQ_ID.nm) + if self.valid_requested_msg(msg.msg_type, identifier=identifier, + req_id=req_id): + req_key = (identifier, req_id) + if req_key in self.requests and self.requests[req_key].finalised: + sender_client = self.requestSender.get(req_key) + req = self.requests[req_key].finalised + return self.createPropagate(req, sender_client) + else: + self.discard(msg, 'cannot serve request', + logMethod=logger.debug) + return False + + def _process_requested_propagate(self, msg, frm): + params = msg.params + identifier = params.get(f.IDENTIFIER.nm) + req_id = params.get(f.REQ_ID.nm) + ppg = msg.msg + ppg = self.valid_requested_msg(msg.msg_type, identifier=identifier, + req_id=req_id, propagate=ppg) + if ppg: + self.processPropagate(ppg, frm) + else: + self.discard(msg, + 'cannot process requested message response', + logMethod=logger.debug) diff --git a/plenum/server/node.py b/plenum/server/node.py index 9f9d6d45cd..3f25d6f762 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -1,4 +1,3 @@ -import json import os import time from binascii import unhexlify @@ -45,7 +44,7 @@ from plenum.persistence.req_id_to_txn import ReqIdrToTxn from plenum.persistence.storage import Storage, initStorage, initKeyValueStorage -from plenum.server.msg_filter import MessageFilterEngine +from plenum.server.message_req_processor import MessageReqProcessor from plenum.server.primary_selector import PrimarySelector from plenum.server import replica from plenum.server.blacklister import Blacklister @@ -82,7 +81,7 @@ class Node(HasActionQueue, Motor, Propagator, MessageProcessor, HasFileStorage, - HasPoolManager, PluginLoaderHelper): + HasPoolManager, PluginLoaderHelper, MessageReqProcessor): """ A node in a plenum system. """ @@ -178,9 +177,11 @@ def __init__(self, self.cliNodeReg = self.poolManager.cliNodeReg HasActionQueue.__init__(self) - # Motor.__init__(self) + Propagator.__init__(self) + MessageReqProcessor.__init__(self) + self.primaryDecider = primaryDecider self.nodeInBox = deque() @@ -1750,6 +1751,10 @@ def processPropagate(self, msg: Propagate, frm): if not self.isProcessingReq(*request.key): self.startedProcessingReq(*request.key, clientName) + elif clientName is not None and not self.is_sender_known_for_req(*request.key): + # Since some propagates might not include the client name + self.set_sender_for_req(*request.key, clientName) + self.requests.addPropagate(request, frm) self.propagate(request, clientName) @@ -1764,6 +1769,12 @@ def isProcessingReq(self, identifier, reqId) -> bool: def doneProcessingReq(self, identifier, reqId): self.requestSender.pop((identifier, reqId)) + def is_sender_known_for_req(self, identifier, reqId): + return self.requestSender.get((identifier, reqId)) is not None + + def set_sender_for_req(self, identifier, reqId, frm): + self.requestSender[identifier, reqId] = frm + def send_ack_to_client(self, req_key, to_client): self.transmitToClient(RequestAck(*req_key), to_client) @@ -2124,190 +2135,6 @@ def _is_there_pool_ledger(self): # TODO isinstance is not OK return isinstance(self.poolManager, TxnPoolManager) - def process_message_req(self, msg: MessageReq, frm): - # Assumes a shared memory architecture. In case of multiprocessing, - # RPC architecture, use deques to communicate the message and node will - # maintain a unique internal message id to correlate responses. - resp = None - msg_type = msg.msg_type - if msg_type == LEDGER_STATUS: - resp = self._serve_ledger_status_request(msg) - elif msg_type == CONSISTENCY_PROOF: - resp = self._serve_cons_proof_request(msg) - elif msg_type == PREPREPARE: - resp = self._serve_preprepare_request(msg) - else: - raise RuntimeError('{} encountered request for unknown message ' - 'type {}'.format(self, msg_type)) - - if resp is False: - return - - self.sendToNodes(MessageRep(**{ - f.MSG_TYPE.nm: msg_type, - f.PARAMS.nm: msg.params, - f.MSG.nm: resp - }), names=[frm,]) - - def process_message_rep(self, msg: MessageRep, frm): - msg_type = msg.msg_type - if msg.msg is None: - logger.debug('{} got null response for requested {} from {}'. - format(self, msg_type, frm)) - return - if msg_type == LEDGER_STATUS: - return self._process_requested_ledger_status(msg, frm) - elif msg_type == CONSISTENCY_PROOF: - return self._process_requested_cons_proof(msg, frm) - elif msg_type == PREPREPARE: - return self._process_requested_preprepare(msg, frm) - else: - raise RuntimeError('{} encountered response for unknown message ' - 'type {}'.format(self, msg_type)) - - def valid_requested_msg(self, msg_type, **kwargs): - if msg_type == LEDGER_STATUS: - return self._validate_requested_ledger_status(**kwargs) - elif msg_type == CONSISTENCY_PROOF: - return self._validate_requested_cons_proof(**kwargs) - elif msg_type == PREPREPARE: - return self._validate_requested_preprepare(**kwargs) - - def request_msg(self, typ, params, frm): - self.sendToNodes(MessageReq(**{ - f.MSG_TYPE.nm: typ, - f.PARAMS.nm: params - }), names=frm) - - def _validate_requested_ledger_status(self, **kwargs): - if kwargs['ledger_id'] in self.ledger_ids: - if 'ledger_status' in kwargs: - try: - return LedgerStatus(*kwargs['ledger_status']) - except TypeError as ex: - logger.warning( - '{} could not create LEDGER_STATUS out of {}'. - format(self, *kwargs['ledger_status'])) - else: - return True - - def _validate_requested_cons_proof(self, **kwargs): - if kwargs['ledger_id'] in self.ledger_ids and \ - (isinstance(kwargs['seq_no_start'], int) and kwargs[ - 'seq_no_start'] > 0) and \ - (isinstance(kwargs['seq_no_end'], int) and kwargs[ - 'seq_no_end'] > 0): - if 'cons_proof' in kwargs: - try: - return ConsistencyProof(*kwargs['cons_proof']) - except TypeError as ex: - logger.warning( - '{} could not create CONSISTENCY_PROOF out of {}'. - format(self, *kwargs['cons_proof'])) - else: - return True - - def _validate_requested_preprepare(self, **kwargs): - if kwargs['inst_id'] in range(len(self.replicas)) and \ - kwargs['view_no'] == self.viewNo and \ - isinstance(kwargs['pp_seq_no'], int) and \ - kwargs['pp_seq_no'] > 0: - if 'pp' in kwargs: - try: - return PrePrepare(*kwargs['pp']) - except TypeError as ex: - logger.warning('{} could not create PREPREPARE out of {}'. - format(self, *kwargs['pp'])) - else: - return True - - def _process_requested_ledger_status(self, msg, frm): - params = msg.params - ledger_id = params.get(f.LEDGER_ID.nm) - ledger_status = msg.msg - ledger_status = self.valid_requested_msg(msg.msg_type, - ledger_id=ledger_id, - ledger_status=ledger_status) - if ledger_status: - self.ledgerManager.processLedgerStatus(ledger_status, frm=frm) - return - self.discard(msg, - 'cannot process requested message resposnse', - logMethod=logger.debug) - - def _process_requested_cons_proof(self, msg, frm): - params = msg.params - ledger_id = params.get(f.LEDGER_ID.nm) - seq_no_start = params.get(f.SEQ_NO_START.nm) - seq_no_end = params.get(f.SEQ_NO_END.nm) - cons_proof = msg.msg - cons_proof = self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id, - seq_no_start=seq_no_start, - seq_no_end=seq_no_end, - cons_proof=cons_proof) - if cons_proof: - self.ledgerManager.processConsistencyProof(cons_proof, frm=frm) - return - self.discard(msg, - 'cannot process requested message resposnse', - logMethod=logger.debug) - - def _process_requested_preprepare(self, msg, frm): - params = msg.params - inst_id = params.get(f.INST_ID.nm) - view_no = params.get(f.VIEW_NO.nm) - pp_seq_no = params.get(f.PP_SEQ_NO.nm) - pp = msg.msg - pp = self.valid_requested_msg(msg.msg_type, inst_id=inst_id, - view_no=view_no, pp_seq_no=pp_seq_no, - pp=pp) - if pp: - frm = replica.Replica.generateName(frm, inst_id) - self.replicas[inst_id].process_requested_pre_prepare(pp, - sender=frm) - return - self.discard(msg, - 'cannot process requested message resposnse', - logMethod=logger.debug) - - def _serve_ledger_status_request(self, msg): - params = msg.params - ledger_id = params.get(f.LEDGER_ID.nm) - if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id): - return self.getLedgerStatus(ledger_id) - else: - self.discard(msg, 'cannot serve request', - logMethod=logger.debug) - return False - - def _serve_cons_proof_request(self, msg): - params = msg.params - ledger_id = params.get(f.LEDGER_ID.nm) - seq_no_start = params.get(f.SEQ_NO_START.nm) - seq_no_end = params.get(f.SEQ_NO_END.nm) - if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id, - seq_no_start=seq_no_start, - seq_no_end=seq_no_end): - return self.ledgerManager._buildConsistencyProof(ledger_id, - seq_no_start, - seq_no_end) - else: - self.discard(msg, 'cannot serve request', - logMethod=logger.debug) - return False - - def _serve_preprepare_request(self, msg): - params = msg.params - inst_id = params.get(f.INST_ID.nm) - view_no = params.get(f.VIEW_NO.nm) - pp_seq_no = params.get(f.PP_SEQ_NO.nm) - if self.valid_requested_msg(msg.msg_type, inst_id=inst_id, - view_no=view_no, pp_seq_no=pp_seq_no): - return self.replicas[inst_id].getPrePrepare(view_no, pp_seq_no) - else: - self.discard(msg, 'cannot serve request', logMethod=logger.debug) - return False - def ordered_prev_view_msgs(self, inst_id, pp_seqno): logger.debug('{} ordered previous view batch {} by instance {}'. format(self, pp_seqno, inst_id)) @@ -2454,8 +2281,13 @@ def sendRepliesToClients(self, committedTxns, ppTime): def sendReplyToClient(self, reply, reqKey): if self.isProcessingReq(*reqKey): - logger.debug('{} sending reply for {} to client'.format(self, reqKey)) - self.transmitToClient(reply, self.requestSender[reqKey]) + sender = self.requestSender[reqKey] + if sender: + logger.debug('{} sending reply for {} to client'.format(self, reqKey)) + self.transmitToClient(reply, self.requestSender[reqKey]) + else: + logger.debug('{} not sending reply for {}, since do not ' + 'know client'.format(self, reqKey)) self.doneProcessingReq(*reqKey) def addNewRole(self, txn): @@ -2673,9 +2505,9 @@ def send(self, msg: Any, *rids: Iterable[int], signer: Signer = None): .format(self, msg, recipientsNum, remoteNames)) self.nodestack.send(msg, *rids, signer=signer) - def sendToNodes(self, msg: Any, names: Iterable[str]): + def sendToNodes(self, msg: Any, names: Iterable[str]=None): # TODO: This method exists in `Client` too, refactor to avoid duplication - rids = [rid for rid, r in self.nodestack.remotes.items() if r.name in names] + rids = [rid for rid, r in self.nodestack.remotes.items() if r.name in names] if names else [] self.send(msg, *rids) def getReplyFromLedger(self, ledger, request=None, seq_no=None): diff --git a/plenum/server/pool_manager.py b/plenum/server/pool_manager.py index 772064ed25..08c396ea5c 100644 --- a/plenum/server/pool_manager.py +++ b/plenum/server/pool_manager.py @@ -20,7 +20,7 @@ from plenum.common.stack_manager import TxnStackManager from plenum.common.types import NodeDetail from plenum.persistence.storage import initKeyValueStorage -from plenum.persistence.util import txnsWithMerkleInfo, pop_merkle_info +from plenum.persistence.util import pop_merkle_info from plenum.server.pool_req_handler import PoolRequestHandler from plenum.server.suspicion_codes import Suspicions from state.pruning_state import PruningState diff --git a/plenum/server/propagator.py b/plenum/server/propagator.py index cd0fb421fc..fc5bc86340 100644 --- a/plenum/server/propagator.py +++ b/plenum/server/propagator.py @@ -3,8 +3,12 @@ from typing import Dict, Tuple, Union, Optional +from orderedset._orderedset import OrderedSet +from plenum.common.constants import PROPAGATE from plenum.common.messages.node_messages import Propagate from plenum.common.request import Request, ReqKey +from plenum.common.types import f +from plenum.server.quorums import Quorum from stp_core.common.log import getlogger logger = getlogger() @@ -23,16 +27,14 @@ def __init__(self, request: Request): self.propagates = {} self.finalised = None - @property - def most_propagated_request_with_senders(self): - groups = defaultdict(set) + def req_with_acceptable_quorum(self, quorum: Quorum): + digests = defaultdict(set) # this is workaround because we are getting a propagate from somebody with # non-str (byte) name - propagates = filter(lambda x: type(x[0]) == str, self.propagates.items()) - for key, value in sorted(propagates): - groups[value].add(key) - most_common_requests = sorted(groups.items(), key=lambda x: len(x[1]), reverse=True) - return most_common_requests[0] if most_common_requests else (None, set()) + for sender, req in filter(lambda x: type(x[0]) == str, self.propagates.items()): + digests[req.digest].add(sender) + if quorum.is_reached(len(digests[req.digest])): + return req def set_finalised(self, req): # TODO: make it much explicitly and simpler @@ -94,9 +96,9 @@ def votes(self, req) -> int: votes = 0 return votes - def most_propagated_request_with_senders(self, req: Request): + def req_with_acceptable_quorum(self, req: Request, quorum: Quorum): state = self[req.key] - return state.most_propagated_request_with_senders + return state.req_with_acceptable_quorum(quorum) def set_finalised(self, req: Request): state = self[req.key] @@ -119,8 +121,11 @@ def digest(self, reqKey: Tuple) -> str: class Propagator: + MAX_REQUESTED_KEYS_TO_KEEP = 1000 + def __init__(self): self.requests = Requests() + self.requested_propagates_for = OrderedSet() # noinspection PyUnresolvedReferences def propagate(self, request: Request, clientName): @@ -141,7 +146,7 @@ def propagate(self, request: Request, clientName): self.send(propagate) @staticmethod - def createPropagate(request: Union[Request, dict], identifier) -> Propagate: + def createPropagate(request: Union[Request, dict], client_name) -> Propagate: """ Create a new PROPAGATE for the given REQUEST. @@ -154,9 +159,9 @@ def createPropagate(request: Union[Request, dict], identifier) -> Propagate: logger.debug("Creating PROPAGATE for REQUEST {}".format(request)) request = request.as_dict if isinstance(request, Request) else \ request - if isinstance(identifier, bytes): - identifier = identifier.decode() - return Propagate(request, identifier) + if isinstance(client_name, bytes): + client_name = client_name.decode() + return Propagate(request, client_name) # noinspection PyUnresolvedReferences def canForward(self, request: Request): @@ -179,10 +184,13 @@ def canForward(self, request: Request): if self.requests.forwarded(request): return 'already forwarded' - req, senders = self.requests.most_propagated_request_with_senders(request) - if self.name in senders: - senders.remove(self.name) - if req and self.quorums.propagate.is_reached(len(senders)): + # If not enough Propogates, don't bother comparing + if not self.quorums.propagate.is_reached(self.requests.votes(request)): + return 'not finalised' + + req = self.requests.req_with_acceptable_quorum(request, + self.quorums.propagate) + if req: self.requests.set_finalised(req) return None else: @@ -230,3 +238,28 @@ def tryForwarding(self, request: Request): else: logger.debug("{} not forwarding request {} to its replicas " "since {}".format(self, request, cannot_reason_msg)) + + def request_propagates(self, req_keys): + """ + Request PROPAGATEs for the given request keys. Since replicas can + request PROPAGATEs independently of each other, check if it has + been requested recently + :param req_keys: + :return: + """ + i = 0 + for (idr, req_id) in req_keys: + if (idr, req_id) not in self.requested_propagates_for: + self.request_msg(PROPAGATE, {f.IDENTIFIER.nm: idr, + f.REQ_ID.nm: req_id}) + self._add_to_recently_requested((idr, req_id)) + i += 1 + else: + logger.debug('{} already requested PROPAGATE recently for {}'. + format(self, (idr, req_id))) + return i + + def _add_to_recently_requested(self, key): + while len(self.requested_propagates_for) > self.MAX_REQUESTED_KEYS_TO_KEEP: + self.requested_propagates_for.pop(last=False) + self.requested_propagates_for.add(key) diff --git a/plenum/server/replica.py b/plenum/server/replica.py index 1aa8922dcd..63809c4627 100644 --- a/plenum/server/replica.py +++ b/plenum/server/replica.py @@ -14,11 +14,9 @@ import plenum.server.node from plenum.common.config_util import getConfig -from plenum.common.constants import PREPREPARE from plenum.common.exceptions import SuspiciousNode, \ InvalidClientMessageException, UnknownIdentifier from plenum.common.signing import serialize -from plenum.common.types import f from plenum.common.messages.node_messages import * from plenum.common.request import ReqDigest, Request, ReqKey from plenum.common.message_processor import MessageProcessor @@ -1019,12 +1017,22 @@ def canProcessPrePrepare(self, pp: PrePrepare, sender: str) -> bool: if compare_3PC_keys((pp.viewNo, pp.ppSeqNo), self.__last_pp_3pc) > 0: return False # ignore old pre-prepare + # Do not combine the next if conditions, the idea is to exit as soon + # as possible non_fin_reqs = self.nonFinalisedReqs(pp.reqIdr) + if non_fin_reqs: + self.enqueue_pre_prepare(pp, sender, non_fin_reqs) + # TODO: An optimisation might be to not request PROPAGATEs if some + # PROPAGATEs are present or a client request is present and + # sufficient PREPAREs and PRE-PREPARE are present, then the digest + # can be compared but this is expensive as the PREPARE + # and PRE-PREPARE contain a combined digest + self.node.request_propagates(non_fin_reqs) + return False non_next_upstream_pp = not self.__is_next_pre_prepare(pp.viewNo, pp.ppSeqNo) - - if non_fin_reqs or non_next_upstream_pp: - self.enqueue_pre_prepare(pp, sender, non_fin_reqs) + if non_next_upstream_pp: + self.enqueue_pre_prepare(pp, sender) return False self.validate_pre_prepare(pp, sender) @@ -1087,7 +1095,7 @@ def validatePrepare(self, prepare: Prepare, sender: str) -> bool: raise SuspiciousNode(sender, Suspicions.DUPLICATE_PR_SENT, prepare) # If PRE-PREPARE not received for the PREPARE, might be slow network if not ppReq: - self.enqueuePrepare(prepare, sender) + self.enqueue_prepare(prepare, sender) return False # If primary replica if primaryStatus is True: @@ -1099,7 +1107,7 @@ def validatePrepare(self, prepare: Prepare, sender: str) -> bool: raise SuspiciousNode(sender, Suspicions.UNKNOWN_PR_SENT, prepare) if primaryStatus is None and not ppReq: - self.enqueuePrepare(prepare, sender) + self.enqueue_prepare(prepare, sender) return False if prepare.digest != ppReq.digest: @@ -1677,7 +1685,7 @@ def dequeuePrePrepares(self): r += 1 return r - def enqueuePrepare(self, pMsg: Prepare, sender: str): + def enqueue_prepare(self, pMsg: Prepare, sender: str): logger.debug("Queueing prepare due to unavailability of PRE-PREPARE. " "Prepare {} from {}".format(pMsg, sender)) key = (pMsg.viewNo, pMsg.ppSeqNo) @@ -1789,11 +1797,12 @@ def _request_pre_prepare_if_possible(self, three_pc_key) -> bool: 'stashed for {}'.format(self, three_pc_key)) return False + digest, state_root, txn_root, prepare_senders = \ + self.get_acceptable_stashed_prepare_state(three_pc_key) + # Choose a better data structure for `prePreparesPendingFinReqs` pre_prepares = [pp for pp, _, _ in self.prePreparesPendingFinReqs if (pp.viewNo, pp.ppSeqNo) == three_pc_key] - digest, state_root, txn_root, prepare_senders = \ - self.get_acceptable_stashed_prepare_state(three_pc_key) if pre_prepares: if [pp for pp in pre_prepares if (pp.digest, pp.stateRootHash, pp.txnRootHash) == (digest, state_root, txn_root)]: @@ -1804,6 +1813,9 @@ def _request_pre_prepare_if_possible(self, three_pc_key) -> bool: # TODO: Using a timer to retry would be a better thing to do logger.debug('{} requesting PRE-PREPARE({}) from {}'. format(self, three_pc_key, prepare_senders)) + # An optimisation can be to request PRE-PREPARE from f+1 or + # f+x (f+x<2f) nodes only rather than 2f since only 1 correct + # PRE-PREPARE is needed. self.node.request_msg(PREPREPARE, {f.INST_ID.nm: self.instId, f.VIEW_NO.nm: three_pc_key[0], f.PP_SEQ_NO.nm: three_pc_key[1]}, diff --git a/plenum/test/delayers.py b/plenum/test/delayers.py index 84f2d6bcc9..da87d663e9 100644 --- a/plenum/test/delayers.py +++ b/plenum/test/delayers.py @@ -1,6 +1,7 @@ import random -from typing import Iterable +from typing import Iterable, List +from plenum.common.request import Request from plenum.common.types import f from plenum.common.messages.node_messages import * from plenum.common.constants import OP_FIELD_NAME @@ -79,11 +80,6 @@ def rel_delay(delay: float, inst_id=None, sender_filter: str=None): return delayerMsgTuple(delay, Reelection, instFilter=inst_id, senderFilter=sender_filter) -def vcdDelay(delay: float): - # Delayer of VIEW_CHANGE_DONE requests - return delayerMsgTuple(delay, ViewChangeDone) - - def ppgDelay(delay: float, sender_filter: str=None): # Delayer of PROPAGATE requests return delayerMsgTuple(delay, Propagate, senderFilter=sender_filter) @@ -135,6 +131,31 @@ def cr_delay(delay: float): return delayerMsgTuple(delay, CatchupRep) +def req_delay(delay: float): + # Delayer of Request requests + return delayerMsgTuple(delay, Request) + + +def msg_req_delay(delay: float, types_to_delay: List=None): + # Delayer of MessageReq messages + def specific_msgs(msg): + if isinstance(msg[0], MessageReq) and (not types_to_delay or + msg[0].msg_type in types_to_delay): + return delay + + return specific_msgs + + +def msg_rep_delay(delay: float, types_to_delay: List=None): + # Delayer of MessageRep messages + def specific_msgs(msg): + if isinstance(msg[0], MessageRep) and (not types_to_delay or + msg[0].msg_type in types_to_delay): + return delay + + return specific_msgs + + def delay(what, frm, to, howlong): from plenum.test.test_node import TestNode diff --git a/plenum/test/input_validation/message_validation/test_primary_message.py b/plenum/test/input_validation/message_validation/test_primary_message.py index 6c70fdb1b6..69c5cb03e2 100644 --- a/plenum/test/input_validation/message_validation/test_primary_message.py +++ b/plenum/test/input_validation/message_validation/test_primary_message.py @@ -1,5 +1,3 @@ -import pytest - from collections import OrderedDict from plenum.common.messages.fields import NonNegativeNumberField, \ NonEmptyStringField diff --git a/plenum/test/input_validation/message_validation/test_propagate_message.py b/plenum/test/input_validation/message_validation/test_propagate_message.py index f24383ae27..94f0b545cb 100644 --- a/plenum/test/input_validation/message_validation/test_propagate_message.py +++ b/plenum/test/input_validation/message_validation/test_propagate_message.py @@ -1,5 +1,3 @@ -import pytest - from collections import OrderedDict from plenum.common.messages.fields import NonEmptyStringField from plenum.common.messages.client_request import ClientMessageValidator diff --git a/plenum/test/input_validation/message_validation/test_reelection_message.py b/plenum/test/input_validation/message_validation/test_reelection_message.py index 8b9fc1db72..ceb220400c 100644 --- a/plenum/test/input_validation/message_validation/test_reelection_message.py +++ b/plenum/test/input_validation/message_validation/test_reelection_message.py @@ -1,5 +1,3 @@ -import pytest - from collections import OrderedDict from plenum.common.messages.fields import NonNegativeNumberField, \ IterableField diff --git a/plenum/test/input_validation/message_validation/test_threepcstate_message.py b/plenum/test/input_validation/message_validation/test_threepcstate_message.py index 0a92d38e46..503f525e73 100644 --- a/plenum/test/input_validation/message_validation/test_threepcstate_message.py +++ b/plenum/test/input_validation/message_validation/test_threepcstate_message.py @@ -1,4 +1,3 @@ -import pytest from plenum.common.messages.node_messages import ThreePCState from collections import OrderedDict from plenum.common.messages.fields import \ diff --git a/plenum/test/input_validation/message_validation/test_viewchangedone_messsage.py b/plenum/test/input_validation/message_validation/test_viewchangedone_messsage.py index 41024d3dd9..c4f81a2f82 100644 --- a/plenum/test/input_validation/message_validation/test_viewchangedone_messsage.py +++ b/plenum/test/input_validation/message_validation/test_viewchangedone_messsage.py @@ -1,5 +1,3 @@ -import pytest - from collections import OrderedDict from plenum.common.messages.fields import NonNegativeNumberField, \ IterableField, NonEmptyStringField diff --git a/plenum/test/node_catchup/test_node_request_consistency_proof.py b/plenum/test/node_catchup/test_node_request_consistency_proof.py index cd5a0a39dc..a37056997c 100644 --- a/plenum/test/node_catchup/test_node_request_consistency_proof.py +++ b/plenum/test/node_catchup/test_node_request_consistency_proof.py @@ -11,7 +11,6 @@ from plenum.common.messages.node_messages import LedgerStatus from plenum.test.helper import sendRandomRequests from plenum.test.node_catchup.helper import waitNodeDataEquality -from plenum.test.test_ledger_manager import TestLedgerManager from plenum.test.test_node import checkNodesConnected # Do not remove the next imports diff --git a/plenum/test/node_request/message_request/test_preprepare_request.py b/plenum/test/node_request/message_request/test_preprepare_request.py index 8b78c6516a..89266c94c0 100644 --- a/plenum/test/node_request/message_request/test_preprepare_request.py +++ b/plenum/test/node_request/message_request/test_preprepare_request.py @@ -1,5 +1,6 @@ +from plenum.common.constants import PROPAGATE from plenum.common.messages.node_messages import Prepare -from plenum.test.delayers import ppDelay, pDelay, ppgDelay +from plenum.test.delayers import ppDelay, pDelay, ppgDelay, msg_rep_delay from plenum.test.helper import send_reqs_batches_and_get_suff_replies from plenum.test.node_catchup.helper import checkNodeDataForInequality, \ waitNodeDataEquality @@ -84,6 +85,7 @@ def test_no_preprepare_requested(looper, txnPoolNodeSet, client1, """ slow_node, other_nodes, _, _ = split_nodes(txnPoolNodeSet) slow_node.nodeIbStasher.delay(ppgDelay(20)) + slow_node.nodeIbStasher.delay(msg_rep_delay(20, [PROPAGATE, ])) old_count_resp = count_requested_preprepare_resp(slow_node) send_reqs_batches_and_get_suff_replies(looper, wallet1, client1, 4, 2) diff --git a/plenum/test/node_request/message_request/test_requested_preprepare_handling.py b/plenum/test/node_request/message_request/test_requested_preprepare_handling.py index 353e3aeff8..eb035da446 100644 --- a/plenum/test/node_request/message_request/test_requested_preprepare_handling.py +++ b/plenum/test/node_request/message_request/test_requested_preprepare_handling.py @@ -29,7 +29,7 @@ def patched_method(self, msg): last_pp = orig_method(msg) return last_pp - confused_node._serve_preprepare_request = types.MethodType(patched_method, + confused_node.req_handlers[PREPREPARE] = types.MethodType(patched_method, confused_node) # Delay PRE-PREPAREs by large amount simulating loss diff --git a/plenum/test/node_request/message_request/test_valid_message_request.py b/plenum/test/node_request/message_request/test_valid_message_request.py index 639481d951..10ccf4b4b3 100644 --- a/plenum/test/node_request/message_request/test_valid_message_request.py +++ b/plenum/test/node_request/message_request/test_valid_message_request.py @@ -1,16 +1,19 @@ import pytest +import time from plenum.common.constants import LEDGER_STATUS, CONSISTENCY_PROOF, \ - PREPREPARE + PREPREPARE, PROPAGATE from plenum.common.messages.node_messages import MessageReq, ChooseField, \ - AnyMapField, MessageRep, AnyField + AnyMapField, MessageRep, AnyField, LedgerStatus, ConsistencyProof, \ + PrePrepare, Propagate from plenum.common.types import f from plenum.test.helper import countDiscarded from stp_core.loop.eventually import eventually invalid_type_discard_log = "unknown value 'invalid_type'" -invalid_value_discard_log = "cannot serve request" +invalid_req_discard_log = "cannot serve request" +invalid_rep_discard_log = "cannot process requested message response" whitelist = [invalid_type_discard_log, ] @@ -18,8 +21,8 @@ patched_schema = ( (f.MSG_TYPE.nm, ChooseField(values={'invalid_type', LEDGER_STATUS, - CONSISTENCY_PROOF, - PREPREPARE})), + CONSISTENCY_PROOF, PREPREPARE, + PROPAGATE})), (f.PARAMS.nm, AnyMapField()) ) @@ -41,6 +44,45 @@ class PMessageRep(MessageRep): discard_counts = {} +pre_prepare_msg = PrePrepare( + 0, + 1, + 3, + time.time(), + [['4AdS22kC7xzb4bcqg9JATuCfAMNcQYcZa1u5eWzs6cSJ', 1499707723017300]], + 1, + 'f99937241d4c891c08e92a3cc25966607315ca66b51827b170d492962d58a9be', + 1, + 'CZecK1m7VYjSNCC7pGHj938DSW2tfbqoJp1bMJEtFqvG', + '7WrAMboPTcMaQCU1raoj28vnhu2bPMMd2Lr9tEcsXeCJ', + ) + +propagate_msg = Propagate(**{'request': + {'identifier': '5rArie7XKukPCaEwq5XGQJnM9Fc5aZE3M9HAPVfMU2xC', + 'signature': 'ZbZG68WiaK67eU3CsgpVi85jpgCztW9Yqe7D5ezDUfWbKdiPPVbWq4Tb5m4Ur3jcR5wJ8zmBUZXZudjvMN63Aa9', + 'operation': {'amount': 62, 'type': 'buy'}, + 'reqId': 1499782864169193}, + 'senderClient': '+DG1:vO9#de6?R?>:3RwdAXSdefgLLfxSoN4WMEe'}) + +bad_msgs = [ + (LEDGER_STATUS, {'p1': 'v1', 'p2': 'v2'}, LedgerStatus( + 1, 20, 1, 2, '77wuDUSr4FtAJzJbSqSW7bBw8bKAbra8ABSAjR72Nipq')), + (LEDGER_STATUS, {f.LEDGER_ID.nm: 100}, LedgerStatus( + 1, 20, 1, 2, '77wuDUSr4FtAJzJbSqSW7bBw8bKAbra8ABSAjR72Nipq')), + (CONSISTENCY_PROOF, {f.LEDGER_ID.nm: 1, f.SEQ_NO_START.nm: 10}, + ConsistencyProof(1, 2, 20, 1, 3, + 'BvmagFYpXAYNTuNW8Qssk9tMhEEPucLqL55YuwngUvMw', + 'Dce684wcwhV2wNZCuYTzdW9Kr13ZXFgiuAuAGibFZc4v', + ['58qasGZ9y3TB1pMz7ARKjJeccEbvbx6FT6g3NFnjYsTS'])), + (PREPREPARE, {f.INST_ID.nm: 1, f.VIEW_NO.nm: 0, f.SEQ_NO_START.nm: 10}, + pre_prepare_msg), + (PREPREPARE, {f.INST_ID.nm: -1, f.VIEW_NO.nm: 1, f.PP_SEQ_NO.nm: 10}, + pre_prepare_msg), + (PROPAGATE, {f.IDENTIFIER.nm: 'aa', f.REQ_ID.nm: 'fr'}, propagate_msg), + (PROPAGATE, {f.IDENTIFIER.nm: '4AdS22kC7xzb4bcqg9JATuCfAMNcQYcZa1u5eWzs6cSJ'}, propagate_msg), + (PROPAGATE, {f.REQ_ID.nm: 1499707723017300}, propagate_msg), + ] + def fill_counters(nodes, log_message): global discard_counts @@ -82,29 +124,31 @@ def test_node_reject_invalid_req_resp_type(looper, nodes): looper.run(eventually(chk, other_nodes, invalid_type_discard_log, retryWait=1)) -def test_node_reject_invalid_req_resp_params(looper, nodes): +def test_node_reject_invalid_req_params(looper, nodes): """ Node does not accept invalid `MessageReq`, with missing params. Also it does not accept invalid `MessageRep` """ - global discard_counts + global discard_counts, bad_msgs bad_node, other_nodes = nodes - bad_msgs = [ - patched_MessageReq()(LEDGER_STATUS, {'p1': 'v1', 'p2': 'v2'}), - patched_MessageReq()(CONSISTENCY_PROOF, {f.LEDGER_ID.nm: 1, - f.SEQ_NO_START.nm: 10}), - patched_MessageReq()(PREPREPARE, {f.INST_ID.nm: 1, - f.VIEW_NO.nm: 0, - f.SEQ_NO_START.nm: 10}), - patched_MessageReq()(PREPREPARE, {f.INST_ID.nm: -1, - f.VIEW_NO.nm: 1, - f.PP_SEQ_NO.nm: 10}), - patched_MessageReq()(LEDGER_STATUS, {f.LEDGER_ID.nm: 100}), - ] - for bad_msg in bad_msgs: - fill_counters(other_nodes, invalid_value_discard_log) - bad_node.send(bad_msg) - looper.run(eventually(chk, other_nodes, invalid_value_discard_log, + fill_counters(other_nodes, invalid_req_discard_log) + bad_node.send(patched_MessageReq()(*bad_msg[:2])) + looper.run(eventually(chk, other_nodes, invalid_req_discard_log, retryWait=1)) + + +def test_node_reject_invalid_resp_params(looper, nodes): + """ + Node does not accept invalid `MessageReq`, with missing params. + Also it does not accept invalid `MessageRep` + """ + global discard_counts, bad_msgs + bad_node, other_nodes = nodes + + for bad_msg in bad_msgs: + fill_counters(other_nodes, invalid_rep_discard_log) + bad_node.send(patched_MessageRep()(*bad_msg)) + looper.run(eventually(chk, other_nodes, invalid_rep_discard_log, + retryWait=1)) \ No newline at end of file diff --git a/plenum/test/node_request/test_propagate/test_node_lacks_finalised_requests.py b/plenum/test/node_request/test_propagate/test_node_lacks_finalised_requests.py new file mode 100644 index 0000000000..668e5408f3 --- /dev/null +++ b/plenum/test/node_request/test_propagate/test_node_lacks_finalised_requests.py @@ -0,0 +1,65 @@ +import pytest +from plenum.test.delayers import ppgDelay, req_delay +from plenum.test.helper import send_reqs_to_nodes_and_verify_all_replies +from plenum.test.pool_transactions.conftest import looper, clientAndWallet1, \ + client1, wallet1, client1Connected +from plenum.test.primary_selection.test_primary_selection_pool_txn import \ + ensure_pool_functional +from plenum.test.spy_helpers import get_count, getAllReturnVals +from plenum.test.test_node import getNonPrimaryReplicas + + +@pytest.fixture(scope='function', params=['client_requests', + 'no_client_requests']) +def setup(request, txnPoolNodeSet): + # Test once when client request is received and once when not received + + # Choosing a faulty node which is primary in neither instance, this helps + # in the that same PROPAGATEs are not requested again by the node + faulty_node = getNonPrimaryReplicas(txnPoolNodeSet, 0)[1].node + if request.param == 'client_requests': + # Long delay in PROPAGATEs + faulty_node.nodeIbStasher.delay(ppgDelay(90)) + return faulty_node, True + if request.param == 'no_client_requests': + # Long delay in PROPAGATEs + faulty_node.nodeIbStasher.delay(ppgDelay(90)) + # Long delay in Client Requests + faulty_node.clientIbStasher.delay(req_delay(90)) + return faulty_node, False + + +def test_node_request_propagates(looper, setup, txnPoolNodeSet, client1, + wallet1, client1Connected, request): + """ + One of node lacks sufficient propagates + """ + faulty_node, recv_client_requests = setup + + old_count_recv_ppg = get_count(faulty_node, faulty_node.processPropagate) + old_count_recv_req = get_count(faulty_node, faulty_node.processRequest) + old_count_request_propagates = get_count(faulty_node, faulty_node.request_propagates) + + sent_reqs = 5 + send_reqs_to_nodes_and_verify_all_replies(looper, wallet1, client1, sent_reqs) + + assert get_count(faulty_node, faulty_node.processPropagate) > old_count_recv_ppg + if recv_client_requests: + assert get_count(faulty_node, faulty_node.processRequest) > old_count_recv_req + else: + assert get_count(faulty_node, faulty_node.processRequest) == old_count_recv_req + + # Attempt to request PROPAGATEs was made twice, since the faulty node has 2 replicas + assert get_count(faulty_node, faulty_node.request_propagates) - old_count_request_propagates == 2 + + requested_propagate_counts = getAllReturnVals(faulty_node, + faulty_node.request_propagates) + + # The last attempt to request PROPAGATEs was not successful + assert requested_propagate_counts[0] == 0 + # The first attempt to request PROPAGATEs was successful as PROPAGATEs + # were requested for all nodes + assert requested_propagate_counts[1] == sent_reqs + + faulty_node.nodeIbStasher.reset_delays_and_process_delayeds() + ensure_pool_functional(looper, txnPoolNodeSet, wallet1, client1, 4) diff --git a/plenum/test/propagate/test_propagate_recvd_after_request.py b/plenum/test/propagate/test_propagate_recvd_after_request.py index fc3ec17320..819122be00 100644 --- a/plenum/test/propagate/test_propagate_recvd_after_request.py +++ b/plenum/test/propagate/test_propagate_recvd_after_request.py @@ -1,7 +1,7 @@ import pytest from stp_core.loop.eventually import eventually -from plenum.common.messages.node_messages import Propagate +from plenum.common.messages.node_messages import Propagate, MessageRep from plenum.test import waits from plenum.test.delayers import delay from plenum.test.propagate.helper import recvdRequest, recvdPropagate, \ @@ -15,6 +15,8 @@ def setup(nodeSet): A, B, C, D = nodeSet.nodes.values() # type: TestNode delay(Propagate, frm=[B, C, D], to=A, howlong=howlong) + # Delay MessageRep by long simulating loss as if Propagate is missing, it is requested + delay(MessageRep, frm=[B, C, D], to=A, howlong=10*howlong) def testPropagateRecvdAfterRequest(setup, looper, nodeSet, up, sent1): diff --git a/plenum/test/test_node.py b/plenum/test/test_node.py index 946fb0156e..9e348b4bdd 100644 --- a/plenum/test/test_node.py +++ b/plenum/test/test_node.py @@ -226,41 +226,43 @@ def getDomainReqHandler(self): self.states[DOMAIN_LEDGER_ID], self.reqProcessors) + node_spyables = [Node.handleOneNodeMsg, - Node.handleInvalidClientMsg, - Node.processRequest, - Node.processOrdered, - Node.postToClientInBox, - Node.postToNodeInBox, - "eatTestMsg", - Node.decidePrimaries, - Node.startViewChange, - Node.discard, - Node.reportSuspiciousNode, - Node.reportSuspiciousClient, - Node.processPropagate, - Node.propagate, - Node.forward, - Node.send, - Node.sendInstanceChange, - Node.processInstanceChange, - Node.checkPerformance, - Node.processStashedOrderedReqs, - Node.lost_master_primary, - Node.propose_view_change, - Node.getReplyFromLedger, - Node.recordAndPropagate, - Node.allLedgersCaughtUp, - Node.start_catchup, - Node.is_catchup_needed, - Node.no_more_catchups_needed, - Node.caught_up_for_current_view, - Node._check_view_change_completed, - Node.primary_selected, - Node.num_txns_caught_up_in_last_catchup, - Node.process_message_req, - Node.process_message_rep - ] + Node.handleInvalidClientMsg, + Node.processRequest, + Node.processOrdered, + Node.postToClientInBox, + Node.postToNodeInBox, + "eatTestMsg", + Node.decidePrimaries, + Node.startViewChange, + Node.discard, + Node.reportSuspiciousNode, + Node.reportSuspiciousClient, + Node.processPropagate, + Node.propagate, + Node.forward, + Node.send, + Node.sendInstanceChange, + Node.processInstanceChange, + Node.checkPerformance, + Node.processStashedOrderedReqs, + Node.lost_master_primary, + Node.propose_view_change, + Node.getReplyFromLedger, + Node.recordAndPropagate, + Node.allLedgersCaughtUp, + Node.start_catchup, + Node.is_catchup_needed, + Node.no_more_catchups_needed, + Node.caught_up_for_current_view, + Node._check_view_change_completed, + Node.primary_selected, + Node.num_txns_caught_up_in_last_catchup, + Node.process_message_req, + Node.process_message_rep, + Node.request_propagates + ] @spyable(methods=node_spyables)