Skip to content

Commit 6d9b085

Browse files
committed
feat(sync-v2): Add both BlockchainStreamingClient and TransactionStreamingClient to manage streamings from the client side
1 parent 4b04c9c commit 6d9b085

File tree

7 files changed

+462
-213
lines changed

7 files changed

+462
-213
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 93 additions & 210 deletions
Large diffs are not rendered by default.
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import TYPE_CHECKING, Optional
16+
17+
from structlog import get_logger
18+
from twisted.internet.defer import Deferred
19+
20+
from hathor.p2p.sync_v2.exception import (
21+
BlockNotConnectedToPreviousBlock,
22+
InvalidVertexError,
23+
StreamingError,
24+
TooManyRepeatedVerticesError,
25+
TooManyVerticesReceivedError,
26+
)
27+
from hathor.p2p.sync_v2.streamers import StreamEnd
28+
from hathor.transaction import Block
29+
from hathor.transaction.exceptions import HathorError
30+
from hathor.types import VertexId
31+
32+
if TYPE_CHECKING:
33+
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
34+
35+
logger = get_logger()
36+
37+
38+
class BlockchainStreamingClient:
39+
def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None:
40+
self.sync_agent = sync_agent
41+
self.protocol = self.sync_agent.protocol
42+
self.tx_storage = self.sync_agent.tx_storage
43+
self.manager = self.sync_agent.manager
44+
45+
self.log = logger.new(peer=self.protocol.get_short_peer_id())
46+
47+
self.start_block = start_block
48+
self.end_block = end_block
49+
50+
# When syncing blocks we start streaming with all peers
51+
# so the moment I get some repeated blocks, I stop the download
52+
# because it's probably a streaming that I've already received
53+
self.max_repeated_blocks = 10
54+
55+
self._deferred: Deferred[StreamEnd] = Deferred()
56+
57+
self._blk_received: int = 0
58+
self._blk_repeated: int = 0
59+
60+
self._blk_max_quantity = self.end_block.height - self.start_block.height + 1
61+
self._reverse: bool = False
62+
if self._blk_max_quantity < 0:
63+
self._blk_max_quantity = -self._blk_max_quantity
64+
self._reverse = True
65+
66+
self._last_received_block: Optional[Block] = None
67+
68+
self._partial_blocks: list[Block] = []
69+
70+
def run(self) -> Deferred[StreamEnd]:
71+
return self._deferred
72+
73+
def fails(self, reason: 'StreamingError') -> None:
74+
self.sync_agent.send_stop_block_streaming()
75+
self._deferred.errback(reason)
76+
77+
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
78+
""" Return true if the vertex exists no matter its validation state.
79+
"""
80+
with self.tx_storage.allow_partially_validated_context():
81+
return self.tx_storage.transaction_exists(vertex_id)
82+
83+
def handle_blocks(self, blk: Block) -> None:
84+
if self._deferred.called:
85+
return
86+
87+
self._blk_received += 1
88+
if self._blk_received > self._blk_max_quantity:
89+
self.log.warn('too many blocks received',
90+
blk_received=self._blk_received,
91+
blk_max_quantity=self._blk_max_quantity)
92+
self.fails(TooManyVerticesReceivedError())
93+
return
94+
95+
assert blk.hash is not None
96+
is_duplicated = False
97+
if self.partial_vertex_exists(blk.hash):
98+
# We reached a block we already have. Skip it.
99+
self._blk_repeated += 1
100+
is_duplicated = True
101+
if self._blk_repeated > self.max_repeated_blocks:
102+
self.log.debug('too many repeated block received', total_repeated=self._blk_repeated)
103+
self.fails(TooManyRepeatedVerticesError())
104+
105+
# basic linearity validation, crucial for correctly predicting the next block's height
106+
if self._reverse:
107+
if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash():
108+
self.fails(BlockNotConnectedToPreviousBlock())
109+
return
110+
else:
111+
if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash:
112+
self.fails(BlockNotConnectedToPreviousBlock())
113+
return
114+
115+
try:
116+
# this methods takes care of checking if the block already exists,
117+
# it will take care of doing at least a basic validation
118+
if is_duplicated:
119+
self.log.debug('block early terminate?', blk_id=blk.hash.hex())
120+
else:
121+
self.log.debug('block received', blk_id=blk.hash.hex())
122+
self.sync_agent.on_new_tx(blk, propagate_to_peers=False, quiet=True)
123+
except HathorError:
124+
self.fails(InvalidVertexError())
125+
return
126+
else:
127+
self._last_received_block = blk
128+
self._blk_repeated = 0
129+
# XXX: debugging log, maybe add timing info
130+
if self._blk_received % 500 == 0:
131+
self.log.debug('block streaming in progress', blocks_received=self._blk_received)
132+
133+
meta = blk.get_metadata()
134+
if not meta.validation.is_fully_connected():
135+
self._partial_blocks.append(blk)
136+
137+
def handle_blocks_end(self, response_code: StreamEnd) -> None:
138+
if self._deferred.called:
139+
return
140+
self._deferred.callback(response_code)

