Skip to content

Commit 2938c08

Browse files
committed
review changes
1 parent 2211199 commit 2938c08

File tree

8 files changed

+53
-36
lines changed

8 files changed

+53
-36
lines changed

hathor/builder/builder.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -312,12 +312,7 @@ def _get_peer_id(self) -> PeerId:
312312

313313
def _get_or_create_execution_manager(self) -> ExecutionManager:
314314
if self._execution_manager is None:
315-
tx_storage = self._get_or_create_tx_storage()
316-
event_manager = self._get_or_create_event_manager()
317-
self._execution_manager = ExecutionManager(
318-
tx_storage=tx_storage,
319-
event_manager=event_manager,
320-
)
315+
self._execution_manager = ExecutionManager()
321316

322317
return self._execution_manager
323318

@@ -453,7 +448,8 @@ def _get_or_create_event_manager(self) -> EventManager:
453448
reactor=reactor,
454449
pubsub=self._get_or_create_pubsub(),
455450
event_storage=storage,
456-
event_ws_factory=factory
451+
event_ws_factory=factory,
452+
execution_manager=self._get_or_create_execution_manager()
457453
)
458454

459455
return self._event_manager

hathor/builder/cli_builder.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,14 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
172172
event_storage=event_storage
173173
)
174174

175+
execution_manager = ExecutionManager()
176+
175177
event_manager = EventManager(
176178
event_storage=event_storage,
177179
event_ws_factory=self.event_ws_factory,
178180
pubsub=pubsub,
179-
reactor=reactor
181+
reactor=reactor,
182+
execution_manager=execution_manager,
180183
)
181184

182185
if self._args.wallet_index and tx_storage.indexes is not None:
@@ -196,10 +199,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
196199
full_verification = True
197200

198201
soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
199-
execution_manager = ExecutionManager(
200-
tx_storage=tx_storage,
201-
event_manager=event_manager,
202-
)
203202
consensus_algorithm = ConsensusAlgorithm(
204203
soft_voided_tx_ids,
205204
pubsub=pubsub,
@@ -276,7 +275,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
276275
feature_service=self.feature_service,
277276
bit_signaling_service=bit_signaling_service,
278277
verification_service=verification_service,
279-
cpu_mining_service=cpu_mining_service
278+
cpu_mining_service=cpu_mining_service,
279+
execution_manager=execution_manager,
280280
)
281281

282282
if self._args.x_ipython_kernel:

hathor/consensus/consensus.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ def update(self, base: BaseTransaction) -> None:
8383
assert meta.validation.is_valid()
8484
try:
8585
self._unsafe_update(base)
86-
except BaseException:
86+
except BaseException as e:
8787
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
8888
assert base.storage is not None
8989
base.storage.save_transaction(base, only_metadata=True)
90-
self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}')
90+
message = f'Consensus update failed for tx {base.hash_hex}'
91+
self.log.critical(f'{message}. Exception: {repr(e)}', exc_info=True)
92+
self._execution_manager.crash_and_exit(reason=message)
9193

9294
def _unsafe_update(self, base: BaseTransaction) -> None:
9395
"""Run a consensus update with its own context, indexes will be updated accordingly."""

hathor/event/event_manager.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from hathor.event.model.node_state import NodeState
2323
from hathor.event.storage import EventStorage
2424
from hathor.event.websocket import EventWebsocketFactory
25+
from hathor.execution_manager import ExecutionManager
2526
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
2627
from hathor.reactor import ReactorProtocol as Reactor
2728
from hathor.transaction import BaseTransaction
@@ -70,6 +71,7 @@ def __init__(
7071
event_storage: EventStorage,
7172
pubsub: PubSubManager,
7273
reactor: Reactor,
74+
execution_manager: ExecutionManager,
7375
event_ws_factory: Optional[EventWebsocketFactory] = None,
7476
) -> None:
7577
self.log = logger.new()
@@ -78,13 +80,15 @@ def __init__(
7880
self._event_storage = event_storage
7981
self._event_ws_factory = event_ws_factory
8082
self._pubsub = pubsub
83+
self._execution_manager = execution_manager
8184

8285
def start(self, peer_id: str) -> None:
8386
"""Starts the EventManager."""
8487
assert self._is_running is False, 'Cannot start, EventManager is already running'
8588
assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set'
8689
assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled'
8790

91+
self._execution_manager.register_on_crash_callback(self.on_full_node_crash)
8892
self._previous_node_state = self._event_storage.get_node_state()
8993

9094
if self._should_reload_events():
@@ -153,7 +157,7 @@ def load_finished(self) -> None:
153157
)
154158
self._event_storage.save_node_state(NodeState.SYNC)
155159

156-
def full_node_crashed(self) -> None:
160+
def on_full_node_crash(self) -> None:
157161
if not self._is_running:
158162
return
159163

hathor/execution_manager.py

+19-10
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,34 @@
1313
# limitations under the License.
1414

1515
import os
16-
from typing import NoReturn
16+
from typing import Callable, NoReturn
1717

1818
from structlog import get_logger
1919

20-
from hathor.event import EventManager
21-
from hathor.transaction.storage import TransactionStorage
22-
2320
logger = get_logger()
2421

2522

2623
class ExecutionManager:
2724
"""Class to manage actions related to full node execution."""
28-
__slots__ = ('_log', '_tx_storage', '_event_manager')
25+
__slots__ = ('_log', '_on_crash_callbacks')
2926

