Skip to content

Commit 2211199

Browse files
committed
feat(unrecoverable-error): implement halting of full node execution
1 parent 6448f56 commit 2211199

File tree

11 files changed

+173
-17
lines changed

11 files changed

+173
-17
lines changed

hathor/builder/builder.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from hathor.event import EventManager
2626
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
2727
from hathor.event.websocket import EventWebsocketFactory
28+
from hathor.execution_manager import ExecutionManager
2829
from hathor.feature_activation.bit_signaling_service import BitSignalingService
2930
from hathor.feature_activation.feature import Feature
3031
from hathor.feature_activation.feature_service import FeatureService
@@ -150,6 +151,8 @@ def __init__(self) -> None:
150151

151152
self._soft_voided_tx_ids: Optional[set[bytes]] = None
152153

154+
self._execution_manager: ExecutionManager | None = None
155+
153156
def build(self) -> BuildArtifacts:
154157
if self.artifacts is not None:
155158
raise ValueError('cannot call build twice')
@@ -163,8 +166,9 @@ def build(self) -> BuildArtifacts:
163166

164167
peer_id = self._get_peer_id()
165168

169+
execution_manager = self._get_or_create_execution_manager()
166170
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
167-
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)
171+
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager)
168172

169173
p2p_manager = self._get_p2p_manager()
170174

@@ -306,6 +310,17 @@ def _get_peer_id(self) -> PeerId:
306310
return self._peer_id
307311
raise ValueError('peer_id not set')
308312

313+
def _get_or_create_execution_manager(self) -> ExecutionManager:
314+
if self._execution_manager is None:
315+
tx_storage = self._get_or_create_tx_storage()
316+
event_manager = self._get_or_create_event_manager()
317+
self._execution_manager = ExecutionManager(
318+
tx_storage=tx_storage,
319+
event_manager=event_manager,
320+
)
321+
322+
return self._execution_manager
323+
309324
def _get_or_create_pubsub(self) -> PubSubManager:
310325
if self._pubsub is None:
311326
self._pubsub = PubSubManager(self._get_reactor())

hathor/builder/cli_builder.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from hathor.daa import DifficultyAdjustmentAlgorithm
2727
from hathor.event import EventManager
2828
from hathor.exception import BuilderError
29+
from hathor.execution_manager import ExecutionManager
2930
from hathor.feature_activation.bit_signaling_service import BitSignalingService
3031
from hathor.feature_activation.feature_service import FeatureService
3132
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
@@ -195,7 +196,15 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
195196
full_verification = True
196197

197198
soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
198-
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)
199+
execution_manager = ExecutionManager(
200+
tx_storage=tx_storage,
201+
event_manager=event_manager,
202+
)
203+
consensus_algorithm = ConsensusAlgorithm(
204+
soft_voided_tx_ids,
205+
pubsub=pubsub,
206+
execution_manager=execution_manager
207+
)
199208

