Skip to content

Commit 8efb6d4

Browse files
refactor(indexes): Optimize RocksDBAddressIndex to handle pagination in O(log n) (#978)
1 parent 2b32edc commit 8efb6d4

File tree

8 files changed

+48
-39
lines changed

8 files changed

+48
-39
lines changed

hathor/indexes/address_index.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,11 @@ def get_from_address(self, address: str) -> list[bytes]:
9292
raise NotImplementedError
9393

9494
@abstractmethod
95-
def get_sorted_from_address(self, address: str) -> list[bytes]:
95+
def get_sorted_from_address(self, address: str, tx_start: Optional[BaseTransaction] = None) -> Iterable[bytes]:
9696
""" Get a sorted list of transaction hashes of an address
97+
98+
`tx_start` serves as a pagination marker, indicating the starting position for the iteration.
99+
When tx_start is None, the iteration begins from the initial element.
97100
"""
98101
raise NotImplementedError
99102

hathor/indexes/memory_address_index.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ def add_tx(self, tx: BaseTransaction) -> None:
4949
def get_from_address(self, address: str) -> list[bytes]:
5050
return list(self._get_from_key(address))
5151

52-
def get_sorted_from_address(self, address: str) -> list[bytes]:
53-
return list(self._get_sorted_from_key(address))
52+
def get_sorted_from_address(self, address: str, tx_start: Optional[BaseTransaction] = None) -> Iterable[bytes]:
53+
return self._get_sorted_from_key(address, tx_start)
5454

5555
def is_address_empty(self, address: str) -> bool:
5656
return self._is_key_empty(address)

hathor/indexes/memory_tx_group_index.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from abc import abstractmethod
1616
from collections import defaultdict
17-
from typing import Iterable, Sized, TypeVar
17+
from typing import Iterable, Optional, Sized, TypeVar
1818

1919
from structlog import get_logger
2020

@@ -63,8 +63,15 @@ def _get_from_key(self, key: KT) -> Iterable[bytes]:
6363
for _, h in self.index[key]:
6464
yield h
6565

66-
def _get_sorted_from_key(self, key: KT) -> Iterable[bytes]:
67-
return [h for _, h in sorted(self.index[key])]
66+
def _get_sorted_from_key(self, key: KT, tx_start: Optional[BaseTransaction] = None) -> Iterable[bytes]:
67+
sorted_elements = sorted(self.index[key])
68+
found = False
69+
for _, h in sorted_elements:
70+
if tx_start and h == tx_start.hash:
71+
found = True
72+
73+
if found or not tx_start:
74+
yield h
6875

6976
def _is_key_empty(self, key: KT) -> bool:
7077
return not bool(self.index[key])

hathor/indexes/rocksdb_address_index.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ def add_tx(self, tx: BaseTransaction) -> None:
6666
def get_from_address(self, address: str) -> list[bytes]:
6767
return list(self._get_from_key(address))
6868

69-
def get_sorted_from_address(self, address: str) -> list[bytes]:
70-
return list(self._get_sorted_from_key(address))
69+
def get_sorted_from_address(self, address: str, tx_start: Optional[BaseTransaction] = None) -> Iterable[bytes]:
70+
return self._get_sorted_from_key(address, tx_start)
7171

7272
def is_address_empty(self, address: str) -> bool:
7373
return self._is_key_empty(address)

hathor/indexes/rocksdb_tx_group_index.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,15 @@ def remove_tx(self, tx: BaseTransaction) -> None:
108108
self._db.delete((self._cf, self._to_rocksdb_key(key, tx)))
109109

110110
def _get_from_key(self, key: KT) -> Iterable[bytes]:
111+
return self._util_get_from_key(key)
112+
113+
def _get_sorted_from_key(self, key: KT, tx_start: Optional[BaseTransaction] = None) -> Iterable[bytes]:
114+
return self._util_get_from_key(key, tx_start)
115+
116+
def _util_get_from_key(self, key: KT, tx: Optional[BaseTransaction] = None) -> Iterable[bytes]:
111117
self.log.debug('seek to', key=key)
112118
it = self._db.iterkeys(self._cf)
113-
it.seek(self._to_rocksdb_key(key))
119+
it.seek(self._to_rocksdb_key(key, tx))
114120
for _cf, rocksdb_key in it:
115121
key2, _, tx_hash = self._from_rocksdb_key(rocksdb_key)
116122
if key2 != key:
@@ -119,9 +125,6 @@ def _get_from_key(self, key: KT) -> Iterable[bytes]:
119125
yield tx_hash
120126
self.log.debug('seek end')
121127

122-
def _get_sorted_from_key(self, key: KT) -> Iterable[bytes]:
123-
return self._get_from_key(key)
124-
125128
def _is_key_empty(self, key: KT) -> bool:
126129
self.log.debug('seek to', key=key)
127130
it = self._db.iterkeys(self._cf)

hathor/indexes/tx_group_index.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from abc import abstractmethod
16-
from typing import Generic, Iterable, Sized, TypeVar
16+
from typing import Generic, Iterable, Optional, Sized, TypeVar
1717

1818
from structlog import get_logger
1919

@@ -49,8 +49,12 @@ def _get_from_key(self, key: KT) -> Iterable[bytes]:
4949
raise NotImplementedError
5050

5151
@abstractmethod
52-
def _get_sorted_from_key(self, key: KT) -> Iterable[bytes]:
53-
"""Get all transactions that have a given key, sorted by timestamp."""
52+
def _get_sorted_from_key(self, key: KT, tx_start: Optional[BaseTransaction] = None) -> Iterable[bytes]:
53+
"""Get all transactions that have a given key, sorted by timestamp.
54+
55+
`tx_start` serves as a pagination marker, indicating the starting position for the iteration.
56+
When tx_start is None, the iteration begins from the initial element.
57+
"""
5458
raise NotImplementedError
5559

5660
@abstractmethod

hathor/wallet/resources/thin_wallet/address_history.py

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from hathor.cli.openapi_files.register import register_resource
2222
from hathor.conf.get_settings import get_global_settings
2323
from hathor.crypto.util import decode_address
24+
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
2425
from hathor.util import json_dumpb, json_loadb
2526
from hathor.wallet.exceptions import InvalidAddress
2627

@@ -166,12 +167,6 @@ def get_address_history(self, addresses: list[str], ref_hash: Optional[str]) ->
166167

167168
history = []
168169
seen: set[bytes] = set()
169-
# XXX In this algorithm we need to sort all transactions of an address
170-
# and find one specific (in case of a pagination request)
171-
# so if this address has many txs, this could become slow
172-
# I've done some tests with 10k txs in one address and the request
173-
# returned in less than 50ms, so we will move forward with it for now
174-
# but this could be improved in the future
175170
for idx, address in enumerate(addresses):
176171
try:
177172
decode_address(address)
@@ -181,31 +176,28 @@ def get_address_history(self, addresses: list[str], ref_hash: Optional[str]) ->
181176
'message': 'The address {} is invalid'.format(address)
182177
})
183178

