Skip to content

Commit 8967098

Browse files
committed
feat(unrecoverable-error): implement halting of full node execution
1 parent cffe172 commit 8967098

File tree

11 files changed

+173
-17
lines changed

11 files changed

+173
-17
lines changed

hathor/builder/builder.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from hathor.event import EventManager
2626
from hathor.event.storage import EventMemoryStorage, EventRocksDBStorage, EventStorage
2727
from hathor.event.websocket import EventWebsocketFactory
28+
from hathor.execution_manager import ExecutionManager
2829
from hathor.feature_activation.bit_signaling_service import BitSignalingService
2930
from hathor.feature_activation.feature import Feature
3031
from hathor.feature_activation.feature_service import FeatureService
@@ -149,6 +150,8 @@ def __init__(self) -> None:
149150

150151
self._soft_voided_tx_ids: Optional[set[bytes]] = None
151152

153+
self._execution_manager: ExecutionManager | None = None
154+
152155
def build(self) -> BuildArtifacts:
153156
if self.artifacts is not None:
154157
raise ValueError('cannot call build twice')
@@ -162,8 +165,9 @@ def build(self) -> BuildArtifacts:
162165

163166
peer_id = self._get_peer_id()
164167

168+
execution_manager = self._get_or_create_execution_manager()
165169
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
166-
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)
170+
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager)
167171

168172
p2p_manager = self._get_p2p_manager()
169173

@@ -305,6 +309,17 @@ def _get_peer_id(self) -> PeerId:
305309
return self._peer_id
306310
raise ValueError('peer_id not set')
307311

312+
def _get_or_create_execution_manager(self) -> ExecutionManager:
313+
if self._execution_manager is None:
314+
tx_storage = self._get_or_create_tx_storage()
315+
event_manager = self._get_or_create_event_manager()
316+
self._execution_manager = ExecutionManager(
317+
tx_storage=tx_storage,
318+
event_manager=event_manager,
319+
)
320+
321+
return self._execution_manager
322+
308323
def _get_or_create_pubsub(self) -> PubSubManager:
309324
if self._pubsub is None:
310325
self._pubsub = PubSubManager(self._get_reactor())

hathor/builder/cli_builder.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from hathor.daa import DifficultyAdjustmentAlgorithm
2727
from hathor.event import EventManager
2828
from hathor.exception import BuilderError
29+
from hathor.execution_manager import ExecutionManager
2930
from hathor.feature_activation.bit_signaling_service import BitSignalingService
3031
from hathor.feature_activation.feature_service import FeatureService
3132
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
@@ -192,7 +193,15 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
192193
full_verification = True
193194

194195
soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
195-
consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub=pubsub)
196+
execution_manager = ExecutionManager(
197+
tx_storage=tx_storage,
198+
event_manager=event_manager,
199+
)
200+
consensus_algorithm = ConsensusAlgorithm(
201+
soft_voided_tx_ids,
202+
pubsub=pubsub,
203+
execution_manager=execution_manager
204+
)
196205