hathor/p2p/sync_v2/exception.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
class StreamingError(Exception):
16+
pass
17+
18+
19+
class TooManyVerticesReceivedError(StreamingError):
20+
pass
21+
22+
23+
class TooManyRepeatedVerticesError(StreamingError):
24+
pass
25+
26+
27+
class BlockNotConnectedToPreviousBlock(StreamingError):
28+
pass
29+
30+
31+
class InvalidVertexError(StreamingError):
32+
pass

hathor/p2p/sync_v2/payloads.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from pydantic import validator
16+
17+
from hathor.utils.pydantic import BaseModel
18+
19+
20+
class PayloadBaseModel(BaseModel):
21+
22+
@classmethod
23+
def convert_hex_to_bytes(cls, value: str | bytes) -> bytes:
24+
if isinstance(value, str):
25+
return bytes.fromhex(value)
26+
elif isinstance(value, bytes):
27+
return value
28+
raise ValueError('invalid type')
29+
30+
class Config:
31+
json_encoders = {
32+
bytes: lambda x: x.hex()
33+
}
34+
35+
36+
class GetNextBlocksPayload(PayloadBaseModel):
37+
start_hash: bytes
38+
end_hash: bytes
39+
quantity: int
40+
41+
@validator('start_hash', 'end_hash', pre=True)
42+
def validate_bytes_fields(cls, value: str | bytes) -> bytes:
43+
return cls.convert_hex_to_bytes(value)
44+
45+
46+
class GetBestBlockPayload(PayloadBaseModel):
47+
block: bytes
48+
height: int
49+
50+
@validator('block', pre=True)
51+
def validate_bytes_fields(cls, value: str | bytes) -> bytes:
52+
return cls.convert_hex_to_bytes(value)
53+
54+
55+
class GetTransactionsBFSPayload(PayloadBaseModel):
56+
start_from: list[bytes]
57+
first_block_hash: bytes
58+
last_block_hash: bytes
59+
60+
@validator('first_block_hash', 'last_block_hash', pre=True)
61+
def validate_bytes_fields(cls, value: str | bytes) -> bytes:
62+
return cls.convert_hex_to_bytes(value)
63+
64+
@validator('start_from', pre=True)
65+
def validate_start_from(cls, values: list[str | bytes]) -> list[bytes]:
66+
return [cls.convert_hex_to_bytes(x) for x in values]

