Skip to content

refactor(storage): change storages to use injected settings #1081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def _get_or_create_indexes_manager(self) -> IndexesManager:

def _get_or_create_tx_storage(self) -> TransactionStorage:
indexes = self._get_or_create_indexes_manager()
settings = self._get_or_create_settings()

if self._tx_storage is not None:
# If a tx storage is provided, set the indexes manager to it.
Expand All @@ -446,11 +447,11 @@ def _get_or_create_tx_storage(self) -> TransactionStorage:
store_indexes = None

if self._storage_type == StorageType.MEMORY:
self._tx_storage = TransactionMemoryStorage(indexes=store_indexes)
self._tx_storage = TransactionMemoryStorage(indexes=store_indexes, settings=settings)

elif self._storage_type == StorageType.ROCKSDB:
rocksdb_storage = self._get_or_create_rocksdb_storage()
self._tx_storage = TransactionRocksDBStorage(rocksdb_storage, indexes=store_indexes)
self._tx_storage = TransactionRocksDBStorage(rocksdb_storage, indexes=store_indexes, settings=settings)

else:
raise NotImplementedError
Expand All @@ -460,7 +461,9 @@ def _get_or_create_tx_storage(self) -> TransactionStorage:
kwargs: dict[str, Any] = {}
if self._tx_storage_cache_capacity is not None:
kwargs['capacity'] = self._tx_storage_cache_capacity
self._tx_storage = TransactionCacheStorage(self._tx_storage, reactor, indexes=indexes, **kwargs)
self._tx_storage = TransactionCacheStorage(
self._tx_storage, reactor, indexes=indexes, settings=settings, **kwargs
)

return self._tx_storage

Expand Down
6 changes: 3 additions & 3 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
self.check_or_raise(not self._args.data, '--data should not be used with --memory-storage')
# if using MemoryStorage, no need to have cache
indexes = MemoryIndexesManager()
tx_storage = TransactionMemoryStorage(indexes)
tx_storage = TransactionMemoryStorage(indexes, settings=settings)
event_storage = EventMemoryStorage()
self.check_or_raise(not self._args.x_rocksdb_indexes, 'RocksDB indexes require RocksDB data')
self.log.info('with storage', storage_class=type(tx_storage).__name__)
Expand All @@ -150,14 +150,14 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
# We should only pass indexes if cache is disabled. Otherwise,
# only TransactionCacheStorage should have indexes.
kwargs['indexes'] = indexes
tx_storage = TransactionRocksDBStorage(self.rocksdb_storage, **kwargs)
tx_storage = TransactionRocksDBStorage(self.rocksdb_storage, settings=settings, **kwargs)
event_storage = EventRocksDBStorage(self.rocksdb_storage)
feature_storage = FeatureActivationStorage(settings=settings, rocksdb_storage=self.rocksdb_storage)

self.log.info('with storage', storage_class=type(tx_storage).__name__, path=self._args.data)
if self._args.cache:
self.check_or_raise(not self._args.memory_storage, '--cache should not be used with --memory-storage')
tx_storage = TransactionCacheStorage(tx_storage, reactor, indexes=indexes)
tx_storage = TransactionCacheStorage(tx_storage, reactor, indexes=indexes, settings=settings)
if self._args.cache_size:
tx_storage.capacity = self._args.cache_size
if self._args.cache_interval:
Expand Down
16 changes: 13 additions & 3 deletions hathor/transaction/storage/cache_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from twisted.internet import threads

from hathor.conf.settings import HathorSettings
from hathor.indexes import IndexesManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.transaction import BaseTransaction
Expand All @@ -32,8 +33,17 @@ class TransactionCacheStorage(BaseTransactionStorage):
cache: OrderedDict[bytes, BaseTransaction]
dirty_txs: set[bytes]

