-
Notifications
You must be signed in to change notification settings - Fork 377
Node requests Propagates if needed #269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d1adde1
f40246e
fe45986
9e2cac0
95c3621
4603244
64b96ae
03318e4
259f816
da239de
fa9b973
15d34f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,259 @@ | ||
from typing import Dict | ||
from typing import List | ||
|
||
from plenum.common.constants import LEDGER_STATUS, PREPREPARE, CONSISTENCY_PROOF, \ | ||
PROPAGATE | ||
from plenum.common.messages.fields import RequestIdentifierField | ||
from plenum.common.messages.node_messages import MessageReq, MessageRep, \ | ||
LedgerStatus, PrePrepare, ConsistencyProof, Propagate | ||
from plenum.common.request import Request | ||
from plenum.common.types import f | ||
from plenum.server import replica | ||
from stp_core.common.log import getlogger | ||
|
||
|
||
logger = getlogger() | ||
|
||
|
||
class MessageReqProcessor: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add tests (possibly unit ones) to check the validation of MsgReq messages. Consider negative cases as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are tests present in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok |
||
# This is a mixin, it's mixed with node. | ||
def __init__(self): | ||
self.validation_handlers = { | ||
LEDGER_STATUS: self._validate_requested_ledger_status, | ||
CONSISTENCY_PROOF: self._validate_requested_cons_proof, | ||
PREPREPARE: self._validate_requested_preprepare, | ||
PROPAGATE: self._validate_requested_propagate | ||
} | ||
|
||
self.req_handlers = { | ||
LEDGER_STATUS: self._serve_ledger_status_request, | ||
CONSISTENCY_PROOF: self._serve_cons_proof_request, | ||
PREPREPARE: self._serve_preprepare_request, | ||
PROPAGATE: self._serve_propagate_request | ||
} | ||
|
||
self.rep_handlers = { | ||
LEDGER_STATUS: self._process_requested_ledger_status, | ||
CONSISTENCY_PROOF: self._process_requested_cons_proof, | ||
PREPREPARE: self._process_requested_preprepare, | ||
PROPAGATE: self._process_requested_propagate | ||
} | ||
|
||
def process_message_req(self, msg: MessageReq, frm): | ||
# Assumes a shared memory architecture. In case of multiprocessing, | ||
# RPC architecture, use deques to communicate the message and node will | ||
# maintain a unique internal message id to correlate responses. | ||
msg_type = msg.msg_type | ||
resp = self.req_handlers[msg_type](msg) | ||
|
||
if resp is False: | ||
return | ||
|
||
self.sendToNodes(MessageRep(**{ | ||
f.MSG_TYPE.nm: msg_type, | ||
f.PARAMS.nm: msg.params, | ||
f.MSG.nm: resp | ||
}), names=[frm, ]) | ||
|
||
def process_message_rep(self, msg: MessageRep, frm): | ||
msg_type = msg.msg_type | ||
if msg.msg is None: | ||
logger.debug('{} got null response for requested {} from {}'. | ||
format(self, msg_type, frm)) | ||
return | ||
return self.rep_handlers[msg_type](msg, frm) | ||
|
||
def valid_requested_msg(self, msg_type, **kwargs): | ||
return self.validation_handlers[msg_type](**kwargs) | ||
|
||
def request_msg(self, typ, params: Dict, frm: List[str]=None): | ||
self.sendToNodes(MessageReq(**{ | ||
f.MSG_TYPE.nm: typ, | ||
f.PARAMS.nm: params | ||
}), names=frm) | ||
|
||
def _validate_requested_ledger_status(self, **kwargs): | ||
if kwargs['ledger_id'] in self.ledger_ids: | ||
if 'ledger_status' in kwargs: | ||
try: | ||
return LedgerStatus(*kwargs['ledger_status']) | ||
except TypeError as ex: | ||
logger.warning( | ||
'{} could not create LEDGER_STATUS out of {}'. | ||
format(self, *kwargs['ledger_status'])) | ||
else: | ||
return True | ||
|
||
def _serve_ledger_status_request(self, msg): | ||
params = msg.params | ||
ledger_id = params.get(f.LEDGER_ID.nm) | ||
if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id): | ||
return self.getLedgerStatus(ledger_id) | ||
else: | ||
self.discard(msg, 'cannot serve request', | ||
logMethod=logger.debug) | ||
return False | ||
|
||
def _process_requested_ledger_status(self, msg, frm): | ||
params = msg.params | ||
ledger_id = params.get(f.LEDGER_ID.nm) | ||
ledger_status = msg.msg | ||
ledger_status = self.valid_requested_msg(msg.msg_type, | ||
ledger_id=ledger_id, | ||
ledger_status=ledger_status) | ||
if ledger_status: | ||
self.ledgerManager.processLedgerStatus(ledger_status, frm=frm) | ||
return | ||
self.discard(msg, | ||
'cannot process requested message response', | ||
logMethod=logger.debug) | ||
|
||
def _validate_requested_cons_proof(self, **kwargs): | ||
if kwargs['ledger_id'] in self.ledger_ids and \ | ||
(isinstance(kwargs['seq_no_start'], int) and kwargs[ | ||
'seq_no_start'] > 0) and \ | ||
(isinstance(kwargs['seq_no_end'], int) and kwargs[ | ||
'seq_no_end'] > 0): | ||
if 'cons_proof' in kwargs: | ||
try: | ||
return ConsistencyProof(*kwargs['cons_proof']) | ||
except TypeError as ex: | ||
logger.warning( | ||
'{} could not create CONSISTENCY_PROOF out of {}'. | ||
format(self, *kwargs['cons_proof'])) | ||
else: | ||
return True | ||
|
||
def _serve_cons_proof_request(self, msg): | ||
params = msg.params | ||
ledger_id = params.get(f.LEDGER_ID.nm) | ||
seq_no_start = params.get(f.SEQ_NO_START.nm) | ||
seq_no_end = params.get(f.SEQ_NO_END.nm) | ||
if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id, | ||
seq_no_start=seq_no_start, | ||
seq_no_end=seq_no_end): | ||
return self.ledgerManager._buildConsistencyProof(ledger_id, | ||
seq_no_start, | ||
seq_no_end) | ||
else: | ||
self.discard(msg, 'cannot serve request', | ||
logMethod=logger.debug) | ||
return False | ||
|
||
def _process_requested_cons_proof(self, msg, frm): | ||
params = msg.params | ||
ledger_id = params.get(f.LEDGER_ID.nm) | ||
seq_no_start = params.get(f.SEQ_NO_START.nm) | ||
seq_no_end = params.get(f.SEQ_NO_END.nm) | ||
cons_proof = msg.msg | ||
cons_proof = self.valid_requested_msg(msg.msg_type, | ||
ledger_id=ledger_id, | ||
seq_no_start=seq_no_start, | ||
seq_no_end=seq_no_end, | ||
cons_proof=cons_proof) | ||
if cons_proof: | ||
self.ledgerManager.processConsistencyProof(cons_proof, frm=frm) | ||
return | ||
self.discard(msg, | ||
'cannot process requested message response', | ||
logMethod=logger.debug) | ||
|
||
def _validate_requested_preprepare(self, **kwargs): | ||
if kwargs['inst_id'] in range(len(self.replicas)) and \ | ||
kwargs['view_no'] == self.viewNo and \ | ||
isinstance(kwargs['pp_seq_no'], int) and \ | ||
kwargs['pp_seq_no'] > 0: | ||
if 'pp' in kwargs: | ||
try: | ||
pp = PrePrepare(*kwargs['pp']) | ||
if pp.instId != kwargs['inst_id'] or pp.viewNo != kwargs['view_no']: | ||
logger.warning('{} found PREPREPARE {} not satisfying ' | ||
'query criteria'.format(self, *kwargs['pp'])) | ||
return | ||
return pp | ||
except TypeError as ex: | ||
logger.warning( | ||
'{} could not create PREPREPARE out of {}'. | ||
format(self, *kwargs['pp'])) | ||
else: | ||
return True | ||
|
||
def _serve_preprepare_request(self, msg): | ||
params = msg.params | ||
inst_id = params.get(f.INST_ID.nm) | ||
view_no = params.get(f.VIEW_NO.nm) | ||
pp_seq_no = params.get(f.PP_SEQ_NO.nm) | ||
if self.valid_requested_msg(msg.msg_type, inst_id=inst_id, | ||
view_no=view_no, pp_seq_no=pp_seq_no): | ||
return self.replicas[inst_id].getPrePrepare(view_no, pp_seq_no) | ||
else: | ||
self.discard(msg, 'cannot serve request', | ||
logMethod=logger.debug) | ||
return False | ||
|
||
def _process_requested_preprepare(self, msg, frm): | ||
params = msg.params | ||
inst_id = params.get(f.INST_ID.nm) | ||
view_no = params.get(f.VIEW_NO.nm) | ||
pp_seq_no = params.get(f.PP_SEQ_NO.nm) | ||
pp = msg.msg | ||
pp = self.valid_requested_msg(msg.msg_type, inst_id=inst_id, | ||
view_no=view_no, pp_seq_no=pp_seq_no, | ||
pp=pp) | ||
if pp: | ||
frm = replica.Replica.generateName(frm, inst_id) | ||
self.replicas[inst_id].process_requested_pre_prepare(pp, | ||
sender=frm) | ||
return | ||
self.discard(msg, | ||
'cannot process requested message response', | ||
logMethod=logger.debug) | ||
|
||
def _validate_requested_propagate(self, **kwargs): | ||
if not (RequestIdentifierField().validate((kwargs['identifier'], | ||
kwargs['req_id']))): | ||
if 'propagate' in kwargs: | ||
try: | ||
ppg = Propagate(*kwargs['propagate']) | ||
if ppg.request[f.IDENTIFIER.nm] != kwargs['identifier'] or \ | ||
ppg.request[f.REQ_ID.nm] != kwargs['req_id']: | ||
logger.warning('{} found PROPAGATE {} not ' | ||
'satisfying query criteria'.format(self, *kwargs['ppg'])) | ||
return | ||
return ppg | ||
except TypeError as ex: | ||
logger.warning( | ||
'{} could not create PROPAGATE out of {}'. | ||
format(self, *kwargs['propagate'])) | ||
else: | ||
return True | ||
|
||
def _serve_propagate_request(self, msg): | ||
params = msg.params | ||
identifier = params.get(f.IDENTIFIER.nm) | ||
req_id = params.get(f.REQ_ID.nm) | ||
if self.valid_requested_msg(msg.msg_type, identifier=identifier, | ||
req_id=req_id): | ||
req_key = (identifier, req_id) | ||
if req_key in self.requests and self.requests[req_key].finalised: | ||
sender_client = self.requestSender.get(req_key) | ||
req = self.requests[req_key].finalised | ||
return self.createPropagate(req, sender_client) | ||
else: | ||
self.discard(msg, 'cannot serve request', | ||
logMethod=logger.debug) | ||
return False | ||
|
||
def _process_requested_propagate(self, msg, frm): | ||
params = msg.params | ||
identifier = params.get(f.IDENTIFIER.nm) | ||
req_id = params.get(f.REQ_ID.nm) | ||
ppg = msg.msg | ||
ppg = self.valid_requested_msg(msg.msg_type, identifier=identifier, | ||
req_id=req_id, propagate=ppg) | ||
if ppg: | ||
self.processPropagate(ppg, frm) | ||
else: | ||
self.discard(msg, | ||
'cannot process requested message response', | ||
logMethod=logger.debug) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test for
MessageReq
andMessageRep
inplenum/test/input_validation/message_validation
similar to other tests in this package. The tests are needed to check the contract of the messages.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are tests present in
plenum/test/node_request/message_request/test_valid_message_request.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this is integration tests, and I'm talking about unit tests to support Contracts. Please have a look at the tests at the package I mentioned, it will be very fast to add similar tests there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why write unit tests when there are working integration tests. Infact you can write a single test with expected fields for each message and they can be checked. Why copy the same damn boilerplate over and over, such a waste of time. You can see that those tests are copy pasted code as each has an un-necessary import and in each the same method is misspelled,
test_hash_expected_type
should betest_has_expected_type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In TDD the tests is one of the main (and sometimes the only) peace of requirement.
You declare in tests how the should behaves, what is the API, and what is the contract.
If someone changes the Contract (Supported message types), then the test will fail indicating that we changed API/requirement. This is additional check that the change is intentional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not saying you should not have had the test but the test could be single test file like
Something like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind having the test as you proposed. Adding a test in a way as it's now (with a fixed import and misspelled) is also ok for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these integration tests are fine for now