Skip to content

Commit bdfc5fc

Browse files
authored
Merge pull request #275 from HathorNetwork/feat/sync-v2-mvp2
feat(sync-v2): sync-v2 implemented, sync-v1 still default
2 parents 175f40a + f63f3b8 commit bdfc5fc

22 files changed

+2930
-175
lines changed

hathor/indexes/deps_index.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,7 @@ def get_scope(self) -> Scope:
118118
return SCOPE
119119

120120
def init_loop_step(self, tx: BaseTransaction) -> None:
121-
tx_meta = tx.get_metadata()
122-
if tx_meta.voided_by:
123-
return
124-
self.add_tx(tx, partial=False)
121+
self.add_tx(tx)
125122

126123
def update(self, tx: BaseTransaction) -> None:
127124
assert tx.hash is not None
@@ -193,6 +190,6 @@ def remove_from_needed_index(self, tx: bytes) -> None:
193190
raise NotImplementedError
194191

195192
@abstractmethod
196-
def get_next_needed_tx(self) -> bytes:
197-
"""Choose the start hash for downloading the needed txs"""
193+
def iter_next_needed_txs(self) -> Iterator[bytes]:
194+
"""Iterate over the next needed transactions."""
198195
raise NotImplementedError

hathor/indexes/memory_deps_index.py

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class MemoryDepsIndex(DepsIndex):
3434
_txs_with_deps_ready: set[bytes]
3535

3636
# Next to be downloaded
37+
# - Key: hash of the tx to be downloaded
38+
# - Value[0]: height
39+
# - Value[1]: hash of the tx waiting for the download
3740
_needed_txs_index: dict[bytes, tuple[int, bytes]]
3841

3942
def __init__(self):
@@ -49,10 +52,11 @@ def force_clear(self) -> None:
4952
self._needed_txs_index = {}
5053

5154
def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
52-
assert tx.hash is not None
53-
assert tx.storage is not None
5455
validation = tx.get_metadata().validation
5556
if validation.is_fully_connected():
57+
# discover if new txs are ready because of this tx
58+
self._update_new_deps_ready(tx)
59+
# finally remove from rev deps
5660
self._del_from_deps_index(tx)
5761
elif not partial:
5862
raise ValueError('partial=False will only accept fully connected transactions')
@@ -63,6 +67,19 @@ def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
6367
def del_tx(self, tx: BaseTransaction) -> None:
6468
self._del_from_deps_index(tx)
6569

