diff --git a/build-scripts/ubuntu-1604/Dockerfile b/build-scripts/ubuntu-1604/Dockerfile index c3e0742f42..fc39e59340 100644 --- a/build-scripts/ubuntu-1604/Dockerfile +++ b/build-scripts/ubuntu-1604/Dockerfile @@ -1,5 +1,13 @@ FROM ubuntu:16.04 +RUN apt-get update -y && apt-get install -y \ + apt-transport-https \ + ca-certificates + +RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 68DB5E88 && \ + echo "deb https://repo.sovrin.org/test/deb xenial rocksdb" >> /etc/apt/sources.list && \ + apt-get update + RUN apt-get update -y && apt-get install -y \ # common stuff git \ @@ -9,12 +17,20 @@ RUN apt-get update -y && apt-get install -y \ python3-pip \ python-setuptools \ python3-venv \ - # fmp + # fpm ruby \ ruby-dev \ rubygems \ gcc \ - make + make \ + # rocksdb python wrapper + libbz2-dev \ + zlib1g-dev \ + liblz4-dev \ + libsnappy-dev \ + rocksdb=5.8.8 + +RUN pip3 install -U setuptools # install fpm RUN gem install --no-ri --no-rdoc fpm diff --git a/build-scripts/ubuntu-1604/build-3rd-parties.sh b/build-scripts/ubuntu-1604/build-3rd-parties.sh index 44d2863b43..775b688de9 100755 --- a/build-scripts/ubuntu-1604/build-3rd-parties.sh +++ b/build-scripts/ubuntu-1604/build-3rd-parties.sh @@ -17,8 +17,13 @@ function build_from_pypi { PREREM_TMP=prerm-${PACKAGE_NAME} cp postinst ${POSTINST_TMP} cp prerm ${PREREM_TMP} - sed -i 's/{package_name}/python3-'${PACKAGE_NAME}'/' ${POSTINST_TMP} - sed -i 's/{package_name}/python3-'${PACKAGE_NAME}'/' ${PREREM_TMP} + if [[ ${PACKAGE_NAME} =~ ^python-* ]]; then + PACKAGE_NAME_TMP="${PACKAGE_NAME/python-/}" + else + PACKAGE_NAME_TMP=$PACKAGE_NAME + fi + sed -i 's/{package_name}/python3-'${PACKAGE_NAME_TMP}'/' ${POSTINST_TMP} + sed -i 's/{package_name}/python3-'${PACKAGE_NAME_TMP}'/' ${PREREM_TMP} fpm --input-type "python" \ --output-type "deb" \ @@ -50,3 +55,5 @@ build_from_pypi pyzmq 16.0.2 build_from_pypi intervaltree 2.1.0 build_from_pypi portalocker 0.5.7 build_from_pypi sortedcontainers 1.5.7 +build_from_pypi setuptools 38.5.2 +build_from_pypi python-rocksdb 0.6.9 diff --git a/ci/ubuntu.dockerfile b/ci/ubuntu.dockerfile index a210c40ce9..762b09ab5f 100644 --- a/ci/ubuntu.dockerfile +++ b/ci/ubuntu.dockerfile @@ -5,10 +5,19 @@ ARG uid=1000 ARG user=indy ARG venv=venv +RUN echo "deb https://repo.sovrin.org/test/deb xenial rocksdb" >> /etc/apt/sources.list && \ + apt-get update + RUN apt-get update -y && apt-get install -y \ python3-nacl \ libindy-crypto=0.2.0 \ - libindy=1.3.1~403 + libindy=1.3.1~403 \ +# rocksdb python wrapper + libbz2-dev \ + zlib1g-dev \ + liblz4-dev \ + libsnappy-dev \ + rocksdb=5.8.8 RUN indy_ci_add_user $uid $user $venv diff --git a/ledger/ledger.py b/ledger/ledger.py index e38d7256ea..501a44077d 100644 --- a/ledger/ledger.py +++ b/ledger/ledger.py @@ -10,7 +10,7 @@ from ledger.tree_hasher import TreeHasher from ledger.util import F, ConsistencyVerificationFailed from storage.kv_store import KeyValueStorage -from storage.kv_store_leveldb_int_keys import KeyValueStorageLeveldbIntKeys +from storage.kv_store_rocksdb_int_keys import KeyValueStorageRocksdbIntKeys class Ledger(ImmutableStore): @@ -19,7 +19,7 @@ def _defaultStore(dataDir, logName, ensureDurability, open=True) -> KeyValueStorage: - return KeyValueStorageLeveldbIntKeys(dataDir, logName, open) + return KeyValueStorageRocksdbIntKeys(dataDir, logName, open) def __init__(self, tree: MerkleTree, @@ -222,8 +222,11 @@ def reset(self): # TODO: rename getAllTxn to get_txn_slice with required parameters frm to # add get_txn_all without args. def getAllTxn(self, frm: int = None, to: int = None): - yield from ((int(seq_no), self.txn_serializer.deserialize(txn)) - for seq_no, txn in self._transactionLog.iterator(start=frm, end=to)) + for seq_no, txn in self._transactionLog.iterator(start=frm, end=to): + if to is None or int(seq_no) <= to: + yield (int(seq_no), self.txn_serializer.deserialize(txn)) + else: + break @staticmethod def hashToStr(h): diff --git a/plenum/bls/bls_store.py b/plenum/bls/bls_store.py index f53a1ccfca..f17ede273f 100644 --- a/plenum/bls/bls_store.py +++ b/plenum/bls/bls_store.py @@ -1,5 +1,5 @@ from common.serializers.serialization import multi_sig_store_serializer -from plenum.persistence.storage import initKeyValueStorage +from storage.helper import initKeyValueStorage from crypto.bls.bls_multi_signature import MultiSignature from typing import Optional diff --git a/plenum/common/constants.py b/plenum/common/constants.py index 0becd9d1a8..6bc921f92f 100644 --- a/plenum/common/constants.py +++ b/plenum/common/constants.py @@ -141,6 +141,7 @@ class StorageType(IntEnum): class KeyValueStorageType(IntEnum): Leveldb = 1 Memory = 2 + Rocksdb = 3 @unique @@ -165,6 +166,7 @@ class LedgerState(IntEnum): HS_FILE = "file" HS_MEMORY = "memory" HS_LEVELDB = 'leveldb' +HS_ROCKSDB = 'rocksdb' PLUGIN_BASE_DIR_PATH = "PluginBaseDirPath" POOL_LEDGER_ID = 0 diff --git a/plenum/common/stack_manager.py b/plenum/common/stack_manager.py index 31469ac829..070d30a7c9 100644 --- a/plenum/common/stack_manager.py +++ b/plenum/common/stack_manager.py @@ -1,11 +1,10 @@ from abc import abstractmethod, ABCMeta from collections import OrderedDict -import os from ledger.genesis_txn.genesis_txn_initiator_from_file import GenesisTxnInitiatorFromFile from plenum.common.keygen_utils import initRemoteKeys from plenum.common.tools import lazy_field -from plenum.persistence.leveldb_hash_store import LevelDbHashStore +from storage.helper import initHashStore from stp_core.types import HA from stp_core.network.exceptions import RemoteNotFound from stp_core.common.log import getlogger @@ -43,8 +42,7 @@ def ledgerFile(self) -> str: @lazy_field def hashStore(self): - return LevelDbHashStore(dataDir=self.ledgerLocation, - fileNamePrefix='pool') + return initHashStore(self.ledgerLocation, 'pool', self.config) # noinspection PyTypeChecker @lazy_field diff --git a/plenum/config.py b/plenum/config.py index 90017c82bb..f242070a20 100644 --- a/plenum/config.py +++ b/plenum/config.py @@ -5,7 +5,8 @@ import logging -from plenum.common.constants import ClientBootStrategy, HS_FILE, KeyValueStorageType +from plenum.common.constants import ClientBootStrategy, HS_FILE, HS_LEVELDB, \ + HS_ROCKSDB, HS_MEMORY, KeyValueStorageType from plenum.common.types import PLUGIN_TYPE_STATS_CONSUMER # Each entry in registry is (stack name, ((host, port), verkey, pubkey)) @@ -59,7 +60,7 @@ clientBootStrategy = ClientBootStrategy.PoolTxn hashStore = { - "type": HS_FILE + "type": HS_LEVELDB } primaryStorage = None diff --git a/plenum/persistence/client_txn_log.py b/plenum/persistence/client_txn_log.py index 73575d0d2c..193f569fc6 100644 --- a/plenum/persistence/client_txn_log.py +++ b/plenum/persistence/client_txn_log.py @@ -4,7 +4,7 @@ from plenum.common.has_file_storage import HasFileStorage from plenum.common.txn_util import getTxnOrderedFields from plenum.common.util import updateFieldsWithSeqNo -from storage.kv_store_leveldb import KeyValueStorageLeveldb +from storage.kv_store_rocksdb import KeyValueStorageRocksdb class ClientTxnLog(HasFileStorage): @@ -17,9 +17,7 @@ def __init__(self, dataLocation): self.clientDataLocation = self.dataLocation if not os.path.exists(self.clientDataLocation): os.makedirs(self.clientDataLocation) - # self.transactionLog = TextFileStore(self.clientDataLocation, - # "transactions") - self.transactionLog = KeyValueStorageLeveldb( + self.transactionLog = KeyValueStorageRocksdb( self.clientDataLocation, "transactions") self.serializer = ledger_txn_serializer diff --git a/plenum/persistence/leveldb_hash_store.py b/plenum/persistence/db_hash_store.py similarity index 79% rename from plenum/persistence/leveldb_hash_store.py rename to plenum/persistence/db_hash_store.py index d15654897b..8c9024da16 100644 --- a/plenum/persistence/leveldb_hash_store.py +++ b/plenum/persistence/db_hash_store.py @@ -1,13 +1,18 @@ +import storage.helper + from ledger.hash_stores.hash_store import HashStore -from storage.kv_store_leveldb import KeyValueStorageLeveldb +from plenum.common.constants import KeyValueStorageType, HS_LEVELDB, HS_ROCKSDB from stp_core.common.log import getlogger logger = getlogger() -class LevelDbHashStore(HashStore): - def __init__(self, dataDir, fileNamePrefix=""): +class DbHashStore(HashStore): + def __init__(self, dataDir, fileNamePrefix="", db_type=HS_ROCKSDB): self.dataDir = dataDir + assert db_type == HS_ROCKSDB or db_type == HS_LEVELDB + self.db_type = KeyValueStorageType.Leveldb if db_type == HS_LEVELDB \ + else KeyValueStorageType.Rocksdb self.nodesDb = None self.leavesDb = None self._leafCount = 0 @@ -76,9 +81,10 @@ def closed(self): (self.nodesDb.closed and self.leavesDb.closed) def open(self): - self.nodesDb = KeyValueStorageLeveldb(self.dataDir, self.nodes_db_name) - self.leavesDb = KeyValueStorageLeveldb( - self.dataDir, self.leaves_db_name) + self.nodesDb = storage.helper.initKeyValueStorage( + self.db_type, self.dataDir, self.nodes_db_name) + self.leavesDb = storage.helper.initKeyValueStorage( + self.db_type, self.dataDir, self.leaves_db_name) self._leafCount = self.leavesDb.size def close(self): diff --git a/plenum/persistence/storage.py b/plenum/persistence/storage.py index 5399a6e00d..8b1590f0e1 100644 --- a/plenum/persistence/storage.py +++ b/plenum/persistence/storage.py @@ -1,11 +1,8 @@ from abc import abstractmethod, ABC -from plenum.common.constants import StorageType, KeyValueStorageType -from plenum.common.exceptions import DataDirectoryNotFound, KeyValueStorageConfigNotFound +from plenum.common.constants import StorageType +from plenum.common.exceptions import DataDirectoryNotFound from plenum.common.messages.node_messages import Reply -from storage.kv_in_memory import KeyValueStorageInMemory -from storage.kv_store import KeyValueStorage -from storage.kv_store_leveldb import KeyValueStorageLeveldb from storage.text_file_store import TextFileStore @@ -27,16 +24,6 @@ async def get(self, identifier: str, reqId: int, **kwargs): pass -def initKeyValueStorage(keyValueType, dataLocation, - keyValueStorageName) -> KeyValueStorage: - if keyValueType == KeyValueStorageType.Leveldb: - return KeyValueStorageLeveldb(dataLocation, keyValueStorageName) - elif keyValueType == KeyValueStorageType.Memory: - return KeyValueStorageInMemory() - else: - raise KeyValueStorageConfigNotFound - - def initStorage(storageType, name, dataDir=None, config=None): if storageType == StorageType.File: if dataDir is None: diff --git a/plenum/server/node.py b/plenum/server/node.py index 78ad970b3e..7c539ad0c7 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -11,9 +11,7 @@ from intervaltree import IntervalTree from ledger.compact_merkle_tree import CompactMerkleTree from ledger.genesis_txn.genesis_txn_initiator_from_file import GenesisTxnInitiatorFromFile -from ledger.hash_stores.file_hash_store import FileHashStore from ledger.hash_stores.hash_store import HashStore -from ledger.hash_stores.memory_hash_store import MemoryHashStore from ledger.util import F from plenum.bls.bls_bft_factory import create_default_bls_bft_factory from plenum.bls.bls_crypto_factory import create_default_bls_crypto_factory @@ -21,7 +19,7 @@ from plenum.common.config_util import getConfig from plenum.common.constants import POOL_LEDGER_ID, DOMAIN_LEDGER_ID, \ CLIENT_BLACKLISTER_SUFFIX, CONFIG_LEDGER_ID, \ - NODE_BLACKLISTER_SUFFIX, NODE_PRIMARY_STORAGE_SUFFIX, HS_FILE, HS_LEVELDB, \ + NODE_BLACKLISTER_SUFFIX, NODE_PRIMARY_STORAGE_SUFFIX, \ TXN_TYPE, LEDGER_STATUS, \ CLIENT_STACK_SUFFIX, PRIMARY_SELECTION_PREFIX, VIEW_CHANGE_PREFIX, \ OP_FIELD_NAME, CATCH_UP_PREFIX, NYM, \ @@ -58,9 +56,8 @@ from plenum.common.util import friendlyEx, getMaxFailures, pop_keys, \ compare_3PC_keys, get_utc_epoch from plenum.common.verifier import DidVerifier -from plenum.persistence.leveldb_hash_store import LevelDbHashStore from plenum.persistence.req_id_to_txn import ReqIdrToTxn -from plenum.persistence.storage import Storage, initStorage, initKeyValueStorage +from plenum.persistence.storage import Storage, initStorage from plenum.server.blacklister import Blacklister from plenum.server.blacklister import SimpleBlacklister from plenum.server.client_authn import ClientAuthNr, SimpleAuthNr, CoreAuthNr @@ -91,6 +88,7 @@ from plenum.common.config_helper import PNodeConfigHelper from state.pruning_state import PruningState from state.state import State +from storage.helper import initKeyValueStorage, initHashStore from stp_core.common.log import getlogger from stp_core.crypto.signer import Signer from stp_core.network.exceptions import RemoteNotFound @@ -447,9 +445,7 @@ def setup_config_req_handler(self): self.register_req_handler(CONFIG_LEDGER_ID, self.configReqHandler) def getConfigLedger(self): - hashStore = LevelDbHashStore( - dataDir=self.dataLocation, fileNamePrefix='config') - return Ledger(CompactMerkleTree(hashStore=hashStore), + return Ledger(CompactMerkleTree(hashStore=self.getHashStore('config')), dataDir=self.dataLocation, fileName=self.config.configTransactionsFile, ensureDurability=self.config.EnsureLedgerDurability) @@ -717,15 +713,7 @@ def getHashStore(self, name) -> HashStore: """ Create and return a hashStore implementation based on configuration """ - hsConfig = self.config.hashStore['type'].lower() - if hsConfig == HS_FILE: - return FileHashStore(dataDir=self.dataLocation, - fileNamePrefix=name) - elif hsConfig == HS_LEVELDB: - return LevelDbHashStore(dataDir=self.dataLocation, - fileNamePrefix=name) - else: - return MemoryHashStore() + return initHashStore(self.dataLocation, name, self.config) def get_new_ledger_manager(self) -> LedgerManager: ledger_sync_order = self.ledger_ids @@ -940,7 +928,7 @@ def onStopping(self): def closeAllKVStores(self): # Clear leveldb lock files - logger.debug("{} closing level dbs".format(self), extra={"cli": False}) + logger.debug("{} closing key-value storages".format(self), extra={"cli": False}) for ledgerId in self.ledgerManager.ledgerRegistry: state = self.getState(ledgerId) if state: diff --git a/plenum/server/pool_manager.py b/plenum/server/pool_manager.py index a1260ab8f6..de4d1cf651 100644 --- a/plenum/server/pool_manager.py +++ b/plenum/server/pool_manager.py @@ -14,7 +14,7 @@ from plenum.common.exceptions import UnsupportedOperation from plenum.common.stack_manager import TxnStackManager from plenum.common.types import NodeDetail -from plenum.persistence.storage import initKeyValueStorage +from storage.helper import initKeyValueStorage from plenum.persistence.util import pop_merkle_info from plenum.server.pool_req_handler import PoolRequestHandler from state.pruning_state import PruningState diff --git a/plenum/test/plugin/demo_plugin/storage.py b/plenum/test/plugin/demo_plugin/storage.py index e74e5bb515..8eaa47b6f8 100644 --- a/plenum/test/plugin/demo_plugin/storage.py +++ b/plenum/test/plugin/demo_plugin/storage.py @@ -1,13 +1,15 @@ from ledger.compact_merkle_tree import CompactMerkleTree from plenum.common.ledger import Ledger -from plenum.persistence.leveldb_hash_store import LevelDbHashStore -from plenum.persistence.storage import initKeyValueStorage +from plenum.persistence.db_hash_store import DbHashStore from state.pruning_state import PruningState +from storage.helper import initKeyValueStorage +from plenum.common.constants import HS_LEVELDB def get_auction_hash_store(data_dir): - return LevelDbHashStore(dataDir=data_dir, - fileNamePrefix='auction') + return DbHashStore(dataDir=data_dir, + fileNamePrefix='auction', + db_type=HS_LEVELDB) def get_auction_ledger(data_dir, name, hash_store, config): diff --git a/plenum/test/storage/test_leveldb_hash_store.py b/plenum/test/storage/test_hash_stores.py similarity index 51% rename from plenum/test/storage/test_leveldb_hash_store.py rename to plenum/test/storage/test_hash_stores.py index a6f0df0549..8dde45542c 100644 --- a/plenum/test/storage/test_leveldb_hash_store.py +++ b/plenum/test/storage/test_hash_stores.py @@ -2,15 +2,14 @@ from ledger.compact_merkle_tree import CompactMerkleTree from ledger.ledger import Ledger -from ledger.test.test_file_hash_store import nodesLeaves, \ - generateHashes +from plenum.common.constants import HS_LEVELDB, HS_ROCKSDB +from ledger.test.test_file_hash_store import nodesLeaves +from plenum.persistence.db_hash_store import DbHashStore -from plenum.persistence.leveldb_hash_store import LevelDbHashStore - -@pytest.yield_fixture(scope="module") -def leveldbHashStore(tdir): - hs = LevelDbHashStore(tdir) +@pytest.yield_fixture(scope="module", params=[HS_ROCKSDB, HS_LEVELDB]) +def hashStore(request, tmpdir_factory): + hs = DbHashStore(tmpdir_factory.mktemp('').strpath, db_type=request.param) cleanup(hs) yield hs hs.close() @@ -21,33 +20,33 @@ def cleanup(hs): hs.leafCount = 0 -def testIndexFrom1(leveldbHashStore): +def testIndexFrom1(hashStore): with pytest.raises(IndexError): - leveldbHashStore.readLeaf(0) + hashStore.readLeaf(0) -def testReadWrite(leveldbHashStore, nodesLeaves): +def testReadWrite(hashStore, nodesLeaves): nodes, leaves = nodesLeaves for node in nodes: - leveldbHashStore.writeNode(node) + hashStore.writeNode(node) for leaf in leaves: - leveldbHashStore.writeLeaf(leaf) - onebyone = [leveldbHashStore.readLeaf(i + 1) for i in range(10)] - multiple = leveldbHashStore.readLeafs(1, 10) + hashStore.writeLeaf(leaf) + onebyone = [hashStore.readLeaf(i + 1) for i in range(10)] + multiple = hashStore.readLeafs(1, 10) assert onebyone == leaves assert onebyone == multiple -def testRecoverLedgerFromHashStore(leveldbHashStore, tdir): - cleanup(leveldbHashStore) - tree = CompactMerkleTree(hashStore=leveldbHashStore) +def testRecoverLedgerFromHashStore(hashStore, tdir): + cleanup(hashStore) + tree = CompactMerkleTree(hashStore=hashStore) ledger = Ledger(tree=tree, dataDir=tdir) for d in range(10): ledger.add(str(d).encode()) updatedTree = ledger.tree ledger.stop() - tree = CompactMerkleTree(hashStore=leveldbHashStore) + tree = CompactMerkleTree(hashStore=hashStore) restartedLedger = Ledger(tree=tree, dataDir=tdir) assert restartedLedger.size == ledger.size assert restartedLedger.root_hash == ledger.root_hash diff --git a/plenum/test/view_change/test_that_domain_ledger_the_same_after_restart_for_all_nodes.py b/plenum/test/view_change/test_that_domain_ledger_the_same_after_restart_for_all_nodes.py index 5eb35f3522..5035b0fc8b 100644 --- a/plenum/test/view_change/test_that_domain_ledger_the_same_after_restart_for_all_nodes.py +++ b/plenum/test/view_change/test_that_domain_ledger_the_same_after_restart_for_all_nodes.py @@ -1,5 +1,6 @@ import pytest +from plenum.common.constants import HS_FILE, HS_LEVELDB, HS_ROCKSDB from plenum.test.view_change.helper import ensure_view_change_by_primary_restart from plenum.test.pool_transactions.conftest import looper from stp_core.common.log import getlogger @@ -47,19 +48,25 @@ def prepare_for_compare(domain_ledger): dict_for_compare['root_hash'] = domain_ledger.root_hash dict_for_compare['tree_root_hash'] = domain_ledger.tree.root_hash dict_for_compare['tree_root_hash_hex'] = domain_ledger.tree.root_hash_hex - """ - save current position of the cursor in stream, move to begin, read content and - move the cursor back - """ - c_pos = domain_ledger.tree.hashStore.leavesFile.db_file.tell() - domain_ledger.tree.hashStore.leavesFile.db_file.seek(0, 0) - dict_for_compare['leaves_store'] = domain_ledger.tree.hashStore.leavesFile.db_file.read() - domain_ledger.tree.hashStore.leavesFile.db_file.seek(c_pos) - - c_pos = domain_ledger.tree.hashStore.nodesFile.db_file.tell() - domain_ledger.tree.hashStore.nodesFile.db_file.seek(0, 0) - dict_for_compare['nodes_store'] = domain_ledger.tree.hashStore.nodesFile.db_file.read() - domain_ledger.tree.hashStore.nodesFile.db_file.seek(c_pos) + if tconf.hashStore['type'] == HS_FILE: + """ + save current position of the cursor in stream, move to begin, read content and + move the cursor back + """ + c_pos = domain_ledger.tree.hashStore.leavesFile.db_file.tell() + domain_ledger.tree.hashStore.leavesFile.db_file.seek(0, 0) + dict_for_compare['leaves_store'] = domain_ledger.tree.hashStore.leavesFile.db_file.read() + domain_ledger.tree.hashStore.leavesFile.db_file.seek(c_pos) + + c_pos = domain_ledger.tree.hashStore.nodesFile.db_file.tell() + domain_ledger.tree.hashStore.nodesFile.db_file.seek(0, 0) + dict_for_compare['nodes_store'] = domain_ledger.tree.hashStore.nodesFile.db_file.read() + domain_ledger.tree.hashStore.nodesFile.db_file.seek(c_pos) + elif tconf.hashStore['type'] == HS_LEVELDB or tconf.hashStore['type'] == HS_ROCKSDB: + dict_for_compare['leaves_store'] = domain_ledger.tree.hashStore.\ + readLeafs(1, domain_ledger.tree.hashStore.leafCount) + dict_for_compare['nodes_store'] = domain_ledger.tree.hashStore. \ + readNodes(1, domain_ledger.tree.hashStore.nodeCount) dict_for_compare['txns'] = [(tno, txn) for tno, txn in domain_ledger.getAllTxn()] diff --git a/setup.py b/setup.py index 3735b4f03f..b7ac38ff38 100644 --- a/setup.py +++ b/setup.py @@ -57,7 +57,8 @@ 'sortedcontainers==1.5.7', 'psutil', 'pip', 'portalocker==0.5.7', 'pyzmq', 'libnacl==1.6.1', 'six==1.11.0', 'psutil', 'intervaltree', - 'msgpack-python==0.4.6', 'indy-crypto==0.2.0'], + 'msgpack-python==0.4.6', 'indy-crypto==0.2.0', + 'python-rocksdb==0.6.9'], setup_requires=['pytest-runner'], extras_require={ 'tests': tests_require, diff --git a/state/test/test_pruning_state.py b/state/test/test_pruning_state.py index 8d6916e347..a158982f2e 100644 --- a/state/test/test_pruning_state.py +++ b/state/test/test_pruning_state.py @@ -1,19 +1,23 @@ import copy import pytest +from storage.kv_store import KeyValueStorage from state.pruning_state import PruningState from state.state import State from state.trie.pruning_trie import BLANK_NODE, BLANK_ROOT from storage.kv_in_memory import KeyValueStorageInMemory from storage.kv_store_leveldb import KeyValueStorageLeveldb +from storage.kv_store_rocksdb import KeyValueStorageRocksdb i = 0 -@pytest.yield_fixture(scope="function", params=['leveldb', 'in_memory']) -def db(request, tempdir) -> State: - if request == 'leveldb': +@pytest.yield_fixture(scope="function", params=['rocksdb', 'leveldb', 'in_memory']) +def db(request, tempdir) -> KeyValueStorage: + if request.param == 'leveldb': return KeyValueStorageLeveldb(tempdir, 'kv{}'.format(i)) + if request.param == 'rocksdb': + return KeyValueStorageRocksdb(tempdir, 'kv{}'.format(i)) return KeyValueStorageInMemory() diff --git a/state/test/test_state_proof_verification.py b/state/test/test_state_proof_verification.py index 5516546659..0019d9f83b 100644 --- a/state/test/test_state_proof_verification.py +++ b/state/test/test_state_proof_verification.py @@ -4,15 +4,19 @@ from state.state import State from storage.kv_in_memory import KeyValueStorageInMemory from storage.kv_store_leveldb import KeyValueStorageLeveldb +from storage.kv_store_rocksdb import KeyValueStorageRocksdb -@pytest.yield_fixture(params=['memory', 'leveldb']) +@pytest.yield_fixture(params=['memory', 'leveldb', 'rocksdb']) def state(request, tmpdir_factory) -> State: - if request.param == 'memory': - db = KeyValueStorageInMemory() if request.param == 'leveldb': db = KeyValueStorageLeveldb(tmpdir_factory.mktemp('').strpath, 'some_db') + elif request.param == 'rocksdb': + db = KeyValueStorageRocksdb(tmpdir_factory.mktemp('').strpath, + 'some_db') + else: + db = KeyValueStorageInMemory() state = PruningState(db) yield state state.close() diff --git a/storage/helper.py b/storage/helper.py new file mode 100644 index 0000000000..6b3fe9a8f7 --- /dev/null +++ b/storage/helper.py @@ -0,0 +1,55 @@ +from ledger.hash_stores.file_hash_store import FileHashStore +from ledger.hash_stores.hash_store import HashStore +from ledger.hash_stores.memory_hash_store import MemoryHashStore + +from plenum.common.config_util import getConfig +from plenum.common.constants import KeyValueStorageType, HS_FILE, HS_LEVELDB, HS_ROCKSDB +from plenum.common.exceptions import KeyValueStorageConfigNotFound + +from plenum.persistence.db_hash_store import DbHashStore + +from storage.kv_in_memory import KeyValueStorageInMemory +from storage.kv_store import KeyValueStorage +from storage.kv_store_leveldb import KeyValueStorageLeveldb +from storage.kv_store_rocksdb import KeyValueStorageRocksdb +from storage.kv_store_leveldb_int_keys import KeyValueStorageLeveldbIntKeys +from storage.kv_store_rocksdb_int_keys import KeyValueStorageRocksdbIntKeys + + +def initKeyValueStorage(keyValueType, dataLocation, + keyValueStorageName) -> KeyValueStorage: + if keyValueType == KeyValueStorageType.Leveldb: + return KeyValueStorageLeveldb(dataLocation, keyValueStorageName) + if keyValueType == KeyValueStorageType.Rocksdb: + return KeyValueStorageRocksdb(dataLocation, keyValueStorageName) + elif keyValueType == KeyValueStorageType.Memory: + return KeyValueStorageInMemory() + else: + raise KeyValueStorageConfigNotFound + + +def initKeyValueStorageIntKeys(keyValueType, dataLocation, + keyValueStorageName) -> KeyValueStorage: + if keyValueType == KeyValueStorageType.Leveldb: + return KeyValueStorageLeveldbIntKeys(dataLocation, keyValueStorageName) + if keyValueType == KeyValueStorageType.Rocksdb: + return KeyValueStorageRocksdbIntKeys(dataLocation, keyValueStorageName) + else: + raise KeyValueStorageConfigNotFound + + +def initHashStore(data_dir, name, config=None) -> HashStore: + """ + Create and return a hashStore implementation based on configuration + """ + config = config or getConfig() + hsConfig = config.hashStore['type'].lower() + if hsConfig == HS_FILE: + return FileHashStore(dataDir=data_dir, + fileNamePrefix=name) + elif hsConfig == HS_LEVELDB or hsConfig == HS_ROCKSDB: + return DbHashStore(dataDir=data_dir, + fileNamePrefix=name, + db_type=hsConfig) + else: + return MemoryHashStore() diff --git a/storage/kv_store_leveldb.py b/storage/kv_store_leveldb.py index 0fbab35b56..6885e7f9de 100644 --- a/storage/kv_store_leveldb.py +++ b/storage/kv_store_leveldb.py @@ -15,20 +15,21 @@ class KeyValueStorageLeveldb(KeyValueStorage): def __init__(self, db_dir, db_name, open=True): if 'leveldb' not in globals(): raise RuntimeError('Leveldb is needed to use this class') - self.db_path = os.path.join(db_dir, db_name) + self._db_path = os.path.join(db_dir, db_name) self._db = None if open: self.open() def __repr__(self): - return self.db_path + return self._db_path @property def is_byte(self) -> bool: return True + @property def db_path(self) -> str: - return self.db_path + return self._db_path def iterator(self, start=None, end=None, include_key=True, include_value=True, prefix=None): if start and isinstance(start, int): diff --git a/storage/kv_store_rocksdb.py b/storage/kv_store_rocksdb.py index a1174e385a..83ff51a37d 100644 --- a/storage/kv_store_rocksdb.py +++ b/storage/kv_store_rocksdb.py @@ -1,23 +1,110 @@ +import os + from typing import Iterable, Tuple +import shutil from storage.kv_store import KeyValueStorage +from state.util.utils import removeLockFiles - -# TODO: WIP below +try: + import rocksdb +except ImportError: + print('Cannot import rocksdb, please install') class KeyValueStorageRocksdb(KeyValueStorage): - def set(self, key, value): - raise NotImplementedError + def __init__(self, db_dir, db_name, open=True): + if 'rocksdb' not in globals(): + raise RuntimeError('Rocksdb is needed to use this class') + self._db_path = os.path.join(db_dir, db_name) + self._db = None + if open: + self.open() + + def open(self): + opts = rocksdb.Options() + opts.create_if_missing = True + self._db = rocksdb.DB(self._db_path, opts) + + def __repr__(self): + return self._db_path + + @property + def db_path(self) -> str: + return self._db_path + + def put(self, key, value): + if isinstance(key, str): + key = key.encode() + if isinstance(value, str): + value = value.encode() + self._db.put(key, value) def get(self, key): - raise NotImplementedError + if isinstance(key, str): + key = key.encode() + vv = self._db.get(key) + if vv is None: + raise KeyError + return vv def remove(self, key): - raise NotImplementedError + if isinstance(key, str): + key = key.encode() + self._db.delete(key) def setBatch(self, batch: Iterable[Tuple]): - raise NotImplementedError + b = rocksdb.WriteBatch() + for key, value in batch: + if isinstance(key, str): + key = key.encode() + if isinstance(value, str): + value = value.encode() + b.put(key, value) + self._db.write(b, sync=False) def close(self): - raise NotImplementedError + del self._db + self._db = None + removeLockFiles(self._db_path) + + def drop(self): + self.close() + shutil.rmtree(self._db_path) + + def reset(self): + self.drop() + self.open() + + def iterator(self, start=None, end=None, include_key=True, include_value=True, prefix=None): + if not include_value: + itr = self._db.iterkeys() + else: + itr = self._db.iteritems() + + if start and isinstance(start, int): + start = str(start) + if start and isinstance(start, str): + start = start.encode() + + if start: + itr.seek(start) + else: + itr.seek_to_first() + return itr + + def do_ops_in_batch(self, batch: Iterable[Tuple], is_committed=False): + pass + + @property + def is_byte(self) -> bool: + return True + + def has_key(self, key): + if isinstance(key, str): + key = key.encode() + return self._db.key_may_exist(key)[0] + + @property + def closed(self): + return self._db is None diff --git a/storage/kv_store_rocksdb_int_keys.py b/storage/kv_store_rocksdb_int_keys.py new file mode 100644 index 0000000000..c30836a12e --- /dev/null +++ b/storage/kv_store_rocksdb_int_keys.py @@ -0,0 +1,49 @@ +from storage.kv_store_rocksdb import KeyValueStorageRocksdb + +try: + import rocksdb +except ImportError: + print('Cannot import rocksdb, please install') + + +class IntegerComparator(rocksdb.IComparator): + def compare(self, a, b): + a = int(a) + b = int(b) + if (a < b): + return -1 + if (a > b): + return 1 + return 0 + + def name(self): + return b'IntegerComparator' + + +class KeyValueStorageRocksdbIntKeys(KeyValueStorageRocksdb): + def __init__(self, db_dir, db_name, open=True): + super().__init__(db_dir, db_name, open) + + def open(self): + opts = rocksdb.Options() + opts.create_if_missing = True + opts.comparator = IntegerComparator() + self._db = rocksdb.DB(self._db_path, opts) + + def get_equal_or_prev(self, key): + # return value can be: + # None, if required key less then minimal key from DB + # Equal by key if key exist in DB + # Previous if key does not exist in Db, but there is key less than required + + if isinstance(key, int): + key = str(key) + if isinstance(key, str): + key = key.encode() + iter = self._db.itervalues() + iter.seek_for_prev(key) + try: + value = next(iter) + except StopIteration: + value = None + return value diff --git a/storage/test/test_kv_leveldb.py b/storage/test/test_kv_leveldb.py deleted file mode 100644 index 34a0ba6d39..0000000000 --- a/storage/test/test_kv_leveldb.py +++ /dev/null @@ -1,129 +0,0 @@ -import os - -import pytest -from storage.kv_store_leveldb import KeyValueStorageLeveldb - -i = 0 - - -@pytest.yield_fixture(scope="function") -def kv(tempdir) -> KeyValueStorageLeveldb: - global i - kv = KeyValueStorageLeveldb(tempdir, 'kv{}'.format(i)) - i += 1 - yield kv - kv.close() - - -def test_reopen(kv): - kv.put('k1', 'v1') - v1 = kv.get('k1') - kv.close() - - kv.open() - v2 = kv.get('k1') - - assert b'v1' == v1 - assert b'v1' == v2 - - -def test_drop(kv): - kv.put('k1', 'v1') - hasKeyBeforeDrop = 'k1' in kv - kv.close() - kv.drop() - - kv.open() - hasKeyAfterDrop = 'k1' in kv - - assert hasKeyBeforeDrop - assert not hasKeyAfterDrop - - -def test_put_string(kv): - kv.put('k1', 'v1') - v1 = kv.get('k1') - - kv.put('k2', 'v2') - v2 = kv.get('k2') - - kv.put('k1', 'v3') - v3 = kv.get('k1') - v4 = kv.get('k2') - - assert b'v1' == v1 - assert b'v2' == v2 - assert b'v3' == v3 - assert b'v2' == v4 - - -def test_put_bytes(kv): - kv.put(b'k1', b'v1') - v1 = kv.get(b'k1') - - kv.put(b'k2', b'v2') - v2 = kv.get(b'k2') - - kv.put(b'k1', b'v3') - v3 = kv.get(b'k1') - v4 = kv.get(b'k2') - - assert b'v1' == v1 - assert b'v2' == v2 - assert b'v3' == v3 - assert b'v2' == v4 - - -def test_put_string_and_bytes(kv): - kv.put(b'k1', 'v1') - v1 = kv.get('k1') - - kv.put('k2', b'v2') - v2 = kv.get(b'k2') - - kv.put('k1', b'v3') - v3 = kv.get('k1') - v4 = kv.get('k2') - - assert b'v1' == v1 - assert b'v2' == v2 - assert b'v3' == v3 - assert b'v2' == v4 - - -def test_remove_string(kv): - kv.put('k1', 'v1') - hasKeyBeforeRemove = 'k1' in kv - kv.remove('k1') - hasKeyAfterRemove = 'k1' in kv - - assert hasKeyBeforeRemove - assert not hasKeyAfterRemove - - -def test_remove_bytes(kv): - kv.put(b'k1', b'v1') - hasKeyBeforeRemove = b'k1' in kv - kv.remove(b'k1') - hasKeyAfterRemove = b'k1' in kv - - assert hasKeyBeforeRemove - assert not hasKeyAfterRemove - - -def test_batch_string(kv): - batch = [('k'.format(i), 'v'.format(i)) - for i in range(5)] - kv.setBatch(batch) - - for i in range(5): - assert 'v'.format(i).encode() == kv.get('k'.format(i)) - - -def test_batch_bytes(kv): - batch = [('k'.format(i).encode(), 'v'.format(i).encode()) - for i in range(5)] - kv.setBatch(batch) - - for i in range(5): - assert 'v'.format(i).encode() == kv.get('k'.format(i)) diff --git a/storage/test/test_kv_leveldb_get_equal_or_prev.py b/storage/test/test_kv_leveldb_get_equal_or_prev.py index 3e0136511c..7f3b7b4095 100644 --- a/storage/test/test_kv_leveldb_get_equal_or_prev.py +++ b/storage/test/test_kv_leveldb_get_equal_or_prev.py @@ -1,10 +1,16 @@ import pytest from storage.kv_store_leveldb_int_keys import KeyValueStorageLeveldbIntKeys +from storage.kv_store_rocksdb_int_keys import KeyValueStorageRocksdbIntKeys -@pytest.fixture(scope="function") -def storage_with_ts_root_hashes(tmpdir): - storage = KeyValueStorageLeveldbIntKeys(tmpdir.dirname, "test_db") +@pytest.fixture(scope="module", params=['rocksdb', 'leveldb']) +def storage_with_ts_root_hashes(request, tmpdir_factory): + if request.param == 'leveldb': + storage = KeyValueStorageLeveldbIntKeys(tmpdir_factory.mktemp('').strpath, + "test_db") + else: + storage = KeyValueStorageRocksdbIntKeys(tmpdir_factory.mktemp('').strpath, + "test_db") ts_list = { 2: "aaaa", 4: "bbbb", diff --git a/storage/test/test_kv_memory.py b/storage/test/test_kv_storages.py similarity index 78% rename from storage/test/test_kv_memory.py rename to storage/test/test_kv_storages.py index 5d8603bfe8..f08c5964f1 100644 --- a/storage/test/test_kv_memory.py +++ b/storage/test/test_kv_storages.py @@ -1,11 +1,24 @@ import pytest -from storage.kv_in_memory import KeyValueStorageInMemory from storage.kv_store_leveldb import KeyValueStorageLeveldb +from storage.kv_store_rocksdb import KeyValueStorageRocksdb +from storage.kv_in_memory import KeyValueStorageInMemory +from storage.kv_store import KeyValueStorage + +i = 0 + + +@pytest.yield_fixture(scope="function", params=['rocksdb', 'leveldb', 'in_memory']) +def kv(request, tempdir) -> KeyValueStorage: + global i + if request.param == 'leveldb': + kv = KeyValueStorageLeveldb(tempdir, 'kv{}'.format(i)) + elif request.param == 'rocksdb': + kv = KeyValueStorageRocksdb(tempdir, 'kv{}'.format(i)) + else: + kv = KeyValueStorageInMemory() -@pytest.yield_fixture(scope="function") -def kv() -> KeyValueStorageLeveldb: - kv = KeyValueStorageInMemory() + i += 1 yield kv kv.close() @@ -36,7 +49,10 @@ def test_drop(kv): def test_put_none(kv): - kv.put('k1', None) + if isinstance(kv, KeyValueStorageInMemory): + kv.put('k1', None) + else: + pass def test_put_string(kv):