|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
15 |
| -from itertools import chain |
16 | 15 | from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional
|
17 | 16 |
|
18 | 17 | from structlog import get_logger
|
@@ -45,11 +44,8 @@ class SyncCheckpoint:
|
45 | 44 | LC_INTERVAL: int = 5
|
46 | 45 |
|
47 | 46 | def __init__(self, manager: 'HathorManager'):
|
48 |
| - # All peers that have all the checkpoints to download |
49 |
| - self.peers_to_request: List['NodeBlockSync'] = [] |
50 |
| - |
51 |
| - # All peers that we are connected but don't have all the checkpoints |
52 |
| - self.incomplete_peers: List['NodeBlockSync'] = [] |
| 47 | + # List of peers and latest known height |
| 48 | + self.available_peers: dict['NodeBlockSync', int] = {} |
53 | 49 |
|
54 | 50 | # Indicate whether the checkpoint sync has been started.
|
55 | 51 | self._started: bool = False
|
@@ -122,8 +118,6 @@ def stop(self) -> bool:
|
122 | 118 | self.log.warn('already stopped')
|
123 | 119 | return False
|
124 | 120 | if self.peer_syncing is not None:
|
125 |
| - if self.peer_syncing in self.peers_to_request: |
126 |
| - self.peers_to_request.remove(self.peer_syncing) |
127 | 121 | self.peer_syncing = None
|
128 | 122 | self._started = False
|
129 | 123 | self._lc_run.stop()
|
@@ -192,15 +186,25 @@ def run_sync(self):
|
192 | 186 | finally:
|
193 | 187 | self._is_running = False
|
194 | 188 |
|
195 |
| - def get_peer_to_request(self) -> Optional['NodeBlockSync']: |
| 189 | + def get_heighest_checkpoint_height(self) -> int: |
| 190 | + """ Return the height of the heighest checkpoint. |
196 | 191 | """
|
| 192 | + return self.manager.checkpoints[-1].height |
| 193 | + |
| 194 | + def get_peer_to_request(self) -> Optional['NodeBlockSync']: |
| 195 | + """ Search through the available peers and choose one to sync checkpoints from, if any |
| 196 | +
|
| 197 | + This method will return None if there are no suitable peers to download checkpoints from, which can happen if |
| 198 | + none of the peers have the checkpoint height that we want or if their connection is not ready yet (or anymore). |
197 | 199 | """
|
198 | 200 | # XXX: we could use a better peer selecting strategy here
|
199 |
| - for peer in self.peers_to_request[:]: |
| 201 | + cutoff_height = self.get_heighest_checkpoint_height() |
| 202 | + for peer, height in self.available_peers.items(): |
200 | 203 | if peer.protocol.state is None:
|
201 |
| - self.peers_to_request.remove(peer) |
202 |
| - else: |
203 |
| - return peer |
| 204 | + continue |
| 205 | + if height < cutoff_height: |
| 206 | + continue |
| 207 | + return peer |
204 | 208 | return None
|
205 | 209 |
|
206 | 210 | def _run_sync(self):
|
@@ -242,7 +246,7 @@ def on_sync_error(self):
|
242 | 246 | """Called by NodeBlockSync when an error occurs (e.g. receive invalid blocks or receive too many blocks)."""
|
243 | 247 | self.log.debug('sync error')
|
244 | 248 | # TODO: emit an event and let the networking handle it
|
245 |
| - self.peers_to_request.remove(self.peer_syncing) |
| 249 | + self.available_peers.remove(self.peer_syncing) |
246 | 250 | self.peer_syncing = None
|
247 | 251 |
|
248 | 252 | def on_stream_ends(self):
|
@@ -283,20 +287,10 @@ def on_stream_ends(self):
|
283 | 287 | # All blocks are downloaded until the last checkpoint.
|
284 | 288 | # So, we stop the checkpoint sync and mark all connections as checkpoint finished.
|
285 | 289 | self.log.debug('stop all sync-checkpoints')
|
286 |
| - for peer_sync in chain(self.peers_to_request, self.incomplete_peers): |
| 290 | + for peer_sync in self.available_peers.keys(): |
287 | 291 | peer_sync.sync_checkpoints_finished()
|
288 | 292 | self.stop()
|
289 | 293 |
|
290 | 294 | def update_peer_height(self, peer: 'NodeBlockSync', height: int) -> None:
|
291 | 295 | """Called by NodeBlockSync when we have updated information about a peers height."""
|
292 |
| - if height >= self.manager.checkpoints[-1].height: |
293 |
| - if peer in self.incomplete_peers: |
294 |
| - self.incomplete_peers.remove(peer) |
295 |
| - # This peer has all checkpoints |
296 |
| - self.peers_to_request.append(peer) |
297 |
| - else: |
298 |
| - # XXX: Maybe this isn't possible, but just in case |
299 |
| - if peer in self.peers_to_request: |
300 |
| - self.peers_to_request.remove(peer) |
301 |
| - # This peer does not have all checkpoints |
302 |
| - self.incomplete_peers.append(peer) |
| 296 | + self.available_peers[peer] = height |
0 commit comments