Skip to content

Commit e85ad70

Browse files
committed
refactor(sync-v2): Modify TransactionStreamingClient to process vertices asynchronously
1 parent 1171e10 commit e85ad70

File tree

2 files changed

+86
-21
lines changed

2 files changed

+86
-21
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from structlog import get_logger
2424
from twisted.internet.defer import Deferred, inlineCallbacks
25-
from twisted.internet.task import LoopingCall
25+
from twisted.internet.task import LoopingCall, deferLater
2626

2727
from hathor.conf.get_settings import get_settings
2828
from hathor.p2p.messages import ProtocolMessages
@@ -569,10 +569,12 @@ def find_best_common_block(self,
569569
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
570570
return lo
571571

572-
def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> None:
572+
@inlineCallbacks
573+
def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> Generator[Any, Any, None]:
573574
"""This method is called when a block and its transactions are downloaded."""
574575
for tx in vertex_list:
575576
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
577+
yield deferLater(self.reactor, 0, lambda: None)
576578

577579
self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)
578580

hathor/p2p/sync_v2/transaction_streaming_client.py

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import TYPE_CHECKING, Iterator
15+
from collections import deque
16+
from typing import TYPE_CHECKING, Any, Generator, Iterator, Optional
1617

1718
from structlog import get_logger
18-
from twisted.internet.defer import Deferred
19+
from twisted.internet.defer import Deferred, inlineCallbacks
1920

2021
from hathor.p2p.sync_v2.exception import (
2122
InvalidVertexError,
@@ -45,23 +46,43 @@ def __init__(self,
4546
self.protocol = self.sync_agent.protocol
4647
self.tx_storage = self.sync_agent.tx_storage
4748
self.manager = self.sync_agent.manager
49+
self.reactor = self.manager.reactor
4850

4951
self.log = logger.new(peer=self.protocol.get_short_peer_id())
5052

53+
# List of blocks from which we will receive transactions.
5154
self.partial_blocks = partial_blocks
5255

56+
# True if we are processing a transaction.
57+
self._is_processing: bool = False
58+
59+
# Deferred return to the sync agent.
5360
self._deferred: Deferred[StreamEnd] = Deferred()
5461

62+
# Number of transactions received.
5563
self._tx_received: int = 0
5664

65+
# Maximum number of transactions to be received.
5766
self._tx_max_quantity = limit
5867

68+
# Queue of transactions waiting to be processed.
69+
self._queue: deque[BaseTransaction] = deque()
70+
71+
# Keeps the response code if the streaming has ended.
72+
self._response_code: Optional[StreamEnd] = None
73+
74+
# Index to the current block.
5975
self._idx: int = 0
60-
self._buffer: list[VertexId] = []
76+
77+
# Set of hashes we are waiting to receive.
6178
self._waiting_for: set[VertexId] = set()
79+
80+
# In-memory database of transactions already received but still
81+
# waiting for dependencies.
6282
self._db: dict[VertexId, BaseTransaction] = {}
6383

6484
self._prepare_block(self.partial_blocks[0])
85+
assert self._waiting_for
6586

6687
def wait(self) -> Deferred[StreamEnd]:
6788
"""Return the deferred."""
@@ -71,6 +92,7 @@ def resume(self) -> Deferred[StreamEnd]:
7192
"""Resume receiving vertices."""
7293
assert self._deferred.called
7394
self._tx_received = 0
95+
self._response_code = None
7496
self._deferred = Deferred()
7597
return self._deferred
7698

@@ -92,9 +114,37 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
92114
return
93115

94116
assert tx.hash is not None
95-
96117
self.log.debug('tx received', tx_id=tx.hash.hex())
97118

119+
self._queue.append(tx)
120+
assert len(self._queue) <= self._tx_max_quantity
121+
if not self._is_processing:
122+
self.reactor.callLater(0, self.process_queue)
123+
124+
@inlineCallbacks
125+
def process_queue(self) -> Generator[Any, Any, None]:
126+
"""Process next transaction in the queue."""
127+
if self._is_processing:
128+
return
129+
130+
if not self._queue:
131+
self.check_end()
132+
return
133+
134+
self._is_processing = True
135+
try:
136+
tx = self._queue.popleft()
137+
yield self._process_transaction(tx)
138+
finally:
139+
self._is_processing = False
140+
141+
self.reactor.callLater(0, self.process_queue)
142+
143+
@inlineCallbacks
144+
def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None]:
145+
"""Process transaction."""
146+
assert tx.hash is not None
147+
98148
# Run basic verification.
99149
if not tx.is_genesis:
100150
try:
@@ -120,11 +170,13 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
120170
self._waiting_for.add(dep)
121171

122172
self._db[tx.hash] = tx
123-
self._buffer.append(tx.hash)
124173

125174
if not self._waiting_for:
126175
self.log.debug('no pending dependencies, processing buffer')
127-
self._execute_and_prepare_next()
176+
while not self._waiting_for:
177+
result = yield self._execute_and_prepare_next()
178+
if not result:
179+
break
128180
else:
129181
self.log.debug('pending dependencies', counter=len(self._waiting_for))
130182

@@ -144,38 +196,49 @@ def handle_transactions_end(self, response_code: StreamEnd) -> None:
144196
"""This method is called by the sync agent when a TRANSACTIONS-END message is received."""
145197
if self._deferred.called:
146198
return
147-
self.log.info('transactions streaming ended', reason=response_code, waiting_for=len(self._waiting_for))
148-
self._deferred.callback(response_code)
199+
assert self._response_code is None
200+
self._response_code = response_code
201+
self.check_end()
202+
203+
def check_end(self) -> None:
204+
"""Check if the streaming has ended."""
205+
if self._response_code is None:
206+
return
207+
208+
if self._queue:
209+
return
210+
211+
self.log.info('transactions streaming ended', reason=self._response_code, waiting_for=len(self._waiting_for))
212+
self._deferred.callback(self._response_code)
149213

150-
def _execute_and_prepare_next(self) -> None:
214+
@inlineCallbacks
215+
def _execute_and_prepare_next(self) -> Generator[Any, Any, bool]:
151216
"""Add the block and its vertices to the DAG."""
152217
assert not self._waiting_for
153218

154219
blk = self.partial_blocks[self._idx]
155-
vertex_list = [self._db[_id] for _id in self._buffer]
220+
vertex_list = list(self._db.values())
156221
vertex_list.sort(key=lambda v: v.timestamp)
157222

158223
try:
159-
self.sync_agent.on_block_complete(blk, vertex_list)
224+
yield self.sync_agent.on_block_complete(blk, vertex_list)
160225
except HathorError as e:
161226
self.fails(InvalidVertexError(repr(e)))
162-
return
227+
return False
163228

164229
self._idx += 1
165-
if self._idx < len(self.partial_blocks):
166-
self._prepare_block(self.partial_blocks[self._idx])
230+
if self._idx >= len(self.partial_blocks):
231+
return False
232+
233+
self._prepare_block(self.partial_blocks[self._idx])
234+
return True
167235

168236
def _prepare_block(self, blk: 'Block') -> None:
169237
"""Reset everything for the next block. It also adds blocks that have no dependencies."""
170-
self._buffer.clear()
171238
self._waiting_for.clear()
172239
self._db.clear()
173240

174241
# Add pending dependencies from block.
175242
for dep in blk.get_all_dependencies():
176243
if not self.tx_storage.transaction_exists(dep):
177244
self._waiting_for.add(dep)
178-
179-
# If block is ready to be added then do it.
180-
if not self._waiting_for:
181-
self._execute_and_prepare_next()

0 commit comments

Comments
 (0)