Skip to content

refactor(misc): minor maintenance changes #1206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions docs/event-queue-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@ When the Event Queue feature is enabled, the full node will generate specific ev

## Enabling the Event Queue

To enable the Event Queue feature, you must add two CLI options when running the full node:

1. Add `--unsafe-mode [network_name]`
2. Add `--x-enable-event-queue`
To enable the Event Queue feature, you must add this CLI option when running the full node: `--enable-event-queue`.

For example:

```bash
poetry run hathor-cli run_node --memory-storage --status 8080 --testnet --unsafe-mode testnet-golf --x-enable-event-queue
poetry run hathor-cli run_node --memory-storage --status 8080 --testnet --enable-event-queue
```

**ATTENTION**: While the Event Queue is in beta, it's considered unsafe. You must not use it in production environments.

### First run

If this is the first time your full node is running with the event queue enabled, there are 3 possibilities:
Expand All @@ -45,7 +40,7 @@ For case 2.2, an extra loading step will be performed during full node initializ

After running the full node with the Event Queue enabled, if you restart your full node (that is, stop it and then run it again), there are 2 possibilities:

1. You run the full node with the `--x-enable-event-queue` CLI option, that is, you keep the Event Queue enabled, or
1. You run the full node with the `--enable-event-queue` CLI option, that is, you keep the Event Queue enabled, or
2. You run the full node without the CLI option, that is, you don't enable it, but you **have to clear the event data in the database**.

For case 1, the full node will start normally, and continue to generate new events for synced vertices from where it stopped in the previous run.
Expand Down
4 changes: 2 additions & 2 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ def _get_or_create_consensus(self) -> ConsensusAlgorithm:
if self._consensus is None:
soft_voided_tx_ids = self._get_soft_voided_tx_ids()
pubsub = self._get_or_create_pubsub()
execution_manager = self._get_or_create_execution_manager()
self._consensus = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager)
self._consensus = ConsensusAlgorithm(soft_voided_tx_ids, pubsub)

return self._consensus

Expand Down Expand Up @@ -611,6 +610,7 @@ def _get_or_create_vertex_handler(self) -> VertexHandler:
verification_service=self._get_or_create_verification_service(),
consensus=self._get_or_create_consensus(),
feature_service=self._get_or_create_feature_service(),
execution_manager=self._get_or_create_execution_manager(),
pubsub=self._get_or_create_pubsub(),
wallet=self._get_or_create_wallet(),
)
Expand Down
12 changes: 3 additions & 9 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,12 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
self.log.debug('enable utxo index')
tx_storage.indexes.enable_utxo_index()

full_verification = False
if self._args.x_full_verification:
self.check_or_raise(
not self._args.x_enable_event_queue and not self._args.enable_event_queue,
'--x-full-verification cannot be used with --enable-event-queue'
)
full_verification = True
self.check_or_raise(not self._args.x_full_verification, '--x-full-verification is deprecated')

soft_voided_tx_ids = set(settings.SOFT_VOIDED_TX_IDS)
consensus_algorithm = ConsensusAlgorithm(
soft_voided_tx_ids,
pubsub=pubsub,
execution_manager=execution_manager
)

