Skip to content

Commit 95830d5

Browse files
committed
fix(sync-v2): Fix stream end messages for both blockchains and transactions
1 parent 0b32a31 commit 95830d5

File tree

2 files changed

+42
-25
lines changed

2 files changed

+42
-25
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,13 @@ def start_blockchain_streaming(self,
502502
self.send_get_next_blocks(start_block.id, end_block.id, quantity)
503503
return self._blk_streaming_client.wait()
504504

505+
def stop_blk_streaming_server(self, response_code: StreamEnd) -> None:
506+
"""Stop blockchain streaming server."""
507+
assert self._blk_streaming_server is not None
508+
self._blk_streaming_server.stop()
509+
self._blk_streaming_server = None
510+
self.send_blocks_end(response_code)
511+
505512
def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
506513
""" Helper to send a message.
507514
"""
@@ -678,7 +685,7 @@ def send_next_blocks(self, start_block: Block, end_hash: bytes, quantity: int) -
678685
"""
679686
self.log.debug('start NEXT-BLOCKS stream')
680687
if self._blk_streaming_server is not None and self._blk_streaming_server.is_running:
681-
self._blk_streaming_server.stop()
688+
self.stop_blk_streaming_server(StreamEnd.PER_REQUEST)
682689
limit = min(quantity, self.DEFAULT_STREAMING_LIMIT)
683690
self._blk_streaming_server = BlockchainStreamingServer(self, start_block, end_hash, limit=limit)
684691
self._blk_streaming_server.start()
@@ -760,8 +767,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
760767
return
761768

762769
self.log.debug('got stop streaming message')
763-
self._blk_streaming_server.stop()
764-
self._blk_streaming_server = None
770+
self.stop_blk_streaming_server(StreamEnd.PER_REQUEST)
765771

766772
def send_stop_transactions_streaming(self) -> None:
767773
""" Send a STOP-TRANSACTIONS-STREAMING message.
@@ -780,8 +786,7 @@ def handle_stop_transactions_streaming(self, payload: str) -> None:
780786
return
781787

782788
self.log.debug('got stop streaming message')
783-
self._tx_streaming_server.stop()
784-
self._tx_streaming_server = None
789+
self.stop_tx_streaming_server(StreamEnd.PER_REQUEST)
785790

786791
def get_peer_best_block(self) -> Deferred[_HeightInfo]:
787792
""" Async call to get the remote peer's best block.
@@ -853,6 +858,13 @@ def resume_transactions_streaming(self) -> Deferred[StreamEnd]:
853858
self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash)
854859
return self._tx_streaming_client.resume()
855860

861+
def stop_tx_streaming_server(self, response_code: StreamEnd) -> None:
862+
"""Stop transaction streaming server."""
863+
assert self._tx_streaming_server is not None
864+
self._tx_streaming_server.stop()
865+
self._tx_streaming_server = None
866+
self.send_transactions_end(response_code)
867+
856868
def send_get_transactions_bfs(self,
857869
start_from: list[bytes],
858870
first_block_hash: bytes,
@@ -921,7 +933,7 @@ def handle_get_transactions_bfs(self, payload: str) -> None:
921933
vertex_id=tx.hash.hex(),
922934
first_block=first_block.hash.hex(),
923935
vertex_first_block=meta.first_block)
924-
self.send_blocks_end(StreamEnd.INVALID_PARAMS)
936+
self.send_transactions_end(StreamEnd.INVALID_PARAMS)
925937
return
926938
start_from_txs.append(tx)
927939

@@ -934,7 +946,7 @@ def send_transactions_bfs(self,
934946
""" Start a transactions BFS stream.
935947
"""
936948
if self._tx_streaming_server is not None and self._tx_streaming_server.is_running:
937-
self._tx_streaming_server.stop()
949+
self.stop_tx_streaming_server(StreamEnd.PER_REQUEST)
938950
self._tx_streaming_server = TransactionsStreamingServer(self,
939951
start_from,
940952
first_block,

hathor/p2p/sync_v2/streamers.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class StreamEnd(IntFlag):
4141
TX_NOT_CONFIRMED = 4
4242
INVALID_PARAMS = 5
4343
INTERNAL_ERROR = 6
44+
PER_REQUEST = 7
4445

4546
def __str__(self):
4647
if self is StreamEnd.END_HASH_REACHED:
@@ -57,6 +58,8 @@ def __str__(self):
5758
return 'streamed with invalid parameters'
5859
elif self is StreamEnd.INTERNAL_ERROR:
5960
return 'internal error'
61+
elif self is StreamEnd.PER_REQUEST:
62+
return 'stopped per request'
6063
else:
6164
raise ValueError(f'invalid StreamEnd value: {self.value}')
6265

@@ -99,12 +102,15 @@ def safe_send_next(self) -> None:
99102
try:
100103
self.send_next()
101104
except Exception:
102-
self.stop()
103-
self.sync_agent.send_blocks_end(StreamEnd.INTERNAL_ERROR)
105+
self._stop_streaming_server(StreamEnd.INTERNAL_ERROR)
104106
raise
105107
else:
106108
self.schedule_if_needed()
107109

110+
def _stop_streaming_server(self, response_code: StreamEnd) -> None:
111+
"""Stop streaming server."""
112+
raise NotImplementedError
113+
108114
def start(self) -> None:
109115
"""Start pushing."""
110116
self.log.debug('start streaming')
@@ -153,6 +159,9 @@ def __init__(self, sync_agent: 'NodeBlockSync', start_block: Block, end_hash: by
153159
self.end_hash = end_hash
154160
self.reverse = reverse
155161

162+
def _stop_streaming_server(self, response_code: StreamEnd) -> None:
163+
self.sync_agent.stop_blk_streaming_server(response_code)
164+
156165
def send_next(self) -> None:
157166
"""Push next block to peer."""
158167
assert self.is_running
@@ -165,26 +174,23 @@ def send_next(self) -> None:
165174

166175
meta = cur.get_metadata()
167176
if meta.voided_by:
168-
self.stop()
169-
self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED)
177+
self.sync_agent.stop_blk_streaming_server(StreamEnd.STREAM_BECAME_VOIDED)
170178
return
171179

172180
if cur.hash == self.end_hash:
173181
# only send the last when not reverse
174182
if not self.reverse:
175183
self.log.debug('send next block', blk_id=cur.hash.hex())
176184
self.sync_agent.send_blocks(cur)
177-
self.stop()
178-
self.sync_agent.send_blocks_end(StreamEnd.END_HASH_REACHED)
185+
self.sync_agent.stop_blk_streaming_server(StreamEnd.END_HASH_REACHED)
179186
return
180187

181188
if self.counter >= self.limit:
182189
# only send the last when not reverse
183190
if not self.reverse:
184191
self.log.debug('send next block', blk_id=cur.hash.hex())
185192
self.sync_agent.send_blocks(cur)
186-
self.stop()
187-
self.sync_agent.send_blocks_end(StreamEnd.LIMIT_EXCEEDED)
193+
self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED)
188194
return
189195

190196
self.counter += 1
@@ -199,8 +205,7 @@ def send_next(self) -> None:
199205

200206
# XXX: don't send the genesis or the current block
201207
if self.current_block is None or self.current_block.is_genesis:
202-
self.stop()
203-
self.sync_agent.send_blocks_end(StreamEnd.NO_MORE_BLOCKS)
208+
self.sync_agent.stop_blk_streaming_server(StreamEnd.NO_MORE_BLOCKS)
204209
return
205210

206211

@@ -235,6 +240,9 @@ def __init__(self,
235240
self.bfs = BFSOrderWalk(self.tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False)
236241
self.iter = self.get_iter()
237242

243+
def _stop_streaming_server(self, response_code: StreamEnd) -> None:
244+
self.sync_agent.stop_tx_streaming_server(response_code)
245+
238246
def get_iter(self) -> Iterator[BaseTransaction]:
239247
"""Return an iterator that yields all transactions confirmed by each block in sequence."""
240248
root: Union[BaseTransaction, Iterable[BaseTransaction]]
@@ -258,8 +266,7 @@ def get_iter(self) -> Iterator[BaseTransaction]:
258266

259267
# Check if this block is still in the best blockchain.
260268
if self.current_block.get_metadata().voided_by:
261-
self.stop()
262-
self.sync_agent.send_blocks_end(StreamEnd.STREAM_BECAME_VOIDED)
269+
self.sync_agent.stop_tx_streaming_server(StreamEnd.STREAM_BECAME_VOIDED)
263270
return
264271

265272
self.current_block = self.current_block.get_next_block_best_chain()
@@ -275,8 +282,7 @@ def send_next(self) -> None:
275282
except StopIteration:
276283
# nothing more to send
277284
self.log.debug('no more transactions, stopping streaming')
278-
self.stop()
279-
self.sync_agent.send_transactions_end(StreamEnd.END_HASH_REACHED)
285+
self.sync_agent.stop_tx_streaming_server(StreamEnd.END_HASH_REACHED)
280286
return
281287

282288
# Skip blocks.
@@ -290,8 +296,7 @@ def send_next(self) -> None:
290296
cur_metadata = cur.get_metadata()
291297
if cur_metadata.first_block is None:
292298
self.log.debug('reached a tx that is not confirmed, stopping streaming')
293-
self.stop()
294-
self.sync_agent.send_transactions_end(StreamEnd.TX_NOT_CONFIRMED)
299+
self.sync_agent.stop_tx_streaming_server(StreamEnd.TX_NOT_CONFIRMED)
295300
return
296301

297302
# Check if tx is confirmed by the `self.current_block` or any next block.
@@ -308,6 +313,6 @@ def send_next(self) -> None:
308313

309314
self.counter += 1
310315
if self.counter >= self.limit:
311-
self.stop()
312-
self.sync_agent.send_transactions_end(StreamEnd.LIMIT_EXCEEDED)
316+
self.log.debug('limit exceeded, stopping streaming')
317+
self.sync_agent.stop_tx_streaming_server(StreamEnd.LIMIT_EXCEEDED)
313318
return

0 commit comments

Comments
 (0)