Skip to content

Commit 86e9beb

Browse files
jansegremsbroglipedroferreira1
committed
feat(sync-v2): sync-v2 implemented, sync-v1 still default
Co-authored-by: Marcelo Salhab Brogliato <[email protected]> Co-authored-by: Pedro Ferreira <[email protected]>
1 parent 328e50b commit 86e9beb

27 files changed

+3022
-169
lines changed

hathor/indexes/deps_index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def get_requested_from_height(tx: BaseTransaction) -> int:
5252
# I'm defaulting the height to `inf` (practically), this should make it heightest priority when
5353
# choosing which transactions to fetch next
5454
return INF_HEIGHT
55-
block = tx.storage.get_transaction(first_block)
55+
block = tx.storage.get_transaction(first_block, allow_partially_valid=True)
5656
assert isinstance(block, Block)
5757
return block.get_metadata().height
5858

hathor/indexes/memory_deps_index.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from functools import partial
1516
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterator, List, Optional, Set, Tuple
1617

1718
from structlog import get_logger
@@ -34,6 +35,9 @@ class MemoryDepsIndex(DepsIndex):
3435
_txs_with_deps_ready: Set[bytes]
3536

3637
# Next to be downloaded
38+
# - Key: hash of the tx to be downloaded
39+
# - Value[0]: height
40+
# - Value[1]: hash of the tx waiting for the download
3741
_needed_txs_index: Dict[bytes, Tuple[int, bytes]]
3842

3943
def __init__(self):
@@ -49,10 +53,11 @@ def force_clear(self) -> None:
4953
self._needed_txs_index = {}
5054

5155
def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
52-
assert tx.hash is not None
53-
assert tx.storage is not None
5456
validation = tx.get_metadata().validation
5557
if validation.is_fully_connected():
58+
# discover if new txs are ready because of this tx
59+
self._update_new_deps_ready(tx)
60+
# finally remove from rev deps
5661
self._del_from_deps_index(tx)
5762
elif not partial:
5863
raise ValueError('partial=False will only accept fully connected transactions')
@@ -63,6 +68,18 @@ def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
6368
def del_tx(self, tx: BaseTransaction) -> None:
6469
self._del_from_deps_index(tx)
6570

71+
def _update_new_deps_ready(self, tx: BaseTransaction) -> None:
72+
"""Go over the reverse dependencies of tx and check if any of them are now ready to be validated.
73+
74+
This is also idempotent.
75+
"""
76+
assert tx.hash is not None
77+
assert tx.storage is not None
78+
for candidate_hash in self._rev_dep_index.get(tx.hash, []):
79+
candidate_tx = tx.storage.get_transaction(candidate_hash, allow_partially_valid=True)
80+
if candidate_tx.is_ready_for_validation():
81+
self._txs_with_deps_ready.add(candidate_hash)
82+
6683
def _add_deps(self, tx: BaseTransaction) -> None:
6784
"""This method is idempotent, because self.update needs it to be indempotent."""
6885
assert tx.hash is not None
@@ -89,12 +106,13 @@ def remove_ready_for_validation(self, tx: bytes) -> None:
89106
self._txs_with_deps_ready.discard(tx)
90107

91108
def next_ready_for_validation(self, tx_storage: 'TransactionStorage', *, dry_run: bool = False) -> Iterator[bytes]:
109+
get_partially_validated = partial(tx_storage.get_transaction, allow_partially_valid=True)
92110
if dry_run:
93111
cur_ready = self._txs_with_deps_ready.copy()
94112
else:
95113
cur_ready, self._txs_with_deps_ready = self._txs_with_deps_ready, set()
96114
while cur_ready:
97-
yield from sorted(cur_ready, key=lambda tx_hash: tx_storage.get_transaction(tx_hash).timestamp)
115+
yield from sorted(cur_ready, key=lambda tx_hash: get_partially_validated(tx_hash).timestamp)
98116
if dry_run:
99117
cur_ready = self._txs_with_deps_ready - cur_ready
100118
else:
@@ -113,7 +131,8 @@ def _get_rev_deps(self, tx: bytes) -> FrozenSet[bytes]:
113131
def known_children(self, tx: BaseTransaction) -> List[bytes]:
114132
assert tx.hash is not None
115133
assert tx.storage is not None
116-
it_rev_deps = map(tx.storage.get_transaction, self._get_rev_deps(tx.hash))
134+
get_partially_validated = partial(tx.storage.get_transaction, allow_partially_valid=True)
135+
it_rev_deps = map(get_partially_validated, self._get_rev_deps(tx.hash))
117136
return [not_none(rev.hash) for rev in it_rev_deps if tx.hash in rev.parents]
118137

