Skip to content

Commit cb7d068

Browse files
AlexanderShekhovcovashcherbakov
authored andcommitted
Limit length of batches (#367)
* Very basic implementation * Finish implementation * Enable splitting for the Node * Add logging * Refactor tests
1 parent e690cb5 commit cb7d068

File tree

6 files changed

+208
-17
lines changed

6 files changed

+208
-17
lines changed

plenum/common/batched.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Any, Iterable
33

44
from plenum.common.constants import BATCH, OP_FIELD_NAME
5+
from plenum.common.prepare_batch import split_messages_on_batches
56
from stp_core.common.constants import CONNECTION_PREFIX
67
from stp_core.crypto.signer import Signer
78
from stp_core.common.log import getlogger
@@ -98,23 +99,24 @@ def flushOutBoxes(self) -> None:
9899
"{} batching {} msgs to {} into one transmission".
99100
format(self, len(msgs), dest))
100101
logger.trace(" messages: {}".format(msgs))
101-
batch = Batch(list(msgs), None)
102+
batches = split_messages_on_batches(list(msgs),
103+
self._make_batch,
104+
self._test_batch_len,
105+
)
102106
msgs.clear()
103-
# don't need to sign the batch, when the composed msgs are
104-
# signed
105-
payload, err_msg = self.signAndSerialize(batch)
106-
if payload is not None:
107-
logger.trace("{} sending payload to {}: {}".format(
108-
self, dest, payload))
109-
# Setting timeout to never expire
110-
self.transmit(
111-
payload,
112-
rid,
113-
timeout=self.messageTimeout,
114-
serialized=True)
107+
if batches:
108+
for batch in batches:
109+
logger.trace("{} sending payload to {}: {}".format(
110+
self, dest, batch))
111+
# Setting timeout to never expire
112+
self.transmit(
113+
batch,
114+
rid,
115+
timeout=self.messageTimeout,
116+
serialized=True)
115117
else:
116-
logger.warning("{} error {}. tried to {}: {}".format(
117-
self, err_msg, dest, payload))
118+
logger.warning("Cannot create batch(es) for {}".format(
119+
self, dest))
118120
for rid in removedRemotes:
119121
logger.warning("{}{} rid {} has been removed"
120122
.format(CONNECTION_PREFIX, self, rid),
@@ -127,6 +129,14 @@ def flushOutBoxes(self) -> None:
127129
logMethod=logger.debug)
128130
del self.outBoxes[rid]
129131

132+
def _make_batch(self, msgs):
133+
batch = Batch(msgs, None)
134+
serialized_batch, _ = self.signAndSerialize(batch)
135+
return serialized_batch
136+
137+
def _test_batch_len(self, batch_len):
138+
return self.msg_len_val.is_len_less_than_limit(batch_len)
139+
130140
def doProcessReceived(self, msg, frm, ident):
131141
if OP_FIELD_NAME in msg and msg[OP_FIELD_NAME] == BATCH:
132142
if f.MSGS.nm in msg and isinstance(msg[f.MSGS.nm], list):

