Skip to content

Commit d593364

Browse files
committed
improvements 2
1 parent d425afe commit d593364

File tree

7 files changed

+72
-28
lines changed

7 files changed

+72
-28
lines changed

hathor/consensus/consensus.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ def create_context(self) -> ConsensusAlgorithmContext:
7272
def update(self, base: BaseTransaction) -> None:
7373
assert base.storage is not None
7474
assert base.storage.is_only_valid_allowed()
75+
meta = base.get_metadata()
7576
try:
76-
if settings.CONSENSUS_FAIL_ID in meta.voided_by:
77+
if meta.voided_by and settings.CONSENSUS_FAIL_ID in meta.voided_by:
7778
self.log.warn('voided_by has previously failed, removing so we can revalidate', vertex=base.hash_hex)
7879
meta.del_voided_by(settings.CONSENSUS_FAIL_ID)
7980
self._unsafe_update(base)
8081
except Exception:
81-
meta = base.get_metadata()
8282
meta.add_voided_by(settings.CONSENSUS_FAIL_ID)
8383
assert base.storage is not None
8484
base.storage.save_transaction(base, only_metadata=True)

hathor/indexes/deps_index.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,7 @@ def remove_from_needed_index(self, tx: bytes) -> None:
193193
def get_next_needed_tx(self) -> bytes:
194194
"""Choose the start hash for downloading the needed txs"""
195195
raise NotImplementedError
196+
197+
# TODO: make this abstract
198+
def checked_next_needed_txs(self, tx_storage: 'TransactionStorage') -> Iterator[bytes]:
199+
raise NotImplementedError

hathor/indexes/memory_deps_index.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,14 @@ def get_next_needed_tx(self) -> bytes:
157157
height=height, needed_tx=tx.hex())
158158
return start_hash
159159

160+
def checked_next_needed_txs(self, tx_storage: 'TransactionStorage') -> Iterator[bytes]:
161+
for _, (__, tx_hash) in self._needed_txs_index.items():
162+
basic_tx_exists = False
163+
with tx_storage.allow_partially_validated_context():
164+
basic_tx_exists = tx_storage.transaction_exists(tx_hash)
165+
if not basic_tx_exists:
166+
yield tx_hash
167+
160168
def _add_needed(self, tx: BaseTransaction) -> None:
161169
"""This method is idempotent, because self.update needs it to be indempotent."""
162170
assert tx.hash is not None