184-
hashes = addresses_index.get_sorted_from_address(address)
185-
start_index = 0
186-
if ref_hash_bytes and idx == 0:
187-
# It's not the first request, so we must continue from the hash
188-
# but we do it only for the first address
179+
tx = None
180+
if ref_hash_bytes:
189181
try:
190-
# Find index where the hash is
191-
start_index = hashes.index(ref_hash_bytes)
192-
except ValueError:
193-
# ref_hash is not in the list
182+
tx = self.manager.tx_storage.get_transaction(ref_hash_bytes)
183+
except TransactionDoesNotExist:
194184
return json_dumpb({
195185
'success': False,
196-
'message': 'Hash {} is not a transaction from the address {}.'.format(ref_hash, address)
186+
'message': 'Hash {} is not a transaction hash.'.format(ref_hash)
197187
})
198188

199-
# Slice the hashes array from the start_index
200-
to_iterate = hashes[start_index:]
189+
# The address index returns an iterable that starts at `tx`.
190+
hashes = addresses_index.get_sorted_from_address(address, tx)
201191
did_break = False
202-
for index, tx_hash in enumerate(to_iterate):
192+
for tx_hash in hashes:
203193
if total_added == self._settings.MAX_TX_ADDRESSES_HISTORY:
204194
# If already added the max number of elements possible, then break
205195
# I need to add this if at the beginning of the loop to handle the case
206196
# when the first tx of the address exceeds the limit, so we must return
207197
# that the next request should start in the first tx of this address
208198
did_break = True
199+
# Saving the first tx hash for the next request
200+
first_hash = tx_hash.hex()
209201
break
210202

211203
if tx_hash not in seen:
@@ -216,6 +208,8 @@ def get_address_history(self, addresses: list[str], ref_hash: Optional[str]) ->
216208
# It's important to validate also the maximum number of inputs and outputs because some txs
217209
# are really big and the response payload becomes too big
218210
did_break = True
211+
# Saving the first tx hash for the next request
212+
first_hash = tx_hash.hex()
219213
break
220214

221215
seen.add(tx_hash)
@@ -226,10 +220,8 @@ def get_address_history(self, addresses: list[str], ref_hash: Optional[str]) ->
226220
if did_break:
227221
# We stopped in the middle of the txs of this address
228222
# So we return that we still have more data to send
229-
break_index = start_index + index
230223
has_more = True
231224
# The hash to start the search and which address this hash belongs
232-
first_hash = hashes[break_index].hex()
233225
first_address = address
234226
break
235227

tests/tx/test_indexes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ def test_addresses_index_empty(self):
631631
address = self.get_address(10)
632632
assert address is not None
633633
self.assertTrue(addresses_indexes.is_address_empty(address))
634-
self.assertEqual(addresses_indexes.get_sorted_from_address(address), [])
634+
self.assertEqual(list(addresses_indexes.get_sorted_from_address(address)), [])
635635

636636
def test_addresses_index_last(self):
637637
"""
@@ -653,7 +653,7 @@ def test_addresses_index_last(self):
653653
# XXX: this artificial address should major (be greater byte-wise) any possible "natural" address
654654
address = '\x7f' * 34
655655
self.assertTrue(addresses_indexes.is_address_empty(address))
656-
self.assertEqual(addresses_indexes.get_sorted_from_address(address), [])
656+
self.assertEqual(list(addresses_indexes.get_sorted_from_address(address)), [])
657657

658658
# XXX: since we didn't add any multisig address, this is guaranteed to be reach the tail end of the index
659659
assert self._settings.P2PKH_VERSION_BYTE[0] < self._settings.MULTISIG_VERSION_BYTE[0]
@@ -666,7 +666,7 @@ def test_addresses_index_last(self):
666666
assert address is not None
667667

668668
self.assertTrue(addresses_indexes.is_address_empty(address))
669-
self.assertEqual(addresses_indexes.get_sorted_from_address(address), [])
669+
self.assertEqual(list(addresses_indexes.get_sorted_from_address(address)), [])
670670

671671
def test_height_index(self):
672672
from hathor.indexes.height_index import HeightInfo

0 commit comments

Comments
 (0)