Skip to content

Node requests Propagates if needed #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 11, 2017
4 changes: 4 additions & 0 deletions plenum/common/messages/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, optional=False, nullable=False):
self.optional = optional
self.nullable = nullable

# TODO: `validate` should be renamed to `validation_error`
def validate(self, val):
"""
Performs basic validation of field value and then passes it for
Expand Down Expand Up @@ -78,6 +79,9 @@ def _wrong_type_msg(self, val):
"".format(types_str, type(val).__name__)


# TODO: The fields below should be singleton.


class AnyField(FieldBase):
_base_types = (object,)

Expand Down
11 changes: 5 additions & 6 deletions plenum/common/messages/node_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class Propagate(MessageBase):
typename = PROPAGATE
schema = (
(f.REQUEST.nm, ClientMessageValidator(operation_schema_is_strict=True)),
(f.SENDER_CLIENT.nm, NonEmptyStringField()),
(f.SENDER_CLIENT.nm, NonEmptyStringField(nullable=True)),
)


Expand Down Expand Up @@ -243,7 +243,6 @@ class CatchupReq(MessageBase):


class CatchupRep(MessageBase):

typename = CATCHUP_REP
schema = (
(f.LEDGER_ID.nm, LedgerIdField()),
Expand Down Expand Up @@ -282,10 +281,11 @@ class MessageReq(MessageBase):
"""
Purpose: ask node for any message
"""
allowed_types = {LEDGER_STATUS, CONSISTENCY_PROOF, PREPREPARE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for MessageReq and MessageRep in plenum/test/input_validation/message_validation similar to other tests in this package. The tests are needed to check the contract of the messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests present in plenum/test/node_request/message_request/test_valid_message_request.py

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is integration tests, and I'm talking about unit tests to support Contracts. Please have a look at the tests at the package I mentioned, it will be very fast to add similar tests there.

Copy link
Contributor Author

@lovesh lovesh Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why write unit tests when there are working integration tests. Infact you can write a single test with expected fields for each message and they can be checked. Why copy the same damn boilerplate over and over, such a waste of time. You can see that those tests are copy pasted code as each has an un-necessary import and in each the same method is misspelled, test_hash_expected_type should be test_has_expected_type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TDD the tests is one of the main (and sometimes the only) peace of requirement.
You declare in tests how the should behaves, what is the API, and what is the contract.
If someone changes the Contract (Supported message types), then the test will fail indicating that we changed API/requirement. This is additional check that the change is intentional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying you should not have had the test but the test could be single test file like

expecteds = [
Propagate.typename:(PROPAGATE, OrderedDict([
    ("request", ClientMessageValidator),
    ("senderClient", NonEmptyStringField),
])),
ViewChangeDone.typename: (VIEW_CHANGE_DONE, OrderedDict([
    ("viewNo", NonNegativeNumberField),
    ("name", NonEmptyStringField),
    ("ledgerInfo", IterableField)
]))
]
def test_has_expected_type():
   for expected in expecteds:
       assert expected.name == ...

def test_has_expected_fields():
    for expected in expecteds:
        .....

def test_has_expected_validators():
    for expected in expecteds:
        .....

Something like this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind having the test as you proposed. Adding a test in a way as it's now (with a fixed import and misspelled) is also ok for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these integration tests are fine for now

PROPAGATE}
typename = MESSAGE_REQUEST
schema = (
(f.MSG_TYPE.nm, ChooseField(values={LEDGER_STATUS,
CONSISTENCY_PROOF, PREPREPARE})),
(f.MSG_TYPE.nm, ChooseField(values=allowed_types)),
(f.PARAMS.nm, AnyMapField())
)

Expand All @@ -296,8 +296,7 @@ class MessageRep(MessageBase):
"""
typename = MESSAGE_RESPONSE
schema = (
(f.MSG_TYPE.nm, ChooseField(values={LEDGER_STATUS,
CONSISTENCY_PROOF, PREPREPARE})),
(f.MSG_TYPE.nm, ChooseField(values=MessageReq.allowed_types)),
(f.PARAMS.nm, AnyMapField()),
(f.MSG.nm, AnyField())
)
Expand Down
259 changes: 259 additions & 0 deletions plenum/server/message_req_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
from typing import Dict
from typing import List

from plenum.common.constants import LEDGER_STATUS, PREPREPARE, CONSISTENCY_PROOF, \
PROPAGATE
from plenum.common.messages.fields import RequestIdentifierField
from plenum.common.messages.node_messages import MessageReq, MessageRep, \
LedgerStatus, PrePrepare, ConsistencyProof, Propagate
from plenum.common.request import Request
from plenum.common.types import f
from plenum.server import replica
from stp_core.common.log import getlogger


