Skip to content

Commit 735ae0d

Browse files
committed
feat(unrecoverable-error): implement halting of full node execution
1 parent bd11bc4 commit 735ae0d

File tree

11 files changed

+175
-19
lines changed

11 files changed

+175
-19
lines changed

hathor/builder/builder.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from hathor.event import EventManager
2525
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
2626
from hathor.event.websocket import EventWebsocketFactory
27+
from hathor.execution_manager import ExecutionManager
2728
from hathor.feature_activation.bit_signaling_service import BitSignalingService
2829
from hathor.feature_activation.feature import Feature
2930
from hathor.feature_activation.feature_service import FeatureService
@@ -137,6 +138,8 @@ def __init__(self) -> None:
137138

138139
self._soft_voided_tx_ids: Optional[set[bytes]] = None
139140

141+
self._execution_manager: ExecutionManager | None = None
142+
140143
def build(self) -> BuildArtifacts:
141144
if self.artifacts is not None:
142145
raise ValueError('cannot call build twice')
@@ -150,15 +153,17 @@ def build(self) -> BuildArtifacts:
150153

151154
peer_id = self._get_peer_id()
152155

156+
indexes = self._get_or_create_indexes_manager()
157+
tx_storage = self._get_or_create_tx_storage(indexes)
158+
159+
execution_manager = self._get_or_create_execution_manager(tx_storage)
153160
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
154-
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)
161+
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager)
155162

156163
p2p_manager = self._get_p2p_manager()
157164

158165
wallet = self._get_or_create_wallet()
159166
event_manager = self._get_or_create_event_manager()
160-
indexes = self._get_or_create_indexes_manager()
161-
tx_storage = self._get_or_create_tx_storage(indexes)
162167
feature_service = self._get_or_create_feature_service(tx_storage)
163168
bit_signaling_service = self._get_or_create_bit_signaling_service(tx_storage)
164169
verification_service = self._get_or_create_verification_service()
@@ -287,6 +292,16 @@ def _get_peer_id(self) -> PeerId:
287292
return self._peer_id
288293
raise ValueError('peer_id not set')
289294

295+
def _get_or_create_execution_manager(self, tx_storage: TransactionStorage) -> ExecutionManager:
296+
if self._execution_manager is None:
297+
event_manager = self._get_or_create_event_manager()
298+
self._execution_manager = ExecutionManager(
299+
tx_storage=tx_storage,
300+
event_manager=event_manager,
301+
)
302+
303+
return self._execution_manager
304+
290305
def _get_or_create_pubsub(self) -> PubSubManager:
291306
if self._pubsub is None:
292307
self._pubsub = PubSubManager(self._get_reactor())

hathor/builder/cli_builder.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from hathor.consensus import ConsensusAlgorithm
2626
from hathor.event import EventManager
2727
from hathor.exception import BuilderError
28+
from hathor.execution_manager import ExecutionManager
2829
from hathor.feature_activation.bit_signaling_service import BitSignalingService
2930
from hathor.feature_activation.feature_service import FeatureService
3031
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
@@ -183,7 +184,15 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
183184
full_verification = True
184185

185186
soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
186-
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)
187+
execution_manager = ExecutionManager(
188+
tx_storage=tx_storage,
189+
event_manager=event_manager,
190+
)
191+
consensus_algorithm = ConsensusAlgorithm(
192+
soft_voided_tx_ids,
193+
pubsub=pubsub,
194+
execution_manager=execution_manager
195+
)
187196

