Skip to content

Commit eab8986

Browse files
committed
feat(sync-v2): Add both BlockchainStreamingClient and TransactionStreamingClient to manage streamings from the client side
1 parent 4b04c9c commit eab8986

File tree

8 files changed

+507
-235
lines changed

8 files changed

+507
-235
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 116 additions & 225 deletions
Large diffs are not rendered by default.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import TYPE_CHECKING, Optional
16+
17+
from structlog import get_logger
18+
from twisted.internet.defer import Deferred
19+
20+
from hathor.p2p.sync_v2.exception import (
21+
BlockNotConnectedToPreviousBlock,
22+
InvalidVertexError,
23+
StreamingError,
24+
TooManyRepeatedVerticesError,
25+
TooManyVerticesReceivedError,
26+
)
27+
from hathor.p2p.sync_v2.streamers import StreamEnd
28+
from hathor.transaction import Block
29+
from hathor.transaction.exceptions import HathorError
30+
from hathor.types import VertexId
31+
32+
if TYPE_CHECKING:
33+
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
34+
35+
logger = get_logger()
36+
37+
38+
class BlockchainStreamingClient:
39+
def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None:
40+
self.sync_agent = sync_agent
41+
self.protocol = self.sync_agent.protocol
42+
self.tx_storage = self.sync_agent.tx_storage
43+
self.manager = self.sync_agent.manager
44+
45+
self.log = logger.new(peer=self.protocol.get_short_peer_id())
46+
47+
self.start_block = start_block
48+
self.end_block = end_block
49+
50+
# When syncing blocks we start streaming with all peers
51+
# so the moment I get some repeated blocks, I stop the download
52+
# because it's probably a streaming that I've already received
53+
self.max_repeated_blocks = 10
54+
55+
self._deferred: Deferred[StreamEnd] = Deferred()
56+
57+
self._blk_received: int = 0
58+
self._blk_repeated: int = 0
59+
60+
self._blk_max_quantity = self.end_block.height - self.start_block.height + 1
61+
self._reverse: bool = False
62+
if self._blk_max_quantity < 0:
63+
self._blk_max_quantity = -self._blk_max_quantity
64+
self._reverse = True
65+
66+
self._last_received_block: Optional[Block] = None
67+
68+
self._partial_blocks: list[Block] = []
69+
70+
def wait(self) -> Deferred[StreamEnd]:
71+
"""Return the deferred."""
72+
return self._deferred
73+
74+
def fails(self, reason: 'StreamingError') -> None:
75+
"""Fail the execution by resolving the deferred with an error."""
76+
self._deferred.errback(reason)
77+
78+
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
79+
"""Return true if the vertex exists no matter its validation state."""
80+
with self.tx_storage.allow_partially_validated_context():
81+
return self.tx_storage.transaction_exists(vertex_id)
82+
83+
def handle_blocks(self, blk: Block) -> None:
84+
"""This method is called by the sync agent when a BLOCKS message is received."""
85+
if self._deferred.called:
86+
return
87+
88+
self._blk_received += 1
89+
if self._blk_received > self._blk_max_quantity:
90+
self.log.warn('too many blocks received',
91+
blk_received=self._blk_received,
92+
blk_max_quantity=self._blk_max_quantity)
93+
self.fails(TooManyVerticesReceivedError())
94+
return
95+
96+
assert blk.hash is not None
97+
is_duplicated = False
98+
if self.partial_vertex_exists(blk.hash):
99+
# We reached a block we already have. Skip it.
100+
self._blk_repeated += 1
101+
is_duplicated = True
102+
if self._blk_repeated > self.max_repeated_blocks:
103+
self.log.debug('too many repeated block received', total_repeated=self._blk_repeated)
104+
self.fails(TooManyRepeatedVerticesError())
105+
106+
# basic linearity validation, crucial for correctly predicting the next block's height
107+
if self._reverse:
108+
if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash():
109+
self.fails(BlockNotConnectedToPreviousBlock())
110+
return
111+
else:
112+
if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash:
113+
self.fails(BlockNotConnectedToPreviousBlock())
114+
return
115+
116+
try:
117+
# this methods takes care of checking if the block already exists,
118+
# it will take care of doing at least a basic validation
119+
if is_duplicated:
120+
self.log.debug('block early terminate?', blk_id=blk.hash.hex())
121+
else:
122+
self.log.debug('block received', blk_id=blk.hash.hex())
123+
self.sync_agent.on_new_tx(blk, propagate_to_peers=False, quiet=True)
124+
except HathorError:
125+
self.fails(InvalidVertexError())
126+
return
127+
else:
128+
self._last_received_block = blk
129+
self._blk_repeated = 0
130+
# XXX: debugging log, maybe add timing info
131+
if self._blk_received % 500 == 0:
132+
self.log.debug('block streaming in progress', blocks_received=self._blk_received)
133+
134+
if not blk.can_validate_full():
135+
self._partial_blocks.append(blk)
136+
137+
def handle_blocks_end(self, response_code: StreamEnd) -> None:
138+
"""This method is called by the sync agent when a BLOCKS-END message is received."""
139+
if self._deferred.called:
140+
return
141+
self._deferred.callback(response_code)

