Skip to content

feat(unrecoverable-error): implement halting of full node execution #809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from hathor.event import EventManager
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.bit_signaling_service import BitSignalingService
from hathor.feature_activation.feature import Feature
from hathor.feature_activation.feature_service import FeatureService
Expand Down Expand Up @@ -150,6 +151,8 @@ def __init__(self) -> None:

self._soft_voided_tx_ids: Optional[set[bytes]] = None

self._execution_manager: ExecutionManager | None = None

def build(self) -> BuildArtifacts:
if self.artifacts is not None:
raise ValueError('cannot call build twice')
Expand All @@ -163,8 +166,9 @@ def build(self) -> BuildArtifacts:

peer_id = self._get_peer_id()

execution_manager = self._get_or_create_execution_manager()
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager)

p2p_manager = self._get_p2p_manager()

Expand Down Expand Up @@ -215,6 +219,7 @@ def build(self) -> BuildArtifacts:
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
cpu_mining_service=cpu_mining_service,
execution_manager=execution_manager,
**kwargs
)

Expand Down Expand Up @@ -306,6 +311,13 @@ def _get_peer_id(self) -> PeerId:
return self._peer_id
raise ValueError('peer_id not set')

def _get_or_create_execution_manager(self) -> ExecutionManager:
if self._execution_manager is None:
reactor = self._get_reactor()
self._execution_manager = ExecutionManager(reactor)

return self._execution_manager

def _get_or_create_pubsub(self) -> PubSubManager:
if self._pubsub is None:
self._pubsub = PubSubManager(self._get_reactor())
Expand Down Expand Up @@ -438,7 +450,8 @@ def _get_or_create_event_manager(self) -> EventManager:
reactor=reactor,
pubsub=self._get_or_create_pubsub(),
event_storage=storage,
event_ws_factory=factory
event_ws_factory=factory,
execution_manager=self._get_or_create_execution_manager()
)

return self._event_manager
Expand Down
15 changes: 12 additions & 3 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from hathor.daa import DifficultyAdjustmentAlgorithm
from hathor.event import EventManager
from hathor.exception import BuilderError
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.bit_signaling_service import BitSignalingService
from hathor.feature_activation.feature_service import FeatureService
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
Expand Down Expand Up @@ -212,11 +213,14 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
event_storage=event_storage
)

execution_manager = ExecutionManager(reactor)

event_manager = EventManager(
event_storage=event_storage,
event_ws_factory=self.event_ws_factory,
pubsub=pubsub,
reactor=reactor
reactor=reactor,
execution_manager=execution_manager,
)

if self._args.wallet_index and tx_storage.indexes is not None:
Expand All @@ -236,7 +240,11 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
full_verification = True

soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)
consensus_algorithm = ConsensusAlgorithm(
soft_voided_tx_ids,
pubsub=pubsub,
execution_manager=execution_manager
)

if self._args.x_enable_event_queue:
self.log.info('--x-enable-event-queue flag provided. '
Expand Down Expand Up @@ -308,7 +316,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
feature_service=self.feature_service,
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
cpu_mining_service=cpu_mining_service
cpu_mining_service=cpu_mining_service,
execution_manager=execution_manager,
)

