diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index d08550984..393634406 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -49,6 +49,7 @@ from hathor.util import Random, get_environment_info, not_none from hathor.verification.verification_service import VerificationService from hathor.verification.vertex_verifiers import VertexVerifiers +from hathor.vertex_handler import VertexHandler from hathor.wallet import BaseWallet, Wallet logger = get_logger() @@ -156,6 +157,9 @@ def __init__(self) -> None: self._soft_voided_tx_ids: Optional[set[bytes]] = None self._execution_manager: ExecutionManager | None = None + self._vertex_handler: VertexHandler | None = None + self._consensus: ConsensusAlgorithm | None = None + self._p2p_manager: ConnectionsManager | None = None def build(self) -> BuildArtifacts: if self.artifacts is not None: @@ -171,10 +175,9 @@ def build(self) -> BuildArtifacts: peer_id = self._get_peer_id() execution_manager = self._get_or_create_execution_manager() - soft_voided_tx_ids = self._get_soft_voided_tx_ids() - consensus_algorithm = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager) + consensus_algorithm = self._get_or_create_consensus() - p2p_manager = self._get_p2p_manager() + p2p_manager = self._get_or_create_p2p_manager() wallet = self._get_or_create_wallet() event_manager = self._get_or_create_event_manager() @@ -185,6 +188,7 @@ def build(self) -> BuildArtifacts: verification_service = self._get_or_create_verification_service() daa = self._get_or_create_daa() cpu_mining_service = self._get_or_create_cpu_mining_service() + vertex_handler = self._get_or_create_vertex_handler() if self._enable_address_index: indexes.enable_address_index(pubsub) @@ -219,11 +223,11 @@ def build(self) -> BuildArtifacts: checkpoints=self._checkpoints, capabilities=self._capabilities, environment_info=get_environment_info(self._cmdline, peer_id.id), - feature_service=feature_service, bit_signaling_service=bit_signaling_service, verification_service=verification_service, cpu_mining_service=cpu_mining_service, execution_manager=execution_manager, + vertex_handler=vertex_handler, **kwargs ) @@ -323,6 +327,15 @@ def _get_or_create_execution_manager(self) -> ExecutionManager: return self._execution_manager + def _get_or_create_consensus(self) -> ConsensusAlgorithm: + if self._consensus is None: + soft_voided_tx_ids = self._get_soft_voided_tx_ids() + pubsub = self._get_or_create_pubsub() + execution_manager = self._get_or_create_execution_manager() + self._consensus = ConsensusAlgorithm(soft_voided_tx_ids, pubsub, execution_manager=execution_manager) + + return self._consensus + def _get_or_create_pubsub(self) -> PubSubManager: if self._pubsub is None: self._pubsub = PubSubManager(self._get_reactor()) @@ -351,7 +364,10 @@ def _get_or_create_rocksdb_storage(self) -> RocksDBStorage: return self._rocksdb_storage - def _get_p2p_manager(self) -> ConnectionsManager: + def _get_or_create_p2p_manager(self) -> ConnectionsManager: + if self._p2p_manager: + return self._p2p_manager + from hathor.p2p.sync_v1.factory import SyncV11Factory from hathor.p2p.sync_v2.factory import SyncV2Factory from hathor.p2p.sync_version import SyncVersion @@ -362,7 +378,7 @@ def _get_p2p_manager(self) -> ConnectionsManager: assert self._network is not None - p2p_manager = ConnectionsManager( + self._p2p_manager = ConnectionsManager( reactor, network=self._network, my_peer=my_peer, @@ -371,13 +387,13 @@ def _get_p2p_manager(self) -> ConnectionsManager: whitelist_only=False, rng=self._rng, ) - p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(p2p_manager)) - p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(p2p_manager)) + self._p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(self._p2p_manager)) + self._p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(self._p2p_manager)) if self._enable_sync_v1: - p2p_manager.enable_sync_version(SyncVersion.V1_1) + self._p2p_manager.enable_sync_version(SyncVersion.V1_1) if self._enable_sync_v2: - p2p_manager.enable_sync_version(SyncVersion.V2) - return p2p_manager + self._p2p_manager.enable_sync_version(SyncVersion.V2) + return self._p2p_manager def _get_or_create_indexes_manager(self) -> IndexesManager: if self._indexes_manager is not None: @@ -536,6 +552,22 @@ def _get_or_create_cpu_mining_service(self) -> CpuMiningService: return self._cpu_mining_service + def _get_or_create_vertex_handler(self) -> VertexHandler: + if self._vertex_handler is None: + self._vertex_handler = VertexHandler( + reactor=self._get_reactor(), + settings=self._get_or_create_settings(), + tx_storage=self._get_or_create_tx_storage(), + verification_service=self._get_or_create_verification_service(), + consensus=self._get_or_create_consensus(), + p2p_manager=self._get_or_create_p2p_manager(), + feature_service=self._get_or_create_feature_service(), + pubsub=self._get_or_create_pubsub(), + wallet=self._get_or_create_wallet(), + ) + + return self._vertex_handler + def use_memory(self) -> 'Builder': self.check_if_can_modify() self._storage_type = StorageType.MEMORY @@ -565,16 +597,14 @@ def force_memory_index(self) -> 'Builder': def _get_or_create_wallet(self) -> Optional[BaseWallet]: if self._wallet is not None: - assert self._wallet_directory is None - assert self._wallet_unlock is None return self._wallet if self._wallet_directory is None: return None - wallet = Wallet(directory=self._wallet_directory) + self._wallet = Wallet(directory=self._wallet_directory) if self._wallet_unlock is not None: - wallet.unlock(self._wallet_unlock) - return wallet + self._wallet.unlock(self._wallet_unlock) + return self._wallet def set_wallet(self, wallet: BaseWallet) -> 'Builder': self.check_if_can_modify() diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 8418b8dc0..2b26557bd 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -42,6 +42,7 @@ from hathor.util import Random, not_none from hathor.verification.verification_service import VerificationService from hathor.verification.vertex_verifiers import VertexVerifiers +from hathor.vertex_handler import VertexHandler from hathor.wallet import BaseWallet, HDWallet, Wallet logger = get_logger() @@ -298,6 +299,18 @@ def create_manager(self, reactor: Reactor) -> HathorManager: if enable_sync_v2: p2p_manager.enable_sync_version(SyncVersion.V2) + vertex_handler = VertexHandler( + reactor=reactor, + settings=settings, + tx_storage=tx_storage, + verification_service=verification_service, + consensus=consensus_algorithm, + p2p_manager=p2p_manager, + feature_service=self.feature_service, + pubsub=pubsub, + wallet=self.wallet, + ) + self.manager = HathorManager( reactor, settings=settings, @@ -315,11 +328,11 @@ def create_manager(self, reactor: Reactor) -> HathorManager: environment_info=get_environment_info(args=str(self._args), peer_id=peer_id.id), full_verification=full_verification, enable_event_queue=self._args.x_enable_event_queue, - feature_service=self.feature_service, bit_signaling_service=bit_signaling_service, verification_service=verification_service, cpu_mining_service=cpu_mining_service, execution_manager=execution_manager, + vertex_handler=vertex_handler, ) if self._args.x_ipython_kernel: diff --git a/hathor/cli/db_import.py b/hathor/cli/db_import.py index 6a266f169..6369890e4 100644 --- a/hathor/cli/db_import.py +++ b/hathor/cli/db_import.py @@ -87,7 +87,7 @@ def _import_txs(self) -> Iterator['BaseTransaction']: tx = tx_or_block_from_bytes(tx_bytes) assert tx is not None tx.storage = self.tx_storage - self.manager.on_new_tx(tx, quiet=True, fails_silently=False, skip_block_weight_verification=True) + self.manager.on_new_tx(tx, quiet=True, fails_silently=False) yield tx diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 15b610099..052f4282c 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -221,7 +221,7 @@ def prepare(self, *, register_resources: bool = True) -> None: wallet=self.manager.wallet, rocksdb_storage=getattr(builder, 'rocksdb_storage', None), stratum_factory=self.manager.stratum_factory, - feature_service=self.manager._feature_service, + feature_service=self.manager.vertex_handler._feature_service, bit_signaling_service=self.manager._bit_signaling_service, ) diff --git a/hathor/manager.py b/hathor/manager.py index a34a18955..410a6a438 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -34,7 +34,6 @@ from hathor.exception import ( BlockTemplateTimestampError, DoubleSpendingError, - HathorError, InitializationError, InvalidNewTransaction, NonStandardTxError, @@ -43,13 +42,10 @@ ) from hathor.execution_manager import ExecutionManager from hathor.feature_activation.bit_signaling_service import BitSignalingService -from hathor.feature_activation.feature import Feature -from hathor.feature_activation.feature_service import FeatureService from hathor.mining import BlockTemplate, BlockTemplates from hathor.mining.cpu_mining_service import CpuMiningService from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer_id import PeerId -from hathor.p2p.protocol import HathorProtocol from hathor.profiler import get_cpu_profiler from hathor.pubsub import HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol as Reactor @@ -63,6 +59,7 @@ from hathor.types import Address, VertexId from hathor.util import EnvironmentInfo, LogDuration, Random, calculate_min_significant_weight, not_none from hathor.verification.verification_service import VerificationService +from hathor.vertex_handler import VertexHandler from hathor.wallet import BaseWallet logger = get_logger() @@ -101,12 +98,12 @@ def __init__( tx_storage: TransactionStorage, p2p_manager: ConnectionsManager, event_manager: EventManager, - feature_service: FeatureService, bit_signaling_service: BitSignalingService, verification_service: VerificationService, cpu_mining_service: CpuMiningService, network: str, execution_manager: ExecutionManager, + vertex_handler: VertexHandler, hostname: Optional[str] = None, wallet: Optional[BaseWallet] = None, capabilities: Optional[list[str]] = None, @@ -187,7 +184,6 @@ def __init__( self._event_manager.save_event_queue_state(enable_event_queue) self._enable_event_queue = enable_event_queue - self._feature_service = feature_service self._bit_signaling_service = bit_signaling_service self.verification_service = verification_service self.cpu_mining_service = cpu_mining_service @@ -195,6 +191,7 @@ def __init__( self.consensus_algorithm = consensus_algorithm self.connections = p2p_manager + self.vertex_handler = vertex_handler self.metrics = Metrics( pubsub=self.pubsub, @@ -939,166 +936,30 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool return self.on_new_tx(tx, fails_silently=fails_silently, propagate_to_peers=True) @cpu.profiler('on_new_tx') - def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = None, - quiet: bool = False, fails_silently: bool = True, propagate_to_peers: bool = True, - skip_block_weight_verification: bool = False, reject_locked_reward: bool = True) -> bool: + def on_new_tx( + self, + tx: BaseTransaction, + *, + quiet: bool = False, + fails_silently: bool = True, + propagate_to_peers: bool = True, + reject_locked_reward: bool = True + ) -> bool: """ New method for adding transactions or blocks that steps the validation state machine. :param tx: transaction to be added - :param conn: optionally specify the protocol instance where this tx was received from :param quiet: if True will not log when a new tx is accepted :param fails_silently: if False will raise an exception when tx cannot be added :param propagate_to_peers: if True will relay the tx to other peers if it is accepted - :param skip_block_weight_verification: if True will not check the tx PoW """ - assert self.tx_storage.is_only_valid_allowed() - - already_exists = False - if self.tx_storage.transaction_exists(tx.hash): - self.tx_storage.compare_bytes_with_local_tx(tx) - already_exists = True - - if tx.timestamp - self.reactor.seconds() > self._settings.MAX_FUTURE_TIMESTAMP_ALLOWED: - if not fails_silently: - raise InvalidNewTransaction('Ignoring transaction in the future {} (timestamp={})'.format( - tx.hash_hex, tx.timestamp)) - self.log.warn('on_new_tx(): Ignoring transaction in the future', tx=tx.hash_hex, - future_timestamp=tx.timestamp) - return False - - assert self.tx_storage.indexes is not None - tx.storage = self.tx_storage - - try: - metadata = tx.get_metadata() - except TransactionDoesNotExist: - if not fails_silently: - raise InvalidNewTransaction('cannot get metadata') - self.log.warn('on_new_tx(): cannot get metadata', tx=tx.hash_hex) - return False - - if already_exists and metadata.validation.is_fully_connected(): - if not fails_silently: - raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex)) - self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex) - return False - - if metadata.validation.is_invalid(): - if not fails_silently: - raise InvalidNewTransaction('previously marked as invalid') - self.log.warn('on_new_tx(): previously marked as invalid', tx=tx.hash_hex) - return False - - if not metadata.validation.is_fully_connected(): - try: - self.verification_service.validate_full(tx, reject_locked_reward=reject_locked_reward) - except HathorError as e: - if not fails_silently: - raise InvalidNewTransaction('full validation failed') from e - self.log.warn('on_new_tx(): full validation failed', tx=tx.hash_hex, exc_info=True) - return False - - # The method below adds the tx as a child of the parents - # This needs to be called right before the save because we were adding the children - # in the tx parents even if the tx was invalid (failing the verifications above) - # then I would have a children that was not in the storage - tx.update_initial_metadata(save=False) - self.tx_storage.save_transaction(tx) - self.tx_storage.add_to_indexes(tx) - self.consensus_algorithm.update(tx) - - assert self.verification_service.validate_full( + return self.vertex_handler.on_new_tx( tx, - skip_block_weight_verification=True, - reject_locked_reward=reject_locked_reward - ) - self.tx_storage.indexes.update(tx) - if self.tx_storage.indexes.mempool_tips: - self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update - self.tx_fully_validated(tx, quiet=quiet) - - if propagate_to_peers: - # Propagate to our peers. - self.connections.send_tx_to_peers(tx) - - return True - - def log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None: - """ A shortcut for logging additional information for block/txs. - """ - metadata = tx.get_metadata() - now = datetime.datetime.fromtimestamp(self.reactor.seconds()) - kwargs = { - 'tx': tx, - 'ts_date': datetime.datetime.fromtimestamp(tx.timestamp), - 'time_from_now': tx.get_time_from_now(now), - 'validation': metadata.validation.name, - } - if tx.is_block: - message = message_fmt.format('block') - if isinstance(tx, Block): - kwargs['height'] = tx.get_height() - else: - message = message_fmt.format('tx') - if not quiet: - log_func = self.log.info - else: - log_func = self.log.debug - log_func(message, **kwargs) - - def tx_fully_validated(self, tx: BaseTransaction, *, quiet: bool) -> None: - """ Handle operations that need to happen once the tx becomes fully validated. - - This might happen immediately after we receive the tx, if we have all dependencies - already. Or it might happen later. - """ - assert self.tx_storage.indexes is not None - - # Publish to pubsub manager the new tx accepted, now that it's full validated - self.pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED, tx=tx) - - if self.tx_storage.indexes.mempool_tips: - self.tx_storage.indexes.mempool_tips.update(tx) - - if self.wallet: - # TODO Remove it and use pubsub instead. - self.wallet.on_new_tx(tx) - - self.log_new_object(tx, 'new {}', quiet=quiet) - self._log_feature_states(tx) - - def _log_feature_states(self, vertex: BaseTransaction) -> None: - """Log features states for a block. Used as part of the Feature Activation Phased Testing.""" - if not isinstance(vertex, Block): - return - - feature_descriptions = self._feature_service.get_bits_description(block=vertex) - state_by_feature = { - feature.value: description.state.value - for feature, description in feature_descriptions.items() - } - - self.log.info( - 'New block accepted with feature activation states', - block_hash=vertex.hash_hex, - block_height=vertex.get_height(), - features_states=state_by_feature + quiet=quiet, + fails_silently=fails_silently, + propagate_to_peers=propagate_to_peers, + reject_locked_reward=reject_locked_reward, ) - features = [Feature.NOP_FEATURE_1, Feature.NOP_FEATURE_2] - for feature in features: - self._log_if_feature_is_active(vertex, feature) - - def _log_if_feature_is_active(self, block: Block, feature: Feature) -> None: - """Log if a feature is ACTIVE for a block. Used as part of the Feature Activation Phased Testing.""" - if self._feature_service.is_feature_active(block=block, feature=feature): - self.log.info( - 'Feature is ACTIVE for block', - feature=feature.value, - block_hash=block.hash_hex, - block_height=block.get_height() - ) - def has_sync_version_capability(self) -> bool: return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities diff --git a/hathor/p2p/sync_v1/agent.py b/hathor/p2p/sync_v1/agent.py index 2060c755f..cf395e4e3 100644 --- a/hathor/p2p/sync_v1/agent.py +++ b/hathor/p2p/sync_v1/agent.py @@ -628,7 +628,7 @@ def handle_data(self, payload: str) -> None: self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) # If we have not requested the data, it is a new transaction being propagated # in the network, thus, we propagate it as well. - result = self.manager.on_new_tx(tx, conn=self.protocol, propagate_to_peers=True) + result = self.manager.on_new_tx(tx, propagate_to_peers=True) self.update_received_stats(tx, result) def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None: diff --git a/hathor/vertex_handler/__init__.py b/hathor/vertex_handler/__init__.py new file mode 100644 index 000000000..8ac87643a --- /dev/null +++ b/hathor/vertex_handler/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.vertex_handler.vertex_handler import VertexHandler + +__all__ = [ + 'VertexHandler' +] diff --git a/hathor/vertex_handler/vertex_handler.py b/hathor/vertex_handler/vertex_handler.py new file mode 100644 index 000000000..a18cc1da9 --- /dev/null +++ b/hathor/vertex_handler/vertex_handler.py @@ -0,0 +1,238 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +from structlog import get_logger + +from hathor.conf.settings import HathorSettings +from hathor.consensus import ConsensusAlgorithm +from hathor.exception import HathorError, InvalidNewTransaction +from hathor.feature_activation.feature import Feature +from hathor.feature_activation.feature_service import FeatureService +from hathor.p2p.manager import ConnectionsManager +from hathor.pubsub import HathorEvents, PubSubManager +from hathor.reactor import ReactorProtocol +from hathor.transaction import BaseTransaction, Block +from hathor.transaction.storage import TransactionStorage +from hathor.transaction.storage.exceptions import TransactionDoesNotExist +from hathor.verification.verification_service import VerificationService +from hathor.wallet import BaseWallet + +logger = get_logger() + + +class VertexHandler: + __slots__ = ( + '_log', + '_reactor', + '_settings', + '_tx_storage', + '_verification_service', + '_consensus', + '_p2p_manager', + '_feature_service', + '_pubsub', + '_wallet', + ) + + def __init__( + self, + *, + reactor: ReactorProtocol, + settings: HathorSettings, + tx_storage: TransactionStorage, + verification_service: VerificationService, + consensus: ConsensusAlgorithm, + p2p_manager: ConnectionsManager, + feature_service: FeatureService, + pubsub: PubSubManager, + wallet: BaseWallet | None, + ) -> None: + self._log = logger.new() + self._reactor = reactor + self._settings = settings + self._tx_storage = tx_storage + self._verification_service = verification_service + self._consensus = consensus + self._p2p_manager = p2p_manager + self._feature_service = feature_service + self._pubsub = pubsub + self._wallet = wallet + + def on_new_tx( + self, + tx: BaseTransaction, + *, + quiet: bool = False, + fails_silently: bool = True, + propagate_to_peers: bool = True, + reject_locked_reward: bool = True + ) -> bool: + """ New method for adding transactions or blocks that steps the validation state machine. + + :param tx: transaction to be added + :param quiet: if True will not log when a new tx is accepted + :param fails_silently: if False will raise an exception when tx cannot be added + :param propagate_to_peers: if True will relay the tx to other peers if it is accepted + """ + assert self._tx_storage.is_only_valid_allowed() + assert tx.hash is not None + + already_exists = False + if self._tx_storage.transaction_exists(tx.hash): + self._tx_storage.compare_bytes_with_local_tx(tx) + already_exists = True + + if tx.timestamp - self._reactor.seconds() > self._settings.MAX_FUTURE_TIMESTAMP_ALLOWED: + if not fails_silently: + raise InvalidNewTransaction('Ignoring transaction in the future {} (timestamp={})'.format( + tx.hash_hex, tx.timestamp)) + self._log.warn('on_new_tx(): Ignoring transaction in the future', tx=tx.hash_hex, + future_timestamp=tx.timestamp) + return False + + assert self._tx_storage.indexes is not None + tx.storage = self._tx_storage + + try: + metadata = tx.get_metadata() + except TransactionDoesNotExist: + if not fails_silently: + raise InvalidNewTransaction('cannot get metadata') + self._log.warn('on_new_tx(): cannot get metadata', tx=tx.hash_hex) + return False + + if already_exists and metadata.validation.is_fully_connected(): + if not fails_silently: + raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex)) + self._log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex) + return False + + if metadata.validation.is_invalid(): + if not fails_silently: + raise InvalidNewTransaction('previously marked as invalid') + self._log.warn('on_new_tx(): previously marked as invalid', tx=tx.hash_hex) + return False + + if not metadata.validation.is_fully_connected(): + try: + self._verification_service.validate_full(tx, reject_locked_reward=reject_locked_reward) + except HathorError as e: + if not fails_silently: + raise InvalidNewTransaction('full validation failed') from e + self._log.warn('on_new_tx(): full validation failed', tx=tx.hash_hex, exc_info=True) + return False + + # The method below adds the tx as a child of the parents + # This needs to be called right before the save because we were adding the children + # in the tx parents even if the tx was invalid (failing the verifications above) + # then I would have a children that was not in the storage + tx.update_initial_metadata(save=False) + self._tx_storage.save_transaction(tx) + self._tx_storage.add_to_indexes(tx) + self._consensus.update(tx) + + assert self._verification_service.validate_full( + tx, + skip_block_weight_verification=True, + reject_locked_reward=reject_locked_reward + ) + self._tx_storage.indexes.update(tx) + if self._tx_storage.indexes.mempool_tips: + self._tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update + self.tx_fully_validated(tx, quiet=quiet) + + if propagate_to_peers: + # Propagate to our peers. + self._p2p_manager.send_tx_to_peers(tx) + + return True + + def tx_fully_validated(self, tx: BaseTransaction, *, quiet: bool) -> None: + """ Handle operations that need to happen once the tx becomes fully validated. + + This might happen immediately after we receive the tx, if we have all dependencies + already. Or it might happen later. + """ + assert tx.hash is not None + assert self._tx_storage.indexes is not None + + # Publish to pubsub manager the new tx accepted, now that it's full validated + self._pubsub.publish(HathorEvents.NETWORK_NEW_TX_ACCEPTED, tx=tx) + + if self._tx_storage.indexes.mempool_tips: + self._tx_storage.indexes.mempool_tips.update(tx) + + if self._wallet: + # TODO Remove it and use pubsub instead. + self._wallet.on_new_tx(tx) + + self._log_new_object(tx, 'new {}', quiet=quiet) + self._log_feature_states(tx) + + def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None: + """ A shortcut for logging additional information for block/txs. + """ + metadata = tx.get_metadata() + now = datetime.datetime.fromtimestamp(self._reactor.seconds()) + kwargs = { + 'tx': tx, + 'ts_date': datetime.datetime.fromtimestamp(tx.timestamp), + 'time_from_now': tx.get_time_from_now(now), + 'validation': metadata.validation.name, + } + if tx.is_block: + message = message_fmt.format('block') + if isinstance(tx, Block): + kwargs['height'] = tx.get_height() + else: + message = message_fmt.format('tx') + if not quiet: + log_func = self._log.info + else: + log_func = self._log.debug + log_func(message, **kwargs) + + def _log_feature_states(self, vertex: BaseTransaction) -> None: + """Log features states for a block. Used as part of the Feature Activation Phased Testing.""" + if not isinstance(vertex, Block): + return + + feature_descriptions = self._feature_service.get_bits_description(block=vertex) + state_by_feature = { + feature.value: description.state.value + for feature, description in feature_descriptions.items() + } + + self._log.info( + 'New block accepted with feature activation states', + block_hash=vertex.hash_hex, + block_height=vertex.get_height(), + features_states=state_by_feature + ) + + features = [Feature.NOP_FEATURE_1, Feature.NOP_FEATURE_2] + for feature in features: + self._log_if_feature_is_active(vertex, feature) + + def _log_if_feature_is_active(self, block: Block, feature: Feature) -> None: + """Log if a feature is ACTIVE for a block. Used as part of the Feature Activation Phased Testing.""" + if self._feature_service.is_feature_active(block=block, feature=feature): + self._log.info( + 'Feature is ACTIVE for block', + feature=feature.value, + block_hash=block.hash_hex, + block_height=block.get_height() + ) diff --git a/tests/unittest.py b/tests/unittest.py index d99b94925..c9abb0d9c 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -101,9 +101,9 @@ def _get_peer_id(self) -> PeerId: return PeerId() def _get_reactor(self) -> Reactor: - if self._reactor: - return self._reactor - return MemoryReactorHeapClock() + if self._reactor is None: + self._reactor = MemoryReactorHeapClock() + return self._reactor class TestCase(unittest.TestCase):