Skip to content

Commit 60515a9

Browse files
authored
Node requests Propagates if needed (#269)
* removing unused config variables and imports * remove commented code * fix bug with checkpoint and gc after view change * Feature Added: Repo consolidation (#262) * [WIP] Repo merge Merged in ledger * [WIP] Repo merge Merged in state, stp * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * [WIP] Repo merge Test fixes * Update Jenkinsfile * Update setup.py * update test * Node requests PROPAGATEs If a node receives a PRE-PREPARE but does not enough finalised requests, it requests PROPAGATEs * use fixture params in test and update test with more checks * use correct type * add and update tests
1 parent fcfba66 commit 60515a9

20 files changed

+568
-303
lines changed

plenum/common/messages/fields.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(self, optional=False, nullable=False):
3535
self.optional = optional
3636
self.nullable = nullable
3737

38+
# TODO: `validate` should be renamed to `validation_error`
3839
def validate(self, val):
3940
"""
4041
Performs basic validation of field value and then passes it for
@@ -78,6 +79,9 @@ def _wrong_type_msg(self, val):
7879
"".format(types_str, type(val).__name__)
7980

8081

82+
# TODO: The fields below should be singleton.
83+
84+
8185
class AnyField(FieldBase):
8286
_base_types = (object,)
8387

plenum/common/messages/node_messages.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class Propagate(MessageBase):
113113
typename = PROPAGATE
114114
schema = (
115115
(f.REQUEST.nm, ClientMessageValidator(operation_schema_is_strict=True)),
116-
(f.SENDER_CLIENT.nm, NonEmptyStringField()),
116+
(f.SENDER_CLIENT.nm, NonEmptyStringField(nullable=True)),
117117
)
118118

119119

@@ -243,7 +243,6 @@ class CatchupReq(MessageBase):
243243

244244

245245
class CatchupRep(MessageBase):
246-
247246
typename = CATCHUP_REP
248247
schema = (
249248
(f.LEDGER_ID.nm, LedgerIdField()),
@@ -282,10 +281,11 @@ class MessageReq(MessageBase):
282281
"""
283282
Purpose: ask node for any message
284283
"""
284+
allowed_types = {LEDGER_STATUS, CONSISTENCY_PROOF, PREPREPARE,
285+
PROPAGATE}
285286
typename = MESSAGE_REQUEST
286287
schema = (
287-
(f.MSG_TYPE.nm, ChooseField(values={LEDGER_STATUS,
288-
CONSISTENCY_PROOF, PREPREPARE})),
288+
(f.MSG_TYPE.nm, ChooseField(values=allowed_types)),
289289
(f.PARAMS.nm, AnyMapField())
290290
)
291291

@@ -296,8 +296,7 @@ class MessageRep(MessageBase):
296296
"""
297297
typename = MESSAGE_RESPONSE
298298
schema = (
299-
(f.MSG_TYPE.nm, ChooseField(values={LEDGER_STATUS,
300-
CONSISTENCY_PROOF, PREPREPARE})),
299+
(f.MSG_TYPE.nm, ChooseField(values=MessageReq.allowed_types)),
301300
(f.PARAMS.nm, AnyMapField()),
302301
(f.MSG.nm, AnyField())
303302
)
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
from typing import Dict
2+
from typing import List
3+
4+
from plenum.common.constants import LEDGER_STATUS, PREPREPARE, CONSISTENCY_PROOF, \
5+
PROPAGATE
6+
from plenum.common.messages.fields import RequestIdentifierField
7+
from plenum.common.messages.node_messages import MessageReq, MessageRep, \
8+
LedgerStatus, PrePrepare, ConsistencyProof, Propagate
9+
from plenum.common.request import Request
10+
from plenum.common.types import f
11+
from plenum.server import replica
12+
from stp_core.common.log import getlogger
13+
14+
15+
logger = getlogger()
16+
17+
18+
class MessageReqProcessor:
19+
# This is a mixin, it's mixed with node.
20+
def __init__(self):
21+
self.validation_handlers = {
22+
LEDGER_STATUS: self._validate_requested_ledger_status,
23+
CONSISTENCY_PROOF: self._validate_requested_cons_proof,
24+
PREPREPARE: self._validate_requested_preprepare,
25+
PROPAGATE: self._validate_requested_propagate
26+
}
27+
28+
self.req_handlers = {
29+
LEDGER_STATUS: self._serve_ledger_status_request,
30+
CONSISTENCY_PROOF: self._serve_cons_proof_request,
31+
PREPREPARE: self._serve_preprepare_request,
32+
PROPAGATE: self._serve_propagate_request
33+
}
34+
35+
self.rep_handlers = {
36+
LEDGER_STATUS: self._process_requested_ledger_status,
37+
CONSISTENCY_PROOF: self._process_requested_cons_proof,
38+
PREPREPARE: self._process_requested_preprepare,
39+
PROPAGATE: self._process_requested_propagate
40+
}
41+
42+
def process_message_req(self, msg: MessageReq, frm):
43+
# Assumes a shared memory architecture. In case of multiprocessing,
44+
# RPC architecture, use deques to communicate the message and node will
45+
# maintain a unique internal message id to correlate responses.
46+
msg_type = msg.msg_type
47+
resp = self.req_handlers[msg_type](msg)
48+
49+
if resp is False:
50+
return
51+
52+
self.sendToNodes(MessageRep(**{
53+
f.MSG_TYPE.nm: msg_type,
54+
f.PARAMS.nm: msg.params,
55+
f.MSG.nm: resp
56+
}), names=[frm, ])
57+
58+
def process_message_rep(self, msg: MessageRep, frm):
59+
msg_type = msg.msg_type
60+
if msg.msg is None:
61+
logger.debug('{} got null response for requested {} from {}'.
62+
format(self, msg_type, frm))
63+
return
64+
return self.rep_handlers[msg_type](msg, frm)
65+
66+
def valid_requested_msg(self, msg_type, **kwargs):
67+
return self.validation_handlers[msg_type](**kwargs)
68+
69+
def request_msg(self, typ, params: Dict, frm: List[str]=None):
70+
self.sendToNodes(MessageReq(**{
71+
f.MSG_TYPE.nm: typ,
72+
f.PARAMS.nm: params
73+
}), names=frm)
74+
75+
def _validate_requested_ledger_status(self, **kwargs):
76+
if kwargs['ledger_id'] in self.ledger_ids:
77+
if 'ledger_status' in kwargs:
78+
try:
79+
return LedgerStatus(*kwargs['ledger_status'])
80+
except TypeError as ex:
81+
logger.warning(
82+
'{} could not create LEDGER_STATUS out of {}'.
83+
format(self, *kwargs['ledger_status']))
84+
else:
85+
return True
86+
87+
def _serve_ledger_status_request(self, msg):
88+
params = msg.params
89+
ledger_id = params.get(f.LEDGER_ID.nm)
90+
if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id):
91+
return self.getLedgerStatus(ledger_id)
92+
else:
93+
self.discard(msg, 'cannot serve request',
94+
logMethod=logger.debug)
95+
return False
96+
97+
def _process_requested_ledger_status(self, msg, frm):
98+
params = msg.params
99+
ledger_id = params.get(f.LEDGER_ID.nm)
100+
ledger_status = msg.msg
101+
ledger_status = self.valid_requested_msg(msg.msg_type,
102+
ledger_id=ledger_id,
103+
ledger_status=ledger_status)
104+
if ledger_status:
105+
self.ledgerManager.processLedgerStatus(ledger_status, frm=frm)
106+
return
107+
self.discard(msg,
108+
'cannot process requested message response',
109+
logMethod=logger.debug)
110+
111+
def _validate_requested_cons_proof(self, **kwargs):
112+
if kwargs['ledger_id'] in self.ledger_ids and \
113+
(isinstance(kwargs['seq_no_start'], int) and kwargs[
114+
'seq_no_start'] > 0) and \
115+
(isinstance(kwargs['seq_no_end'], int) and kwargs[
116+
'seq_no_end'] > 0):
117+
if 'cons_proof' in kwargs:
118+
try:
119+
return ConsistencyProof(*kwargs['cons_proof'])
120+
except TypeError as ex:
121+
logger.warning(
122+
'{} could not create CONSISTENCY_PROOF out of {}'.
123+
format(self, *kwargs['cons_proof']))
124+
else:
125+
return True
126+
127+
def _serve_cons_proof_request(self, msg):
128+
params = msg.params
129+
ledger_id = params.get(f.LEDGER_ID.nm)
130+
seq_no_start = params.get(f.SEQ_NO_START.nm)
131+
seq_no_end = params.get(f.SEQ_NO_END.nm)
132+
if self.valid_requested_msg(msg.msg_type, ledger_id=ledger_id,
133+
seq_no_start=seq_no_start,
134+
seq_no_end=seq_no_end):
135+
return self.ledgerManager._buildConsistencyProof(ledger_id,
136+
seq_no_start,
137+
seq_no_end)
138+
else:
139+
self.discard(msg, 'cannot serve request',
140+
logMethod=logger.debug)
141+
return False
142+
143+
def _process_requested_cons_proof(self, msg, frm):
144+
params = msg.params
145+
ledger_id = params.get(f.LEDGER_ID.nm)
146+
seq_no_start = params.get(f.SEQ_NO_START.nm)
147+
seq_no_end = params.get(f.SEQ_NO_END.nm)
148+
cons_proof = msg.msg
149+
cons_proof = self.valid_requested_msg(msg.msg_type,
150+
ledger_id=ledger_id,
151+
seq_no_start=seq_no_start,
152+
seq_no_end=seq_no_end,
153+
cons_proof=cons_proof)
154+
if cons_proof:
155+
self.ledgerManager.processConsistencyProof(cons_proof, frm=frm)
156+
return
157+
self.discard(msg,
158+
'cannot process requested message response',
159+
logMethod=logger.debug)
160+
161+
def _validate_requested_preprepare(self, **kwargs):
162+
if kwargs['inst_id'] in range(len(self.replicas)) and \
163+
kwargs['view_no'] == self.viewNo and \
164+
isinstance(kwargs['pp_seq_no'], int) and \
165+
kwargs['pp_seq_no'] > 0:
166+
if 'pp' in kwargs:
167+
try:
168+
pp = PrePrepare(*kwargs['pp'])
169+
if pp.instId != kwargs['inst_id'] or pp.viewNo != kwargs['view_no']:
170+
logger.warning('{} found PREPREPARE {} not satisfying '
171+
'query criteria'.format(self, *kwargs['pp']))
172+
return
173+
return pp
174+
except TypeError as ex:
175+
logger.warning(
176+
'{} could not create PREPREPARE out of {}'.
177+
format(self, *kwargs['pp']))
178+
else:
179+
return True
180+
181+
def _serve_preprepare_request(self, msg):
182+
params = msg.params
183+
inst_id = params.get(f.INST_ID.nm)
184+
view_no = params.get(f.VIEW_NO.nm)
185+
pp_seq_no = params.get(f.PP_SEQ_NO.nm)
186+
if self.valid_requested_msg(msg.msg_type, inst_id=inst_id,
187+
view_no=view_no, pp_seq_no=pp_seq_no):
188+
return self.replicas[inst_id].getPrePrepare(view_no, pp_seq_no)
189+
else:
190+
self.discard(msg, 'cannot serve request',
191+
logMethod=logger.debug)
192+
return False
193+
194+
def _process_requested_preprepare(self, msg, frm):
195+
params = msg.params
196+
inst_id = params.get(f.INST_ID.nm)
197+
view_no = params.get(f.VIEW_NO.nm)
198+
pp_seq_no = params.get(f.PP_SEQ_NO.nm)
199+
pp = msg.msg
200+
pp = self.valid_requested_msg(msg.msg_type, inst_id=inst_id,
201+
view_no=view_no, pp_seq_no=pp_seq_no,
202+
pp=pp)
203+
if pp:
204+
frm = replica.Replica.generateName(frm, inst_id)
205+
self.replicas[inst_id].process_requested_pre_prepare(pp,
206+
sender=frm)
207+
return
208+
self.discard(msg,
209+
'cannot process requested message response',
210+
logMethod=logger.debug)
211+
212+
def _validate_requested_propagate(self, **kwargs):
213+
if not (RequestIdentifierField().validate((kwargs['identifier'],
214+
kwargs['req_id']))):
215+
if 'propagate' in kwargs:
216+
try:
217+
ppg = Propagate(*kwargs['propagate'])
218+
if ppg.request[f.IDENTIFIER.nm] != kwargs['identifier'] or \
219+
ppg.request[f.REQ_ID.nm] != kwargs['req_id']:
220+
logger.warning('{} found PROPAGATE {} not '
221+
'satisfying query criteria'.format(self, *kwargs['ppg']))
222+
return
223+
return ppg
224+
except TypeError as ex:
225+
logger.warning(
226+
'{} could not create PROPAGATE out of {}'.
227+
format(self, *kwargs['propagate']))
228+
else:
229+
return True
230+
231+
def _serve_propagate_request(self, msg):
232+
params = msg.params
233+
identifier = params.get(f.IDENTIFIER.nm)
234+
req_id = params.get(f.REQ_ID.nm)
235+
if self.valid_requested_msg(msg.msg_type, identifier=identifier,
236+
req_id=req_id):
237+
req_key = (identifier, req_id)
238+
if req_key in self.requests and self.requests[req_key].finalised:
239+
sender_client = self.requestSender.get(req_key)
240+
req = self.requests[req_key].finalised
241+
return self.createPropagate(req, sender_client)
242+
else:
243+
self.discard(msg, 'cannot serve request',
244+
logMethod=logger.debug)
245+
return False
246+
247+
def _process_requested_propagate(self, msg, frm):
248+
params = msg.params
249+
identifier = params.get(f.IDENTIFIER.nm)
250+
req_id = params.get(f.REQ_ID.nm)
251+
ppg = msg.msg
252+
ppg = self.valid_requested_msg(msg.msg_type, identifier=identifier,
253+
req_id=req_id, propagate=ppg)
254+
if ppg:
255+
self.processPropagate(ppg, frm)
256+
else:
257+
self.discard(msg,
258+
'cannot process requested message response',
259+
logMethod=logger.debug)

0 commit comments

Comments
 (0)