200209
if self._args.x_enable_event_queue:
201210
self.log.info('--x-enable-event-queue flag provided. '

hathor/consensus/consensus.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory
1919
from hathor.consensus.context import ConsensusAlgorithmContext
2020
from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory
21+
from hathor.execution_manager import ExecutionManager
2122
from hathor.profiler import get_cpu_profiler
2223
from hathor.pubsub import HathorEvents, PubSubManager
2324
from hathor.transaction import BaseTransaction
@@ -55,13 +56,20 @@ class ConsensusAlgorithm:
5556
b0 will not be propagated to the voided_by of b1, b2, and b3.
5657
"""
5758

58-
def __init__(self, soft_voided_tx_ids: set[bytes], pubsub: PubSubManager) -> None:
59+
def __init__(
60+
self,
61+
soft_voided_tx_ids: set[bytes],
62+
pubsub: PubSubManager,
63+
*,
64+
execution_manager: ExecutionManager
65+
) -> None:
5966
self._settings = get_settings()
6067
self.log = logger.new()
6168
self._pubsub = pubsub
6269
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
6370
self.block_algorithm_factory = BlockConsensusAlgorithmFactory()
6471
self.transaction_algorithm_factory = TransactionConsensusAlgorithmFactory()
72+
self._execution_manager = execution_manager
6573

6674
def create_context(self) -> ConsensusAlgorithmContext:
6775
"""Handy method to create a context that can be used to access block and transaction algorithms."""
@@ -75,11 +83,11 @@ def update(self, base: BaseTransaction) -> None:
7583
assert meta.validation.is_valid()
7684
try:
7785
self._unsafe_update(base)
78-
except Exception:
86+
except BaseException:
7987
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
8088
assert base.storage is not None
8189
base.storage.save_transaction(base, only_metadata=True)
82-
raise
90+
self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}')
8391

8492
def _unsafe_update(self, base: BaseTransaction) -> None:
8593
"""Run a consensus update with its own context, indexes will be updated accordingly."""

hathor/event/event_manager.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def _subscribe_events(self) -> None:
133133
for event in _SUBSCRIBE_EVENTS:
134134
self._pubsub.subscribe(event, self._handle_hathor_event)
135135

136-
def load_started(self):
136+
def load_started(self) -> None:
137137
if not self._is_running:
138138
return
139139

@@ -143,7 +143,7 @@ def load_started(self):
143143
)
144144
self._event_storage.save_node_state(NodeState.LOAD)
145145

146-
def load_finished(self):
146+
def load_finished(self) -> None:
147147
if not self._is_running:
148148
return
149149

@@ -153,6 +153,15 @@ def load_finished(self):
153153
)
154154
self._event_storage.save_node_state(NodeState.SYNC)
155155

156+
def full_node_crashed(self) -> None:
157+
if not self._is_running:
158+
return
159+
160+
self._handle_event(
161+
event_type=EventType.FULL_NODE_CRASHED,
162+
event_args=EventArguments(),
163+
)
164+
156165
def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
157166
"""Handles a PubSub 'HathorEvents' event."""
158167
event_type = EventType.from_hathor_event(hathor_event)

hathor/event/model/event_type.py

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class EventType(Enum):
2525
REORG_STARTED = 'REORG_STARTED'
2626
REORG_FINISHED = 'REORG_FINISHED'
2727
VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED'
28+
FULL_NODE_CRASHED = 'FULL_NODE_CRASHED'
2829

2930
@classmethod
3031
def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType':
@@ -53,4 +54,5 @@ def data_type(self) -> type[BaseEventData]:
5354
EventType.REORG_STARTED: ReorgData,
5455
EventType.REORG_FINISHED: EmptyData,
5556
EventType.VERTEX_METADATA_CHANGED: TxData,
57+
EventType.FULL_NODE_CRASHED: EmptyData,
5658
}

hathor/execution_manager.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
from typing import NoReturn
17+
18+
from structlog import get_logger
19+
20+
from hathor.event import EventManager
21+
from hathor.transaction.storage import TransactionStorage
22+
23+
logger = get_logger()
24+
25+
26+
class ExecutionManager:
27+
"""Class to manage actions related to full node execution."""
28+
__slots__ = ('_log', '_tx_storage', '_event_manager')
29+
30+
def __init__(self, *, tx_storage: TransactionStorage, event_manager: EventManager) -> None:
31+
self._log = logger.new()
32+
self._tx_storage = tx_storage
33+
self._event_manager = event_manager
34+
35+
def crash_and_exit(self, *, reason: str) -> NoReturn:
36+
"""
37+
Calling this function is a very extreme thing to do, so be careful. It should only be called when a
38+
critical, unrecoverable failure happens. It crashes and exits the full node, rendering the database
39+
corrupted, and requiring manual intervention. In other words, a restart with a clean database (from scratch
40+
or a snapshot) will be required.
41+
"""
42+
self._tx_storage.full_node_crashed()
43+
self._event_manager.full_node_crashed()
44+
self._log.critical(
45+
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
46+
reason=reason,
47+
exc_info=True
48+
)
49+
# We use os._exit() instead of sys.exit() or any other approaches because this is the only one Twisted
50+
# doesn't catch.
51+
os._exit(-1)

hathor/manager.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,15 @@ def start(self) -> None:
244244
self.is_started = True
245245

246246
self.log.info('start manager', network=self.network)
247+
248+
if self.tx_storage.is_full_node_crashed():
249+
self.log.error(
250+
'Error initializing node. The last time you executed your full node it wasn\'t stopped correctly. '
251+
'The storage is not reliable anymore and, because of that, you must remove your storage and do a '
252+
'full sync (either from scratch or from a snapshot).'
253+
)
254+
sys.exit(-1)
255+
247256
# If it's a full verification, we save on the storage that we are starting it
248257
# this is required because if we stop the initilization in the middle, the metadata
249258
# saved on the storage is not reliable anymore, only if we finish it
@@ -990,13 +999,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
990999
tx.update_initial_metadata(save=False)
9911000
self.tx_storage.save_transaction(tx)
9921001
self.tx_storage.add_to_indexes(tx)
993-
try:
994-
self.consensus_algorithm.update(tx)
995-
except HathorError as e:
996-
if not fails_silently:
997-
raise InvalidNewTransaction('consensus update failed') from e
998-
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex, exc_info=True)
999-
return False
1002+
self.consensus_algorithm.update(tx)
10001003

10011004
assert self.verification_service.validate_full(
10021005
tx,

hathor/transaction/storage/transaction_storage.py

+11
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ class TransactionStorage(ABC):
8181
# Key storage attribute to save if the manager is running
8282
_manager_running_attribute: str = 'manager_running'
8383

84+
# Key storage attribute to save if the full node crashed
85+
_full_node_crashed_attribute: str = 'full_node_crashed'
86+
8487
# Ket storage attribute to save the last time the node started
8588
_last_start_attribute: str = 'last_start'
8689

@@ -978,6 +981,14 @@ def is_running_manager(self) -> bool:
978981
"""
979982
return self.get_value(self._manager_running_attribute) == '1'
980983

984+
def full_node_crashed(self) -> None:
985+
"""Save on storage that the full node crashed and cannot be recovered."""
986+
self.add_value(self._full_node_crashed_attribute, '1')
987+
988+
def is_full_node_crashed(self) -> bool:
989+
"""Return whether the full node was crashed."""
990+
return self.get_value(self._full_node_crashed_attribute) == '1'
991+
981992
def get_last_started_at(self) -> int:
982993
""" Return the timestamp when the database was last started.
983994
"""

tests/consensus/test_consensus.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from unittest.mock import MagicMock
1+
from unittest.mock import MagicMock, Mock
22

33
from hathor.conf import HathorSettings
4+
from hathor.execution_manager import ExecutionManager
45
from hathor.simulator.utils import add_new_block, add_new_blocks, gen_new_tx
56
from hathor.transaction.storage import TransactionMemoryStorage
67
from tests import unittest
@@ -33,10 +34,15 @@ def test_unhandled_exception(self):
3334
class MyError(Exception):
3435
pass
3536

37+
execution_manager_mock = Mock(spec_set=ExecutionManager)
38+
manager.consensus_algorithm._execution_manager = execution_manager_mock
3639
manager.consensus_algorithm._unsafe_update = MagicMock(side_effect=MyError)
3740

38-
with self.assertRaises(MyError):
39-
manager.propagate_tx(tx, fails_silently=False)
41+
manager.propagate_tx(tx, fails_silently=False)
42+
43+
execution_manager_mock.crash_and_exit.assert_called_once_with(
44+
reason=f"Consensus update failed for tx {tx.hash_hex}"
45+
)
4046

4147
tx2 = manager.tx_storage.get_transaction(tx.hash)
4248
meta2 = tx2.get_metadata()

tests/execution_manager/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
from unittest.mock import Mock, patch
17+
18+
from hathor.event import EventManager
19+
from hathor.execution_manager import ExecutionManager
20+
from hathor.transaction.storage import TransactionStorage
21+
22+
23+
def test_crash_and_exit() -> None:
24+
tx_storage_mock = Mock(spec_set=TransactionStorage)
25+
event_manager_mock = Mock(spec_set=EventManager)
26+
log_mock = Mock()
27+
manager = ExecutionManager(tx_storage=tx_storage_mock, event_manager=event_manager_mock)
28+
manager._log = log_mock
29+
reason = 'some critical failure'
30+
31+
with patch.object(os, '_exit') as exit_mock:
32+
manager.crash_and_exit(reason=reason)
33+
34+
tx_storage_mock.full_node_crashed.assert_called_once()
35+
event_manager_mock.full_node_crashed.assert_called_once()
36+
log_mock.critical.assert_called_once_with(
37+
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
38+
reason=reason,
39+
exc_info=True
40+
)
41+
42+
exit_mock.assert_called_once_with(-1)

0 commit comments

Comments
 (0)