@@ -148,6 +148,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
148
148
self ._lc_run = LoopingCall (self .run_sync )
149
149
self ._lc_run .clock = self .reactor
150
150
self ._is_running = False
151
+ self ._sync_started_at : float = 0
152
+
153
+ # Maximum running time to consider a sync stale.
154
+ self .max_running_time : int = 30 * 60 # seconds
151
155
152
156
# Whether we propagate transactions or not
153
157
self ._is_relaying = False
@@ -262,6 +266,16 @@ def handle_error(self, payload: str) -> None:
262
266
def update_synced (self , synced : bool ) -> None :
263
267
self ._synced = synced
264
268
269
+ def watchdog (self ) -> None :
270
+ """Close connection if sync is stale."""
271
+ if not self ._is_running :
272
+ return
273
+
274
+ dt = self .reactor .seconds () - self ._sync_started_at
275
+ if dt > self .max_running_time :
276
+ self .log .warn ('stale syncing detected, closing connection' )
277
+ self .protocol .send_error_and_close_connection ('stale syncing' )
278
+
265
279
@inlineCallbacks
266
280
def run_sync (self ) -> Generator [Any , Any , None ]:
267
281
""" Async step of the sync algorithm.
@@ -274,8 +288,10 @@ def run_sync(self) -> Generator[Any, Any, None]:
274
288
if self ._is_running :
275
289
# Already running...
276
290
self .log .debug ('already running' )
291
+ self .watchdog ()
277
292
return
278
293
self ._is_running = True
294
+ self ._sync_started_at = self .reactor .seconds ()
279
295
try :
280
296
yield self ._run_sync ()
281
297
except Exception :
0 commit comments