logger = getlogger()


class MessageReqProcessor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests (possibly unit ones) to check the validation of MsgReq messages. Consider negative cases as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests present in plenum/test/node_request/message_request/test_valid_message_request.py

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

# This is a mixin, it's mixed with node.
def __init__(self):
self.validation_handlers = {
LEDGER_STATUS: self._validate_requested_ledger_status,
CONSISTENCY_PROOF: self._validate_requested_cons_proof,
PREPREPARE: self._validate_requested_preprepare,
PROPAGATE: self._validate_requested_propagate
}

self.req_handlers = {
LEDGER_STATUS: self._serve_ledger_status_request,
CONSISTENCY_PROOF: self._serve_cons_proof_request,
PREPREPARE: self._serve_preprepare_request,
PROPAGATE: self._serve_propagate_request
}

self.rep_handlers = {
LEDGER_STATUS: self._process_requested_ledger_status,
CONSISTENCY_PROOF: self._process_requested_cons_proof,
PREPREPARE: self._process_requested_preprepare,
PROPAGATE: self._process_requested_propagate
}

def process_message_req(self, msg: MessageReq, frm):
# Assumes a shared memory architecture. In case of multiprocessing,
# RPC architecture, use deques to communicate the message and node will
# maintain a unique internal message id to correlate responses.
msg_type = msg.msg_type
resp = self.req_handlers[msg_type](msg)

if resp is False:
return

self.sendToNodes(MessageRep(**{
f.MSG_TYPE.nm: msg_type,
f.PARAMS.nm: msg.params,
f.MSG.nm: resp
}), names=[frm, ])

def process_message_rep(self, msg: MessageRep, frm):
msg_type = msg.msg_type
if msg.msg is None:
logger.debug('{} got null response for requested {} from {}'.
format(self, msg_type, frm))
return
return self.rep_handlers[msg_type](msg, frm)

def valid_requested_msg(self, msg_type, **kwargs):
return self.validation_handlers[msg_type](**kwargs)

def request_msg(self, typ, params: Dict, frm: List[str]=None):
self.sendToNodes(MessageReq(**{
f.MSG_TYPE.nm: typ,
f.PARAMS.nm: params
}), names=frm)

def _validate_requested_ledger_status(self, **kwargs):
if kwargs['ledger_id'] in self.ledger_ids:
if 'ledger_status' in kwargs:
try:
return LedgerStatus(*kwargs['ledger_status'])
except TypeError as ex:
logger.warning(
'{} could not create LEDGER_STATUS out of {}'.
format(self, *kwargs['ledger_status']))
else:
return True

def _serve_ledger_status_request(self, msg):
params = msg.params
ledger_id = params.get(f.LEDGER_ID.nm)
if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id):
return self.getLedgerStatus(ledger_id)
else:
self.discard(msg, 'cannot serve request',
logMethod=logger.debug)
return False

def _process_requested_ledger_status(self, msg, frm):
params = msg.params
ledger_id = params.get(f.LEDGER_ID.nm)
ledger_status = msg.msg
ledger_status = self.valid_requested_msg(msg.msg_type,
ledger_id=ledger_id,
ledger_status=ledger_status)
if ledger_status:
self.ledgerManager.processLedgerStatus(ledger_status, frm=frm)
return
self.discard(msg,
'cannot process requested message response',
logMethod=logger.debug)

def _validate_requested_cons_proof(self, **kwargs):
if kwargs['ledger_id'] in self.ledger_ids and \
(isinstance(kwargs['seq_no_start'], int) and kwargs[
'seq_no_start'] > 0) and \
(isinstance(kwargs['seq_no_end'], int) and kwargs[
'seq_no_end'] > 0):
if 'cons_proof' in kwargs:
try:
return ConsistencyProof(*kwargs['cons_proof'])
except TypeError as ex:
logger.warning(
'{} could not create CONSISTENCY_PROOF out of {}'.
format(self, *kwargs['cons_proof']))
else:
return True

def _serve_cons_proof_request(self, msg):
params = msg.params
ledger_id = params.get(f.LEDGER_ID.nm)
seq_no_start = params.get(f.SEQ_NO_START.nm)
seq_no_end = params.get(f.SEQ_NO_END.nm)
if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id,
seq_no_start=seq_no_start,
seq_no_end=seq_no_end):
return self.ledgerManager._buildConsistencyProof(ledger_id,
seq_no_start,
seq_no_end)
else:
self.discard(msg, 'cannot serve request',
logMethod=logger.debug)
return False

