Skip to content

Commit 8570d55

Browse files
jansegremsbroglipedroferreira1
committed
feat(sync-v2): sync-v2 implemented, sync-v1 still default
Co-authored-by: Marcelo Salhab Brogliato <[email protected]> Co-authored-by: Pedro Ferreira <[email protected]>
1 parent 07d09a4 commit 8570d55

27 files changed

+3022
-169
lines changed

hathor/indexes/deps_index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def get_requested_from_height(tx: BaseTransaction) -> int:
5252
# I'm defaulting the height to `inf` (practically), this should make it heightest priority when
5353
# choosing which transactions to fetch next
5454
return INF_HEIGHT
55-
block = tx.storage.get_transaction(first_block)
55+
block = tx.storage.get_transaction(first_block, allow_partially_valid=True)
5656
assert isinstance(block, Block)
5757
return block.get_metadata().height
5858

hathor/indexes/memory_deps_index.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from functools import partial
1516
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterator, List, Optional, Set, Tuple
1617

1718
from structlog import get_logger
@@ -34,6 +35,9 @@ class MemoryDepsIndex(DepsIndex):
3435
_txs_with_deps_ready: Set[bytes]
3536

3637
# Next to be downloaded
38+
# - Key: hash of the tx to be downloaded
39+
# - Value[0]: height
40+
# - Value[1]: hash of the tx waiting for the download
3741
_needed_txs_index: Dict[bytes, Tuple[int, bytes]]
3842

3943
def __init__(self):
@@ -49,10 +53,11 @@ def force_clear(self) -> None:
4953
self._needed_txs_index = {}
5054

5155
def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
52-
assert tx.hash is not None
53-
assert tx.storage is not None
5456
validation = tx.get_metadata().validation
5557
if validation.is_fully_connected():
58+
# discover if new txs are ready because of this tx
59+
self._update_new_deps_ready(tx)
60+
# finally remove from rev deps
5661
self._del_from_deps_index(tx)
5762
elif not partial:
5863
raise ValueError('partial=False will only accept fully connected transactions')
@@ -63,6 +68,18 @@ def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
6368
def del_tx(self, tx: BaseTransaction) -> None:
6469
self._del_from_deps_index(tx)
6570

71+
def _update_new_deps_ready(self, tx: BaseTransaction) -> None:
72+
"""Go over the reverse dependencies of tx and check if any of them are now ready to be validated.
73+
74+
This is also idempotent.
75+
"""
76+
assert tx.hash is not None
77+
assert tx.storage is not None
78+
for candidate_hash in self._rev_dep_index.get(tx.hash, []):
79+
candidate_tx = tx.storage.get_transaction(candidate_hash, allow_partially_valid=True)
80+
if candidate_tx.is_ready_for_validation():
81+
self._txs_with_deps_ready.add(candidate_hash)
82+
6683
def _add_deps(self, tx: BaseTransaction) -> None:
6784
"""This method is idempotent, because self.update needs it to be indempotent."""
6885
assert tx.hash is not None
@@ -89,12 +106,13 @@ def remove_ready_for_validation(self, tx: bytes) -> None:
89106
self._txs_with_deps_ready.discard(tx)
90107

91108
def next_ready_for_validation(self, tx_storage: 'TransactionStorage', *, dry_run: bool = False) -> Iterator[bytes]:
109+
get_partially_validated = partial(tx_storage.get_transaction, allow_partially_valid=True)
92110
if dry_run:
93111
cur_ready = self._txs_with_deps_ready.copy()
94112
else:
95113
cur_ready, self._txs_with_deps_ready = self._txs_with_deps_ready, set()
96114
while cur_ready:
97-
yield from sorted(cur_ready, key=lambda tx_hash: tx_storage.get_transaction(tx_hash).timestamp)
115+
yield from sorted(cur_ready, key=lambda tx_hash: get_partially_validated(tx_hash).timestamp)
98116
if dry_run:
99117
cur_ready = self._txs_with_deps_ready - cur_ready
100118
else:
@@ -113,7 +131,8 @@ def _get_rev_deps(self, tx: bytes) -> FrozenSet[bytes]:
113131
def known_children(self, tx: BaseTransaction) -> List[bytes]:
114132
assert tx.hash is not None
115133
assert tx.storage is not None
116-
it_rev_deps = map(tx.storage.get_transaction, self._get_rev_deps(tx.hash))
134+
get_partially_validated = partial(tx.storage.get_transaction, allow_partially_valid=True)
135+
it_rev_deps = map(get_partially_validated, self._get_rev_deps(tx.hash))
117136
return [not_none(rev.hash) for rev in it_rev_deps if tx.hash in rev.parents]
118137