hathor/manager.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -939,12 +939,11 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
939939
"""
940940
assert self.tx_storage.is_only_valid_allowed()
941941
assert tx.hash is not None
942+
943+
already_exists = False
942944
if self.tx_storage.transaction_exists(tx.hash):
943945
self.tx_storage.compare_bytes_with_local_tx(tx)
944-
if not fails_silently:
945-
raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex))
946-
self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex)
947-
return False
946+
already_exists = True
948947

949948
if tx.timestamp - self.reactor.seconds() > settings.MAX_FUTURE_TIMESTAMP_ALLOWED:
950949
if not fails_silently:
@@ -961,8 +960,14 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
961960
metadata = tx.get_metadata()
962961
except TransactionDoesNotExist:
963962
if not fails_silently:
964-
raise InvalidNewTransaction('missing parent')
965-
self.log.warn('on_new_tx(): missing parent', tx=tx.hash_hex)
963+
raise InvalidNewTransaction('cannot get metadata')
964+
self.log.warn('on_new_tx(): cannot get metadata', tx=tx.hash_hex)
965+
return False
966+
967+
if already_exists and metadata.validation.is_fully_connected():
968+
if not fails_silently:
969+
raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex))
970+
self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex)
966971
return False
967972

968973
if metadata.validation.is_invalid():

hathor/p2p/sync_v2/manager.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,16 @@
3434
from hathor.transaction.exceptions import HathorError
3535
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
3636
from hathor.types import VertexId
37-
from hathor.util import Reactor
37+
from hathor.util import Reactor, collect_n
3838

3939
if TYPE_CHECKING:
4040
from hathor.p2p.protocol import HathorProtocol
4141

4242
settings = HathorSettings()
4343
logger = get_logger()
4444

45+
MAX_GET_TRANSACTIONS_BFS_LEN: int = 256
46+
4547

4648
class PeerState(Enum):
4749
ERROR = 'error'
@@ -309,7 +311,9 @@ def run_sync_transactions(self) -> None:
309311
assert self.tx_storage.indexes is not None
310312
assert self.tx_storage.indexes.deps is not None
311313

312-
start_hash = self.tx_storage.indexes.deps.get_next_needed_tx()
314+
# start_hash = self.tx_storage.indexes.deps.get_next_needed_tx()
315+
needed_txs, _ = collect_n(self.tx_storage.indexes.deps.checked_next_needed_txs(self.tx_storage),
316+
MAX_GET_TRANSACTIONS_BFS_LEN)
313317

314318
# Start with the last received block and find the best block full validated in its chain
315319
block = self._last_received_block
@@ -323,9 +327,9 @@ def run_sync_transactions(self) -> None:
323327
assert block.hash is not None
324328
block_height = block.get_height()
325329

326-
self.log.info('run sync transactions', start=start_hash.hex(), end_block_hash=block.hash.hex(),
330+
self.log.info('run sync transactions', start=[i.hex() for i in needed_txs], end_block_hash=block.hash.hex(),
327331
end_block_height=block_height)
328-
self.send_get_transactions_bfs([start_hash], block.hash)
332+
self.send_get_transactions_bfs(needed_txs, block.hash)
329333

330334
@inlineCallbacks
331335
def run_sync_blocks(self) -> Generator[Any, Any, None]:
@@ -890,8 +894,14 @@ def handle_get_transactions_bfs(self, payload: str) -> None:
890894
self.log.warn('ignore GET-TRANSACTIONS-BFS, already streaming')
891895
return
892896
data = json.loads(payload)
897+
# XXX: todo verify this limit while parsing the payload.
898+
start_from = data['start_from']
899+
if len(start_from) > MAX_GET_TRANSACTIONS_BFS_LEN:
900+
self.log.error('too many transactions in GET-TRANSACTIONS-BFS', state=self.state)
901+
self.protocol.send_error_and_close_connection('Too many transactions in GET-TRANSACTIONS-BFS')
902+
return
893903
self.log.debug('handle_get_transactions_bfs', **data)
894-
start_from = [bytes.fromhex(tx_hash_hex) for tx_hash_hex in data['start_from']]
904+
start_from = [bytes.fromhex(tx_hash_hex) for tx_hash_hex in start_from]
895905
until_first_block = bytes.fromhex(data['until_first_block'])
896906
self.send_transactions_bfs(start_from, until_first_block)
897907

hathor/simulator/fake_connection.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ def getPeerCertificate(self) -> X509:
4141

4242

4343
class FakeConnection:
44-
def __init__(self, manager1: 'HathorManager', manager2: 'HathorManager', *, latency: float = 0):
44+
def __init__(self, manager1: 'HathorManager', manager2: 'HathorManager', *, latency: float = 0,
45+
autoreconnect: bool = False):
4546
"""
4647
:param: latency: Latency between nodes in seconds
4748
"""
@@ -51,20 +52,14 @@ def __init__(self, manager1: 'HathorManager', manager2: 'HathorManager', *, late
5152
self.manager2 = manager2
5253

5354
self.latency = latency
54-
self.is_connected = True
55-
56-
self._proto1 = manager1.connections.server_factory.buildProtocol(HostnameAddress(b'fake', 0))
57-
self._proto2 = manager2.connections.client_factory.buildProtocol(HostnameAddress(b'fake', 0))
58-
59-
self.tr1 = HathorStringTransport(self._proto2.my_peer)
60-
self.tr2 = HathorStringTransport(self._proto1.my_peer)
55+
self.autoreconnect = autoreconnect
56+
self.is_connected = False
6157

6258
self._do_buffering = True
6359
self._buf1: deque[str] = deque()
6460
self._buf2: deque[str] = deque()
6561

66-
self._proto1.makeConnection(self.tr1)
67-
self._proto2.makeConnection(self.tr2)
62+
self.reconnect()
6863

6964
@property
7065
def proto1(self):
@@ -184,6 +179,9 @@ def run_one_step(self, debug=False, force=False):
184179
if debug:
185180
self.log.debug('[2->1] delivered', line=line2)
186181

182+
if self.autoreconnect and self._proto1.aborting and self._proto2.aborting:
183+
self.reconnect()
184+
187185
return True
188186

189187
def run_until_empty(self, max_steps: Optional[int] = None, debug: bool = False, force: bool = False) -> None:
@@ -207,6 +205,20 @@ def disconnect(self, reason):
207205
self._proto2.connectionLost(reason)
208206
self.is_connected = False
209207

208+
def reconnect(self) -> None:
209+
from twisted.python.failure import Failure
210+
if self.is_connected:
211+
self.disconnect(Failure(Exception('forced reconnection')))
212+
self._buf1.clear()
213+
self._buf2.clear()
214+
self._proto1 = self.manager1.connections.server_factory.buildProtocol(HostnameAddress(b'fake', 0))
215+
self._proto2 = self.manager2.connections.client_factory.buildProtocol(HostnameAddress(b'fake', 0))
216+
self.tr1 = HathorStringTransport(self._proto2.my_peer)
217+
self.tr2 = HathorStringTransport(self._proto1.my_peer)
218+
self._proto1.makeConnection(self.tr1)
219+
self._proto2.makeConnection(self.tr2)
220+
self.is_connected = True
221+
210222
def is_empty(self):
211223
if self._do_buffering and (self._buf1 or self._buf2):
212224
return False

tests/simulation/test_simulator.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def test_two_nodes(self):
5757
gen_tx1.stop()
5858
gen_tx2.stop()
5959

60-
self.simulator.run(5 * 60)
60+
self.assertTrue(self.simulator.run(600, trigger=StopWhenSynced(conn12)))
6161

6262
self.assertTrue(conn12.is_connected)
6363
self.assertTipsEqual(manager1, manager2)
@@ -70,7 +70,9 @@ def test_many_miners_since_beginning(self):
7070
for hashpower in [10e6, 5e6, 1e6, 1e6, 1e6]:
7171
manager = self.create_peer()
7272
for node in nodes:
73-
conn = FakeConnection(manager, node, latency=0.085)
73+
# XXX: using autoreconnect is more realistic, but ideally it shouldn't be needed, but the test is
74+
# failing without it for some reason
75+
conn = FakeConnection(manager, node, latency=0.085, autoreconnect=True)
7476
self.simulator.add_connection(conn)
7577
stop_triggers.append(StopWhenSynced(conn))
7678

@@ -95,6 +97,7 @@ def test_new_syncing_peer(self):
9597
nodes = []
9698
miners = []
9799
tx_generators = []
100+
stop_triggers = []
98101

99102
manager = self.create_peer()
100103
nodes.append(manager)
@@ -125,8 +128,9 @@ def test_new_syncing_peer(self):
125128
self.log.debug('adding late node')
126129
late_manager = self.create_peer()
127130
for node in nodes:
128-
conn = FakeConnection(late_manager, node, latency=0.300)
131+
conn = FakeConnection(late_manager, node, latency=0.300, autoreconnect=True)
129132
self.simulator.add_connection(conn)
133+
stop_triggers.append(StopWhenSynced(conn))
130134

131135
self.simulator.run(600)
132136

@@ -135,8 +139,9 @@ def test_new_syncing_peer(self):
135139
for miner in miners:
136140
miner.stop()
137141

138-
self.simulator.run(1200)
139-
self.simulator.run_until_complete(1200)
142+
# self.simulator.run(1200)
143+
# self.simulator.run_until_complete(1200)
144+
self.assertTrue(self.simulator.run(3600, trigger=AllTriggers(stop_triggers)))
140145

141146
for idx, node in enumerate(nodes):
142147
self.log.debug(f'checking node {idx}')

0 commit comments

Comments
 (0)