if self._args.x_ipython_kernel:
Expand Down
14 changes: 11 additions & 3 deletions hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory
from hathor.consensus.context import ConsensusAlgorithmContext
from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory
from hathor.execution_manager import ExecutionManager
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
Expand Down Expand Up @@ -55,13 +56,20 @@ class ConsensusAlgorithm:
b0 will not be propagated to the voided_by of b1, b2, and b3.
"""

def __init__(self, soft_voided_tx_ids: set[bytes], pubsub: PubSubManager) -> None:
def __init__(
self,
soft_voided_tx_ids: set[bytes],
pubsub: PubSubManager,
*,
execution_manager: ExecutionManager
) -> None:
self._settings = get_global_settings()
self.log = logger.new()
self._pubsub = pubsub
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
self.block_algorithm_factory = BlockConsensusAlgorithmFactory()
self.transaction_algorithm_factory = TransactionConsensusAlgorithmFactory()
self._execution_manager = execution_manager

def create_context(self) -> ConsensusAlgorithmContext:
"""Handy method to create a context that can be used to access block and transaction algorithms."""
Expand All @@ -75,11 +83,11 @@ def update(self, base: BaseTransaction) -> None:
assert meta.validation.is_valid()
try:
self._unsafe_update(base)
except Exception:
except BaseException:
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
assert base.storage is not None
base.storage.save_transaction(base, only_metadata=True)
raise
self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}')

def _unsafe_update(self, base: BaseTransaction) -> None:
"""Run a consensus update with its own context, indexes will be updated accordingly."""
Expand Down
17 changes: 15 additions & 2 deletions hathor/event/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from hathor.event.model.node_state import NodeState
from hathor.event.storage import EventStorage
from hathor.event.websocket import EventWebsocketFactory
from hathor.execution_manager import ExecutionManager
from hathor.pubsub import EventArguments, HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.transaction import BaseTransaction
Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(
event_storage: EventStorage,
pubsub: PubSubManager,
reactor: Reactor,
execution_manager: ExecutionManager,
event_ws_factory: Optional[EventWebsocketFactory] = None,
) -> None:
self.log = logger.new()
Expand All @@ -78,13 +80,15 @@ def __init__(
self._event_storage = event_storage
self._event_ws_factory = event_ws_factory
self._pubsub = pubsub
self._execution_manager = execution_manager

def start(self, peer_id: str) -> None:
"""Starts the EventManager."""
assert self._is_running is False, 'Cannot start, EventManager is already running'
assert self._event_ws_factory is not None, 'Cannot start, EventWebsocketFactory is not set'
assert self.get_event_queue_state() is True, 'Cannot start, event queue feature is disabled'

self._execution_manager.register_on_crash_callback(self.on_full_node_crash)
self._previous_node_state = self._event_storage.get_node_state()

if self._should_reload_events():
Expand Down Expand Up @@ -133,7 +137,7 @@ def _subscribe_events(self) -> None:
for event in _SUBSCRIBE_EVENTS:
self._pubsub.subscribe(event, self._handle_hathor_event)

def load_started(self):
def load_started(self) -> None:
if not self._is_running:
return

Expand All @@ -143,7 +147,7 @@ def load_started(self):
)
self._event_storage.save_node_state(NodeState.LOAD)

def load_finished(self):
def load_finished(self) -> None:
if not self._is_running:
return

Expand All @@ -153,6 +157,15 @@ def load_finished(self):
)
self._event_storage.save_node_state(NodeState.SYNC)

def on_full_node_crash(self) -> None:
if not self._is_running:
return

self._handle_event(
event_type=EventType.FULL_NODE_CRASHED,
event_args=EventArguments(),
)

def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
"""Handles a PubSub 'HathorEvents' event."""
event_type = EventType.from_hathor_event(hathor_event)
Expand Down
2 changes: 2 additions & 0 deletions hathor/event/model/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class EventType(Enum):
REORG_STARTED = 'REORG_STARTED'
REORG_FINISHED = 'REORG_FINISHED'
VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED'
FULL_NODE_CRASHED = 'FULL_NODE_CRASHED'

@classmethod
def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType':
Expand Down Expand Up @@ -53,4 +54,5 @@ def data_type(self) -> type[BaseEventData]:
EventType.REORG_STARTED: ReorgData,
EventType.REORG_FINISHED: EmptyData,
EventType.VERTEX_METADATA_CHANGED: TxData,
EventType.FULL_NODE_CRASHED: EmptyData,
}
65 changes: 65 additions & 0 deletions hathor/execution_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from typing import Callable, NoReturn

from structlog import get_logger

from hathor.reactor import ReactorProtocol

logger = get_logger()


class ExecutionManager:
"""Class to manage actions related to full node execution."""
__slots__ = ('_log', '_reactor', '_on_crash_callbacks')

def __init__(self, reactor: ReactorProtocol) -> None:
self._log = logger.new()
self._reactor = reactor
self._on_crash_callbacks: list[tuple[int, Callable[[], None]]] = []

def register_on_crash_callback(self, callback: Callable[[], None], *, priority: int = 0) -> None:
"""Register a callback to be executed before the full node exits."""
self._on_crash_callbacks.append((priority, callback))

def _run_on_crash_callbacks(self) -> None:
"""Run all registered on crash callbacks."""
callbacks = sorted(self._on_crash_callbacks, reverse=True)

for _, callback in callbacks:
try:
callback()
except BaseException as e:
self._log.critical(f'Failed execution of on_crash callback "{callback}". Exception: {repr(e)}')

def crash_and_exit(self, *, reason: str) -> NoReturn:
"""
Calling this function is a very extreme thing to do, so be careful. It should only be called when a
critical, unrecoverable failure happens. It crashes and exits the full node, maybe rendering the database
corrupted, and requiring manual intervention. In other words, a restart with a clean database (from scratch
or a snapshot) may be required.
"""
self._run_on_crash_callbacks()
self._log.critical(
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
reason=reason,
exc_info=True
)
# We sequentially call more extreme exit methods, so the full node exits as gracefully as possible, while
# guaranteeing that it will indeed exit.
self._reactor.stop()
self._reactor.crash()
sys.exit(-1)
22 changes: 14 additions & 8 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
RewardLockedError,
SpendingVoidedError,
)
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.bit_signaling_service import BitSignalingService
from hathor.feature_activation.feature import Feature
from hathor.feature_activation.feature_service import FeatureService
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self,
verification_service: VerificationService,
cpu_mining_service: CpuMiningService,
network: str,
execution_manager: ExecutionManager,
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
capabilities: Optional[list[str]] = None,
Expand All @@ -129,6 +131,7 @@ def __init__(self,
'Either enable it, or use the reset-event-queue CLI command to remove all event-related data'
)

self._execution_manager = execution_manager
self._settings = settings
self.daa = daa
self._cmd_path: Optional[str] = None
Expand Down Expand Up @@ -250,6 +253,15 @@ def start(self) -> None:
self.is_started = True

self.log.info('start manager', network=self.network)

if self.tx_storage.is_full_node_crashed():
self.log.error(
'Error initializing node. The last time you executed your full node it wasn\'t stopped correctly. '
'The storage is not reliable anymore and, because of that, you must remove your storage and do a '
'full sync (either from scratch or from a snapshot).'
)
sys.exit(-1)

# If it's a full verification, we save on the storage that we are starting it
# this is required because if we stop the initilization in the middle, the metadata
# saved on the storage is not reliable anymore, only if we finish it
Expand Down Expand Up @@ -319,7 +331,7 @@ def start(self) -> None:
self.stratum_factory.start()

# Start running
self.tx_storage.start_running_manager()
self.tx_storage.start_running_manager(self._execution_manager)

def stop(self) -> Deferred:
if not self.is_started:
Expand Down Expand Up @@ -997,13 +1009,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
tx.update_initial_metadata(save=False)
self.tx_storage.save_transaction(tx)
self.tx_storage.add_to_indexes(tx)
try:
self.consensus_algorithm.update(tx)
except HathorError as e:
if not fails_silently:
raise InvalidNewTransaction('consensus update failed') from e
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex, exc_info=True)
return False
self.consensus_algorithm.update(tx)

assert self.verification_service.validate_full(
tx,
Expand Down
Loading