119138
# needed-txs-index methods:
@@ -139,6 +158,7 @@ def get_next_needed_tx(self) -> bytes:
139158

140159
def _add_needed(self, tx: BaseTransaction) -> None:
141160
"""This method is idempotent, because self.update needs it to be indempotent."""
161+
assert tx.hash is not None
142162
assert tx.storage is not None
143163
tx_storage = tx.storage
144164

@@ -153,3 +173,6 @@ def _add_needed(self, tx: BaseTransaction) -> None:
153173
if not tx_storage.transaction_exists(tx_hash):
154174
self.log.debug('tx parent is needed', tx=tx_hash.hex())
155175
self._needed_txs_index[tx_hash] = (height, not_none(tx.hash))
176+
177+
# also, remove the given transaction from needed, because we already have it
178+
self._needed_txs_index.pop(tx.hash, None)

hathor/manager.py

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,6 @@ def _initialize_components(self) -> None:
442442

443443
self.log.debug('load blocks and transactions')
444444
for tx in self.tx_storage._topological_sort_dfs():
445-
if self._full_verification:
446-
tx.update_initial_metadata()
447-
448445
assert tx.hash is not None
449446

450447
tx_meta = tx.get_metadata()
@@ -474,15 +471,21 @@ def _initialize_components(self) -> None:
474471
try:
475472
if self._full_verification:
476473
# TODO: deal with invalid tx
474+
tx.calculate_height()
475+
tx._update_parents_children_metadata()
476+
477477
if tx.can_validate_full():
478-
self.tx_storage.add_to_indexes(tx)
478+
tx.calculate_min_height()
479+
if tx.is_genesis:
480+
assert tx.validate_checkpoint(self.checkpoints)
479481
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
482+
self.tx_storage.add_to_indexes(tx)
480483
self.consensus_algorithm.update(tx)
481484
self.tx_storage.indexes.update(tx)
482485
if self.tx_storage.indexes.mempool_tips is not None:
483486
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
484487
if self.tx_storage.indexes.deps is not None:
485-
self.sync_v2_step_validations([tx])
488+
self.sync_v2_step_validations([tx], quiet=True)
486489
else:
487490
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
488491
self.tx_storage.save_transaction(tx, only_metadata=True)
@@ -495,7 +498,9 @@ def _initialize_components(self) -> None:
495498
assert self.tx_storage.indexes is not None
496499
if self.tx_storage.indexes.mempool_tips:
497500
self.tx_storage.indexes.mempool_tips.update(tx)
498-
self.tx_storage.add_to_indexes(tx)
501+
# XXX: refactor this in the future so the index manager decies whether to update each index
502+
if tx_meta.validation.is_fully_connected():
503+
self.tx_storage.add_to_indexes(tx)
499504
if tx.is_transaction and tx_meta.voided_by:
500505
self.tx_storage.del_from_indexes(tx)
501506
except (InvalidNewTransaction, TxValidationError):
@@ -550,11 +555,11 @@ def _initialize_components(self) -> None:
550555
for tx_hash in self.tx_storage.indexes.deps.iter():
551556
if not self.tx_storage.transaction_exists(tx_hash):
552557
continue
553-
tx = self.tx_storage.get_transaction(tx_hash)
558+
tx = self.tx_storage.get_transaction(tx_hash, allow_partially_valid=True)
554559
if tx.get_metadata().validation.is_final():
555560
depended_final_txs.append(tx)
556561
if self.tx_storage.indexes.deps is not None:
557-
self.sync_v2_step_validations(depended_final_txs)
562+
self.sync_v2_step_validations(depended_final_txs, quiet=True)
558563
self.log.debug('pending validations finished')
559564