def __init__(self, store: 'BaseTransactionStorage', reactor: Reactor, interval: int = 5,
capacity: int = 10000, *, indexes: Optional[IndexesManager], _clone_if_needed: bool = False):
def __init__(
self,
store: 'BaseTransactionStorage',
reactor: Reactor,
interval: int = 5,
capacity: int = 10000,
*,
settings: HathorSettings,
indexes: Optional[IndexesManager],
_clone_if_needed: bool = False,
) -> None:
"""
:param store: a subclass of BaseTransactionStorage
:type store: :py:class:`hathor.transaction.storage.BaseTransactionStorage`
Expand Down Expand Up @@ -68,7 +78,7 @@ def __init__(self, store: 'BaseTransactionStorage', reactor: Reactor, interval:

# we need to use only one weakref dict, so we must first initialize super, and then
# attribute the same weakref for both.
super().__init__(indexes=indexes)
super().__init__(indexes=indexes, settings=settings)
self._tx_weakref = store._tx_weakref
# XXX: just to make sure this isn't being used anywhere, setters/getters should be used instead
del self._allow_scope
Expand Down
11 changes: 9 additions & 2 deletions hathor/transaction/storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import Any, Iterator, Optional, TypeVar

from hathor.conf.settings import HathorSettings
from hathor.indexes import IndexesManager
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.storage.migrations import MigrationState
Expand All @@ -25,7 +26,13 @@


class TransactionMemoryStorage(BaseTransactionStorage):
def __init__(self, indexes: Optional[IndexesManager] = None, *, _clone_if_needed: bool = False) -> None:
def __init__(
self,
indexes: Optional[IndexesManager] = None,
*,
settings: HathorSettings,
_clone_if_needed: bool = False,
) -> None:
"""
:param _clone_if_needed: *private parameter*, defaults to True, controls whether to clone
transaction/blocks/metadata when returning those objects.
Expand All @@ -36,7 +43,7 @@ def __init__(self, indexes: Optional[IndexesManager] = None, *, _clone_if_needed
# Store custom key/value attributes
self.attributes: dict[str, Any] = {}
self._clone_if_needed = _clone_if_needed
super().__init__(indexes=indexes)
super().__init__(indexes=indexes, settings=settings)

def _check_and_set_network(self) -> None:
# XXX: does not apply to memory storage, can safely be ignored
Expand Down
11 changes: 9 additions & 2 deletions hathor/transaction/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from structlog import get_logger

from hathor.conf.settings import HathorSettings
from hathor.indexes import IndexesManager
from hathor.storage import RocksDBStorage
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
Expand Down Expand Up @@ -43,15 +44,21 @@ class TransactionRocksDBStorage(BaseTransactionStorage):
It uses Protobuf serialization internally.
"""

def __init__(self, rocksdb_storage: RocksDBStorage, indexes: Optional[IndexesManager] = None):
def __init__(
self,
rocksdb_storage: RocksDBStorage,
indexes: Optional[IndexesManager] = None,
*,
settings: HathorSettings,
) -> None:
self._cf_tx = rocksdb_storage.get_or_create_column_family(_CF_NAME_TX)
self._cf_meta = rocksdb_storage.get_or_create_column_family(_CF_NAME_META)
self._cf_attr = rocksdb_storage.get_or_create_column_family(_CF_NAME_ATTR)
self._cf_migrations = rocksdb_storage.get_or_create_column_family(_CF_NAME_MIGRATIONS)

self._rocksdb_storage = rocksdb_storage
self._db = rocksdb_storage.get_db()
super().__init__(indexes=indexes)
super().__init__(indexes=indexes, settings=settings)

def _load_from_bytes(self, tx_data: bytes, meta_data: bytes) -> 'BaseTransaction':
from hathor.transaction.base_transaction import tx_or_block_from_bytes
Expand Down
16 changes: 11 additions & 5 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from intervaltree.interval import Interval
from structlog import get_logger

from hathor.conf.get_settings import get_global_settings
from hathor.conf.settings import HathorSettings
from hathor.execution_manager import ExecutionManager
from hathor.indexes import IndexesManager
from hathor.indexes.height_index import HeightInfo
Expand Down Expand Up @@ -104,8 +104,8 @@ class TransactionStorage(ABC):

_migrations: list[BaseMigration]

def __init__(self) -> None:
self._settings = get_global_settings()
def __init__(self, *, settings: HathorSettings) -> None:
self._settings = settings
# Weakref is used to guarantee that there is only one instance of each transaction in memory.
self._tx_weakref: WeakValueDictionary[bytes, BaseTransaction] = WeakValueDictionary()
self._tx_weakref_disabled: bool = False
Expand Down Expand Up @@ -1165,8 +1165,14 @@ def get_block(self, block_id: VertexId) -> Block:
class BaseTransactionStorage(TransactionStorage):
indexes: Optional[IndexesManager]

def __init__(self, indexes: Optional[IndexesManager] = None, pubsub: Optional[Any] = None) -> None:
super().__init__()
def __init__(
self,
indexes: Optional[IndexesManager] = None,
pubsub: Optional[Any] = None,
*,
settings: HathorSettings,
) -> None:
super().__init__(settings=settings)

# Pubsub is used to publish tx voided and winner but it's optional
self.pubsub = pubsub
Expand Down
2 changes: 1 addition & 1 deletion tests/consensus/test_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class BaseConsensusTestCase(unittest.TestCase):

def setUp(self) -> None:
super().setUp()
self.tx_storage = TransactionMemoryStorage()
self.tx_storage = TransactionMemoryStorage(settings=self._settings)
self.genesis = self.tx_storage.get_all_genesis()
self.genesis_blocks = [tx for tx in self.genesis if tx.is_block]
self.genesis_txs = [tx for tx in self.genesis if not tx.is_block]
Expand Down
4 changes: 2 additions & 2 deletions tests/others/test_init_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _get_all_transactions(self) -> Iterator[BaseTransaction]:
class SimpleManagerInitializationTestCase(unittest.TestCase):
def setUp(self):
super().setUp()
self.tx_storage = ModifiedTransactionMemoryStorage()
self.tx_storage = ModifiedTransactionMemoryStorage(settings=self._settings)
self.pubsub = PubSubManager(self.clock)

def test_invalid_arguments(self):
Expand Down Expand Up @@ -87,7 +87,7 @@ class BaseManagerInitializationTestCase(unittest.TestCase):

def setUp(self):
super().setUp()
self.tx_storage = ModifiedTransactionMemoryStorage()
self.tx_storage = ModifiedTransactionMemoryStorage(settings=self._settings)
self.network = 'testnet'
self.manager = self.create_peer(self.network, tx_storage=self.tx_storage)

Expand Down
8 changes: 4 additions & 4 deletions tests/others/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_connections_manager_integration(self):
to update the Metrics class with info from ConnectionsManager class
"""
# Preparation
tx_storage = TransactionMemoryStorage()
tx_storage = TransactionMemoryStorage(settings=self._settings)
tmpdir = tempfile.mkdtemp()
self.tmpdirs.append(tmpdir)
wallet = Wallet(directory=tmpdir)
Expand Down Expand Up @@ -191,7 +191,7 @@ def test_tx_storage_data_collection_with_memory_storage(self):
The expected result is that nothing is done, because we currently only collect
data for RocksDB storage
"""
tx_storage = TransactionMemoryStorage()
tx_storage = TransactionMemoryStorage(settings=self._settings)

# All
manager = self.create_peer('testnet', tx_storage=tx_storage)
Expand Down Expand Up @@ -260,8 +260,8 @@ def test_cache_data_collection(self):
TransactionCacheStorage
"""
# Preparation
base_storage = TransactionMemoryStorage()
tx_storage = TransactionCacheStorage(base_storage, self.clock, indexes=None)
base_storage = TransactionMemoryStorage(settings=self._settings)
tx_storage = TransactionCacheStorage(base_storage, self.clock, indexes=None, settings=self._settings)

manager = self.create_peer('testnet', tx_storage=tx_storage)

Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_accumulated_weight.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class BaseAccumulatedWeightTestCase(unittest.TestCase):

def setUp(self):
super().setUp()
self.tx_storage = TransactionMemoryStorage()
self.tx_storage = TransactionMemoryStorage(settings=self._settings)
self.genesis = self.tx_storage.get_all_genesis()
self.genesis_blocks = [tx for tx in self.genesis if tx.is_block]
self.genesis_txs = [tx for tx in self.genesis if not tx.is_block]
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

def test_calculate_feature_activation_bit_counts_genesis():
settings = get_global_settings()
storage = TransactionMemoryStorage()
storage = TransactionMemoryStorage(settings=settings)
genesis_block = storage.get_transaction(settings.GENESIS_BLOCK_HASH)
assert isinstance(genesis_block, Block)
result = genesis_block.get_feature_activation_bit_counts()
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BaseBlockchainTestCase(unittest.TestCase):
"""
def setUp(self):
super().setUp()
self.tx_storage = TransactionMemoryStorage()
self.tx_storage = TransactionMemoryStorage(settings=self._settings)
self.genesis = self.tx_storage.get_all_genesis()
self.genesis_blocks = [tx for tx in self.genesis if tx.is_block]
self.genesis_txs = [tx for tx in self.genesis if not tx.is_block]
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_genesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def setUp(self) -> None:
self._daa = DifficultyAdjustmentAlgorithm(settings=self._settings)
verifiers = VertexVerifiers.create_defaults(settings=self._settings, daa=self._daa, feature_service=Mock())
self._verification_service = VerificationService(settings=self._settings, verifiers=verifiers)
self.storage = TransactionMemoryStorage()
self.storage = TransactionMemoryStorage(settings=settings)

def test_pow(self):
verifier = VertexVerifier(settings=self._settings)
Expand Down
4 changes: 2 additions & 2 deletions tests/tx/test_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ def setUp(self):

super().setUp()
self.wallet = Wallet()
self.tx_storage = TransactionMemoryStorage()
self.tx_storage = TransactionMemoryStorage(settings=self._settings)
self.genesis = self.tx_storage.get_all_genesis()
self.genesis_blocks = [tx for tx in self.genesis if tx.is_block]
self.genesis_txs = [tx for tx in self.genesis if not tx.is_block]
Expand Down Expand Up @@ -724,7 +724,7 @@ def setUp(self):
directory = tempfile.mkdtemp()
self.tmpdirs.append(directory)
rocksdb_storage = RocksDBStorage(path=directory)
self.tx_storage = TransactionRocksDBStorage(rocksdb_storage)
self.tx_storage = TransactionRocksDBStorage(rocksdb_storage, settings=self._settings)
self.genesis = self.tx_storage.get_all_genesis()
self.genesis_blocks = [tx for tx in self.genesis if tx.is_block]
self.genesis_txs = [tx for tx in self.genesis if not tx.is_block]
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_indexes4.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class BaseSimulatorIndexesTestCase(unittest.TestCase):
__test__ = False

def _build_randomized_blockchain(self, *, utxo_index=False):
tx_storage = TransactionMemoryStorage()
tx_storage = TransactionMemoryStorage(settings=self._settings)
manager = self.create_peer('testnet', tx_storage=tx_storage, unlock_wallet=True, wallet_index=True,
use_memory_index=True, utxo_index=utxo_index)

Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_mining.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BaseMiningTest(unittest.TestCase):

def setUp(self):
super().setUp()
self.tx_storage = TransactionMemoryStorage()
self.tx_storage = TransactionMemoryStorage(settings=self._settings)
self.genesis = self.tx_storage.get_all_genesis()
self.genesis_blocks = [tx for tx in self.genesis if tx.is_block]
self.genesis_txs = [tx for tx in self.genesis if not tx.is_block]
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_reward_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def setUp(self):
self.genesis_public_key = self.genesis_private_key.public_key()

# this makes sure we can spend the genesis outputs
self.tx_storage = TransactionMemoryStorage()
self.tx_storage = TransactionMemoryStorage(settings=self._settings)
self.genesis = self.tx_storage.get_all_genesis()
self.genesis_blocks = [tx for tx in self.genesis if tx.is_block]
self.genesis_txs = [tx for tx in self.genesis if not tx.is_block]
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
class TestScripts(unittest.TestCase):
def setUp(self):
super().setUp()
tx_storage = TransactionMemoryStorage()
tx_storage = TransactionMemoryStorage(settings=self._settings)
self.genesis_blocks = [tx for tx in tx_storage.get_all_genesis() if tx.is_block]
self.genesis_txs = [tx for tx in tx_storage.get_all_genesis() if not tx.is_block]

Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_stratum.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class BaseStratumClientTest(unittest.TestCase):

def setUp(self):
super().setUp()
storage = TransactionMemoryStorage()
storage = TransactionMemoryStorage(settings=self._settings)
self.block = storage.get_transaction(self._settings.GENESIS_BLOCK_HASH)
self.transport = StringTransportWithDisconnection()
self.protocol = StratumClient(reactor=self.clock)
Expand Down
Loading