Skip to content

Commit ca1d829

Browse files
authored
Merge pull request #892 from HathorNetwork/feat/remove-data-push
feat(sync-v1): remove SendDataPush queue
2 parents 628f3ad + 61327ba commit ca1d829

File tree

1 file changed

+2
-134
lines changed

1 file changed

+2
-134
lines changed

hathor/p2p/sync_v1/agent.py

Lines changed: 2 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414

1515
import base64
1616
import struct
17-
from collections import OrderedDict
1817
from math import inf
1918
from typing import TYPE_CHECKING, Any, Callable, Generator, Iterator, Optional
2019
from weakref import WeakSet
2120

2221
from structlog import get_logger
2322
from twisted.internet.defer import Deferred, inlineCallbacks
24-
from twisted.internet.interfaces import IConsumer, IDelayedCall, IPushProducer
25-
from zope.interface import implementer
23+
from twisted.internet.interfaces import IDelayedCall
2624

2725
from hathor.conf.get_settings import get_settings
2826
from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload
@@ -32,7 +30,6 @@
3230
from hathor.transaction.base_transaction import tx_or_block_from_bytes
3331
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
3432
from hathor.util import Reactor, json_dumps, json_loads
35-
from hathor.utils.zope import asserted_cast
3633

3734
logger = get_logger()
3835

@@ -52,126 +49,6 @@ def _get_deps(tx: BaseTransaction) -> Iterator[bytes]:
5249
yield txin.tx_id
5350

5451

55-
@implementer(IPushProducer)
56-
class SendDataPush:
57-
""" Prioritize blocks over transactions when pushing data to peers.
58-
"""
59-
def __init__(self, node_sync: 'NodeSyncTimestamp'):
60-
self.node_sync = node_sync
61-
self.protocol: 'HathorProtocol' = node_sync.protocol
62-
assert self.protocol.transport is not None
63-
consumer = asserted_cast(IConsumer, self.protocol.transport)
64-
self.consumer = consumer
65-
self.is_running: bool = False
66-
self.is_producing: bool = False
67-
68-
self.queue: OrderedDict[bytes, tuple[BaseTransaction, list[bytes]]] = OrderedDict()
69-
self.priority_queue: OrderedDict[bytes, tuple[BaseTransaction, list[bytes]]] = OrderedDict()
70-
71-
self.delayed_call: Optional[IDelayedCall] = None
72-
73-
def start(self) -> None:
74-
""" Start pushing data.
75-
"""
76-
if self.is_running:
77-
raise Exception('SendDataPush is already started.')
78-
self.is_running = True
79-
self.consumer.registerProducer(self, True)
80-
self.resumeProducing()
81-
82-
def stop(self) -> None:
83-
""" Stop pushing data.
84-
"""
85-
if not self.is_running:
86-
raise Exception('SendDataPush is already stopped.')
87-
self.is_running = False
88-
self.pauseProducing()
89-
self.consumer.unregisterProducer()
90-
91-
def schedule_if_needed(self) -> None:
92-
""" Schedule `send_next` if needed.
93-
"""
94-
if not self.is_running:
95-
return
96-
97-
if not self.is_producing:
98-
return
99-
100-
if self.delayed_call and self.delayed_call.active():
101-
return
102-
103-
if len(self.queue) > 0 or len(self.priority_queue) > 0:
104-
self.delayed_call = self.node_sync.reactor.callLater(0, self.send_next)
105-
106-
def add(self, tx: BaseTransaction) -> None:
107-
""" Add a new block/transaction to be pushed.
108-
"""
109-
assert tx.hash is not None
110-
if tx.is_block:
111-
self.add_to_priority(tx)
112-
else:
113-
deps = list(_get_deps(tx))
114-
self.queue[tx.hash] = (tx, deps)
115-
self.schedule_if_needed()
116-
117-
def add_to_priority(self, tx: BaseTransaction) -> None:
118-
""" Add a new block/transaction to be pushed with priority.
119-
"""
120-
assert tx.hash is not None
121-
assert tx.hash not in self.queue
122-
if tx.hash in self.priority_queue:
123-
return
124-
deps = list(_get_deps(tx))
125-
for h in deps:
126-
if h in self.queue:
127-
tx2, _ = self.queue.pop(h)
128-
self.add_to_priority(tx2)
129-
self.priority_queue[tx.hash] = (tx, deps)
130-
self.schedule_if_needed()
131-
132-
def send_next(self) -> None:
133-
""" Push next block/transaction to peer.
134-
"""
135-
assert self.is_running
136-
assert self.is_producing
137-
138-
if len(self.priority_queue) > 0:
139-
# Send blocks first.
140-
_, (tx, _) = self.priority_queue.popitem(last=False)
141-
142-
elif len(self.queue) > 0:
143-
# Otherwise, send in order.
144-
_, (tx, _) = self.queue.popitem(last=False)
145-
146-
else:
147-
# Nothing to send.
148-
self.delayed_call = None
149-
return
150-
151-
self.node_sync.send_data(tx)
152-
self.schedule_if_needed()
153-
154-
def resumeProducing(self) -> None:
155-
""" This method is automatically called to resume pushing data.
156-
"""
157-
self.is_producing = True
158-
self.schedule_if_needed()
159-
160-
def pauseProducing(self) -> None:
161-
""" This method is automatically called to pause pushing data.
162-
"""
163-
self.is_producing = False
164-
if self.delayed_call and self.delayed_call.active():
165-
self.delayed_call.cancel()
166-
167-
def stopProducing(self) -> None:
168-
""" This method is automatically called to stop pushing data.
169-
"""
170-
self.pauseProducing()
171-
self.queue.clear()
172-
self.priority_queue.clear()
173-
174-
17552
class NodeSyncTimestamp(SyncAgent):
17653
""" An algorithm to sync the DAG between two peers using the timestamp of the transactions.
17754
@@ -218,8 +95,6 @@ def __init__(self, protocol: 'HathorProtocol', downloader: Downloader, reactor:
21895
# This number may decrease if a new transaction/block arrives in a timestamp smaller than it.
21996
self.synced_timestamp: int = 0
22097

221-
self.send_data_queue: SendDataPush = SendDataPush(self)
222-
22398
# Latest data timestamp of the peer.
22499
self.previous_timestamp: int = 0
225100

@@ -274,8 +149,6 @@ def start(self) -> None:
274149
if self._started:
275150
raise Exception('NodeSyncTimestamp is already running')
276151
self._started = True
277-
if self.send_data_queue:
278-
self.send_data_queue.start()
279152
self.next_step()
280153

281154
def stop(self) -> None:
@@ -284,8 +157,6 @@ def stop(self) -> None:
284157
if not self._started:
285158
raise Exception('NodeSyncTimestamp is already stopped')
286159
self._started = False
287-
if self.send_data_queue and self.send_data_queue.is_running:
288-
self.send_data_queue.stop()
289160
if self.call_later_id and self.call_later_id.active():
290161
self.call_later_id.cancel()
291162
for call_later in self._send_tips_call_later:
@@ -330,10 +201,7 @@ def send_tx_to_peer_if_possible(self, tx: BaseTransaction) -> None:
330201
if parent.timestamp > self.synced_timestamp:
331202
return
332203

333-
if self.send_data_queue:
334-
self.send_data_queue.add(tx)
335-
else:
336-
self.send_data(tx)
204+
self.send_data(tx)
337205

338206
def get_peer_next(self, timestamp: Optional[int] = None, offset: int = 0) -> Deferred[NextPayload]:
339207
""" A helper that returns a deferred that is called when the peer replies.

0 commit comments

Comments
 (0)