119138
# needed-txs-index methods:
@@ -139,6 +158,7 @@ def get_next_needed_tx(self) -> bytes:
139158

140159
def _add_needed(self, tx: BaseTransaction) -> None:
141160
"""This method is idempotent, because self.update needs it to be indempotent."""
161+
assert tx.hash is not None
142162
assert tx.storage is not None
143163
tx_storage = tx.storage
144164

@@ -153,3 +173,6 @@ def _add_needed(self, tx: BaseTransaction) -> None:
153173
if not tx_storage.transaction_exists(tx_hash):
154174
self.log.debug('tx parent is needed', tx=tx_hash.hex())
155175
self._needed_txs_index[tx_hash] = (height, not_none(tx.hash))
176+
177+
# also, remove the given transaction from needed, because we already have it
178+
self._needed_txs_index.pop(tx.hash, None)

hathor/manager.py

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -454,9 +454,6 @@ def _initialize_components(self) -> None:
454454

455455
self.log.debug('load blocks and transactions')
456456
for tx in self.tx_storage._topological_sort_dfs():
457-
if self._full_verification:
458-
tx.update_initial_metadata()
459-
460457
assert tx.hash is not None
461458

462459
tx_meta = tx.get_metadata()
@@ -486,15 +483,21 @@ def _initialize_components(self) -> None:
486483
try:
487484
if self._full_verification:
488485
# TODO: deal with invalid tx
486+
tx.calculate_height()
487+
tx._update_parents_children_metadata()
488+
489489
if tx.can_validate_full():
490-
self.tx_storage.add_to_indexes(tx)
490+
tx.calculate_min_height()
491+
if tx.is_genesis:
492+
assert tx.validate_checkpoint(self.checkpoints)
491493
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
494+
self.tx_storage.add_to_indexes(tx)
492495
self.consensus_algorithm.update(tx)
493496
self.tx_storage.indexes.update(tx)
494497
if self.tx_storage.indexes.mempool_tips is not None:
495498
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
496499
if self.tx_storage.indexes.deps is not None:
497-
self.sync_v2_step_validations([tx])
500+
self.sync_v2_step_validations([tx], quiet=True)
498501
else:
499502
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
500503
self.tx_storage.save_transaction(tx, only_metadata=True)
@@ -507,7 +510,9 @@ def _initialize_components(self) -> None:
507510
assert self.tx_storage.indexes is not None
508511
if self.tx_storage.indexes.mempool_tips:
509512
self.tx_storage.indexes.mempool_tips.update(tx)
510-
self.tx_storage.add_to_indexes(tx)
513+
# XXX: refactor this in the future so the index manager decies whether to update each index
514+
if tx_meta.validation.is_fully_connected():
515+
self.tx_storage.add_to_indexes(tx)
511516
if tx.is_transaction and tx_meta.voided_by:
512517
self.tx_storage.del_from_indexes(tx)
513518
except (InvalidNewTransaction, TxValidationError):
@@ -562,11 +567,11 @@ def _initialize_components(self) -> None:
562567
for tx_hash in self.tx_storage.indexes.deps.iter():
563568
if not self.tx_storage.transaction_exists(tx_hash):
564569
continue
565-
tx = self.tx_storage.get_transaction(tx_hash)
570+
tx = self.tx_storage.get_transaction(tx_hash, allow_partially_valid=True)
566571
if tx.get_metadata().validation.is_final():
567572
depended_final_txs.append(tx)
568573
if self.tx_storage.indexes.deps is not None:
569-
self.sync_v2_step_validations(depended_final_txs)
574+
self.sync_v2_step_validations(depended_final_txs, quiet=True)
570575
self.log.debug('pending validations finished')
571576