if self._args.x_enable_event_queue or self._args.enable_event_queue:
Expand Down Expand Up @@ -307,6 +300,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
consensus=consensus_algorithm,
feature_service=self.feature_service,
pubsub=pubsub,
execution_manager=execution_manager,
wallet=self.wallet,
log_vertex_bytes=self._args.log_vertex_bytes,
)
Expand Down Expand Up @@ -339,7 +333,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
wallet=self.wallet,
checkpoints=settings.CHECKPOINTS,
environment_info=get_environment_info(args=str(self._args), peer_id=str(peer.id)),
full_verification=full_verification,
full_verification=False,
enable_event_queue=self._args.x_enable_event_queue or self._args.enable_event_queue,
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
Expand Down
2 changes: 1 addition & 1 deletion hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def create_parser(cls) -> ArgumentParser:
parser.add_argument('--recursion-limit', type=int, help='Set python recursion limit')
parser.add_argument('--allow-mining-without-peers', action='store_true', help='Allow mining without peers')
fvargs = parser.add_mutually_exclusive_group()
fvargs.add_argument('--x-full-verification', action='store_true', help='Fully validate the local database')
fvargs.add_argument('--x-full-verification', action='store_true', help=SUPPRESS) # deprecated
parser.add_argument('--procname-prefix', help='Add a prefix to the process name', default='')
parser.add_argument('--allow-non-standard-script', action='store_true', help='Accept non-standard scripts on '
'/push-tx API')
Expand Down
24 changes: 8 additions & 16 deletions hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from hathor.consensus.block_consensus import BlockConsensusAlgorithmFactory
from hathor.consensus.context import ConsensusAlgorithmContext
from hathor.consensus.transaction_consensus import TransactionConsensusAlgorithmFactory
from hathor.execution_manager import ExecutionManager
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
Expand Down Expand Up @@ -68,38 +67,31 @@ def __init__(
self,
soft_voided_tx_ids: set[bytes],
pubsub: PubSubManager,
*,
execution_manager: ExecutionManager
) -> None:
self._settings = get_global_settings()
self.log = logger.new()
self._pubsub = pubsub
self.soft_voided_tx_ids = frozenset(soft_voided_tx_ids)
self.block_algorithm_factory = BlockConsensusAlgorithmFactory()
self.transaction_algorithm_factory = TransactionConsensusAlgorithmFactory()
self._execution_manager = execution_manager

def create_context(self) -> ConsensusAlgorithmContext:
"""Handy method to create a context that can be used to access block and transaction algorithms."""
return ConsensusAlgorithmContext(self, self._pubsub)

@cpu.profiler(key=lambda self, base: 'consensus!{}'.format(base.hash.hex()))
def update(self, base: BaseTransaction) -> None:
def unsafe_update(self, base: BaseTransaction) -> None:
"""
Run a consensus update with its own context, indexes will be updated accordingly.

It is considered unsafe because the caller is responsible for crashing the full node
if this method throws any exception.
"""
from hathor.transaction import Block, Transaction
assert base.storage is not None
assert base.storage.is_only_valid_allowed()
meta = base.get_metadata()
assert meta.validation.is_valid()
try:
self._unsafe_update(base)
except BaseException:
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
assert base.storage is not None
base.storage.save_transaction(base, only_metadata=True)
self._execution_manager.crash_and_exit(reason=f'Consensus update failed for tx {base.hash_hex}')

def _unsafe_update(self, base: BaseTransaction) -> None:
"""Run a consensus update with its own context, indexes will be updated accordingly."""
from hathor.transaction import Block, Transaction