plenum/common/prepare_batch.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from stp_core.common.log import getlogger
2+
3+
SPLIT_STEPS_LIMIT = 8
4+
5+
logger = getlogger()
6+
7+
8+
def split_messages_on_batches(msgs, make_batch_func, is_batch_len_under_limit, step_num=0):
9+
10+
def split(rec_depth):
11+
l = len(msgs) // 2
12+
left_batch = split_messages_on_batches(msgs[:l], make_batch_func, is_batch_len_under_limit, rec_depth)
13+
right_batch = split_messages_on_batches(msgs[l:], make_batch_func, is_batch_len_under_limit, rec_depth)
14+
return left_batch + right_batch if left_batch and right_batch else None
15+
16+
if step_num > SPLIT_STEPS_LIMIT:
17+
logger.warning('Too many split steps '
18+
'were done {}. Batches were not created'.format(step_num))
19+
return None
20+
21+
# precondition for case when total length is greater than limit
22+
# helps skip extra serialization step
23+
tt_len = sum(len(m) for m in msgs)
24+
if not is_batch_len_under_limit(tt_len):
25+
for m in msgs:
26+
if not is_batch_len_under_limit(len(m)):
27+
logger.warning('The message {} is to long ({}). '
28+
'Batches were not created'.format(m, len(m)))
29+
return
30+
step_num += 1
31+
return split(step_num)
32+
33+
# make a batch and check its length
34+
batch = make_batch_func(msgs)
35+
if is_batch_len_under_limit(len(batch)):
36+
return [batch] # success split
37+
else:
38+
if len(msgs) == 1:
39+
# a batch with this message greater than limit so split fails
40+
logger.warning('The message {} is less than limit '
41+
'but the batch which contains only this '
42+
'message is greater than limit'.format(msgs))
43+
return None
44+
step_num += 1
45+
return split(step_num)

plenum/test/batching_3pc/test_basic_batching.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
import pytest
44

5-
from stp_core.loop.eventually import eventually
65
from plenum.common.exceptions import UnauthorizedClientRequest
76
from plenum.test.batching_3pc.helper import checkNodesHaveSameRoots
87
from plenum.test.helper import checkReqNackWithReason, sendRandomRequests, \
98
checkRejectWithReason, waitForSufficientRepliesForRequests
9+
from stp_core.loop.eventually import eventually
1010

1111