572577
best_height = self.tx_storage.get_height_best_block()
@@ -722,10 +727,10 @@ def _sync_v2_resume_validations(self) -> None:
722727
for tx_hash in self.tx_storage.indexes.deps.iter():
723728
if not self.tx_storage.transaction_exists(tx_hash):
724729
continue
725-
tx = self.tx_storage.get_transaction(tx_hash)
730+
tx = self.tx_storage.get_transaction(tx_hash, allow_partially_valid=True)
726731
if tx.get_metadata().validation.is_final():
727732
depended_final_txs.append(tx)
728-
self.sync_v2_step_validations(depended_final_txs)
733+
self.sync_v2_step_validations(depended_final_txs, quiet=True)
729734
self.log.debug('pending validations finished')
730735

731736
def add_listen_address(self, addr: str) -> None:
@@ -1018,7 +1023,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10181023
tx.storage = self.tx_storage
10191024

10201025
try:
1021-
metadata = tx.get_metadata()
1026+
metadata = tx.get_metadata(allow_partial=partial)
10221027
except TransactionDoesNotExist:
10231028
if not fails_silently:
10241029
raise InvalidNewTransaction('missing parent')
@@ -1061,7 +1066,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10611066
self.tx_storage.indexes.update(tx)
10621067
if self.tx_storage.indexes.mempool_tips:
10631068
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
1064-
self.tx_fully_validated(tx)
1069+
self.tx_fully_validated(tx, quiet=quiet)
10651070
elif sync_checkpoints:
10661071
assert self.tx_storage.indexes.deps is not None
10671072
metadata.children = self.tx_storage.indexes.deps.known_children(tx)
@@ -1073,7 +1078,10 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10731078
self.log.warn('on_new_tx(): checkpoint validation failed', tx=tx.hash_hex, exc_info=True)
10741079
return False
10751080
self.tx_storage.save_transaction(tx)
1081+
self.tx_storage.indexes.deps.add_tx(tx)
1082+
self.log_new_object(tx, 'new {} partially accepted while syncing checkpoints', quiet=quiet)
10761083
else:
1084+
assert self.tx_storage.indexes.deps is not None
10771085
if isinstance(tx, Block) and not tx.has_basic_block_parent():
10781086
if not fails_silently:
10791087
raise InvalidNewTransaction('block parent needs to be at least basic-valid')
@@ -1089,38 +1097,33 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10891097
# This needs to be called right before the save because we were adding the children
10901098
# in the tx parents even if the tx was invalid (failing the verifications above)
10911099
# then I would have a children that was not in the storage
1092-
tx.update_initial_metadata(save=False)
10931100
self.tx_storage.save_transaction(tx)
1101+
self.tx_storage.indexes.deps.add_tx(tx)
1102+
self.log_new_object(tx, 'new {} partially accepted', quiet=quiet)
10941103

1095-
if tx.is_transaction and self.tx_storage.indexes.deps is not None:
1104+
if self.tx_storage.indexes.deps is not None:
10961105
self.tx_storage.indexes.deps.remove_from_needed_index(tx.hash)
10971106

10981107
if self.tx_storage.indexes.deps is not None:
10991108
try:
1100-
self.sync_v2_step_validations([tx])
1109+
self.sync_v2_step_validations([tx], quiet=quiet)
11011110
except (AssertionError, HathorError) as e:
11021111
if not fails_silently:
11031112
raise InvalidNewTransaction('step validations failed') from e
11041113
self.log.warn('on_new_tx(): step validations failed', tx=tx.hash_hex, exc_info=True)
11051114
return False
11061115

1107-
if not quiet:
1108-
ts_date = datetime.datetime.fromtimestamp(tx.timestamp)
1109-
now = datetime.datetime.fromtimestamp(self.reactor.seconds())
1110-
if tx.is_block:
1111-
self.log.info('new block', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now))
1112-
else:
1113-
self.log.info('new tx', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now))
1114-
11151116
if propagate_to_peers:
11161117
# Propagate to our peers.
11171118
self.connections.send_tx_to_peers(tx)
11181119

11191120
return True
11201121

