@@ -151,6 +151,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
151
151
self ._lc_run = LoopingCall (self .run_sync )
152
152
self ._lc_run .clock = self .reactor
153
153
self ._is_running = False
154
+ self ._sync_started_at : float = 0
155
+
156
+ # Maximum running time to consider a sync stale.
157
+ self .max_running_time : int = 30 * 60 # seconds
154
158
155
159
# Whether we propagate transactions or not
156
160
self ._is_relaying = False
@@ -265,6 +269,16 @@ def handle_error(self, payload: str) -> None:
265
269
def update_synced (self , synced : bool ) -> None :
266
270
self ._synced = synced
267
271
272
+ def watchdog (self ) -> None :
273
+ """Close connection if sync is stale."""
274
+ if not self ._is_running :
275
+ return
276
+
277
+ dt = self .reactor .seconds () - self ._sync_started_at
278
+ if dt > self .max_running_time :
279
+ self .log .warn ('stale syncing detected, closing connection' )
280
+ self .protocol .send_error_and_close_connection ('stale syncing' )
281
+
268
282
@inlineCallbacks
269
283
def run_sync (self ) -> Generator [Any , Any , None ]:
270
284
""" Async step of the sync algorithm.
@@ -277,8 +291,10 @@ def run_sync(self) -> Generator[Any, Any, None]:
277
291
if self ._is_running :
278
292
# Already running...
279
293
self .log .debug ('already running' )
294
+ self .watchdog ()
280
295
return
281
296
self ._is_running = True
297
+ self ._sync_started_at = self .reactor .seconds ()
282
298
try :
283
299
yield self ._run_sync ()
284
300
except Exception :
0 commit comments