1212
def testRequestStaticValidation(tconf, looper, txnPoolNodeSet, client,
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from plenum.common.prepare_batch import split_messages_on_batches, SPLIT_STEPS_LIMIT
2+
3+
4+
LEN_LIMIT_BYTES = 100
5+
SERIALIZATION_OTHER_HEAD_BYTES = 10
6+
MAX_ONE_MSG_LEN = LEN_LIMIT_BYTES - SERIALIZATION_OTHER_HEAD_BYTES
7+
8+
9+
def make_batch_func(msgs):
10+
overhead = b'1' * SERIALIZATION_OTHER_HEAD_BYTES
11+
return b''.join(msgs + [overhead])
12+
13+
14+
def check_batch_len_func(length):
15+
return length <= LEN_LIMIT_BYTES
16+
17+
18+
def split_ut(msgs):
19+
return split_messages_on_batches(msgs, make_batch_func, check_batch_len_func)
20+
21+
22+
def test_empty_msgs_returns_one_batch():
23+
assert len(split_ut([])) == 1
24+
25+
26+
def test_less_than_limit_returns_one_batch():
27+
msgs = [b'1'] * 10
28+
assert len(split_ut(msgs)) == 1
29+
30+
31+
def test_total_len_excesses_limit_two_batches():
32+
msgs = [b'1'] * (LEN_LIMIT_BYTES + 1)
33+
assert len(split_ut(msgs)) == 2
34+
35+
36+
def test_each_msg_almost_excesses_limit_one_msg_per_batch():
37+
count = 100
38+
msgs = [b'1' * MAX_ONE_MSG_LEN] * count
39+
assert len(split_ut(msgs)) == count
40+
41+
42+
def test_small_msgs_with_one_huge_more_than_one_batch():
43+
msgs = [b'1', b'1', b'1', b'1' * MAX_ONE_MSG_LEN, b'1']
44+
assert len(split_ut(msgs)) == 4
45+
46+
47+
def test_one_msg_excesses_limit_split_fails():
48+
msgs = [b'1' * (LEN_LIMIT_BYTES + 1)]
49+
assert split_ut(msgs) is None
50+
51+
52+
def test_one_msg_almost_excesses_limit_split_fails():
53+
msgs = [b'1' * (MAX_ONE_MSG_LEN + 1)]
54+
assert split_ut(msgs) is None
55+
56+
57+
def test_excesses_limit_of_split_steps_split_fails():
58+
msgs = [b'1' * MAX_ONE_MSG_LEN] * 2**(SPLIT_STEPS_LIMIT + 1)
59+
assert split_ut(msgs) is None
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from functools import partial
2+
3+
import pytest
4+
5+
from plenum.test import waits
6+
7+
from plenum.test.helper import sendRandomRequests, waitForSufficientRepliesForRequests, checkReqAck
8+
from plenum.test.pool_transactions.helper import buildPoolClientAndWallet
9+
from stp_core.loop.eventually import eventuallyAll
10+
from stp_core.validators.message_length_validator import MessageLenValidator
11+
12+
from plenum.test.pool_transactions.conftest import looper, client1Connected # noqa
13+
from plenum.test.pool_transactions.conftest import clientAndWallet1, client1, wallet1 # noqa
14+
15+
16+
def test_msg_max_length_check_node_to_node(tconf,
17+
tdir,
18+
looper,
19+
txnPoolNodeSet,
20+
client1,
21+
wallet1,
22+
client1Connected,
23+
clientAndWallet2):
24+
"""
25+
Two clients send 2*N requests each at the same time.
26+
N < MSG_LEN_LIMIT but 2*N > MSG_LEN_LIMIT so the requests pass the max
27+
length check for client-node requests but do not pass the check
28+
for node-node requests.
29+
"""
30+
N = 10
31+
# it is an empirical value for N random requests
32+
# it has to be adjusted if the world changed (see pydoc)
33+
max_len_limit = 3000
34+
35+
patch_msg_len_validators(max_len_limit, txnPoolNodeSet)
36+
37+
client2, wallet2 = clientAndWallet2
38+
39+
reqs1 = sendRandomRequests(wallet1, client1, N)
40+
reqs2 = sendRandomRequests(wallet2, client2, N)
41+
42+
check_reqacks(client1, looper, reqs1, txnPoolNodeSet)
43+
check_reqacks(client2, looper, reqs2, txnPoolNodeSet)
44+
45+
waitForSufficientRepliesForRequests(looper, client1, requests=reqs1)
46+
waitForSufficientRepliesForRequests(looper, client2, requests=reqs2)
47+
48+
49+
def patch_msg_len_validators(max_len_limit, txnPoolNodeSet):
50+
for node in txnPoolNodeSet:
51+
assert hasattr(node.nodestack, 'msgLenVal')
52+
assert hasattr(node.nodestack, 'msg_len_val')
53+
node.nodestack.msgLenVal = MessageLenValidator(max_len_limit)
54+
node.nodestack.msg_len_val = MessageLenValidator(max_len_limit)
55+
56+
57+
def check_reqacks(client, looper, reqs, txnPoolNodeSet):
58+
reqack_coros = []
59+
for req in reqs:
60+
reqack_coros.extend([partial(checkReqAck, client, node, req.identifier,
61+
req.reqId, None) for node in txnPoolNodeSet])
62+
timeout = waits.expectedReqAckQuorumTime()
63+
looper.run(eventuallyAll(*reqack_coros, totalTimeout=timeout))
64+
65+
66+
@pytest.fixture(scope="module")
67+
def clientAndWallet2(looper, poolTxnClientData, tdirWithPoolTxns):
68+
client, wallet = buildPoolClientAndWallet(poolTxnClientData,
69+
tdirWithPoolTxns)
70+
71+
looper.add(client)
72+
looper.run(client.ensureConnectedToNodes())
73+
yield client, wallet
74+
client.stop()

stp_core/validators/message_length_validator.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ def __init__(self, max_allowed: int):
77

88
def validate(self, msg: bytes):
99
has_len = len(msg)
10-
if has_len > self.max_allowed:
10+
if not self.is_len_less_than_limit(has_len):
1111
raise InvalidMessageExceedingSizeException(
1212
self.max_allowed, has_len)
13+
14+
def is_len_less_than_limit(self, l):
15+
return l <= self.max_allowed

0 commit comments

Comments
 (0)