197206
if self._args.x_enable_event_queue:
198207
self.log.info('--x-enable-event-queue flag provided. '

hathor/consensus/consensus.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory
1919
from hathor.consensus.context import ConsensusAlgorithmContext
2020
from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory
21+
from hathor.execution_manager import ExecutionManager
2122
from hathor.profiler import get_cpu_profiler
2223
from hathor.pubsub import HathorEvents, PubSubManager
2324
from hathor.transaction import BaseTransaction
@@ -55,13 +56,20 @@ class ConsensusAlgorithm:
5556
b0 will not be propagated to the voided_by of b1, b2, and b3.
5657
"""
5758

58-
def __init__(self, soft_voided_tx_ids: set[bytes], pubsub: PubSubManager) -> None:
59+
def __init__(
60+
self,
61+
soft_voided_tx_ids: set[bytes],
62+
pubsub: PubSubManager,
63+
*,
64+
execution_manager: ExecutionManager
65+
) -> None:
5966
self._settings = get_settings()
6067
self.log = logger.new()
6168
self._pubsub = pubsub
6269
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
6370
self.block_algorithm_factory = BlockConsensusAlgorithmFactory()
6471
self.transaction_algorithm_factory = TransactionConsensusAlgorithmFactory()
72+
self._execution_manager = execution_manager
6573

6674
def create_context(self) -> ConsensusAlgorithmContext:
6775
"""Handy method to create a context that can be used to access block and transaction algorithms."""
@@ -75,11 +83,11 @@ def update(self, base: BaseTransaction) -> None:
7583
assert meta.validation.is_valid()
7684
try:
7785
self._unsafe_update(base)
78-
except Exception:
86+
except BaseException:
7987
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
8088
assert base.storage is not None
8189
base.storage.save_transaction(base, only_metadata=True)
82-
raise
90+
self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}')
8391

8492
def _unsafe_update(self, base: BaseTransaction) -> None:
8593
"""Run a consensus update with its own context, indexes will be updated accordingly."""

hathor/event/event_manager.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def _subscribe_events(self) -> None:
132132
for event in _SUBSCRIBE_EVENTS:
133133
self._pubsub.subscribe(event, self._handle_hathor_event)
134134

135-
def load_started(self):
135+
def load_started(self) -> None:
136136
if not self._is_running:
137137
return
138138

@@ -142,7 +142,7 @@ def load_started(self):
142142
)
143143
self._event_storage.save_node_state(NodeState.LOAD)
144144

145-
def load_finished(self):
145+
def load_finished(self) -> None:
146146
if not self._is_running:
147147
return
148148

@@ -152,6 +152,15 @@ def load_finished(self):
152152
)
153153
self._event_storage.save_node_state(NodeState.SYNC)
154154

155+
def full_node_crashed(self) -> None:
156+
if not self._is_running:
157+
return
158+
159+
self._handle_event(
160+
event_type=EventType.FULL_NODE_CRASHED,
161+
event_args=EventArguments(),
162+
)
163+
155164
def _handle_hathor_event(self, hathor_event: HathorEvents, event_args: EventArguments) -> None:
156165
"""Handles a PubSub 'HathorEvents' event."""
157166
event_type = EventType.from_hathor_event(hathor_event)

hathor/event/model/event_type.py

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class EventType(Enum):
2525
REORG_STARTED = 'REORG_STARTED'
2626
REORG_FINISHED = 'REORG_FINISHED'
2727
VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED'
28+
FULL_NODE_CRASHED = 'FULL_NODE_CRASHED'
2829

2930
@classmethod
3031
def from_hathor_event(cls, hathor_event: HathorEvents) -> 'EventType':
@@ -53,4 +54,5 @@ def data_type(self) -> type[BaseEventData]:
5354
EventType.REORG_STARTED: ReorgData,
5455
EventType.REORG_FINISHED: EmptyData,
5556
EventType.VERTEX_METADATA_CHANGED: TxData,
57+
EventType.FULL_NODE_CRASHED: EmptyData,
5658
}

hathor/execution_manager.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
from typing import NoReturn
17+
18+
from structlog import get_logger
19+
20+
from hathor.event import EventManager
21+
from hathor.transaction.storage import TransactionStorage
22+
23+
logger = get_logger()
24+
25+
26+
class ExecutionManager:
27+
"""Class to manage actions related to full node execution."""
28+
__slots__ = ('_log', '_tx_storage', '_event_manager')
29+
30+
def __init__(self, *, tx_storage: TransactionStorage, event_manager: EventManager) -> None:
31+
self._log = logger.new()
32+
self._tx_storage = tx_storage
33+
self._event_manager = event_manager
34+
35+
def crash_and_exit(self, *, reason: str) -> NoReturn:
36+
"""
37+
Calling this function is a very extreme thing to do, so be careful. It should only be called when a
38+
critical, unrecoverable failure happens. It crashes and exits the full node, rendering the database
39+
corrupted, and requiring manual intervention. In other words, a restart with a clean database (from scratch
40+
or a snapshot) will be required.
41+
"""
42+
self._tx_storage.full_node_crashed()
43+
self._event_manager.full_node_crashed()
44+
self._log.critical(
45+
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
46+
reason=reason,
47+
exc_info=True
48+
)
49+
# We use os._exit() instead of sys.exit() or any other approaches because this is the only one Twisted
50+
# doesn't catch.
51+
os._exit(-1)

hathor/manager.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,15 @@ def start(self) -> None:
243243
self.is_started = True
244244

245245
self.log.info('start manager', network=self.network)
246+
247+
if self.tx_storage.is_full_node_crashed():
248+
self.log.error(
249+
'Error initializing node. The last time you executed your full node it wasn\'t stopped correctly. '
250+
'The storage is not reliable anymore and, because of that, you must remove your storage and do a '
251+
'full sync (either from scratch or from a snapshot).'
252+
)
253+
sys.exit(-1)
254+
246255
# If it's a full verification, we save on the storage that we are starting it
247256
# this is required because if we stop the initilization in the middle, the metadata
248257
# saved on the storage is not reliable anymore, only if we finish it
@@ -989,13 +998,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
989998
tx.update_initial_metadata(save=False)
990999
self.tx_storage.save_transaction(tx)
9911000
self.tx_storage.add_to_indexes(tx)
992-
try:
993-
self.consensus_algorithm.update(tx)
994-
except HathorError as e:
995-
if not fails_silently:
996-
raise InvalidNewTransaction('consensus update failed') from e
997-
self.log.warn('on_new_tx(): consensus update failed', tx=tx.hash_hex, exc_info=True)
998-
return False
1001+
self.consensus_algorithm.update(tx)
9991002

10001003
assert self.verification_service.validate_full(
10011004
tx,

hathor/transaction/storage/transaction_storage.py

+11
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ class TransactionStorage(ABC):
8080
# Key storage attribute to save if the manager is running
8181
_manager_running_attribute: str = 'manager_running'
8282

83+
# Key storage attribute to save if the full node crashed
84+
_full_node_crashed_attribute: str = 'full_node_crashed'
85+
8386
# Ket storage attribute to save the last time the node started
8487
_last_start_attribute: str = 'last_start'
8588

@@ -976,6 +979,14 @@ def is_running_manager(self) -> bool:
976979
"""
977980
return self.get_value(self._manager_running_attribute) == '1'
978981

982+
def full_node_crashed(self) -> None:
983+
"""Save on storage that the full node crashed and cannot be recovered."""
984+
self.add_value(self._full_node_crashed_attribute, '1')
985+
986+
def is_full_node_crashed(self) -> bool:
987+
"""Return whether the full node was crashed."""
988+
return self.get_value(self._full_node_crashed_attribute) == '1'
989+
979990
def get_last_started_at(self) -> int:
980991
""" Return the timestamp when the database was last started.
981992
"""

tests/consensus/test_consensus.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from unittest.mock import MagicMock
1+
from unittest.mock import MagicMock, Mock
22

33
from hathor.conf import HathorSettings
4+
from hathor.execution_manager import ExecutionManager
45
from hathor.simulator.utils import add_new_block, add_new_blocks, gen_new_tx
56
from hathor.transaction.storage import TransactionMemoryStorage
67
from tests import unittest
@@ -33,10 +34,15 @@ def test_unhandled_exception(self):
3334
class MyError(Exception):
3435
pass
3536

37+
execution_manager_mock = Mock(spec_set=ExecutionManager)
38+
manager.consensus_algorithm._execution_manager = execution_manager_mock
3639
manager.consensus_algorithm._unsafe_update = MagicMock(side_effect=MyError)
3740

38-
with self.assertRaises(MyError):
39-
manager.propagate_tx(tx, fails_silently=False)
41+
manager.propagate_tx(tx, fails_silently=False)
42+
43+
execution_manager_mock.crash_and_exit.assert_called_once_with(
44+
reason=f"Consensus update failed for tx {tx.hash_hex}"
45+
)
4046

4147
tx2 = manager.tx_storage.get_transaction(tx.hash)
4248
meta2 = tx2.get_metadata()

tests/execution_manager/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright 2023 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
from unittest.mock import Mock, patch
17+
18+
from hathor.event import EventManager
19+
from hathor.execution_manager import ExecutionManager
20+
from hathor.transaction.storage import TransactionStorage
21+
22+
23+
def test_crash_and_exit() -> None:
24+
tx_storage_mock = Mock(spec_set=TransactionStorage)
25+
event_manager_mock = Mock(spec_set=EventManager)
26+
log_mock = Mock()
27+
manager = ExecutionManager(tx_storage=tx_storage_mock, event_manager=event_manager_mock)
28+
manager._log = log_mock
29+
reason = 'some critical failure'
30+
31+
with patch.object(os, '_exit') as exit_mock:
32+
manager.crash_and_exit(reason=reason)
33+
34+
tx_storage_mock.full_node_crashed.assert_called_once()
35+
event_manager_mock.full_node_crashed.assert_called_once()
36+
log_mock.critical.assert_called_once_with(
37+
'Critical failure occurred, causing the full node to halt execution. Manual intervention is required.',
38+
reason=reason,
39+
exc_info=True
40+
)
41+
42+
exit_mock.assert_called_once_with(-1)

0 commit comments

Comments
 (0)