Skip to content

Commit fdf7323

Browse files
authored
Merge pull request #819 from spivachuk/forced-upgrade-fix
INDY-1447: Fixed handling of forced requests
2 parents 46fb17e + 0dce6b2 commit fdf7323

File tree

6 files changed

+95
-57
lines changed

6 files changed

+95
-57
lines changed

plenum/persistence/req_id_to_txn.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import string
2-
from typing import Optional
32

43
from storage.kv_store import KeyValueStorage
54

@@ -14,13 +13,13 @@ class ReqIdrToTxn:
1413
def __init__(self, keyValueStorage: KeyValueStorage):
1514
self._keyValueStorage = keyValueStorage
1615

17-
def add(self, digest, ledge_id, seq_no):
18-
self._keyValueStorage.put(digest, self._create_value(ledge_id, seq_no))
16+
def add(self, digest, ledger_id, seq_no):
17+
self._keyValueStorage.put(digest, self._create_value(ledger_id, seq_no))
1918

2019
def addBatch(self, batch):
21-
self._keyValueStorage.setBatch([(digest, self._create_value(ledge_id,
20+
self._keyValueStorage.setBatch([(digest, self._create_value(ledger_id,
2221
seq_no))
23-
for digest, ledge_id, seq_no in batch])
22+
for digest, ledger_id, seq_no in batch])
2423

2524
def get(self, digest):
2625
"""
@@ -37,7 +36,7 @@ def get(self, digest):
3736

3837
def _parse_value(self, val: string):
3938
parse_data = val.split(self.delimiter)
40-
return str(parse_data[0]), int(parse_data[1])
39+
return int(parse_data[0]), int(parse_data[1])
4140

4241
def _create_value(self, ledger_id, seq_no):
4342
return str(ledger_id) + self.delimiter + str(seq_no)

plenum/server/ledger_req_handler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from abc import ABCMeta, abstractmethod
22
from typing import List
33

4-
from common.exceptions import PlenumValueError
4+
from common.exceptions import PlenumValueError, LogicError
55
from common.serializers.serialization import state_roots_serializer
66
from stp_core.common.log import getlogger
77

@@ -66,6 +66,11 @@ def commit(self, txnCount, stateRoot, txnRoot, ppTime) -> List:
6666
return self._commit(self.ledger, self.state, txnCount, stateRoot,
6767
txnRoot, ppTime, ts_store=self.ts_store)
6868

69+
def applyForced(self, req: Request):
70+
if not req.isForced():
71+
raise LogicError('requestHandler.applyForce method is called '
72+
'for not forced request: {}'.format(req))
73+
6974
def onBatchCreated(self, state_root):
7075
pass
7176

plenum/server/node.py

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,32 +2130,46 @@ def processRequest(self, request: Request, frm: str):
21302130
# TODO: What if the reply was a REQNACK? Its not gonna be found in the
21312131
# replies.
21322132

2133-
if request.operation[TXN_TYPE] == GET_TXN:
2133+
txn_type = request.operation[TXN_TYPE]
2134+
2135+
if self.is_action(txn_type):
2136+
self.process_action(request, frm)
2137+
2138+
elif txn_type == GET_TXN:
21342139
self.handle_get_txn_req(request, frm)
21352140
self.total_read_request_number += 1
2136-
return
2137-
2138-
ledger_id = self.ledger_id_for_request(request)
2139-
ledger = self.getLedger(ledger_id)
21402141

2141-
if self.is_query(request.operation[TXN_TYPE]):
2142+
elif self.is_query(txn_type):
21422143
self.process_query(request, frm)
2143-
return
2144+
self.total_read_request_number += 1
21442145

2145-
reply = self.getReplyFromLedger(ledger, request)
2146-
if reply:
2147-
logger.debug("{} returning REPLY from already processed "
2148-
"REQUEST: {}".format(self, request))
2149-
self.transmitToClient(reply, frm)
2150-
return
2146+
elif self.is_txn_writable(txn_type):
2147+
reply = self.getReplyFromLedgerForRequest(request)
2148+
if reply:
2149+
logger.debug("{} returning REPLY from already processed "
2150+
"REQUEST: {}".format(self, request))
2151+
self.transmitToClient(reply, frm)
2152+
return
21512153

2152-
# If the node is not already processing the request
2153-
if not self.isProcessingReq(request.key):
2154-
self.startedProcessingReq(request.key, frm)
2155-
# If not already got the propagate request(PROPAGATE) for the
2156-
# corresponding client request(REQUEST)
2157-
self.recordAndPropagate(request, frm)
2158-
self.send_ack_to_client((request.identifier, request.reqId), frm)
2154+
# If the node is not already processing the request
2155+
if not self.isProcessingReq(request.key):
2156+
self.startedProcessingReq(request.key, frm)
2157+
if request.isForced():
2158+
# forced request should be processed before consensus
2159+
req_handler = self.get_req_handler(txn_type=txn_type)
2160+
req_handler.validate(request)
2161+
req_handler.applyForced(request)
2162+
2163+
# If not already got the propagate request(PROPAGATE) for the
2164+
# corresponding client request(REQUEST)
2165+
self.recordAndPropagate(request, frm)
2166+
self.send_ack_to_client((request.identifier, request.reqId), frm)
2167+
2168+
else:
2169+
raise InvalidClientRequest(
2170+
request.identifier,
2171+
request.reqId,
2172+
'Pool is in readonly mode, try again in 60 seconds')
21592173

21602174
def is_query(self, txn_type) -> bool:
21612175
# Does the transaction type correspond to a read?
@@ -2167,6 +2181,9 @@ def is_query(self, txn_type) -> bool:
21672181
def is_action(self, txn_type) -> bool:
21682182
return txn_type in self.actionReqHandler.operation_types
21692183

2184+
def is_txn_writable(self, txn_type):
2185+
return True
2186+
21702187
def process_query(self, request: Request, frm: str):
21712188
# Process a read request from client
21722189
handler = self.get_req_handler(txn_type=request.operation[TXN_TYPE])
@@ -2213,13 +2230,19 @@ def processPropagate(self, msg: Propagate, frm):
22132230

22142231
if not self.isProcessingReq(request.key):
22152232
ledger_id, seq_no = self.seqNoDB.get(request.key)
2216-
if seq_no is not None:
2233+
if ledger_id is not None and seq_no is not None:
22172234
logger.debug("{} ignoring propagated request {} "
22182235
"since it has been already ordered"
22192236
.format(self.name, msg))
22202237
return
22212238

22222239
self.startedProcessingReq(request.key, clientName)
2240+
if request.isForced():
2241+
# forced request should be processed before consensus
2242+
req_handler = self.get_req_handler(
2243+
txn_type=request.operation[TXN_TYPE])
2244+
req_handler.validate(request)
2245+
req_handler.applyForced(request)
22232246

22242247
else:
22252248
if clientName is not None and \
@@ -2264,17 +2287,20 @@ def handle_get_txn_req(self, request: Request, frm: str):
22642287
'Invalid ledger id {}'.format(ledger_id),
22652288
frm)
22662289
return
2290+
22672291
seq_no = request.operation.get(DATA)
22682292
self.send_ack_to_client((request.identifier, request.reqId), frm)
22692293
ledger = self.getLedger(ledger_id)
2294+
22702295
try:
2271-
txn = self.getReplyFromLedger(ledger=ledger,
2272-
seq_no=seq_no)
2296+
txn = self.getReplyFromLedger(ledger, seq_no)
22732297
except KeyError:
2298+
txn = None
2299+
2300+
if txn is None:
22742301
logger.debug(
22752302
"{} can not handle GET_TXN request: ledger doesn't "
22762303
"have txn with seqNo={}". format(self, str(seq_no)))
2277-
txn = None
22782304

22792305
result = {
22802306
f.IDENTIFIER.nm: request.identifier,
@@ -3029,17 +3055,24 @@ def sendToNodes(self, msg: Any, names: Iterable[str] = None, message_splitter=No
30293055
) if r.name in names] if names else []
30303056
self.send(msg, *rids, message_splitter=message_splitter)
30313057

3032-
def getReplyFromLedger(self, ledger, request=None, seq_no=None):
3058+
def getReplyFromLedgerForRequest(self, request):
3059+
ledger_id, seq_no = self.seqNoDB.get(request.digest)
3060+
if ledger_id is not None and seq_no is not None:
3061+
ledger = self.getLedger(ledger_id)
3062+
return self.getReplyFromLedger(ledger, seq_no)
3063+
else:
3064+
return None
3065+
3066+
def getReplyFromLedger(self, ledger, seq_no):
30333067
# DoS attack vector, client requesting already processed request id
30343068
# results in iterating over ledger (or its subset)
3035-
if seq_no is None:
3036-
ledger_id, seq_no = self.seqNoDB.get(request.digest)
3037-
if seq_no:
3038-
txn = ledger.getBySeqNo(int(seq_no))
3039-
if txn:
3040-
txn.update(ledger.merkleInfo(seq_no))
3041-
txn = self.update_txn_with_extra_data(txn)
3042-
return Reply(txn)
3069+
txn = ledger.getBySeqNo(int(seq_no))
3070+
if txn:
3071+
txn.update(ledger.merkleInfo(seq_no))
3072+
txn = self.update_txn_with_extra_data(txn)
3073+
return Reply(txn)
3074+
else:
3075+
return None
30433076

30443077
def update_txn_with_extra_data(self, txn):
30453078
"""

plenum/test/node_request/test_already_processed_request.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ def get_method_call_count(method):
1919
assert len(counts) == 1
2020
return counts.pop()
2121

22-
def get_getReplyFromLedger_call_count():
22+
def get_getReplyFromLedgerForRequest_call_count():
2323
return get_method_call_count(
24-
next(iter(txnPoolNodeSet)).getReplyFromLedger)
24+
next(iter(txnPoolNodeSet)).getReplyFromLedgerForRequest)
2525

2626
def get_recordAndPropagate_call_count():
2727
return get_method_call_count(
@@ -30,13 +30,13 @@ def get_recordAndPropagate_call_count():
3030
def get_last_returned_val():
3131
rvs = []
3232
for node in txnPoolNodeSet:
33-
rv = getAllReturnVals(node, node.getReplyFromLedger)
33+
rv = getAllReturnVals(node, node.getReplyFromLedgerForRequest)
3434
rvs.append(rv[0])
3535
# All items are same in the list
3636
assert rvs.count(rvs[0]) == len(txnPoolNodeSet)
3737
return rvs[0]
3838

39-
rlc1 = get_getReplyFromLedger_call_count()
39+
rlc1 = get_getReplyFromLedgerForRequest_call_count()
4040
rpc1 = get_recordAndPropagate_call_count()
4141

4242
# Request which will be send twice
@@ -50,24 +50,24 @@ def get_last_returned_val():
5050
sdk_check_reply(req_res)
5151
first_req_id = request1[0][0]['reqId']
5252

53-
rlc2 = get_getReplyFromLedger_call_count()
53+
rlc2 = get_getReplyFromLedgerForRequest_call_count()
5454
rpc2 = get_recordAndPropagate_call_count()
55-
assert rlc2 - rlc1 == 1 # getReplyFromLedger was called
55+
assert rlc2 - rlc1 == 1 # getReplyFromLedgerForRequest was called
5656
assert rpc2 - rpc1 == 1 # recordAndPropagate was called
5757
r1 = get_last_returned_val()
58-
assert r1 is None # getReplyFromLedger returned None since had not seen request
58+
assert r1 is None # getReplyFromLedgerForRequest returned None since had not seen request
5959

6060
# Request which we will send only once
6161
request2 = sdk_send_random_and_check(looper, txnPoolNodeSet, sdk_pool_handle, sdk_wallet_client, 1)
6262
second_req_id = request2[0][0]['reqId']
6363

6464
assert second_req_id != first_req_id
65-
rlc3 = get_getReplyFromLedger_call_count()
65+
rlc3 = get_getReplyFromLedgerForRequest_call_count()
6666
rpc3 = get_recordAndPropagate_call_count()
67-
assert rlc3 - rlc2 == 1 # getReplyFromLedger was called again
67+
assert rlc3 - rlc2 == 1 # getReplyFromLedgerForRequest was called again
6868
assert rpc3 - rpc2 == 1 # recordAndPropagate was called again
6969
r2 = get_last_returned_val()
70-
assert r2 is None # getReplyFromLedger returned None since had not seen request
70+
assert r2 is None # getReplyFromLedgerForRequest returned None since had not seen request
7171

7272
# Reply for the first request, which is going to be sent again
7373
rep1 = request1[0][1]['result']
@@ -79,12 +79,12 @@ def get_last_returned_val():
7979
third_req_id = request3[0][0]['reqId']
8080

8181
assert third_req_id == first_req_id
82-
rlc4 = get_getReplyFromLedger_call_count()
82+
rlc4 = get_getReplyFromLedgerForRequest_call_count()
8383
rpc4 = get_recordAndPropagate_call_count()
84-
assert rlc4 - rlc3 == 1 # getReplyFromLedger was called again
84+
assert rlc4 - rlc3 == 1 # getReplyFromLedgerForRequest was called again
8585
assert rpc4 - rpc3 == 0 # recordAndPropagate was not called
8686
r3 = get_last_returned_val()
87-
# getReplyFromLedger did not return None this time since had seen request
87+
# getReplyFromLedgerForRequest did not return None this time since had seen request
8888
assert r3 is not None
8989
rep3 = request3[0][1]['result']
9090

plenum/test/node_request/test_req_idr_to_txn.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ def req_ids_to_txn(tconf):
2121

2222
def test_req_id_to_txn_add(req_ids_to_txn):
2323
digest = "random_req_digest"
24-
ledge_id = "1"
24+
ledger_id = 1
2525
seq_no = 123
26-
req_ids_to_txn.add(digest, ledge_id, seq_no)
26+
req_ids_to_txn.add(digest, ledger_id, seq_no)
2727
new_ledger_id, new_seq_no = req_ids_to_txn.get(digest)
28-
assert new_ledger_id == ledge_id
28+
assert new_ledger_id == ledger_id
2929
assert seq_no == seq_no
3030

3131

3232
def test_req_id_to_txn_add_batch(req_ids_to_txn):
3333
batch = [("random_req_digest" + str(index),
34-
"1",
34+
1,
3535
123 + index)
3636
for index in range(3)]
3737
req_ids_to_txn.addBatch(batch)

plenum/test/test_node.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ def processRequest(self, request, frm):
335335
Node.lost_master_primary,
336336
Node.propose_view_change,
337337
Node.getReplyFromLedger,
338+
Node.getReplyFromLedgerForRequest,
338339
Node.recordAndPropagate,
339340
Node.allLedgersCaughtUp,
340341
Node.start_catchup,

0 commit comments

Comments
 (0)