560565
best_height = self.tx_storage.get_height_best_block()
@@ -705,10 +710,10 @@ def _sync_v2_resume_validations(self) -> None:
705710
for tx_hash in self.tx_storage.indexes.deps.iter():
706711
if not self.tx_storage.transaction_exists(tx_hash):
707712
continue
708-
tx = self.tx_storage.get_transaction(tx_hash)
713+
tx = self.tx_storage.get_transaction(tx_hash, allow_partially_valid=True)
709714
if tx.get_metadata().validation.is_final():
710715
depended_final_txs.append(tx)
711-
self.sync_v2_step_validations(depended_final_txs)
716+
self.sync_v2_step_validations(depended_final_txs, quiet=True)
712717
self.log.debug('pending validations finished')
713718

714719
def add_listen_address(self, addr: str) -> None:
@@ -1001,7 +1006,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10011006
tx.storage = self.tx_storage
10021007

10031008
try:
1004-
metadata = tx.get_metadata()
1009+
metadata = tx.get_metadata(allow_partial=partial)
10051010
except TransactionDoesNotExist:
10061011
if not fails_silently:
10071012
raise InvalidNewTransaction('missing parent')
@@ -1044,7 +1049,7 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10441049
self.tx_storage.indexes.update(tx)
10451050
if self.tx_storage.indexes.mempool_tips:
10461051
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
1047-
self.tx_fully_validated(tx)
1052+
self.tx_fully_validated(tx, quiet=quiet)
10481053
elif sync_checkpoints:
10491054
assert self.tx_storage.indexes.deps is not None
10501055
metadata.children = self.tx_storage.indexes.deps.known_children(tx)
@@ -1056,7 +1061,10 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10561061
self.log.warn('on_new_tx(): checkpoint validation failed', tx=tx.hash_hex, exc_info=True)
10571062
return False
10581063
self.tx_storage.save_transaction(tx)
1064+
self.tx_storage.indexes.deps.add_tx(tx)
1065+
self.log_new_object(tx, 'new {} partially accepted while syncing checkpoints', quiet=quiet)
10591066
else:
1067+
assert self.tx_storage.indexes.deps is not None
10601068
if isinstance(tx, Block) and not tx.has_basic_block_parent():
10611069
if not fails_silently:
10621070
raise InvalidNewTransaction('block parent needs to be at least basic-valid')
@@ -1072,38 +1080,33 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
10721080
# This needs to be called right before the save because we were adding the children
10731081
# in the tx parents even if the tx was invalid (failing the verifications above)
10741082
# then I would have a children that was not in the storage
1075-
tx.update_initial_metadata(save=False)
10761083
self.tx_storage.save_transaction(tx)
1084+
self.tx_storage.indexes.deps.add_tx(tx)
1085+
self.log_new_object(tx, 'new {} partially accepted', quiet=quiet)
10771086

1078-
if tx.is_transaction and self.tx_storage.indexes.deps is not None:
1087+
if self.tx_storage.indexes.deps is not None:
10791088
self.tx_storage.indexes.deps.remove_from_needed_index(tx.hash)
10801089

10811090
if self.tx_storage.indexes.deps is not None:
10821091
try:
1083-
self.sync_v2_step_validations([tx])
1092+
self.sync_v2_step_validations([tx], quiet=quiet)
10841093
except (AssertionError, HathorError) as e:
10851094
if not fails_silently:
10861095
raise InvalidNewTransaction('step validations failed') from e
10871096
self.log.warn('on_new_tx(): step validations failed', tx=tx.hash_hex, exc_info=True)
10881097
return False
10891098

1090-
if not quiet:
1091-
ts_date = datetime.datetime.fromtimestamp(tx.timestamp)
1092-
now = datetime.datetime.fromtimestamp(self.reactor.seconds())
1093-
if tx.is_block:
1094-
self.log.info('new block', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now))
1095-
else:
1096-
self.log.info('new tx', tx=tx, ts_date=ts_date, time_from_now=tx.get_time_from_now(now))
1097-
10981099
if propagate_to_peers:
10991100
# Propagate to our peers.
11001101
self.connections.send_tx_to_peers(tx)
11011102

11021103
return True
11031104

