@@ -150,11 +150,7 @@ def cancel_stuck_requests(client: distributed.Client, session: sa.orm.Session) -
150
150
f"canceling stuck requests for more than { CONFIG .broker_stuck_requests_limit_minutes } minutes" ,
151
151
stuck_requests = stuck_requests ,
152
152
)
153
- try :
154
- cancel_jobs_on_scheduler (client , job_ids = stuck_requests )
155
- except RuntimeError as e :
156
- # avoid race condition when previous status is "released"
157
- logger .error ("error canceling stuck requests" , error = e )
153
+ cancel_jobs_on_scheduler (client , job_ids = stuck_requests )
158
154
159
155
160
156
class Scheduler :
@@ -515,12 +511,23 @@ def sync_database(self, session: sa.orm.Session) -> None:
515
511
** db .logger_kwargs (request = finished_request ),
516
512
)
517
513
# if the task is in processing, it means that the task is still running
518
- if state == "processing" :
514
+ elif state == "processing" :
519
515
# notify start of request if it is not already notified
520
516
self .qos .notify_start_of_request (
521
517
request , scheduler = self .internal_scheduler
522
518
)
523
519
continue
520
+ elif state == "released" :
521
+ # notify start of request if it is not already notified
522
+ queued_request = db .requeue_request (
523
+ request = request , session = session
524
+ )
525
+ if queued_request :
526
+ self .queue .add (queued_request .request_uid , request )
527
+ self .qos .notify_end_of_request (
528
+ request , scheduler = self .internal_scheduler
529
+ )
530
+ continue
524
531
# if it doesn't find the request: re-queue it
525
532
else :
526
533
request = db .get_request (request .request_uid , session = session )
0 commit comments