1121-
def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
1122+
def sync_v2_step_validations(self, txs: Iterable[BaseTransaction], *, quiet: bool) -> None:
11221123
""" Step all validations until none can be stepped anymore.
11231124
"""
1125+
from functools import partial
1126+
11241127
assert self.tx_storage.indexes is not None
11251128
assert self.tx_storage.indexes.deps is not None
11261129
# cur_txs will be empty when there are no more new txs that reached full
@@ -1129,13 +1132,14 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
11291132
assert ready_tx.hash is not None
11301133
self.tx_storage.indexes.deps.remove_ready_for_validation(ready_tx.hash)
11311134
it_next_ready = self.tx_storage.indexes.deps.next_ready_for_validation(self.tx_storage)
1132-
for tx in map(self.tx_storage.get_transaction, it_next_ready):
1135+
get_partially_validated = partial(self.tx_storage.get_transaction, allow_partially_valid=True)
1136+
for tx in map(get_partially_validated, it_next_ready):
11331137
assert tx.hash is not None
11341138
tx.update_initial_metadata()
11351139
try:
11361140
# XXX: `reject_locked_reward` might not apply, partial validation is only used on sync-v2
11371141
# TODO: deal with `reject_locked_reward` on sync-v2
1138-
assert tx.validate_full(reject_locked_reward=True)
1142+
assert tx.validate_full(reject_locked_reward=False)
11391143
except (AssertionError, HathorError):
11401144
# TODO
11411145
raise
@@ -1145,9 +1149,9 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
11451149
self.tx_storage.indexes.update(tx)
11461150
if self.tx_storage.indexes.mempool_tips:
11471151
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
1148-
self.tx_fully_validated(tx)
1152+
self.tx_fully_validated(tx, quiet=quiet)
11491153

1150-
def tx_fully_validated(self, tx: BaseTransaction) -> None:
1154+
def tx_fully_validated(self, tx: BaseTransaction, *, quiet: bool) -> None:
11511155
""" Handle operations that need to happen once the tx becomes fully validated.
11521156
11531157
This might happen immediately after we receive the tx, if we have all dependencies
@@ -1166,6 +1170,30 @@ def tx_fully_validated(self, tx: BaseTransaction) -> None:
11661170
# TODO Remove it and use pubsub instead.
11671171
self.wallet.on_new_tx(tx)
11681172

1173+
self.log_new_object(tx, 'new {}', quiet=quiet)
1174+
1175+
def log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
1176+
""" A shortcut for logging additional information for block/txs.
1177+
"""
1178+
metadata = tx.get_metadata()
1179+
now = datetime.datetime.fromtimestamp(self.reactor.seconds())
1180+
kwargs = {
1181+
'tx': tx,
1182+
'ts_date': datetime.datetime.fromtimestamp(tx.timestamp),
1183+
'time_from_now': tx.get_time_from_now(now),
1184+
'validation': metadata.validation.name,
1185+
}
1186+
if tx.is_block:
1187+
message = message_fmt.format('block')
1188+
kwargs['height'] = metadata.get_soft_height()
1189+
else:
1190+
message = message_fmt.format('tx')
1191+
if not quiet:
1192+
log_func = self.log.info
1193+
else:
1194+
log_func = self.log.debug
1195+
log_func(message, **kwargs)
1196+
11691197
def listen(self, description: str, use_ssl: Optional[bool] = None) -> None:
11701198
endpoint = self.connections.listen(description, use_ssl)
11711199
# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases

hathor/p2p/manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def __init__(self, reactor: Reactor, my_peer: PeerId, server_factory: 'HathorSer
9393
enable_sync_v1_1: bool) -> None:
9494
from hathor.p2p.sync_v1_1_factory import SyncV11Factory
9595
from hathor.p2p.sync_v1_factory import SyncV1Factory
96+
from hathor.p2p.sync_v2_factory import SyncV2Factory
9697

9798
if not (enable_sync_v1 or enable_sync_v1_1 or enable_sync_v2):
9899
raise TypeError(f'{type(self).__name__}() at least one sync version is required')
@@ -174,7 +175,7 @@ def __init__(self, reactor: Reactor, my_peer: PeerId, server_factory: 'HathorSer
174175
if enable_sync_v1_1:
175176
self._sync_factories[SyncVersion.V1_1] = SyncV11Factory(self)
176177
if enable_sync_v2:
177-
self._sync_factories[SyncVersion.V2] = SyncV1Factory(self)
178+
self._sync_factories[SyncVersion.V2] = SyncV2Factory(self)
178179

179180
def disable_rate_limiter(self) -> None:
180181
"""Disable global rate limiter."""

0 commit comments

Comments
 (0)