1104-
def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
1105+
def sync_v2_step_validations(self, txs: Iterable[BaseTransaction], *, quiet: bool) -> None:
11051106
""" Step all validations until none can be stepped anymore.
11061107
"""
1108+
from functools import partial
1109+
11071110
assert self.tx_storage.indexes is not None
11081111
assert self.tx_storage.indexes.deps is not None
11091112
# cur_txs will be empty when there are no more new txs that reached full
@@ -1112,13 +1115,14 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
11121115
assert ready_tx.hash is not None
11131116
self.tx_storage.indexes.deps.remove_ready_for_validation(ready_tx.hash)
11141117
it_next_ready = self.tx_storage.indexes.deps.next_ready_for_validation(self.tx_storage)
1115-
for tx in map(self.tx_storage.get_transaction, it_next_ready):
1118+
get_partially_validated = partial(self.tx_storage.get_transaction, allow_partially_valid=True)
1119+
for tx in map(get_partially_validated, it_next_ready):
11161120
assert tx.hash is not None
11171121
tx.update_initial_metadata()
11181122
try:
11191123
# XXX: `reject_locked_reward` might not apply, partial validation is only used on sync-v2
11201124
# TODO: deal with `reject_locked_reward` on sync-v2
1121-
assert tx.validate_full(reject_locked_reward=True)
1125+
assert tx.validate_full(reject_locked_reward=False)
11221126
except (AssertionError, HathorError):
11231127
# TODO
11241128
raise
@@ -1128,9 +1132,9 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction]) -> None:
11281132
self.tx_storage.indexes.update(tx)
11291133
if self.tx_storage.indexes.mempool_tips:
11301134
self.tx_storage.indexes.mempool_tips.update(tx) # XXX: move to indexes.update
1131-
self.tx_fully_validated(tx)
1135+
self.tx_fully_validated(tx, quiet=quiet)
11321136

1133-
def tx_fully_validated(self, tx: BaseTransaction) -> None:
1137+
def tx_fully_validated(self, tx: BaseTransaction, *, quiet: bool) -> None:
11341138
""" Handle operations that need to happen once the tx becomes fully validated.
11351139
11361140
This might happen immediately after we receive the tx, if we have all dependencies
@@ -1149,6 +1153,30 @@ def tx_fully_validated(self, tx: BaseTransaction) -> None:
11491153
# TODO Remove it and use pubsub instead.
11501154
self.wallet.on_new_tx(tx)
11511155

1156+
self.log_new_object(tx, 'new {}', quiet=quiet)
1157+
1158+
def log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
1159+
""" A shortcut for logging additional information for block/txs.
1160+
"""
1161+
metadata = tx.get_metadata()
1162+
now = datetime.datetime.fromtimestamp(self.reactor.seconds())
1163+
kwargs = {
1164+
'tx': tx,
1165+
'ts_date': datetime.datetime.fromtimestamp(tx.timestamp),
1166+
'time_from_now': tx.get_time_from_now(now),
1167+
'validation': metadata.validation.name,
1168+
}
1169+
if tx.is_block:
1170+
message = message_fmt.format('block')
1171+
kwargs['height'] = metadata.get_soft_height()
1172+
else:
1173+
message = message_fmt.format('tx')
1174+
if not quiet:
1175+
log_func = self.log.info
1176+
else:
1177+
log_func = self.log.debug
1178+
log_func(message, **kwargs)
1179+
11521180
def listen(self, description: str, use_ssl: Optional[bool] = None) -> None:
11531181
endpoint = self.connections.listen(description, use_ssl)
11541182
# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases

hathor/p2p/manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def __init__(self, reactor: Reactor, my_peer: PeerId, server_factory: 'HathorSer
7575
client_factory: 'HathorClientFactory', pubsub: PubSubManager, manager: 'HathorManager',
7676
ssl: bool, rng: Random, whitelist_only: bool, enable_sync_v1: bool, enable_sync_v2: bool) -> None:
7777
from hathor.p2p.sync_v1_factory import SyncV1Factory
78+
from hathor.p2p.sync_v2_factory import SyncV2Factory
7879

7980
if not (enable_sync_v1 or enable_sync_v2):
8081
raise TypeError(f'{type(self).__name__}() at least one sync version is required')
@@ -139,7 +140,7 @@ def __init__(self, reactor: Reactor, my_peer: PeerId, server_factory: 'HathorSer
139140
if enable_sync_v1:
140141
self._sync_factories[SyncVersion.V1] = SyncV1Factory(self)
141142
if enable_sync_v2:
142-
self._sync_factories[SyncVersion.V2] = SyncV1Factory(self)
143+
self._sync_factories[SyncVersion.V2] = SyncV2Factory(self)
143144

144145
def start(self) -> None:
145146
self.lc_reconnect.start(5, now=False)

0 commit comments

Comments
 (0)