30-
def __init__(self, *, tx_storage: TransactionStorage, event_manager: EventManager) -> None:
27+
def __init__(self) -> None:
3128
self._log = logger.new()
32-
self._tx_storage = tx_storage
33-
self._event_manager = event_manager
29+
self._on_crash_callbacks: list[tuple[int, Callable[[], None]]] = []
30+
31+
def register_on_crash_callback(self, callback: Callable[[], None], *, priority: int = 0) -> None:
32+
"""Register a callback to be executed before the full node exits."""
33+
self._on_crash_callbacks.append((priority, callback))
34+
35+
def _run_on_crash_callbacks(self) -> None:
36+
"""Run all registered on crash callbacks."""
37+
callbacks = sorted(self._on_crash_callbacks, reverse=True)
38+
39+
for _, callback in callbacks:
40+
try:
41+
callback()
42+
except BaseException as e:
43+
self._log.critical(f'Failed execution of on_crash callback "{callback}". Exception: {repr(e)}')
3444

3545
def crash_and_exit(self, *, reason: str) -> NoReturn:
3646
"""
@@ -39,8 +49,7 @@ def crash_and_exit(self, *, reason: str) -> NoReturn:
3949
corrupted, and requiring manual intervention. In other words, a restart with a clean database (from scratch
4050
or a snapshot) will be required.
4151
"""
42-
self._tx_storage.full_node_crashed()
43-
self._event_manager.full_node_crashed()
52+
self._run_on_crash_callbacks()
4453
self._log.critical(
4554
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
4655
reason=reason,

hathor/manager.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
RewardLockedError,
4141
SpendingVoidedError,
4242
)
43+
from hathor.execution_manager import ExecutionManager
4344
from hathor.feature_activation.bit_signaling_service import BitSignalingService
4445
from hathor.feature_activation.feature import Feature
4546
from hathor.feature_activation.feature_service import FeatureService
@@ -102,6 +103,7 @@ def __init__(self,
102103
verification_service: VerificationService,
103104
cpu_mining_service: CpuMiningService,
104105
network: str,
106+
execution_manager: ExecutionManager,
105107
hostname: Optional[str] = None,
106108
wallet: Optional[BaseWallet] = None,
107109
capabilities: Optional[list[str]] = None,
@@ -127,6 +129,7 @@ def __init__(self,
127129
'Either enable it, or use the reset-event-queue CLI command to remove all event-related data'
128130
)
129131

132+
self._execution_manager = execution_manager
130133
self._settings = settings
131134
self.daa = daa
132135
self._cmd_path: Optional[str] = None
@@ -322,7 +325,7 @@ def start(self) -> None:
322325
self.stratum_factory.start()
323326

324327
# Start running
325-
self.tx_storage.start_running_manager()
328+
self.tx_storage.start_running_manager(self._execution_manager)
326329

327330
def stop(self) -> Deferred:
328331
if not self.is_started:

hathor/transaction/storage/transaction_storage.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from structlog import get_logger
2525

2626
from hathor.conf.get_settings import get_settings
27+
from hathor.execution_manager import ExecutionManager
2728
from hathor.indexes import IndexesManager
2829
from hathor.indexes.height_index import HeightInfo
2930
from hathor.profiler import get_cpu_profiler
@@ -966,9 +967,10 @@ def is_running_full_verification(self) -> bool:
966967
"""
967968
return self.get_value(self._running_full_verification_attribute) == '1'
968969

969-
def start_running_manager(self) -> None:
970+
def start_running_manager(self, execution_manager: ExecutionManager) -> None:
970971
""" Save on storage that manager is running
971972
"""
973+
execution_manager.register_on_crash_callback(self.on_full_node_crash)
972974
self.add_value(self._manager_running_attribute, '1')
973975

974976
def stop_running_manager(self) -> None:
@@ -981,7 +983,7 @@ def is_running_manager(self) -> bool:
981983
"""
982984
return self.get_value(self._manager_running_attribute) == '1'
983985

984-
def full_node_crashed(self) -> None:
986+
def on_full_node_crash(self) -> None:
985987
"""Save on storage that the full node crashed and cannot be recovered."""
986988
self.add_value(self._full_node_crashed_attribute, '1')
987989

tests/execution_manager/test_execution_manager.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,25 @@
1515
import os
1616
from unittest.mock import Mock, patch
1717

18-
from hathor.event import EventManager
1918
from hathor.execution_manager import ExecutionManager
20-
from hathor.transaction.storage import TransactionStorage
2119

2220

2321
def test_crash_and_exit() -> None:
24-
tx_storage_mock = Mock(spec_set=TransactionStorage)
25-
event_manager_mock = Mock(spec_set=EventManager)
22+
def callback() -> None:
23+
pass
24+
25+
callback_wrapped = Mock(wraps=callback)
2626
log_mock = Mock()
27-
manager = ExecutionManager(tx_storage=tx_storage_mock, event_manager=event_manager_mock)
27+
manager = ExecutionManager()
2828
manager._log = log_mock
2929
reason = 'some critical failure'
3030

31+
manager.register_on_crash_callback(callback_wrapped)
32+
3133
with patch.object(os, '_exit') as exit_mock:
3234
manager.crash_and_exit(reason=reason)
3335

34-
tx_storage_mock.full_node_crashed.assert_called_once()
35-
event_manager_mock.full_node_crashed.assert_called_once()
36+
callback_wrapped.assert_called_once()
3637
log_mock.critical.assert_called_once_with(
3738
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
3839
reason=reason,

0 commit comments

Comments
 (0)