hathor/p2p/sync_v2/streamers.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,21 @@ class TransactionsStreaming(_StreamingBase):
190190
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
191191
"""
192192

193-
def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction], last_block_hash: bytes,
194-
*, limit: int = DEFAULT_STREAMING_LIMIT):
193+
def __init__(self,
194+
node_sync: 'NodeBlockSync',
195+
start_from: list[BaseTransaction],
196+
first_block_hash: bytes,
197+
last_block_hash: bytes,
198+
*,
199+
limit: int = DEFAULT_STREAMING_LIMIT) -> None:
195200
# XXX: is limit needed for tx streaming? Or let's always send all txs for
196201
# a block? Very unlikely we'll reach this limit
197202
super().__init__(node_sync, limit=limit)
198203

199204
assert len(start_from) > 0
200205
assert start_from[0].storage is not None
201206
self.storage = start_from[0].storage
207+
self.first_block_hash = first_block_hash
202208
self.last_block_hash = last_block_hash
203209
self.last_block_height = 0
204210

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import TYPE_CHECKING
16+
17+
from structlog import get_logger
18+
from twisted.internet.defer import Deferred
19+
20+
from hathor.p2p.sync_v2.exception import (
21+
InvalidVertexError,
22+
StreamingError,
23+
TooManyRepeatedVerticesError,
24+
TooManyVerticesReceivedError,
25+
)
26+
from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, StreamEnd
27+
from hathor.transaction import BaseTransaction
28+
from hathor.transaction.exceptions import HathorError
29+
from hathor.types import VertexId
30+
31+
if TYPE_CHECKING:
32+
from hathor.p2p.sync_v2.agent import NodeBlockSync
33+
34+
logger = get_logger()
35+
36+
37+
class TransactionStreamingClient:
38+
def __init__(self,
39+
sync_agent: 'NodeBlockSync',
40+
start_from: list[bytes],
41+
start_block: bytes,
42+
end_block: bytes) -> None:
43+
self.sync_agent = sync_agent
44+
self.protocol = self.sync_agent.protocol
45+
self.tx_storage = self.sync_agent.tx_storage
46+
self.manager = self.sync_agent.manager
47+
48+
self.log = logger.new(peer=self.protocol.get_short_peer_id())
49+
50+
self.start_from = start_from
51+
self.start_block = start_block
52+
self.end_block = end_block
53+
54+
# Let's keep it at "infinity" until a known issue is fixed.
55+
self.max_repeated_transactions = 1_000_000
56+
57+
self._deferred: Deferred[StreamEnd] = Deferred()
58+
59+
self._tx_received: int = 0
60+
self._tx_repeated: int = 0
61+
62+
self._tx_max_quantity = DEFAULT_STREAMING_LIMIT
63+
64+
def run(self) -> Deferred[StreamEnd]:
65+
return self._deferred
66+
67+
def fails(self, reason: 'StreamingError') -> None:
68+
self.sync_agent.send_stop_block_streaming()
69+
self._deferred.errback(reason)
70+
71+
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
72+
""" Return true if the vertex exists no matter its validation state.
73+
"""
74+
with self.tx_storage.allow_partially_validated_context():
75+
return self.tx_storage.transaction_exists(vertex_id)
76+
77+
def handle_transaction(self, tx: BaseTransaction) -> None:
78+
if self._deferred.called:
79+
return
80+
81+
self._tx_received += 1
82+
if self._tx_received > self._tx_max_quantity:
83+
self.log.warn('too many transactions received',
84+
tx_received=self._tx_received,
85+
tx_max_quantity=self._tx_max_quantity)
86+
self.fails(TooManyVerticesReceivedError())
87+
return
88+
89+
assert tx.hash is not None
90+
is_duplicated = False
91+
if self.partial_vertex_exists(tx.hash):
92+
# We reached a block we already have. Skip it.
93+
self._tx_repeated += 1
94+
is_duplicated = True
95+
if self._tx_repeated > self.max_repeated_transactions:
96+
self.log.debug('too many repeated transactions received', total_repeated=self._tx_repeated)
97+
self.fails(TooManyRepeatedVerticesError())
98+
99+
try:
100+
# this methods takes care of checking if the block already exists,
101+
# it will take care of doing at least a basic validation
102+
if is_duplicated:
103+
self.log.debug('tx early terminate?', tx_id=tx.hash.hex())
104+
else:
105+
self.log.debug('tx received', tx_id=tx.hash.hex())
106+
self.sync_agent.on_new_tx(tx, propagate_to_peers=False, quiet=True, reject_locked_reward=True)
107+
except HathorError:
108+
self.fails(InvalidVertexError())
109+
return
110+
else:
111+
# XXX: debugging log, maybe add timing info
112+
if self._tx_received % 100 == 0:
113+
self.log.debug('tx streaming in progress', txs_received=self._tx_received)
114+
115+
def handle_transactions_end(self, response_code: StreamEnd) -> None:
116+
if self._deferred.called:
117+
return
118+
self._deferred.callback(response_code)

tests/p2p/test_protocol.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,11 @@ def test_get_data(self):
425425
self.assertAndStepConn(self.conn, b'^RELAY')
426426
self.assertIsConnected()
427427
missing_tx = '00000000228dfcd5dec1c9c6263f6430a5b4316bb9e3decb9441a6414bfd8697'
428-
payload = {'until_first_block': missing_tx, 'start_from': [settings.GENESIS_BLOCK_HASH.hex()]}
428+
payload = {
429+
'first_block_hash': missing_tx,
430+
'last_block_hash': missing_tx,
431+
'start_from': [settings.GENESIS_BLOCK_HASH.hex()]
432+
}
429433
yield self._send_cmd(self.conn.proto1, 'GET-TRANSACTIONS-BFS', json_dumps(payload))
430434
self._check_result_only_cmd(self.conn.peek_tr1_value(), b'NOT-FOUND')
431435
self.conn.run_one_step()

0 commit comments

Comments
 (0)