# XXX: first make sure we can run the consensus update on this tx:
meta = base.get_metadata()
Expand Down
2 changes: 1 addition & 1 deletion hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def del_tx(self, tx: BaseTransaction, *, remove_all: bool = False, relax_assert:
self.sorted_txs.del_tx(tx)

if self.tokens:
self.tokens.del_tx(tx)
self.tokens.del_tx(tx, remove_all=remove_all)


class MemoryIndexesManager(IndexesManager):
Expand Down
2 changes: 1 addition & 1 deletion hathor/indexes/memory_tokens_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def add_tx(self, tx: BaseTransaction) -> None:
return
transactions.add(element)

def del_tx(self, tx: BaseTransaction) -> None:
def remove_tx(self, tx: BaseTransaction) -> None:
for tx_input in tx.inputs:
spent_tx = tx.get_spent_tx(tx_input)
self._add_to_index(spent_tx, tx_input.index)
Expand Down
7 changes: 5 additions & 2 deletions hathor/indexes/rocksdb_tokens_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ def add_tx(self, tx: BaseTransaction) -> None:
from hathor.transaction.token_creation_tx import TokenCreationTransaction
tx = cast(TokenCreationTransaction, tx)
self.log.debug('create_token_info', tx=tx.hash_hex, name=tx.token_name, symb=tx.token_symbol)
self._create_token_info(tx.hash, tx.token_name, tx.token_symbol)
key_info = self._to_key_info(tx.hash)
token_info = self._db.get((self._cf, key_info))
if token_info is None:
self._create_token_info(tx.hash, tx.token_name, tx.token_symbol)

if tx.is_transaction:
# Adding this tx to the transactions key list
Expand All @@ -305,7 +308,7 @@ def add_tx(self, tx: BaseTransaction) -> None:
self.log.debug('add utxo', tx=tx.hash_hex, index=index)
self._add_utxo(tx, index)

def del_tx(self, tx: BaseTransaction) -> None:
def remove_tx(self, tx: BaseTransaction) -> None:
for tx_input in tx.inputs:
spent_tx = tx.get_spent_tx(tx_input)
self._add_utxo(spent_tx, tx_input.index)
Expand Down
11 changes: 9 additions & 2 deletions hathor/indexes/tokens_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,18 @@ def add_tx(self, tx: BaseTransaction) -> None:
raise NotImplementedError

@abstractmethod
def del_tx(self, tx: BaseTransaction) -> None:
""" Tx has been voided, so remove from tokens index (if applicable)
def remove_tx(self, tx: BaseTransaction) -> None:
""" Implementation of removal from index called by del_tx.
"""
raise NotImplementedError

def del_tx(self, tx: BaseTransaction, *, remove_all: bool = False) -> None:
""" Tx has been voided, so remove from tokens index (if applicable)
"""
from hathor.transaction.base_transaction import TxVersion
if remove_all or tx.version != TxVersion.TOKEN_CREATION_TRANSACTION:
self.remove_tx(tx)

@abstractmethod
def iter_all_tokens(self) -> Iterator[tuple[bytes, TokenIndexInfo]]:
""" Iterate over all tokens, yields tuples of (token_uid, token_index_info)
Expand Down
2 changes: 1 addition & 1 deletion hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def _initialize_components_full_verification(self) -> None:
)
self.tx_storage.add_to_indexes(tx)
with self.tx_storage.allow_only_valid_context():
self.consensus_algorithm.update(tx)
self.consensus_algorithm.unsafe_update(tx)
self.tx_storage.indexes.update(tx)
if self.tx_storage.indexes.mempool_tips is not None:
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
Expand Down
7 changes: 6 additions & 1 deletion hathor/p2p/states/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ async def handle_peer_id(self, payload: str) -> None:

data = json_loads(payload)

peer = PublicPeer.create_from_json(data)
try:
peer = PublicPeer.create_from_json(data)
except ValueError as e:
protocol.send_error_and_close_connection(f'Unable to parse peer id. Reason: {str(e)}')
return

assert peer.id is not None

