Skip to content

Commit 35f8222

Browse files
committed
improve validations
1 parent 2730df9 commit 35f8222

File tree

2 files changed

+53
-56
lines changed

2 files changed

+53
-56
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -622,42 +622,42 @@ def handle_get_next_blocks(self, payload: str) -> None:
622622
self.protocol.send_error_and_close_connection('GET-NEXT-BLOCKS received before previous one finished')
623623
return
624624
data = GetNextBlocksPayload.parse_raw(payload)
625+
start_block = self._validate_block(data.start_hash)
626+
if start_block is None:
627+
return
628+
end_block = self._validate_block(data.end_hash)
629+
if end_block is None:
630+
return
625631
self.send_next_blocks(
626-
start_hash=data.start_hash,
632+
start_block=start_block,
627633
end_hash=data.end_hash,
628634
quantity=data.quantity,
629635
)
630636

631-
def send_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None:
632-
""" Send a NEXT-BLOCKS message.
633-
"""
634-
self.log.debug('start NEXT-BLOCKS stream')
637+
def _validate_block(self, _hash: VertexId) -> Optional[Block]:
638+
"""Validate block given in the GET-NEXT-BLOCKS and GET-TRANSACTIONS-BFS messages."""
635639
try:
636-
blk = self.tx_storage.get_transaction(start_hash)
640+
blk = self.tx_storage.get_transaction(_hash)
637641
except TransactionDoesNotExist:
638-
# In case the tx does not exist we send a NOT-FOUND message
639-
self.log.debug('requested start_hash not found', start_hash=start_hash.hex())
640-
self.send_message(ProtocolMessages.NOT_FOUND, start_hash.hex())
641-
return
642+
self.log.debug('requested block not found', blk_id=_hash.hex())
643+
self.send_message(ProtocolMessages.NOT_FOUND, _hash.hex())
644+
return None
645+
642646
if not isinstance(blk, Block):
643-
self.log.debug('request start_hash is not a block', start_hash=start_hash.hex())
644-
self.send_message(ProtocolMessages.NOT_FOUND, start_hash.hex())
645-
return
646-
assert blk.hash is not None
647-
# XXX: it is not an error for the other peer to request a voided block, we'll pretend it doesn't exist, butf
648-
blk_meta = blk.get_metadata()
649-
if blk_meta.voided_by:
650-
# In case the tx does not exist we send a NOT-FOUND message
651-
self.log.debug('requested start_hash is voided, continue anyway', start_hash=start_hash.hex(),
652-
voided_by=[i.hex() for i in blk_meta.voided_by])
653-
# XXX: we want to be able to not send this, but we do because the remote node could get stuck otherwise
654-
# (tracked by issue #711)
655-
# self.send_message(ProtocolMessages.NOT_FOUND, start_hash.hex())
656-
# return
647+
self.log.debug('request block is not a block', blk_id=_hash.hex())
648+
self.send_message(ProtocolMessages.NOT_FOUND, _hash.hex())
649+
return None
650+
651+
return blk
652+
653+
def send_next_blocks(self, start_block: Block, end_hash: bytes, quantity: int) -> None:
654+
""" Send a NEXT-BLOCKS message.
655+
"""
656+
self.log.debug('start NEXT-BLOCKS stream')
657657
if self._blk_streaming_server is not None and self._blk_streaming_server.is_running:
658658
self._blk_streaming_server.stop()
659659
limit = min(quantity, self.DEFAULT_STREAMING_LIMIT)
660-
self._blk_streaming_server = BlockchainStreamingServer(self, blk, end_hash, limit=limit)
660+
self._blk_streaming_server = BlockchainStreamingServer(self, start_block, end_hash, limit=limit)
661661
self._blk_streaming_server.start()
662662

663663
def send_blocks(self, blk: Block) -> None:
@@ -867,44 +867,44 @@ def handle_get_transactions_bfs(self, payload: str) -> None:
867867
self.log.warn('ignore GET-TRANSACTIONS-BFS, already streaming')
868868
return
869869
data = GetTransactionsBFSPayload.parse_raw(payload)
870-
# XXX: todo verify this limit while parsing the payload.
870+
871871
if len(data.start_from) > MAX_GET_TRANSACTIONS_BFS_LEN:
872872
self.log.error('too many transactions in GET-TRANSACTIONS-BFS', state=self.state)
873873
self.protocol.send_error_and_close_connection('Too many transactions in GET-TRANSACTIONS-BFS')
874874
return
875-
self.send_transactions_bfs(data.start_from, data.first_block_hash, data.last_block_hash)
876875