hathor/p2p/sync_v2/exception.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
class StreamingError(Exception):
16+
"""Base error for sync-v2 streaming."""
17+
pass
18+
19+
20+
class TooManyVerticesReceivedError(StreamingError):
21+
"""Raised when the other peer sent too many vertices."""
22+
pass
23+
24+
25+
class TooManyRepeatedVerticesError(StreamingError):
26+
"""Raised when the other peer sent too many repeated vertices."""
27+
pass
28+
29+
30+
class BlockNotConnectedToPreviousBlock(StreamingError):
31+
"""Raised when the received block is not connected to the previous one."""
32+
pass
33+
34+
35+
class InvalidVertexError(StreamingError):
36+
"""Raised when the received vertex fails validation."""
37+
pass

hathor/p2p/sync_v2/payloads.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from pydantic import validator
16+
17+
from hathor.types import VertexId
18+
from hathor.utils.pydantic import BaseModel
19+
20+
21+
class PayloadBaseModel(BaseModel):
22+
23+
@classmethod
24+
def convert_hex_to_bytes(cls, value: str | VertexId) -> VertexId:
25+
"""Convert a string in hex format to bytes. If bytes are given, it does nothing."""
26+
if isinstance(value, str):
27+
return bytes.fromhex(value)
28+
elif isinstance(value, VertexId):
29+
return value
30+
raise ValueError('invalid type')
31+
32+
class Config:
33+
json_encoders = {
34+
VertexId: lambda x: x.hex()
35+
}
36+
37+
38+
class GetNextBlocksPayload(PayloadBaseModel):
39+
"""GET-NEXT-BLOCKS message is used to request a stream of blocks in the best blockchain."""
40+
41+
start_hash: VertexId
42+
end_hash: VertexId
43+
quantity: int
44+
45+
@validator('start_hash', 'end_hash', pre=True)
46+
def validate_bytes_fields(cls, value: str | bytes) -> VertexId:
47+
return cls.convert_hex_to_bytes(value)
48+
49+
50+
class BestBlockPayload(PayloadBaseModel):
51+
"""BEST-BLOCK message is used to send information about the current best block."""
52+
53+
block: VertexId
54+
height: int
55+
56+
@validator('block', pre=True)
57+
def validate_bytes_fields(cls, value: str | VertexId) -> VertexId:
58+
return cls.convert_hex_to_bytes(value)
59+
60+
61+
class GetTransactionsBFSPayload(PayloadBaseModel):
62+
"""GET-TRANSACTIONS-BFS message is used to request a stream of transactions confirmed by blocks."""
63+
start_from: list[VertexId]
64+
first_block_hash: VertexId
65+
last_block_hash: VertexId
66+
67+
@validator('first_block_hash', 'last_block_hash', pre=True)
68+
def validate_bytes_fields(cls, value: str | VertexId) -> VertexId:
69+
return cls.convert_hex_to_bytes(value)
70+
71+
@validator('start_from', pre=True, each_item=True)
72+
def validate_start_from(cls, value: str | VertexId) -> VertexId:
73+
return cls.convert_hex_to_bytes(value)

hathor/p2p/sync_v2/streamers.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __str__(self):
5555

5656

5757
@implementer(IPushProducer)
58-
class _StreamingBase:
58+
class _StreamingServerBase:
5959
def __init__(self, node_sync: 'NodeBlockSync', *, limit: int = DEFAULT_STREAMING_LIMIT):
6060
self.node_sync = node_sync
6161
self.protocol: 'HathorProtocol' = node_sync.protocol
@@ -123,7 +123,7 @@ def stopProducing(self) -> None:
123123
self.pauseProducing()
124124

125125

126-
class BlockchainStreaming(_StreamingBase):
126+
class BlockchainStreamingServer(_StreamingServerBase):
127127
def __init__(self, node_sync: 'NodeBlockSync', start_block: Block, end_hash: bytes,
128128
*, limit: int = DEFAULT_STREAMING_LIMIT, reverse: bool = False):
129129
super().__init__(node_sync, limit=limit)
@@ -186,19 +186,25 @@ def send_next(self) -> None:
186186
self.schedule_if_needed()
187187

188188

189-
class TransactionsStreaming(_StreamingBase):
189+
class TransactionsStreamingServer(_StreamingServerBase):
190190
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
191191
"""
192192

193-
def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction], last_block_hash: bytes,
194-
*, limit: int = DEFAULT_STREAMING_LIMIT):
193+
def __init__(self,
194+
node_sync: 'NodeBlockSync',
195+
start_from: list[BaseTransaction],
196+
first_block_hash: bytes,
197+
last_block_hash: bytes,
198+
*,
199+
limit: int = DEFAULT_STREAMING_LIMIT) -> None:
195200
# XXX: is limit needed for tx streaming? Or let's always send all txs for
196201
# a block? Very unlikely we'll reach this limit
197202
super().__init__(node_sync, limit=limit)
198203

199204
assert len(start_from) > 0
200205
assert start_from[0].storage is not None
201206
self.storage = start_from[0].storage
207+
self.first_block_hash = first_block_hash
202208
self.last_block_hash = last_block_hash
203209
self.last_block_height = 0
204210

0 commit comments

Comments
 (0)