diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 6c106c8e4..8418b8dc0 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -13,7 +13,6 @@ # limitations under the License. import getpass -import json import os import platform import sys @@ -96,8 +95,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager: self.log = logger.new() self.reactor = reactor - peer_id = self.create_peer_id() - + peer_id = PeerId.create_from_json_path(self._args.peer) if self._args.peer else PeerId() python = f'{platform.python_version()}-{platform.python_implementation()}' self.log.info( @@ -367,7 +365,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager: self.log.warn('--memory-indexes is implied for memory storage or JSON storage') for description in self._args.listen: - p2p_manager.add_listen_address(description) + p2p_manager.add_listen_address_description(description) if self._args.peer_id_blacklist: self.log.info('with peer id blacklist', blacklist=self._args.peer_id_blacklist) @@ -397,14 +395,6 @@ def get_hostname(self) -> Optional[str]: print('Hostname discovered and set to {}'.format(hostname)) return hostname - def create_peer_id(self) -> PeerId: - if not self._args.peer: - peer_id = PeerId() - else: - data = json.load(open(self._args.peer, 'r')) - peer_id = PeerId.create_from_json(data) - return peer_id - def create_wallet(self) -> BaseWallet: if self._args.wallet == 'hd': kwargs: dict[str, Any] = { diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 6070de45f..15b610099 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -265,9 +265,8 @@ def register_signal_handlers(self) -> None: def signal_usr1_handler(self, sig: int, frame: Any) -> None: """Called when USR1 signal is received.""" try: - self.log.warn('USR1 received. Killing all connections...') - if self.manager and self.manager.connections: - self.manager.connections.disconnect_all_peers(force=True) + self.log.warn('USR1 received.') + self.manager.connections.reload_entrypoints_and_connections() except Exception: # see: https://docs.python.org/3/library/signal.html#note-on-signal-handlers-and-exceptions self.log.error('prevented exception from escaping the signal handler', exc_info=True) diff --git a/hathor/conf/settings.py b/hathor/conf/settings.py index 62718bf2a..682279f6c 100644 --- a/hathor/conf/settings.py +++ b/hathor/conf/settings.py @@ -423,6 +423,10 @@ def GENESIS_TX2_TIMESTAMP(self) -> int: OLD_MAX_MERKLE_PATH_LENGTH: int = 12 NEW_MAX_MERKLE_PATH_LENGTH: int = 20 + # Maximum number of tx tips to accept in the initial phase of the mempool sync 1000 is arbitrary, but it should be + # more than enough for the forseeable future + MAX_MEMPOOL_RECEIVING_TIPS: int = 1000 + # Used to enable nano contracts. # # This should NEVER be enabled for mainnet and testnet, since both networks will diff --git a/hathor/manager.py b/hathor/manager.py index 40183114f..928c383ad 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -89,31 +89,33 @@ class UnhealthinessReason(str, Enum): # This is the interval to be used by the task to check if the node is synced CHECK_SYNC_STATE_INTERVAL = 30 # seconds - def __init__(self, - reactor: Reactor, - *, - settings: HathorSettings, - pubsub: PubSubManager, - consensus_algorithm: ConsensusAlgorithm, - daa: DifficultyAdjustmentAlgorithm, - peer_id: PeerId, - tx_storage: TransactionStorage, - p2p_manager: ConnectionsManager, - event_manager: EventManager, - feature_service: FeatureService, - bit_signaling_service: BitSignalingService, - verification_service: VerificationService, - cpu_mining_service: CpuMiningService, - network: str, - execution_manager: ExecutionManager, - hostname: Optional[str] = None, - wallet: Optional[BaseWallet] = None, - capabilities: Optional[list[str]] = None, - checkpoints: Optional[list[Checkpoint]] = None, - rng: Optional[Random] = None, - environment_info: Optional[EnvironmentInfo] = None, - full_verification: bool = False, - enable_event_queue: bool = False): + def __init__( + self, + reactor: Reactor, + *, + settings: HathorSettings, + pubsub: PubSubManager, + consensus_algorithm: ConsensusAlgorithm, + daa: DifficultyAdjustmentAlgorithm, + peer_id: PeerId, + tx_storage: TransactionStorage, + p2p_manager: ConnectionsManager, + event_manager: EventManager, + feature_service: FeatureService, + bit_signaling_service: BitSignalingService, + verification_service: VerificationService, + cpu_mining_service: CpuMiningService, + network: str, + execution_manager: ExecutionManager, + hostname: Optional[str] = None, + wallet: Optional[BaseWallet] = None, + capabilities: Optional[list[str]] = None, + checkpoints: Optional[list[Checkpoint]] = None, + rng: Optional[Random] = None, + environment_info: Optional[EnvironmentInfo] = None, + full_verification: bool = False, + enable_event_queue: bool = False, + ) -> None: """ :param reactor: Twisted reactor which handles the mainloop and the events. :param peer_id: Id of this node. @@ -1173,6 +1175,13 @@ def get_cmd_path(self) -> Optional[str]: """Return the cmd path. If no cmd path is set, returns None.""" return self._cmd_path + def set_hostname_and_reset_connections(self, new_hostname: str) -> None: + """Set the hostname and reset all connections.""" + old_hostname = self.hostname + self.hostname = new_hostname + self.connections.update_hostname_entrypoints(old_hostname=old_hostname, new_hostname=self.hostname) + self.connections.disconnect_all_peers(force=True) + class ParentTxs(NamedTuple): """ Tuple where the `must_include` hash, when present (at most 1), must be included in a pair, and a list of hashes diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index d180af7c8..d7e7f422b 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -16,8 +16,9 @@ from structlog import get_logger from twisted.internet import endpoints +from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.defer import Deferred -from twisted.internet.interfaces import IProtocolFactory, IStreamClientEndpoint, IStreamServerEndpoint +from twisted.internet.interfaces import IListeningPort, IProtocolFactory, IStreamClientEndpoint from twisted.internet.task import LoopingCall from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol from twisted.python.failure import Failure @@ -108,8 +109,11 @@ def __init__(self, self.network = network - # List of addresses to listen for new connections (eg: [tcp:8000]) - self.listen_addresses: list[str] = [] + # List of address descriptions to listen for new connections (eg: [tcp:8000]) + self.listen_address_descriptions: list[str] = [] + + # List of actual IP address instances to listen for new connections + self._listen_addresses: list[IPv4Address | IPv6Address] = [] # List of peer discovery methods. self.peer_discoveries: list[PeerDiscovery] = [] @@ -239,9 +243,9 @@ def set_manager(self, manager: 'HathorManager') -> None: self.log.debug('enable sync-v2 indexes') indexes.enable_mempool_index() - def add_listen_address(self, addr: str) -> None: + def add_listen_address_description(self, addr: str) -> None: """Add address to listen for incoming connections.""" - self.listen_addresses.append(addr) + self.listen_address_descriptions.append(addr) def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None: """Add a peer discovery method.""" @@ -279,7 +283,7 @@ def start(self) -> None: if self._settings.ENABLE_PEER_WHITELIST: self._start_whitelist_reconnect() - for description in self.listen_addresses: + for description in self.listen_address_descriptions: self.listen(description) self.do_discovery() @@ -635,7 +639,7 @@ def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: O peers_count=self._get_peers_count() ) - def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamServerEndpoint: + def listen(self, description: str, use_ssl: Optional[bool] = None) -> None: """ Start to listen for new connection according to the description. If `ssl` is True, then the connection will be wraped by a TLS. @@ -661,20 +665,43 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamSer factory = NetfilterFactory(self, factory) - self.log.info('listen on', endpoint=description) - endpoint.listen(factory) + self.log.info('trying to listen on', endpoint=description) + deferred: Deferred[IListeningPort] = endpoint.listen(factory) + deferred.addCallback(self._on_listen_success, description) + + def _on_listen_success(self, listening_port: IListeningPort, description: str) -> None: + """Callback to be called when listening to an endpoint succeeds.""" + self.log.info('success listening on', endpoint=description) + address = listening_port.getHost() + + if not isinstance(address, (IPv4Address, IPv6Address)): + self.log.error(f'unhandled address type for endpoint "{description}": {str(type(address))}') + return + + self._listen_addresses.append(address) - # XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases - # that we have will have a _port attribute - port = getattr(endpoint, '_port', None) assert self.manager is not None - if self.manager.hostname and port is not None: - proto, _, _ = description.partition(':') - address = '{}://{}:{}'.format(proto, self.manager.hostname, port) - assert self.manager.my_peer is not None - self.manager.my_peer.entrypoints.append(address) + if self.manager.hostname: + self._add_hostname_entrypoint(self.manager.hostname, address) - return endpoint + def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname: str) -> None: + """Add new hostname entrypoints according to the listen addresses, and remove any old entrypoint.""" + assert self.manager is not None + for address in self._listen_addresses: + if old_hostname is not None: + old_address_str = self._get_hostname_address_str(old_hostname, address) + if old_address_str in self.my_peer.entrypoints: + self.my_peer.entrypoints.remove(old_address_str) + + self._add_hostname_entrypoint(new_hostname, address) + + def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None: + hostname_address_str = self._get_hostname_address_str(hostname, address) + self.my_peer.entrypoints.append(hostname_address_str) + + @staticmethod + def _get_hostname_address_str(hostname: str, address: IPv4Address | IPv6Address) -> str: + return '{}://{}:{}'.format(address.type, hostname, address.port).lower() def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: """ When there are duplicate connections, determine which one should be dropped. @@ -796,3 +823,9 @@ def _sync_rotate_if_needed(self, *, force: bool = False) -> None: for peer_id in info.to_enable: self.connected_peers[peer_id].enable_sync() + + def reload_entrypoints_and_connections(self) -> None: + """Kill all connections and reload entrypoints from the original peer config file.""" + self.log.warn('Killing all connections and resetting entrypoints...') + self.disconnect_all_peers(force=True) + self.my_peer.reload_entrypoints_from_source_file() diff --git a/hathor/p2p/peer_id.py b/hathor/p2p/peer_id.py index 678111f1c..785af6a4f 100644 --- a/hathor/p2p/peer_id.py +++ b/hathor/p2p/peer_id.py @@ -14,6 +14,7 @@ import base64 import hashlib +import json from enum import Enum from math import inf from typing import TYPE_CHECKING, Any, Optional, cast @@ -24,6 +25,7 @@ from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa from OpenSSL.crypto import X509, PKey +from structlog import get_logger from twisted.internet.interfaces import ISSLTransport from twisted.internet.ssl import Certificate, CertificateOptions, TLSVersion, trustRootFromCertificates @@ -35,6 +37,8 @@ if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol # noqa: F401 +logger = get_logger() + class InvalidPeerIdException(Exception): pass @@ -64,8 +68,10 @@ class PeerId: retry_attempts: int # how many retries were made last_seen: float # last time this peer was seen flags: set[str] + source_file: str | None def __init__(self, auto_generate_keys: bool = True) -> None: + self._log = logger.new() self._settings = get_global_settings() self.id = None self.private_key = None @@ -159,9 +165,15 @@ def verify_signature(self, signature: bytes, data: bytes) -> bool: else: return True + @classmethod + def create_from_json_path(cls, path: str) -> 'PeerId': + """Create a new PeerId from a JSON file.""" + data = json.load(open(path, 'r')) + return PeerId.create_from_json(data) + @classmethod def create_from_json(cls, data: dict[str, Any]) -> 'PeerId': - """ Create a new PeerId from a JSON. + """ Create a new PeerId from JSON data. It is used both to load a PeerId from disk and to create a PeerId from a peer connection. @@ -408,3 +420,20 @@ def validate_certificate(self, protocol: 'HathorProtocol') -> bool: return False return True + + def reload_entrypoints_from_source_file(self) -> None: + """Update this PeerId's entrypoints from the json file.""" + if not self.source_file: + raise Exception('Trying to reload entrypoints but no peer config file was provided.') + + new_peer_id = PeerId.create_from_json_path(self.source_file) + + if new_peer_id.id != self.id: + self._log.error( + 'Ignoring peer id file update because the peer_id does not match.', + current_peer_id=self.id, + new_peer_id=new_peer_id.id, + ) + return + + self.entrypoints = new_peer_id.entrypoints diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 8382cdefc..b2ee4543b 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -142,7 +142,8 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None: # Saves if I am in the middle of a mempool sync # we don't execute any sync while in the middle of it self.mempool_manager = SyncMempoolManager(self) - self._receiving_tips: Optional[list[bytes]] = None + self._receiving_tips: Optional[list[VertexId]] = None + self.max_receiving_tips: int = self._settings.MAX_MEMPOOL_RECEIVING_TIPS # Cache for get_tx calls self._get_tx_cache: OrderedDict[bytes, BaseTransaction] = OrderedDict() @@ -476,7 +477,13 @@ def handle_tips(self, payload: str) -> None: data = json.loads(payload) data = [bytes.fromhex(x) for x in data] # filter-out txs we already have - self._receiving_tips.extend(tx_id for tx_id in data if not self.partial_vertex_exists(tx_id)) + try: + self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id)) + except ValueError: + self.protocol.send_error_and_close_connection('Invalid trasaction ID received') + # XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol + if len(self._receiving_tips) > self.max_receiving_tips: + self.protocol.send_error_and_close_connection(f'Too many tips: {len(self._receiving_tips)}') def handle_tips_end(self, _payload: str) -> None: """ Handle a TIPS-END message. diff --git a/hathor/p2p/utils.py b/hathor/p2p/utils.py index 66f1bda37..4e2935a2e 100644 --- a/hathor/p2p/utils.py +++ b/hathor/p2p/utils.py @@ -33,18 +33,18 @@ from hathor.transaction.genesis import get_representation_for_all_genesis -def discover_hostname() -> Optional[str]: - """ Try to discover your hostname. It is a synchonous operation and +def discover_hostname(timeout: float | None = None) -> Optional[str]: + """ Try to discover your hostname. It is a synchronous operation and should not be called from twisted main loop. """ - return discover_ip_ipify() + return discover_ip_ipify(timeout) -def discover_ip_ipify() -> Optional[str]: +def discover_ip_ipify(timeout: float | None = None) -> Optional[str]: """ Try to discover your IP address using ipify's api. - It is a synchonous operation and should not be called from twisted main loop. + It is a synchronous operation and should not be called from twisted main loop. """ - response = requests.get('https://api.ipify.org') + response = requests.get('https://api.ipify.org', timeout=timeout) if response.ok: # It may be either an ipv4 or ipv6 in string format. ip = response.text diff --git a/hathor/simulator/tx_generator.py b/hathor/simulator/tx_generator.py index 81c8345d8..ead648da5 100644 --- a/hathor/simulator/tx_generator.py +++ b/hathor/simulator/tx_generator.py @@ -13,12 +13,13 @@ # limitations under the License. from collections import deque -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable, TypeAlias from structlog import get_logger from hathor.conf.get_settings import get_global_settings from hathor.simulator.utils import NoCandidatesError, gen_new_double_spending, gen_new_tx +from hathor.transaction import Transaction from hathor.transaction.exceptions import RewardLocked from hathor.types import VertexId from hathor.util import Random @@ -29,6 +30,8 @@ logger = get_logger() +GenTxFunction: TypeAlias = Callable[['HathorManager', str, int], Transaction] + class RandomTransactionGenerator: """ Generates random transactions without mining. It is supposed to be used @@ -38,7 +41,8 @@ class RandomTransactionGenerator: MAX_LATEST_TRANSACTIONS_LEN = 10 def __init__(self, manager: 'HathorManager', rng: Random, *, - rate: float, hashpower: float, ignore_no_funds: bool = False): + rate: float, hashpower: float, ignore_no_funds: bool = False, + custom_gen_new_tx: GenTxFunction | None = None): """ :param: rate: Number of transactions per second :param: hashpower: Number of hashes per second @@ -58,6 +62,11 @@ def __init__(self, manager: 'HathorManager', rng: Random, *, self.delayedcall = None self.log = logger.new() self.rng = rng + self.gen_new_tx: GenTxFunction + if custom_gen_new_tx is not None: + self.gen_new_tx = custom_gen_new_tx + else: + self.gen_new_tx = gen_new_tx # Most recent transactions generated here. # The lowest index has the most recent transaction. @@ -115,7 +124,7 @@ def new_tx_step1(self): if not self.double_spending_only: try: - tx = gen_new_tx(self.manager, address, value) + tx = self.gen_new_tx(self.manager, address, value) except (InsufficientFunds, RewardLocked): self.delayedcall = self.clock.callLater(0, self.schedule_next_transaction) return diff --git a/hathor/sysctl/p2p/manager.py b/hathor/sysctl/p2p/manager.py index ab6ef5902..e821039bd 100644 --- a/hathor/sysctl/p2p/manager.py +++ b/hathor/sysctl/p2p/manager.py @@ -16,9 +16,12 @@ from hathor.p2p.manager import ConnectionsManager from hathor.p2p.sync_version import SyncVersion +from hathor.p2p.utils import discover_hostname from hathor.sysctl.exception import SysctlException from hathor.sysctl.sysctl import Sysctl, signal_handler_safe +AUTO_HOSTNAME_TIMEOUT_SECONDS: float = 5 + def parse_text(text: str) -> list[str]: """Parse text per line skipping empty lines and comments.""" @@ -103,6 +106,21 @@ def __init__(self, connections: ConnectionsManager) -> None: None, self.set_kill_connection, ) + self.register( + 'hostname', + self.get_hostname, + self.set_hostname, + ) + self.register( + 'refresh_auto_hostname', + None, + self.refresh_auto_hostname, + ) + self.register( + 'reload_entrypoints_and_connections', + None, + self.reload_entrypoints_and_connections, + ) def set_force_sync_rotate(self) -> None: """Force a sync rotate.""" @@ -217,3 +235,32 @@ def set_kill_connection(self, peer_id: str, force: bool = False) -> None: self.log.warn('Killing connection', peer_id=peer_id) raise SysctlException('peer-id is not connected') conn.disconnect(force=force) + + def get_hostname(self) -> str | None: + """Return the configured hostname.""" + assert self.connections.manager is not None + return self.connections.manager.hostname + + def set_hostname(self, hostname: str) -> None: + """Set the hostname and reset all connections.""" + assert self.connections.manager is not None + self.connections.manager.set_hostname_and_reset_connections(hostname) + + def refresh_auto_hostname(self) -> None: + """ + Automatically discover the hostname and set it, if it's found. This operation blocks the event loop. + Then, reset all connections. + """ + assert self.connections.manager is not None + try: + hostname = discover_hostname(timeout=AUTO_HOSTNAME_TIMEOUT_SECONDS) + except Exception as e: + self.log.error(f'Could not refresh hostname. Error: {str(e)}') + return + + if hostname: + self.connections.manager.set_hostname_and_reset_connections(hostname) + + def reload_entrypoints_and_connections(self) -> None: + """Kill all connections and reload entrypoints from the peer config file.""" + self.connections.reload_entrypoints_and_connections() diff --git a/hathor/transaction/aux_pow.py b/hathor/transaction/aux_pow.py index c6772ac88..103f6997c 100644 --- a/hathor/transaction/aux_pow.py +++ b/hathor/transaction/aux_pow.py @@ -18,6 +18,8 @@ logger = get_logger() +MAX_MERKLE_PATH_COUNT = 100 + class BitcoinAuxPow(NamedTuple): header_head: bytes # 36 bytes @@ -96,8 +98,11 @@ def from_bytes(cls, b: bytes) -> 'BitcoinAuxPow': coinbase_head = read_bytes(a) coinbase_tail = read_bytes(a) c = read_varint(a) + if c > MAX_MERKLE_PATH_COUNT: + raise ValueError(f'invalid merkle path count: {c} > {MAX_MERKLE_PATH_COUNT}') merkle_path = [] for _ in range(c): + assert len(a) >= 32 merkle_path.append(bytes(a[:32])) del a[:32] header_tail = read_nbytes(a, 12) diff --git a/hathor/util.py b/hathor/util.py index cf5547573..b66acee2a 100644 --- a/hathor/util.py +++ b/hathor/util.py @@ -30,7 +30,7 @@ import hathor from hathor.conf.get_settings import get_global_settings -from hathor.types import TokenUid +from hathor.types import TokenUid, VertexId if TYPE_CHECKING: import structlog @@ -810,3 +810,23 @@ def calculate_min_significant_weight(score: float, tol: float) -> float: """ This function will return the min significant weight to increase score by tol. """ return score + math.log2(2 ** tol - 1) + + +def bytes_to_vertexid(data: bytes) -> VertexId: + # XXX: using raw string for the docstring so we can more easily write byte literals + r""" Function to validate bytes and return a VertexId, raises ValueError if not valid. + + >>> bytes_to_vertexid(b'\0' * 32) + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + >>> bytes_to_vertexid(b'\0' * 31) + Traceback (most recent call last): + ... + ValueError: length must be exactly 32 bytes + >>> bytes_to_vertexid(b'\0' * 33) + Traceback (most recent call last): + ... + ValueError: length must be exactly 32 bytes + """ + if len(data) != 32: + raise ValueError('length must be exactly 32 bytes') + return VertexId(data) diff --git a/tests/p2p/test_sync_v2.py b/tests/p2p/test_sync_v2.py index c3fb8db2e..f072134c0 100644 --- a/tests/p2p/test_sync_v2.py +++ b/tests/p2p/test_sync_v2.py @@ -257,6 +257,94 @@ def test_exceeds_streaming_and_mempool_limits(self) -> None: self.assertEqual(manager1.tx_storage.get_vertices_count(), manager2.tx_storage.get_vertices_count()) self.assertConsensusEqualSyncV2(manager1, manager2) + def test_receiving_tips_limit(self) -> None: + from hathor.manager import HathorManager + from hathor.transaction import Transaction + from hathor.wallet.base_wallet import WalletOutputInfo + from tests.utils import BURN_ADDRESS + + manager1 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True) + manager1.allow_mining_without_peers() + + # Find 100 blocks. + miner1 = self.simulator.create_miner(manager1, hashpower=10e6) + miner1.start() + trigger: Trigger = StopAfterNMinedBlocks(miner1, quantity=100) + self.assertTrue(self.simulator.run(3 * 3600, trigger=trigger)) + miner1.stop() + + # Custom tx generator that generates tips + parents = manager1.get_new_tx_parents(manager1.tx_storage.latest_timestamp) + + def custom_gen_new_tx(manager: HathorManager, _address: str, value: int, verify: bool = True) -> Transaction: + outputs = [] + # XXX: burn address guarantees that this output will not be used as input for any following transactions + # XXX: reduce value to make sure we can generate more transactions, otherwise it will spend a linear random + # percent from 1 to 100 of the available balance, this way it spends from 0.1% to 10% + outputs.append(WalletOutputInfo(address=BURN_ADDRESS, value=max(1, int(value / 10)), timelock=None)) + + assert manager.wallet is not None + tx = manager.wallet.prepare_transaction_compute_inputs(Transaction, outputs, manager.tx_storage) + tx.storage = manager.tx_storage + + max_ts_spent_tx = max(tx.get_spent_tx(txin).timestamp for txin in tx.inputs) + tx.timestamp = max(max_ts_spent_tx + 1, int(manager.reactor.seconds())) + + tx.weight = 1 + # XXX: fixed parents is the final requirement to make all the generated new tips + tx.parents = parents + manager.cpu_mining_service.resolve(tx) + if verify: + manager.verification_service.verify(tx) + return tx + + # Generate 100 tx-tips in mempool. + gen_tx1 = self.simulator.create_tx_generator(manager1, rate=3., hashpower=10e9, ignore_no_funds=True) + gen_tx1.gen_new_tx = custom_gen_new_tx + gen_tx1.start() + trigger = StopAfterNTransactions(gen_tx1, quantity=100) + self.simulator.run(3600, trigger=trigger) + self.assertGreater(manager1.tx_storage.get_vertices_count(), 100) + gen_tx1.stop() + assert manager1.tx_storage.indexes is not None + assert manager1.tx_storage.indexes.mempool_tips is not None + mempool_tips_count = len(manager1.tx_storage.indexes.mempool_tips.get()) + # we should expect at the very least 30 tips + self.assertGreater(mempool_tips_count, 30) + + # Create a new peer and run sync for a while (but stop before getting synced). + peer_id = PeerId() + builder2 = self.simulator.get_default_builder() \ + .set_peer_id(peer_id) \ + .disable_sync_v1() \ + .enable_sync_v2() \ + + manager2 = self.simulator.create_peer(builder2) + conn12 = FakeConnection(manager1, manager2, latency=0.05) + self.simulator.add_connection(conn12) + + # Let the connection start to sync. + self.simulator.run(1) + + # Run until blocks are synced + sync2 = conn12.proto2.state.sync_agent + trigger = StopWhenTrue(sync2.is_synced) + self.assertTrue(self.simulator.run(300, trigger=trigger)) + + # Change manager2's max_running_time to check if it correctly closes the connection + # 10 < 30, so this should be strict enough that it will fail + sync2.max_receiving_tips = 10 + self.assertIsNone(sync2._blk_streaming_server) + self.assertIsNone(sync2._tx_streaming_server) + + # This should fail because the get tips should be rejected because it exceeds the limit + self.simulator.run(300) + # we should expect only the tips to be missing from the second node + self.assertEqual(manager1.tx_storage.get_vertices_count(), + manager2.tx_storage.get_vertices_count() + mempool_tips_count) + # and also the second node should have aborted the connection + self.assertTrue(conn12.proto2.aborting) + def _prepare_sync_v2_find_best_common_block_reorg(self) -> FakeConnection: manager1 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True) manager1.allow_mining_without_peers()