877-
def send_transactions_bfs(self,
878-
start_from: list[bytes],
879-
first_block_hash: bytes,
880-
last_block_hash: bytes) -> None:
881-
""" Start a transactions BFS stream.
882-
"""
876+
first_block = self._validate_block(data.first_block_hash)
877+
if first_block is None:
878+
return
879+
880+
last_block = self._validate_block(data.last_block_hash)
881+
if last_block is None:
882+
return
883+
883884
start_from_txs = []
884-
for start_from_hash in start_from:
885+
for start_from_hash in data.start_from:
885886
try:
886887
start_from_txs.append(self.tx_storage.get_transaction(start_from_hash))
887888
except TransactionDoesNotExist:
888889
# In case the tx does not exist we send a NOT-FOUND message
889890
self.log.debug('requested start_from_hash not found', start_from_hash=start_from_hash.hex())
890891
self.send_message(ProtocolMessages.NOT_FOUND, start_from_hash.hex())
891892
return
892-
if not self.tx_storage.transaction_exists(first_block_hash):
893-
# In case the tx does not exist we send a NOT-FOUND message
894-
self.log.debug('requested first_block_hash not found', first_block_hash=first_block_hash.hex())
895-
self.send_message(ProtocolMessages.NOT_FOUND, first_block_hash.hex())
896-
return
897-
if not self.tx_storage.transaction_exists(last_block_hash):
898-
# In case the tx does not exist we send a NOT-FOUND message
899-
self.log.debug('requested last_block_hash not found', last_block_hash=last_block_hash.hex())
900-
self.send_message(ProtocolMessages.NOT_FOUND, last_block_hash.hex())
901-
return
893+
894+
self.send_transactions_bfs(start_from_txs, first_block, last_block)
895+
896+
def send_transactions_bfs(self,
897+
start_from: list[BaseTransaction],
898+
first_block: Block,
899+
last_block: Block) -> None:
900+
""" Start a transactions BFS stream.
901+
"""
902902
if self._tx_streaming_server is not None and self._tx_streaming_server.is_running:
903903
self._tx_streaming_server.stop()
904904
self._tx_streaming_server = TransactionsStreamingServer(self,
905-
start_from_txs,
906-
first_block_hash,
907-
last_block_hash,
905+
start_from,
906+
first_block,
907+
last_block,
908908
limit=self.DEFAULT_STREAMING_LIMIT)
909909
self._tx_streaming_server.start()
910910

hathor/p2p/sync_v2/streamers.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from enum import IntFlag
16-
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union, cast
16+
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union
1717

1818
from structlog import get_logger
1919
from twisted.internet.interfaces import IConsumer, IDelayedCall, IPushProducer
@@ -201,24 +201,21 @@ class TransactionsStreamingServer(_StreamingServerBase):
201201
def __init__(self,
202202
sync_agent: 'NodeBlockSync',
203203
start_from: list[BaseTransaction],
204-
first_block_hash: bytes,
205-
last_block_hash: bytes,
204+
first_block: Block,
205+
last_block: Block,
206206
*,
207207
limit: int = DEFAULT_STREAMING_LIMIT) -> None:
208208
# XXX: is limit needed for tx streaming? Or let's always send all txs for
209209
# a block? Very unlikely we'll reach this limit
210210
super().__init__(sync_agent, limit=limit)
211211

212-
self.first_block: Block = cast(Block, self.tx_storage.get_transaction(first_block_hash))
213-
self.last_block: Block = cast(Block, self.tx_storage.get_transaction(last_block_hash))
212+
self.first_block: Block = first_block
213+
self.last_block: Block = last_block
214214
self.start_from = start_from
215215

216-
assert isinstance(self.first_block, Block)
217-
assert isinstance(self.last_block, Block)
218-
219216
# Validate that all transactions in `start_from` are confirmed by the first block.
220217
for tx in start_from:
221-
assert tx.get_metadata().first_block == first_block_hash
218+
assert tx.get_metadata().first_block == self.first_block.hash
222219

223220
self.current_block: Optional[Block] = self.first_block
224221
self.bfs = BFSOrderWalk(self.tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False)

0 commit comments

Comments
 (0)