Skip to content

Commit 9535b77

Browse files
committed
feat(multiprocess-verification): implement on_new_vertex_async
1 parent 34a9630 commit 9535b77

File tree

1 file changed

+66
-12
lines changed

1 file changed

+66
-12
lines changed

hathor/vertex_handler/vertex_handler.py

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
import datetime
1616

1717
from structlog import get_logger
18+
from twisted.internet.task import deferLater
1819

1920
from hathor.conf.settings import HathorSettings
2021
from hathor.consensus import ConsensusAlgorithm
2122
from hathor.exception import HathorError, InvalidNewTransaction
2223
from hathor.p2p.manager import ConnectionsManager
2324
from hathor.pubsub import HathorEvents, PubSubManager
2425
from hathor.reactor import ReactorProtocol
25-
from hathor.transaction import BaseTransaction, Block
26+
from hathor.transaction import Block, Vertex
2627
from hathor.transaction.storage import TransactionStorage
2728
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
2829
from hathor.verification.verification_service import VerificationService
@@ -68,26 +69,29 @@ def __init__(
6869

6970
def on_new_vertex(
7071
self,
71-
vertex: BaseTransaction,
72+
vertex: Vertex,
7273
*,
7374
quiet: bool = False,
7475
fails_silently: bool = True,
7576
propagate_to_peers: bool = True,
7677
reject_locked_reward: bool = True,
7778
) -> bool:
78-
""" New method for adding transactions or blocks that steps the validation state machine.
79+
"""Method for adding vertices (transactions or blocks) that steps the validation state machine, synchronously.
7980
8081
:param vertex: transaction to be added
8182
:param quiet: if True will not log when a new tx is accepted
8283
:param fails_silently: if False will raise an exception when tx cannot be added
8384
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
8485
"""
86+
is_pre_valid = self._pre_validate_vertex(vertex, fails_silently=fails_silently)
87+
if not is_pre_valid:
88+
return False
89+
8590
is_valid = self._validate_vertex(
8691
vertex,
8792
fails_silently=fails_silently,
8893
reject_locked_reward=reject_locked_reward
8994
)
90-
9195
if not is_valid:
9296
return False
9397

@@ -101,13 +105,46 @@ def on_new_vertex(
101105

102106
return True
103107

104-
def _validate_vertex(
108+
async def on_new_vertex_async(
105109
self,
106-
vertex: BaseTransaction,
110+
vertex: Vertex,
107111
*,
108-
fails_silently: bool,
109-
reject_locked_reward: bool,
112+
quiet: bool = False,
113+
fails_silently: bool = True,
114+
propagate_to_peers: bool = True,
115+
reject_locked_reward: bool = True,
110116
) -> bool:
117+
"""Method for adding vertices (transactions or blocks) that steps the validation state machine, asynchronously.
118+
This is exactly the same method as `on_new_vertex()`, except it calls async verification.
119+
120+
:param vertex: transaction to be added
121+
:param quiet: if True will not log when a new tx is accepted
122+
:param fails_silently: if False will raise an exception when tx cannot be added
123+
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
124+
"""
125+
is_pre_valid = self._pre_validate_vertex(vertex, fails_silently=fails_silently)
126+
if not is_pre_valid:
127+
return False
128+
129+
is_valid = await self._validate_vertex_async(
130+
vertex,
131+
fails_silently=fails_silently,
132+
reject_locked_reward=reject_locked_reward
133+
)
134+
if not is_valid:
135+
return False
136+
137+
self._save_and_run_consensus(vertex)
138+
self._post_consensus(
139+
vertex,
140+
quiet=quiet,
141+
propagate_to_peers=propagate_to_peers,
142+
reject_locked_reward=reject_locked_reward
143+
)
144+
145+
return True
146+
147+
def _pre_validate_vertex(self, vertex: Vertex, *, fails_silently: bool) -> bool:
111148
assert self._tx_storage.is_only_valid_allowed()
112149
already_exists = False
113150
if self._tx_storage.transaction_exists(vertex.hash):
@@ -144,6 +181,11 @@ def _validate_vertex(
144181
self._log.warn('on_new_tx(): previously marked as invalid', tx=vertex.hash_hex)
145182
return False
146183

184+
return True
185+
186+
def _validate_vertex(self, vertex: Vertex, *, fails_silently: bool, reject_locked_reward: bool) -> bool:
187+
metadata = vertex.get_metadata()
188+
147189
if not metadata.validation.is_fully_connected():
148190
try:
149191
self._verification_service.validate_full(vertex, reject_locked_reward=reject_locked_reward)
@@ -155,7 +197,19 @@ def _validate_vertex(
155197

156198
return True
157199

158-
def _save_and_run_consensus(self, vertex: BaseTransaction) -> None:
200+
async def _validate_vertex_async(
201+
self,
202+
vertex: Vertex,
203+
*,
204+
fails_silently: bool,
205+
reject_locked_reward: bool,
206+
) -> bool:
207+
# TODO: This method simply calls synchronous verification, but it also releases the reactor loop. This is
208+
# temporary, and soon this method will be changed to call verification on a separate process.
209+
await deferLater(self._reactor, 0, lambda: None)
210+
return self._validate_vertex(vertex, fails_silently=fails_silently, reject_locked_reward=reject_locked_reward)
211+
212+
def _save_and_run_consensus(self, vertex: Vertex) -> None:
159213
# The method below adds the tx as a child of the parents
160214
# This needs to be called right before the save because we were adding the children
161215
# in the tx parents even if the tx was invalid (failing the verifications above)
@@ -167,7 +221,7 @@ def _save_and_run_consensus(self, vertex: BaseTransaction) -> None:
167221

168222
def _post_consensus(
169223
self,
170-
vertex: BaseTransaction,
224+
vertex: Vertex,
171225
*,
172226
quiet: bool,
173227
propagate_to_peers: bool,
@@ -205,7 +259,7 @@ def _post_consensus(
205259
# Propagate to our peers.
206260
self._p2p_manager.send_tx_to_peers(vertex)
207261

208-
def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
262+
def _log_new_object(self, tx: Vertex, message_fmt: str, *, quiet: bool) -> None:
209263
""" A shortcut for logging additional information for block/txs.
210264
"""
211265
metadata = tx.get_metadata()
@@ -228,7 +282,7 @@ def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool)
228282
log_func = self._log.debug
229283
log_func(message, **kwargs)
230284

231-
def _log_feature_states(self, vertex: BaseTransaction) -> None:
285+
def _log_feature_states(self, vertex: Vertex) -> None:
232286
"""Log features states for a block. Used as part of the Feature Activation Phased Testing."""
233287
if not isinstance(vertex, Block):
234288
return

0 commit comments

Comments
 (0)