@@ -364,15 +364,15 @@ def on_future_done(self, future: distributed.Future) -> None:
364
364
logger .info ("worker killed: re-queueing" , job_id = future .key )
365
365
db .requeue_request (request_uid = future .key , session = session )
366
366
self .queue .add (future .key , request )
367
- elif future . status != "cancelled" :
367
+ else :
368
368
request = db .set_request_status (
369
369
future .key ,
370
370
job_status ,
371
371
error_message = error_message ,
372
372
error_reason = error_reason ,
373
373
session = session ,
374
374
)
375
- else :
375
+ elif future . status != "cancelled" :
376
376
# if the dask status is unknown, re-queue it
377
377
request = db .set_request_status (
378
378
future .key ,
@@ -386,6 +386,9 @@ def on_future_done(self, future: distributed.Future) -> None:
386
386
job_status = {future .status },
387
387
job_id = request .request_uid ,
388
388
)
389
+ else :
390
+ # if the dask status is cancelled, the qos has already been reset by sync_database
391
+ return
389
392
self .futures .pop (future .key , None )
390
393
self .qos .notify_end_of_request (
391
394
request , session , scheduler = self .internal_scheduler
0 commit comments