70+
def _update_new_deps_ready(self, tx: BaseTransaction) -> None:
71+
"""Go over the reverse dependencies of tx and check if any of them are now ready to be validated.
72+
73+
This is also idempotent.
74+
"""
75+
assert tx.hash is not None
76+
assert tx.storage is not None
77+
for candidate_hash in self._rev_dep_index.get(tx.hash, []):
78+
with tx.storage.allow_partially_validated_context():
79+
candidate_tx = tx.storage.get_transaction(candidate_hash)
80+
if candidate_tx.is_ready_for_validation():
81+
self._txs_with_deps_ready.add(candidate_hash)
82+
6683
def _add_deps(self, tx: BaseTransaction) -> None:
6784
"""This method is idempotent, because self.update needs it to be indempotent."""
6885
assert tx.hash is not None
@@ -94,7 +111,9 @@ def next_ready_for_validation(self, tx_storage: 'TransactionStorage', *, dry_run
94111
else:
95112
cur_ready, self._txs_with_deps_ready = self._txs_with_deps_ready, set()
96113
while cur_ready:
97-
yield from sorted(cur_ready, key=lambda tx_hash: tx_storage.get_transaction(tx_hash).timestamp)
114+
with tx_storage.allow_partially_validated_context():
115+
sorted_cur_ready = sorted(cur_ready, key=lambda tx_hash: tx_storage.get_transaction(tx_hash).timestamp)
116+
yield from sorted_cur_ready
98117
if dry_run:
99118
cur_ready = self._txs_with_deps_ready - cur_ready
100119
else:
@@ -113,7 +132,8 @@ def _get_rev_deps(self, tx: bytes) -> frozenset[bytes]:
113132
def known_children(self, tx: BaseTransaction) -> list[bytes]:
114133
assert tx.hash is not None
115134
assert tx.storage is not None
116-
it_rev_deps = map(tx.storage.get_transaction, self._get_rev_deps(tx.hash))
135+
with tx.storage.allow_partially_validated_context():
136+
it_rev_deps = map(tx.storage.get_transaction, self._get_rev_deps(tx.hash))
117137
return [not_none(rev.hash) for rev in it_rev_deps if tx.hash in rev.parents]
118138

119139
# needed-txs-index methods:
@@ -127,18 +147,13 @@ def is_tx_needed(self, tx: bytes) -> bool:
127147
def remove_from_needed_index(self, tx: bytes) -> None:
128148
self._needed_txs_index.pop(tx, None)
129149

130-
def get_next_needed_tx(self) -> bytes:
131-
# This strategy maximizes the chance to download multiple txs on the same stream
132-
# find the tx with highest "height"
133-
# XXX: we could cache this onto `needed_txs` so we don't have to fetch txs every time
134-
# TODO: improve this by using some sorted data structure to make this better than O(n)
135-
height, start_hash, tx = max((h, s, t) for t, (h, s) in self._needed_txs_index.items())
136-
self.log.debug('next needed tx start', needed=len(self._needed_txs_index), start=start_hash.hex(),
137-
height=height, needed_tx=tx.hex())
138-
return start_hash
150+
def iter_next_needed_txs(self) -> Iterator[bytes]:
151+
for tx_hash, _ in self._needed_txs_index.items():
152+
yield tx_hash
139153

140154
def _add_needed(self, tx: BaseTransaction) -> None:
141155
"""This method is idempotent, because self.update needs it to be indempotent."""
156+
assert tx.hash is not None
142157
assert tx.storage is not None
143158
tx_storage = tx.storage
144159

@@ -147,9 +162,14 @@ def _add_needed(self, tx: BaseTransaction) -> None:
147162
# get_all_dependencies is needed to ensure that we get the inputs that aren't reachable through parents alone,
148163
# this can happen for inputs that have not been confirmed as of the block the confirms the block or transaction
149164
# that we're adding the dependencies of
150-
for tx_hash in tx.get_all_dependencies():
165+
for dep_hash in tx.get_all_dependencies():
151166
# It may happen that we have one of the dependencies already, so just add the ones we don't
152167
# have. We should add at least one dependency, otherwise this tx should be full validated
153-
if not tx_storage.transaction_exists(tx_hash):
154-
self.log.debug('tx parent is needed', tx=tx_hash.hex())
155-
self._needed_txs_index[tx_hash] = (height, not_none(tx.hash))
168+
with tx_storage.allow_partially_validated_context():
169+
tx_exists = tx_storage.transaction_exists(dep_hash)
170+
if not tx_exists:
171+
self.log.debug('tx parent is needed', tx=dep_hash.hex())
172+
self._needed_txs_index[dep_hash] = (height, not_none(tx.hash))
173+
174+
# also, remove the given transaction from needed, because we already have it
175+
self._needed_txs_index.pop(tx.hash, None)

hathor/indexes/rocksdb_deps_index.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,6 @@ def remove_from_needed_index(self, tx: bytes) -> None:
351351
key_needed = self._to_key_needed(tx)
352352
self._db.delete((self._cf, key_needed))
353353

354-
def get_next_needed_tx(self) -> bytes:
355-
# This strategy maximizes the chance to download multiple txs on the same stream
356-
# Find the tx with highest "height"
357-
# XXX: we could cache this onto `needed_txs` so we don't have to fetch txs every time
358-
# TODO: improve this by using some sorted data structure to make this better than O(n)
359-
height, start_hash, tx = max((h, s, t) for t, h, s in self._iter_needed())
360-
self.log.debug('next needed tx start', start=start_hash.hex(), height=height, needed_tx=tx.hex())
361-
return start_hash
354+
def iter_next_needed_txs(self) -> Iterator[bytes]:
355+
for tx_hash, _, __ in self._iter_needed():
356+
yield tx_hash

hathor/manager.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,6 @@ def _initialize_components_full_verification(self) -> None:
403403

404404
self.log.debug('load blocks and transactions')
405405
for tx in self.tx_storage._topological_sort_dfs():
406-
tx.update_initial_metadata()
407-
408406
assert tx.hash is not None
409407

410408
tx_meta = tx.get_metadata()
@@ -433,7 +431,14 @@ def _initialize_components_full_verification(self) -> None:
433431

434432
try:
435433
# TODO: deal with invalid tx
434+
tx.calculate_height()
435+
tx._update_parents_children_metadata()
436+
436437
if tx.can_validate_full():
438+
tx.update_initial_metadata()
439+
tx.calculate_min_height()
440+
if tx.is_genesis:
441+
assert tx.validate_checkpoint(self.checkpoints)
437442
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
438443
self.tx_storage.add_to_indexes(tx)
439444
with self.tx_storage.allow_only_valid_context():
@@ -934,12 +939,11 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
934939
"""
935940
assert self.tx_storage.is_only_valid_allowed()
936941
assert tx.hash is not None
942+
943+
already_exists = False
937944
if self.tx_storage.transaction_exists(tx.hash):
938945
self.tx_storage.compare_bytes_with_local_tx(tx)
939-
if not fails_silently:
940-
raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex))
941-
self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex)
942-
return False
946+
already_exists = True
943947

944948
if tx.timestamp - self.reactor.seconds() > settings.MAX_FUTURE_TIMESTAMP_ALLOWED:
945949
if not fails_silently:
@@ -956,8 +960,14 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
956960
metadata = tx.get_metadata()
957961
except TransactionDoesNotExist:
958962
if not fails_silently:
959-
raise InvalidNewTransaction('missing parent')
960-
self.log.warn('on_new_tx(): missing parent', tx=tx.hash_hex)
963+
raise InvalidNewTransaction('cannot get metadata')
964+
self.log.warn('on_new_tx(): cannot get metadata', tx=tx.hash_hex)
965+
return False
966+
967+
if already_exists and metadata.validation.is_fully_connected():
968+
if not fails_silently:
969+
raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex))
970+
self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex)
961971
return False
962972

963973
if metadata.validation.is_invalid():
@@ -1044,7 +1054,7 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction], *, quiet: boo
10441054
try:
10451055
# XXX: `reject_locked_reward` might not apply, partial validation is only used on sync-v2
10461056
# TODO: deal with `reject_locked_reward` on sync-v2
1047-
assert tx.validate_full(reject_locked_reward=True)
1057+
assert tx.validate_full(reject_locked_reward=False)
10481058
except (AssertionError, HathorError):
10491059
# TODO
10501060
raise

hathor/p2p/manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def __init__(self,
100100
enable_sync_v1_1: bool) -> None:
101101
from hathor.p2p.sync_v1.factory_v1_0 import SyncV10Factory
102102
from hathor.p2p.sync_v1.factory_v1_1 import SyncV11Factory
103+
from hathor.p2p.sync_v2.factory import SyncV2Factory
103104

104105
if not (enable_sync_v1 or enable_sync_v1_1 or enable_sync_v2):
105106
raise TypeError(f'{type(self).__name__}() at least one sync version is required')
@@ -185,7 +186,7 @@ def __init__(self,
185186
if enable_sync_v1_1:
186187
self._sync_factories[SyncVersion.V1_1] = SyncV11Factory(self)
187188
if enable_sync_v2:
188-
self._sync_factories[SyncVersion.V2] = SyncV10Factory(self)
189+
self._sync_factories[SyncVersion.V2] = SyncV2Factory(self)
189190

190191
def set_manager(self, manager: 'HathorManager') -> None:
191192
"""Set the manager. This method must be called before start()."""

hathor/p2p/messages.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,12 @@ class ProtocolMessages(Enum):
103103
GET_BEST_BLOCK = 'GET-BEST-BLOCK' # Request the best block of the peer
104104
BEST_BLOCK = 'BEST-BLOCK' # Send the best block to your peer
105105

106-
GET_BLOCK_TXS = 'GET-BLOCK-TXS' # TODO: rename, maybe GET-TX-RANGE or repurpose GET-TRANSACTIONS above
106+
GET_TRANSACTIONS_BFS = 'GET-TRANSACTIONS-BFS'
107107
TRANSACTION = 'TRANSACTION'
108+
TRANSACTIONS_END = 'TRANSACTIONS-END'
108109

109-
GET_MEMPOOL = 'GET-MEMPOOL' # TODO: rename, maybe GET-TX-RANGE or repurpose GET-TRANSACTIONS above
110-
MEMPOOL_END = 'MEMPOOL-END' # End of mempool sync
111-
112-
GET_COMMON_CHAIN = 'GET-COMMON-CHAIN'
113-
COMMON_CHAIN = 'COMMON-CHAIN'
110+
GET_MEMPOOL = 'GET-MEMPOOL'
111+
MEMPOOL_END = 'MEMPOOL-END'
114112

115113
GET_PEER_BLOCK_HASHES = 'GET-PEER-BLOCK-HASHES'
116114
PEER_BLOCK_HASHES = 'PEER-BLOCK-HASHES'

hathor/p2p/sync_v2/__init__.py

Whitespace-only changes.

hathor/p2p/sync_v2/factory.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright 2021 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import TYPE_CHECKING, Optional
16+
17+
from hathor.p2p.manager import ConnectionsManager
18+
from hathor.p2p.sync_factory import SyncManagerFactory
19+
from hathor.p2p.sync_manager import SyncManager
20+
from hathor.p2p.sync_v2.manager import NodeBlockSync
21+
from hathor.util import Reactor
22+
23+
if TYPE_CHECKING:
24+
from hathor.p2p.protocol import HathorProtocol
25+
26+
27+
class SyncV2Factory(SyncManagerFactory):
28+
def __init__(self, connections: ConnectionsManager):
29+
self.connections = connections
30+
31+
def create_sync_manager(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncManager:
32+
return NodeBlockSync(protocol, reactor=reactor)

0 commit comments

Comments
 (0)