Skip to content

Commit a3658a7

Browse files
committed
changes (1)
1 parent ea7fbec commit a3658a7

File tree

5 files changed

+43
-39
lines changed

5 files changed

+43
-39
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,13 @@ def find_best_common_block(self,
540540
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
541541
return lo
542542

543+
def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> None:
544+
"""This method is called when a block and its transactions are downloaded."""
545+
for tx in vertex_list:
546+
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
547+
548+
self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)
549+
543550
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
544551
""" Returns the peer's block hashes in the given heights.
545552
"""

hathor/p2p/sync_v2/blockchain_streaming_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ def handle_blocks(self, blk: Block) -> None:
9393
self.fails(TooManyVerticesReceivedError())
9494
return
9595

96+
# Run basic verification.
97+
# if not blk.is_genesis:
98+
# try:
99+
# self.manager.verification_service.validate_basic(blk)
100+
# except TxValidationError as e:
101+
# self.fails(InvalidVertexError(repr(e)))
102+
# return
103+
104+
# Check for repeated blocks.
96105
assert blk.hash is not None
97106
is_duplicated = False
98107
if self.partial_vertex_exists(blk.hash):

hathor/p2p/sync_v2/mempool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def run(self) -> Deferred[None]:
6363
self._is_running = True
6464
self.reactor.callLater(0, self._run)
6565

66+
# TODO Implement a stop() and call it after N minutes.
67+
6668
assert self._deferred is None
6769
self._deferred = Deferred()
6870
return self._deferred

hathor/p2p/sync_v2/streamers.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,10 @@ def __init__(self,
215215
# TODO Validated start_from and use it when it's not empty.
216216

217217
self.bfs = BFSOrderWalk(self.tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False)
218-
self.iter = self.get_next_tx()
218+
self.iter = self.get_iter()
219219

220-
def get_next_tx(self) -> Iterator[BaseTransaction]:
220+
def get_iter(self) -> Iterator[BaseTransaction]:
221+
"""Return an iterator that yields all transactions confirmed by each block in sequence."""
221222
while self.current_block:
222223
self.log.debug('sending transactions from block',
223224
block=not_none(self.current_block.hash).hex(),
@@ -228,10 +229,6 @@ def get_next_tx(self) -> Iterator[BaseTransaction]:
228229
break
229230
self.current_block = self.current_block.get_next_block_best_chain()
230231

231-
def start(self) -> None:
232-
super().start()
233-
234-
# TODO: make this generic too?
235232
def send_next(self) -> None:
236233
"""Push next transaction to peer."""
237234
assert self.is_running
@@ -262,7 +259,7 @@ def send_next(self) -> None:
262259
self.sync_agent.send_transactions_end(StreamEnd.TX_NOT_CONFIRMED)
263260
return
264261

265-
# Check if tx is confirmed by a block within range [first_block, last_block].
262+
# Check if tx is confirmed by `self.current_block`.
266263
assert cur_metadata.first_block is not None
267264
assert self.current_block is not None
268265
if cur_metadata.first_block != self.current_block.hash:

hathor/p2p/sync_v2/transaction_streaming_client.py

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from hathor.p2p.sync_v2.exception import (
2121
InvalidVertexError,
2222
StreamingError,
23-
TooManyRepeatedVerticesError,
2423
TooManyVerticesReceivedError,
2524
UnexpectedVertex,
2625
)
@@ -49,13 +48,9 @@ def __init__(self,
4948

5049
self.partial_blocks = partial_blocks
5150

52-
# Let's keep it at "infinity" until a known issue is fixed.
53-
self.max_repeated_transactions = 1_000_000
54-
5551
self._deferred: Deferred[StreamEnd] = Deferred()
5652

5753
self._tx_received: int = 0
58-
self._tx_repeated: int = 0
5954

6055
self._tx_max_quantity = DEFAULT_STREAMING_LIMIT
6156

@@ -96,8 +91,16 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
9691

9792
self.log.debug('tx received', tx_id=tx.hash.hex())
9893

99-
# TODO Handle repeated transactions.
94+
# Run basic verification.
95+
# if not tx.is_genesis:
96+
# try:
97+
# self.manager.verification_service.validate_basic(tx)
98+
# except TxValidationError as e:
99+
# self.fails(InvalidVertexError(repr(e)))
100+
# return
100101

102+
# Any repeated transaction will fail this check because they will
103+
# not belong to the waiting list.
101104
if tx.hash not in self._waiting_for:
102105
self.fails(UnexpectedVertex(tx.hash.hex()))
103106
return
@@ -118,21 +121,9 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
118121

119122
if self._tx_received % 100 == 0:
120123
self.log.debug('tx streaming in progress', txs_received=self._tx_received)
121-
return
122-
123-
assert tx.hash is not None
124-
is_duplicated = False
125-
if self.partial_vertex_exists(tx.hash):
126-
# We reached a tx we already have. Skip it.
127-
self._tx_repeated += 1
128-
is_duplicated = True
129-
if self._tx_repeated > self.max_repeated_transactions:
130-
self.log.debug('too many repeated transactions received', total_repeated=self._tx_repeated)
131-
self.fails(TooManyRepeatedVerticesError())
132-
if is_duplicated:
133-
pass
134124

135125
def get_missing_deps(self, tx: BaseTransaction) -> Iterator[bytes]:
126+
"""Return missing dependencies."""
136127
for dep in tx.get_all_dependencies():
137128
if self.tx_storage.transaction_exists(dep):
138129
continue
@@ -147,18 +138,15 @@ def handle_transactions_end(self, response_code: StreamEnd) -> None:
147138
self._deferred.callback(response_code)
148139

149140
def _execute_and_prepare_next(self) -> None:
150-
self._buffer.sort(key=lambda vertex_id: self._db[vertex_id].timestamp)
151-
for vertex_id in self._buffer:
152-
tx = self._db[vertex_id]
153-
try:
154-
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
155-
except HathorError as e:
156-
self.fails(InvalidVertexError(repr(e)))
157-
return
141+
"""Add the block and its vertices to the DAG."""
142+
assert not self._waiting_for
158143

159144
blk = self.partial_blocks[self._idx]
145+
vertex_list = [self._db[_id] for _id in self._buffer]
146+
vertex_list.sort(key=lambda v: v.timestamp)
147+
160148
try:
161-
self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)
149+
self.sync_agent.on_block_complete(blk, vertex_list)
162150
except HathorError as e:
163151
self.fails(InvalidVertexError(repr(e)))
164152
return
@@ -168,15 +156,16 @@ def _execute_and_prepare_next(self) -> None:
168156
self._prepare_block(self.partial_blocks[self._idx])
169157

170158
def _prepare_block(self, blk: 'Block') -> None:
171-
# Reset everything for the next block
172-
self._buffer = []
173-
self._waiting_for = set()
174-
self._db = {}
159+
"""Reset everything for the next block. It also adds blocks that have no dependencies."""
160+
self._buffer.clear()
161+
self._waiting_for.clear()
162+
self._db.clear()
175163

176164
# Add pending dependencies from block.
177165
for dep in blk.get_all_dependencies():
178166
if not self.tx_storage.transaction_exists(dep):
179167
self._waiting_for.add(dep)
180168

169+
# If block is ready to be added then do it.
181170
if not self._waiting_for:
182171
self._execute_and_prepare_next()

0 commit comments

Comments
 (0)