188197
if self._args.x_enable_event_queue:
189198
self.log.info('--x-enable-event-queue flag provided. '

hathor/consensus/consensus.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory
1919
from hathor.consensus.context import ConsensusAlgorithmContext
2020
from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory
21+
from hathor.execution_manager import ExecutionManager
2122
from hathor.profiler import get_cpu_profiler
2223
from hathor.pubsub import HathorEvents, PubSubManager
2324
from hathor.transaction import BaseTransaction
@@ -55,13 +56,20 @@ class ConsensusAlgorithm:
5556
b0 will not be propagated to the voided_by of b1, b2, and b3.
5657
"""
5758

58-
def __init__(self, soft_voided_tx_ids: set[bytes], pubsub: PubSubManager) -> None:
59+
def __init__(
60+
self,
61+
soft_voided_tx_ids: set[bytes],
62+
pubsub: PubSubManager,
63+
*,
64+
execution_manager: ExecutionManager
65+
) -> None:
5966
self._settings = get_settings()
6067
self.log = logger.new()
6168
self._pubsub = pubsub
6269
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
6370
self.block_algorithm_factory = BlockConsensusAlgorithmFactory()
6471
self.transaction_algorithm_factory = TransactionConsensusAlgorithmFactory()
72+
self._execution_manager = execution_manager
6573

6674
def create_context(self) -> ConsensusAlgorithmContext:
6775
"""Handy method to create a context that can be used to access block and transaction algorithms."""
@@ -75,11 +83,11 @@ def update(self, base: BaseTransaction) -> None:
7583
assert meta.validation.is_valid()
7684
try:
7785
self._unsafe_update(base)
78-
except Exception:
86+
except BaseException:
7987
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
8088
assert base.storage is not None
8189
base.storage.save_transaction(base, only_metadata=True)
82-
raise
90+
self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}')
8391

8492
def _unsafe_update(self, base: BaseTransaction) -> None:
8593
"""Run a consensus update with its own context, indexes will be updated accordingly."""

hathor/event/event_manager.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def _subscribe_events(self) -> None:
132132
for event in _SUBSCRIBE_EVENTS:
133133
self._pubsub.subscribe(event, self._handle_hathor_event)
134134

135-
def load_started(self):
135+
def load_started(self) -> None:
136136
if not self._is_running:
137137
return
138138

@@ -142,7 +142,7 @@ def load_started(self):
142142
)
143143
self._event_storage.save_node_state(NodeState.LOAD)
144144

145-
def load_finished(self):
145+
def load_finished(self) -> None:
146146
if not self._is_running:
147147
return
148148

@@ -152,6 +152,15 @@ def load_finished(self):
152152
)
153153
self._event_storage.save_node_state(NodeState.SYNC)
154154

155+
def full_node_crashed(self) -> None:
156+
if not self._is_running:
157+
return
158+
159+
self._handle_event(
160+
event_type=EventType.FULL_NODE_CRASHED,
161+
event_args=EventArguments(),
162+
)
163+
155164
def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
156165
"""Handles a PubSub 'HathorEvents' event."""
157166
event_type = EventType.from_hathor_event(hathor_event)

hathor/event/model/event_type.py

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class EventType(Enum):
2525
REORG_STARTED = 'REORG_STARTED'
2626
REORG_FINISHED = 'REORG_FINISHED'
2727
VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED'
28+
FULL_NODE_CRASHED = 'FULL_NODE_CRASHED'
2829

2930
@classmethod
3031
def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType':
@@ -53,4 +54,5 @@ def data_type(self) -> type[BaseEventData]:
5354
EventType.REORG_STARTED: ReorgData,
5455
EventType.REORG_FINISHED: EmptyData,
5556
EventType.VERTEX_METADATA_CHANGED: TxData,
57+
EventType.FULL_NODE_CRASHED: EmptyData,
5658
}

hathor/execution_manager.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
from typing import NoReturn
17+
18+
from structlog import get_logger
19+
20+
from hathor.event import EventManager
21+
from hathor.transaction.storage import TransactionStorage
22+
23+
logger = get_logger()
24+
25+
26+
class ExecutionManager:
27+
"""Class to manage actions related to full node execution."""
28+
__slots__ = ('_log', '_tx_storage', '_event_manager')
29+
30+
def __init__(self, *, tx_storage: TransactionStorage, event_manager: EventManager) -> None:
31+
self._log = logger.new()
32+
self._tx_storage = tx_storage
33+
self._event_manager = event_manager
34+
35+
def crash_and_exit(self, *, reason: str) -> NoReturn:
36+
"""
37+
Calling this function is a very extreme thing to do, so be careful. It should only be called when a
38+
critical, unrecoverable failure happens. It crashes and exits the full node, rendering the database
39+
corrupted, and requiring manual intervention. In other words, a restart with a clean database (from scratch
40+
or a snapshot) will be required.
41+
"""
42+
self._tx_storage.full_node_crashed()
43+
self._event_manager.full_node_crashed()
44+
self._log.critical(
45+
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
46+
reason=reason,
47+
exc_info=True
48+
)
49+
# We use os._exit() instead of sys.exit() or any other approaches because this is the only one Twisted
50+
# doesn't catch.
51+
os._exit(-1)

hathor/manager.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,15 @@ def start(self) -> None:
238238
self.is_started = True
239239

240240
self.log.info('start manager', network=self.network)
241+
242+
if self.tx_storage.is_full_node_crashed():
243+
self.log.error(
244+
'Error initializing node. The last time you executed your full node it wasn\'t stopped correctly. '
245+
'The storage is not reliable anymore and, because of that, you must remove your storage and do a '
246+
'full sync (either from scratch or from a snapshot).'
247+
)
248+
sys.exit(-1)
249+
241250
# If it's a full verification, we save on the storage that we are starting it
242251
# this is required because if we stop the initilization in the middle, the metadata
243252
# saved on the storage is not reliable anymore, only if we finish it
@@ -1011,13 +1020,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10111020
tx.update_initial_metadata(save=False)
10121021
self.tx_storage.save_transaction(tx)
10131022
self.tx_storage.add_to_indexes(tx)
1014-
try:
1015-
self.consensus_algorithm.update(tx)
1016-
except HathorError as e:
1017-
if not fails_silently:
1018-
raise InvalidNewTransaction('consensus update failed') from e
1019-
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex, exc_info=True)
1020-
return False
1023+
self.consensus_algorithm.update(tx)
10211024

10221025
assert self.verification_service.validate_full(
10231026
tx,

hathor/transaction/storage/transaction_storage.py

+11
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ class TransactionStorage(ABC):
7575
# Key storage attribute to save if the manager is running
7676
_manager_running_attribute: str = 'manager_running'
7777

78+
# Key storage attribute to save if the full node crashed
79+
_full_node_crashed_attribute: str = 'full_node_crashed'
80+
7881
# Ket storage attribute to save the last time the node started
7982
_last_start_attribute: str = 'last_start'
8083

@@ -970,6 +973,14 @@ def is_running_manager(self) -> bool:
970973
"""
971974
return self.get_value(self._manager_running_attribute) == '1'
972975

976+
def full_node_crashed(self) -> None:
977+
"""Save on storage that the full node crashed and cannot be recovered."""
978+
self.add_value(self._full_node_crashed_attribute, '1')
979+
980+
def is_full_node_crashed(self) -> bool:
981+
"""Return whether the full node was crashed."""
982+
return self.get_value(self._full_node_crashed_attribute) == '1'
983+
973984
def get_last_started_at(self) -> int:
974985
""" Return the timestamp when the database was last started.
975986
"""

tests/consensus/test_consensus.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from unittest.mock import MagicMock
1+
from unittest.mock import MagicMock, Mock
22

33
from hathor.conf import HathorSettings
4+
from hathor.execution_manager import ExecutionManager
45
from hathor.transaction.storage import TransactionMemoryStorage
56
from tests import unittest
67
from tests.utils import (
@@ -39,10 +40,15 @@ def test_unhandled_exception(self):
3940
class MyError(Exception):
4041
pass
4142

43+
execution_manager_mock = Mock(spec_set=ExecutionManager)
44+
manager.consensus_algorithm._execution_manager = execution_manager_mock
4245
manager.consensus_algorithm._unsafe_update = MagicMock(side_effect=MyError)
4346

44-
with self.assertRaises(MyError):
45-
manager.propagate_tx(tx, fails_silently=False)
47+
manager.propagate_tx(tx, fails_silently=False)
48+
49+
execution_manager_mock.crash_and_exit.assert_called_once_with(
50+
reason=f"Consensus update failed for tx {tx.hash_hex}"
51+
)
4652

4753
tx2 = manager.tx_storage.get_transaction(tx.hash)
4854
meta2 = tx2.get_metadata()

tests/execution_manager/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
from unittest.mock import Mock, patch
17+
18+
from hathor.event import EventManager
19+
from hathor.execution_manager import ExecutionManager
20+
from hathor.transaction.storage import TransactionStorage
21+
22+
23+
def test_crash_and_exit() -> None:
24+
tx_storage_mock = Mock(spec_set=TransactionStorage)
25+
event_manager_mock = Mock(spec_set=EventManager)
26+
log_mock = Mock()
27+
manager = ExecutionManager(tx_storage=tx_storage_mock, event_manager=event_manager_mock)
28+
manager._log = log_mock
29+
reason = 'some critical failure'
30+
31+
with patch.object(os, '_exit') as exit_mock:
32+
manager.crash_and_exit(reason=reason)
33+
34+
tx_storage_mock.full_node_crashed.assert_called_once()
35+
event_manager_mock.full_node_crashed.assert_called_once()
36+
log_mock.critical.assert_called_once_with(
37+
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
38+
reason=reason,
39+
exc_info=True
40+
)
41+
42+
exit_mock.assert_called_once_with(-1)

0 commit comments

Comments
 (0)