# If the connection URL had a peer-id parameter we need to check it's the same
Expand Down
31 changes: 23 additions & 8 deletions hathor/vertex_handler/vertex_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from hathor.conf.settings import HathorSettings
from hathor.consensus import ConsensusAlgorithm
from hathor.exception import HathorError, InvalidNewTransaction
from hathor.execution_manager import ExecutionManager
from hathor.feature_activation.feature_service import FeatureService
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
Expand All @@ -43,6 +44,7 @@ class VertexHandler:
'_consensus',
'_feature_service',
'_pubsub',
'_execution_manager',
'_wallet',
'_log_vertex_bytes',
)
Expand All @@ -57,6 +59,7 @@ def __init__(
consensus: ConsensusAlgorithm,
feature_service: FeatureService,
pubsub: PubSubManager,
execution_manager: ExecutionManager,
wallet: BaseWallet | None,
log_vertex_bytes: bool = False,
) -> None:
Expand All @@ -68,6 +71,7 @@ def __init__(
self._consensus = consensus
self._feature_service = feature_service
self._pubsub = pubsub
self._execution_manager = execution_manager
self._wallet = wallet
self._log_vertex_bytes = log_vertex_bytes

Expand Down Expand Up @@ -95,12 +99,19 @@ def on_new_vertex(
if not is_valid:
return False

self._save_and_run_consensus(vertex)
self._post_consensus(
vertex,
quiet=quiet,
reject_locked_reward=reject_locked_reward
)
try:
self._unsafe_save_and_run_consensus(vertex)
self._post_consensus(
vertex,
quiet=quiet,
reject_locked_reward=reject_locked_reward
)
except BaseException:
self._log.error('unexpected exception in on_new_vertex()', vertex=vertex)
meta = vertex.get_metadata()
meta.add_voided_by(self._settings.CONSENSUS_FAIL_ID)
self._tx_storage.save_transaction(vertex, only_metadata=True)
self._execution_manager.crash_and_exit(reason=f'on_new_vertex() failed for tx {vertex.hash_hex}')

return True

Expand Down Expand Up @@ -158,15 +169,19 @@ def _validate_vertex(

return True

def _save_and_run_consensus(self, vertex: BaseTransaction) -> None:
def _unsafe_save_and_run_consensus(self, vertex: BaseTransaction) -> None:
"""
This method is considered unsafe because the caller is responsible for crashing the full node
if this method throws any exception.
"""
# The method below adds the tx as a child of the parents
# This needs to be called right before the save because we were adding the children
# in the tx parents even if the tx was invalid (failing the verifications above)
# then I would have a children that was not in the storage
vertex.update_initial_metadata(save=False)
self._tx_storage.save_transaction(vertex)
self._tx_storage.add_to_indexes(vertex)
self._consensus.update(vertex)
self._consensus.unsafe_update(vertex)

def _post_consensus(
self,
Expand Down
4 changes: 0 additions & 4 deletions tests/cli/test_shell.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import tempfile

import pytest

from hathor.cli.shell import Shell
from tests import unittest
from tests.utils import HAS_ROCKSDB


class ShellTest(unittest.TestCase):
Expand All @@ -14,7 +11,6 @@ def test_shell_execution_memory_storage(self):
shell = Shell(argv=['--memory-storage', '--', '--extra-arg'])
self.assertTrue(shell is not None)

@pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb')
def test_shell_execution_default_storage(self):
temp_data = tempfile.TemporaryDirectory()
shell = Shell(argv=['--data', temp_data.name])
Expand Down
6 changes: 3 additions & 3 deletions tests/consensus/test_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class MyError(Exception):
pass

execution_manager_mock = Mock(spec_set=ExecutionManager)
manager.consensus_algorithm._execution_manager = execution_manager_mock
manager.consensus_algorithm._unsafe_update = MagicMock(side_effect=MyError)
manager.vertex_handler._execution_manager = execution_manager_mock
manager.consensus_algorithm.unsafe_update = MagicMock(side_effect=MyError)

manager.propagate_tx(tx, fails_silently=False)

execution_manager_mock.crash_and_exit.assert_called_once_with(
reason=f"Consensus update failed for tx {tx.hash_hex}"
reason=f"on_new_vertex() failed for tx {tx.hash_hex}"
)

tx2 = manager.tx_storage.get_transaction(tx.hash)
Expand Down
3 changes: 0 additions & 3 deletions tests/event/event_simulation_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from typing import Any, Iterable
from unittest.mock import Mock

import pytest
from twisted.internet.testing import StringTransport

from hathor.builder import Builder
Expand All @@ -27,7 +26,6 @@
from hathor.transaction.util import unpack, unpack_len
from hathor.util import json_loadb
from tests.simulation.base import SimulatorTestCase
from tests.utils import HAS_ROCKSDB


class BaseEventSimulationTester(SimulatorTestCase):
Expand Down Expand Up @@ -101,7 +99,6 @@ def setUp(self) -> None:
self._create_artifacts()


@pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb')
class RocksDBEventSimulationTester(BaseEventSimulationTester):
def setUp(self) -> None:
super().setUp()
Expand Down
5 changes: 1 addition & 4 deletions tests/event/test_event_storage.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import tempfile

import pytest

from hathor.event.model.base_event import BaseEvent
from hathor.event.model.node_state import NodeState
from hathor.event.storage import EventStorage
from hathor.event.storage.memory_storage import EventMemoryStorage
from hathor.event.storage.rocksdb_storage import EventRocksDBStorage
from hathor.storage.rocksdb_storage import RocksDBStorage
from tests import unittest
from tests.utils import HAS_ROCKSDB, EventMocker
from tests.utils import EventMocker


class EventStorageBaseTest(unittest.TestCase):
Expand Down Expand Up @@ -237,7 +235,6 @@ def test_reset_all_full_database(self) -> None:
assert event_queue_state is False


@pytest.mark.skipif(not HAS_ROCKSDB, reason='requires python-rocksdb')
class EventStorageRocksDBTest(EventStorageBaseTest):
__test__ = True

Expand Down
Loading
Loading