@@ -395,8 +395,8 @@ def sync_database(self, session: sa.orm.Session) -> None:
395
395
if state == "memory" :
396
396
# if the task is in memory and it is not in the futures
397
397
# it means that the task has been lost by the broker (broker has been restarted)
398
- # the task is successful. If the function returns None it means that the request
399
- # has already been set to successful
398
+ # the task is successful. If the "set_successful_request" function returns None
399
+ # it means that the request has already been set to successful
400
400
finished_request = db .set_successful_request (
401
401
request_uid = request .request_uid ,
402
402
session = session ,
@@ -418,6 +418,9 @@ def sync_database(self, session: sa.orm.Session) -> None:
418
418
dask_status = task ["state" ],
419
419
** db .logger_kwargs (request = finished_request ),
420
420
)
421
+ # if the task is in processing, it means that the task is still running
422
+ if state == "processing" :
423
+ continue
421
424
# if it doesn't find the request: re-queue it
422
425
else :
423
426
request = db .get_request (request .request_uid , session = session )
@@ -434,6 +437,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
434
437
"job has finished" ,
435
438
** db .logger_kwargs (request = successful_request ),
436
439
)
440
+ continue
437
441
# FIXME: check if request status has changed
438
442
if os .getenv (
439
443
"BROKER_REQUEUE_ON_LOST_REQUESTS" , True
@@ -604,6 +608,7 @@ def submit_request(
604
608
resources = request .request_metadata .get ("resources" , {}),
605
609
metadata = request .request_metadata ,
606
610
)
611
+ distributed .fire_and_forget (future )
607
612
self .futures [request .request_uid ] = future
608
613
logger .info (
609
614
"submitted job to scheduler" ,
0 commit comments