def _process_requested_cons_proof(self, msg, frm):
params = msg.params
ledger_id = params.get(f.LEDGER_ID.nm)
seq_no_start = params.get(f.SEQ_NO_START.nm)
seq_no_end = params.get(f.SEQ_NO_END.nm)
cons_proof = msg.msg
cons_proof = self.valid_requested_msg(msg.msg_type,
ledger_id=ledger_id,
seq_no_start=seq_no_start,
seq_no_end=seq_no_end,
cons_proof=cons_proof)
if cons_proof:
self.ledgerManager.processConsistencyProof(cons_proof, frm=frm)
return
self.discard(msg,
'cannot process requested message response',
logMethod=logger.debug)

def _validate_requested_preprepare(self, **kwargs):
if kwargs['inst_id'] in range(len(self.replicas)) and \
kwargs['view_no'] == self.viewNo and \
isinstance(kwargs['pp_seq_no'], int) and \
kwargs['pp_seq_no'] > 0:
if 'pp' in kwargs:
try:
pp = PrePrepare(*kwargs['pp'])
if pp.instId != kwargs['inst_id'] or pp.viewNo != kwargs['view_no']:
logger.warning('{} found PREPREPARE {} not satisfying '
'query criteria'.format(self, *kwargs['pp']))
return
return pp
except TypeError as ex:
logger.warning(
'{} could not create PREPREPARE out of {}'.
format(self, *kwargs['pp']))
else:
return True

def _serve_preprepare_request(self, msg):
params = msg.params
inst_id = params.get(f.INST_ID.nm)
view_no = params.get(f.VIEW_NO.nm)
pp_seq_no = params.get(f.PP_SEQ_NO.nm)
if self.valid_requested_msg(msg.msg_type, inst_id=inst_id,
view_no=view_no, pp_seq_no=pp_seq_no):
return self.replicas[inst_id].getPrePrepare(view_no, pp_seq_no)
else:
self.discard(msg, 'cannot serve request',
logMethod=logger.debug)
return False

def _process_requested_preprepare(self, msg, frm):
params = msg.params
inst_id = params.get(f.INST_ID.nm)
view_no = params.get(f.VIEW_NO.nm)
pp_seq_no = params.get(f.PP_SEQ_NO.nm)
pp = msg.msg
pp = self.valid_requested_msg(msg.msg_type, inst_id=inst_id,
view_no=view_no, pp_seq_no=pp_seq_no,
pp=pp)
if pp:
frm = replica.Replica.generateName(frm, inst_id)
self.replicas[inst_id].process_requested_pre_prepare(pp,
sender=frm)
return
self.discard(msg,
'cannot process requested message response',
logMethod=logger.debug)

def _validate_requested_propagate(self, **kwargs):
if not (RequestIdentifierField().validate((kwargs['identifier'],
kwargs['req_id']))):
if 'propagate' in kwargs:
try:
ppg = Propagate(*kwargs['propagate'])
if ppg.request[f.IDENTIFIER.nm] != kwargs['identifier'] or \
ppg.request[f.REQ_ID.nm] != kwargs['req_id']:
logger.warning('{} found PROPAGATE {} not '
'satisfying query criteria'.format(self, *kwargs['ppg']))
return
return ppg
except TypeError as ex:
logger.warning(
'{} could not create PROPAGATE out of {}'.
format(self, *kwargs['propagate']))
else:
return True

def _serve_propagate_request(self, msg):
params = msg.params
identifier = params.get(f.IDENTIFIER.nm)
req_id = params.get(f.REQ_ID.nm)
if self.valid_requested_msg(msg.msg_type, identifier=identifier,
req_id=req_id):
req_key = (identifier, req_id)
if req_key in self.requests and self.requests[req_key].finalised:
sender_client = self.requestSender.get(req_key)
req = self.requests[req_key].finalised
return self.createPropagate(req, sender_client)
else:
self.discard(msg, 'cannot serve request',
logMethod=logger.debug)
return False

def _process_requested_propagate(self, msg, frm):
params = msg.params
identifier = params.get(f.IDENTIFIER.nm)
req_id = params.get(f.REQ_ID.nm)
ppg = msg.msg
ppg = self.valid_requested_msg(msg.msg_type, identifier=identifier,
req_id=req_id, propagate=ppg)
if ppg:
self.processPropagate(ppg, frm)
else:
self.discard(msg,
'cannot process requested message response',
logMethod=logger.debug)
Loading