diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 815ed352f..6fe3c1a91 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -25,6 +25,7 @@ from hathor.event import EventManager from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage from hathor.event.websocket import EventWebsocketFactory +from hathor.execution_manager import ExecutionManager from hathor.feature_activation.bit_signaling_service import BitSignalingService from hathor.feature_activation.feature import Feature from hathor.feature_activation.feature_service import FeatureService @@ -150,6 +151,8 @@ def __init__(self) -> None: self._soft_voided_tx_ids: Optional[set[bytes]] = None + self._execution_manager: ExecutionManager | None = None + def build(self) -> BuildArtifacts: if self.artifacts is not None: raise ValueError('cannot call build twice') @@ -163,8 +166,9 @@ def build(self) -> BuildArtifacts: peer_id = self._get_peer_id() + execution_manager = self._get_or_create_execution_manager() soft_voided_tx_ids = self._get_soft_voided_tx_ids() - consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub) + consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager) p2p_manager = self._get_p2p_manager() @@ -215,6 +219,7 @@ def build(self) -> BuildArtifacts: bit_signaling_service=bit_signaling_service, verification_service=verification_service, cpu_mining_service=cpu_mining_service, + execution_manager=execution_manager, **kwargs ) @@ -306,6 +311,13 @@ def _get_peer_id(self) -> PeerId: return self._peer_id raise ValueError('peer_id not set') + def _get_or_create_execution_manager(self) -> ExecutionManager: + if self._execution_manager is None: + reactor = self._get_reactor() + self._execution_manager = ExecutionManager(reactor) + + return self._execution_manager + def _get_or_create_pubsub(self) -> PubSubManager: if self._pubsub is None: self._pubsub = PubSubManager(self._get_reactor()) @@ -438,7 +450,8 @@ def _get_or_create_event_manager(self) -> EventManager: reactor=reactor, pubsub=self._get_or_create_pubsub(), event_storage=storage, - event_ws_factory=factory + event_ws_factory=factory, + execution_manager=self._get_or_create_execution_manager() ) return self._event_manager diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 8c1f41fff..0ab658a25 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -27,6 +27,7 @@ from hathor.daa import DifficultyAdjustmentAlgorithm from hathor.event import EventManager from hathor.exception import BuilderError +from hathor.execution_manager import ExecutionManager from hathor.feature_activation.bit_signaling_service import BitSignalingService from hathor.feature_activation.feature_service import FeatureService from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager @@ -212,11 +213,14 @@ def create_manager(self, reactor: Reactor) -> HathorManager: event_storage=event_storage ) + execution_manager = ExecutionManager(reactor) + event_manager = EventManager( event_storage=event_storage, event_ws_factory=self.event_ws_factory, pubsub=pubsub, - reactor=reactor + reactor=reactor, + execution_manager=execution_manager, ) if self._args.wallet_index and tx_storage.indexes is not None: @@ -236,7 +240,11 @@ def create_manager(self, reactor: Reactor) -> HathorManager: full_verification = True soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS) - consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub) + consensus_algorithm = ConsensusAlgorithm( + soft_voided_tx_ids, + pubsub=pubsub, + execution_manager=execution_manager + ) if self._args.x_enable_event_queue: self.log.info('--x-enable-event-queue flag provided. ' @@ -308,7 +316,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager: feature_service=self.feature_service, bit_signaling_service=bit_signaling_service, verification_service=verification_service, - cpu_mining_service=cpu_mining_service + cpu_mining_service=cpu_mining_service, + execution_manager=execution_manager, ) if self._args.x_ipython_kernel: diff --git a/hathor/consensus/consensus.py b/hathor/consensus/consensus.py index 34167d973..0317c2fab 100644 --- a/hathor/consensus/consensus.py +++ b/hathor/consensus/consensus.py @@ -18,6 +18,7 @@ from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory from hathor.consensus.context import ConsensusAlgorithmContext from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory +from hathor.execution_manager import ExecutionManager from hathor.profiler import get_cpu_profiler from hathor.pubsub import HathorEvents, PubSubManager from hathor.transaction import BaseTransaction @@ -55,13 +56,20 @@ class ConsensusAlgorithm: b0 will not be propagated to the voided_by of b1, b2, and b3. """ - def __init__(self, soft_voided_tx_ids: set[bytes], pubsub: PubSubManager) -> None: + def __init__( + self, + soft_voided_tx_ids: set[bytes], + pubsub: PubSubManager, + *, + execution_manager: ExecutionManager + ) -> None: self._settings = get_global_settings() self.log = logger.new() self._pubsub = pubsub self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids) self.block_algorithm_factory = BlockConsensusAlgorithmFactory() self.transaction_algorithm_factory = TransactionConsensusAlgorithmFactory() + self._execution_manager = execution_manager def create_context(self) -> ConsensusAlgorithmContext: """Handy method to create a context that can be used to access block and transaction algorithms.""" @@ -75,11 +83,11 @@ def update(self, base: BaseTransaction) -> None: assert meta.validation.is_valid() try: self._unsafe_update(base) - except Exception: + except BaseException: meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID) assert base.storage is not None base.storage.save_transaction(base, only_metadata=True) - raise + self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}') def _unsafe_update(self, base: BaseTransaction) -> None: """Run a consensus update with its own context, indexes will be updated accordingly.""" diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index 6306707c6..748abe90a 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -22,6 +22,7 @@ from hathor.event.model.node_state import NodeState from hathor.event.storage import EventStorage from hathor.event.websocket import EventWebsocketFactory +from hathor.execution_manager import ExecutionManager from hathor.pubsub import EventArguments, HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction @@ -70,6 +71,7 @@ def __init__( event_storage: EventStorage, pubsub: PubSubManager, reactor: Reactor, + execution_manager: ExecutionManager, event_ws_factory: Optional[EventWebsocketFactory] = None, ) -> None: self.log = logger.new() @@ -78,6 +80,7 @@ def __init__( self._event_storage = event_storage self._event_ws_factory = event_ws_factory self._pubsub = pubsub + self._execution_manager = execution_manager def start(self, peer_id: str) -> None: """Starts the EventManager.""" @@ -85,6 +88,7 @@ def start(self, peer_id: str) -> None: assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set' assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled' + self._execution_manager.register_on_crash_callback(self.on_full_node_crash) self._previous_node_state = self._event_storage.get_node_state() if self._should_reload_events(): @@ -133,7 +137,7 @@ def _subscribe_events(self) -> None: for event in _SUBSCRIBE_EVENTS: self._pubsub.subscribe(event, self._handle_hathor_event) - def load_started(self): + def load_started(self) -> None: if not self._is_running: return @@ -143,7 +147,7 @@ def load_started(self): ) self._event_storage.save_node_state(NodeState.LOAD) - def load_finished(self): + def load_finished(self) -> None: if not self._is_running: return @@ -153,6 +157,15 @@ def load_finished(self): ) self._event_storage.save_node_state(NodeState.SYNC) + def on_full_node_crash(self) -> None: + if not self._is_running: + return + + self._handle_event( + event_type=EventType.FULL_NODE_CRASHED, + event_args=EventArguments(), + ) + def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None: """Handles a PubSub 'HathorEvents' event.""" event_type = EventType.from_hathor_event(hathor_event) diff --git a/hathor/event/model/event_type.py b/hathor/event/model/event_type.py index 7c697fbc8..617ea74d8 100644 --- a/hathor/event/model/event_type.py +++ b/hathor/event/model/event_type.py @@ -25,6 +25,7 @@ class EventType(Enum): REORG_STARTED = 'REORG_STARTED' REORG_FINISHED = 'REORG_FINISHED' VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED' + FULL_NODE_CRASHED = 'FULL_NODE_CRASHED' @classmethod def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType': @@ -53,4 +54,5 @@ def data_type(self) -> type[BaseEventData]: EventType.REORG_STARTED: ReorgData, EventType.REORG_FINISHED: EmptyData, EventType.VERTEX_METADATA_CHANGED: TxData, + EventType.FULL_NODE_CRASHED: EmptyData, } diff --git a/hathor/execution_manager.py b/hathor/execution_manager.py new file mode 100644 index 000000000..8d788e8b0 --- /dev/null +++ b/hathor/execution_manager.py @@ -0,0 +1,65 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from typing import Callable, NoReturn + +from structlog import get_logger + +from hathor.reactor import ReactorProtocol + +logger = get_logger() + + +class ExecutionManager: + """Class to manage actions related to full node execution.""" + __slots__ = ('_log', '_reactor', '_on_crash_callbacks') + + def __init__(self, reactor: ReactorProtocol) -> None: + self._log = logger.new() + self._reactor = reactor + self._on_crash_callbacks: list[tuple[int, Callable[[], None]]] = [] + + def register_on_crash_callback(self, callback: Callable[[], None], *, priority: int = 0) -> None: + """Register a callback to be executed before the full node exits.""" + self._on_crash_callbacks.append((priority, callback)) + + def _run_on_crash_callbacks(self) -> None: + """Run all registered on crash callbacks.""" + callbacks = sorted(self._on_crash_callbacks, reverse=True) + + for _, callback in callbacks: + try: + callback() + except BaseException as e: + self._log.critical(f'Failed execution of on_crash callback "{callback}". Exception: {repr(e)}') + + def crash_and_exit(self, *, reason: str) -> NoReturn: + """ + Calling this function is a very extreme thing to do, so be careful. It should only be called when a + critical, unrecoverable failure happens. It crashes and exits the full node, maybe rendering the database + corrupted, and requiring manual intervention. In other words, a restart with a clean database (from scratch + or a snapshot) may be required. + """ + self._run_on_crash_callbacks() + self._log.critical( + 'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.', + reason=reason, + exc_info=True + ) + # We sequentially call more extreme exit methods, so the full node exits as gracefully as possible, while + # guaranteeing that it will indeed exit. + self._reactor.stop() + self._reactor.crash() + sys.exit(-1) diff --git a/hathor/manager.py b/hathor/manager.py index 566a7c936..da1bdb42a 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -41,6 +41,7 @@ RewardLockedError, SpendingVoidedError, ) +from hathor.execution_manager import ExecutionManager from hathor.feature_activation.bit_signaling_service import BitSignalingService from hathor.feature_activation.feature import Feature from hathor.feature_activation.feature_service import FeatureService @@ -104,6 +105,7 @@ def __init__(self, verification_service: VerificationService, cpu_mining_service: CpuMiningService, network: str, + execution_manager: ExecutionManager, hostname: Optional[str] = None, wallet: Optional[BaseWallet] = None, capabilities: Optional[list[str]] = None, @@ -129,6 +131,7 @@ def __init__(self, 'Either enable it, or use the reset-event-queue CLI command to remove all event-related data' ) + self._execution_manager = execution_manager self._settings = settings self.daa = daa self._cmd_path: Optional[str] = None @@ -250,6 +253,15 @@ def start(self) -> None: self.is_started = True self.log.info('start manager', network=self.network) + + if self.tx_storage.is_full_node_crashed(): + self.log.error( + 'Error initializing node. The last time you executed your full node it wasn\'t stopped correctly. ' + 'The storage is not reliable anymore and, because of that, you must remove your storage and do a ' + 'full sync (either from scratch or from a snapshot).' + ) + sys.exit(-1) + # If it's a full verification, we save on the storage that we are starting it # this is required because if we stop the initilization in the middle, the metadata # saved on the storage is not reliable anymore, only if we finish it @@ -319,7 +331,7 @@ def start(self) -> None: self.stratum_factory.start() # Start running - self.tx_storage.start_running_manager() + self.tx_storage.start_running_manager(self._execution_manager) def stop(self) -> Deferred: if not self.is_started: @@ -997,13 +1009,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non tx.update_initial_metadata(save=False) self.tx_storage.save_transaction(tx) self.tx_storage.add_to_indexes(tx) - try: - self.consensus_algorithm.update(tx) - except HathorError as e: - if not fails_silently: - raise InvalidNewTransaction('consensus update failed') from e - self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex, exc_info=True) - return False + self.consensus_algorithm.update(tx) assert self.verification_service.validate_full( tx, diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index 5b56431cb..d97cd0741 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -24,6 +24,7 @@ from structlog import get_logger from hathor.conf.get_settings import get_global_settings +from hathor.execution_manager import ExecutionManager from hathor.indexes import IndexesManager from hathor.indexes.height_index import HeightInfo from hathor.profiler import get_cpu_profiler @@ -84,6 +85,9 @@ class TransactionStorage(ABC): # Key storage attribute to save if the manager is running _manager_running_attribute: str = 'manager_running' + # Key storage attribute to save if the full node crashed + _full_node_crashed_attribute: str = 'full_node_crashed' + # Ket storage attribute to save the last time the node started _last_start_attribute: str = 'last_start' @@ -968,9 +972,10 @@ def is_running_full_verification(self) -> bool: """ return self.get_value(self._running_full_verification_attribute) == '1' - def start_running_manager(self) -> None: + def start_running_manager(self, execution_manager: ExecutionManager) -> None: """ Save on storage that manager is running """ + execution_manager.register_on_crash_callback(self.on_full_node_crash) self.add_value(self._manager_running_attribute, '1') def stop_running_manager(self) -> None: @@ -983,6 +988,14 @@ def is_running_manager(self) -> bool: """ return self.get_value(self._manager_running_attribute) == '1' + def on_full_node_crash(self) -> None: + """Save on storage that the full node crashed and cannot be recovered.""" + self.add_value(self._full_node_crashed_attribute, '1') + + def is_full_node_crashed(self) -> bool: + """Return whether the full node was crashed.""" + return self.get_value(self._full_node_crashed_attribute) == '1' + def get_last_started_at(self) -> int: """ Return the timestamp when the database was last started. """ diff --git a/tests/consensus/test_consensus.py b/tests/consensus/test_consensus.py index caa455a54..54573ff0e 100644 --- a/tests/consensus/test_consensus.py +++ b/tests/consensus/test_consensus.py @@ -1,5 +1,6 @@ -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock +from hathor.execution_manager import ExecutionManager from hathor.simulator.utils import add_new_block, add_new_blocks, gen_new_tx from hathor.transaction.storage import TransactionMemoryStorage from tests import unittest @@ -30,10 +31,15 @@ def test_unhandled_exception(self): class MyError(Exception): pass + execution_manager_mock = Mock(spec_set=ExecutionManager) + manager.consensus_algorithm._execution_manager = execution_manager_mock manager.consensus_algorithm._unsafe_update = MagicMock(side_effect=MyError) - with self.assertRaises(MyError): - manager.propagate_tx(tx, fails_silently=False) + manager.propagate_tx(tx, fails_silently=False) + + execution_manager_mock.crash_and_exit.assert_called_once_with( + reason=f"Consensus update failed for tx {tx.hash_hex}" + ) tx2 = manager.tx_storage.get_transaction(tx.hash) meta2 = tx2.get_metadata() diff --git a/tests/execution_manager/__init__.py b/tests/execution_manager/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/execution_manager/test_execution_manager.py b/tests/execution_manager/test_execution_manager.py new file mode 100644 index 000000000..9093c64fc --- /dev/null +++ b/tests/execution_manager/test_execution_manager.py @@ -0,0 +1,47 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from unittest.mock import Mock, patch + +from hathor.execution_manager import ExecutionManager +from hathor.reactor import ReactorProtocol + + +def test_crash_and_exit() -> None: + def callback() -> None: + pass + + callback_wrapped = Mock(wraps=callback) + log_mock = Mock() + reactor_mock = Mock(spec_set=ReactorProtocol) + manager = ExecutionManager(reactor_mock) + manager._log = log_mock + reason = 'some critical failure' + + manager.register_on_crash_callback(callback_wrapped) + + with patch.object(sys, 'exit') as exit_mock: + manager.crash_and_exit(reason=reason) + + callback_wrapped.assert_called_once() + log_mock.critical.assert_called_once_with( + 'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.', + reason=reason, + exc_info=True + ) + + reactor_mock.stop.assert_called_once() + reactor_mock.crash.assert_called_once() + exit_mock.assert_called_once_with(-1)