@@ -92,6 +92,37 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A
92
92
return client .run_on_scheduler (get_tasks_on_scheduler )
93
93
94
94
95
+ def cancel_jobs_on_scheduler (client : distributed .Client , job_ids : list [str ]) -> None :
96
+ """Cancel jobs on the dask scheduler.
97
+
98
+ This function is executed on the scheduler pod. This just cancel the jobs on the scheduler.
99
+ See https://stackoverflow.com/questions/49203128/how-do-i-stop-a-running-task-in-dask.
100
+ """
101
+
102
+ def cancel_jobs (dask_scheduler : distributed .Scheduler , job_ids : list [str ]) -> None :
103
+ for job_id in job_ids :
104
+ if job_id in dask_scheduler .tasks :
105
+ dask_scheduler .transitions (
106
+ {job_id : "cancelled" }, stimulus_id = "manual-cancel"
107
+ )
108
+
109
+ return client .run_on_scheduler (cancel_jobs , job_ids = job_ids )
110
+
111
+
112
+ @cachetools .cached ( # type: ignore
113
+ cache = cachetools .TTLCache (
114
+ maxsize = 1024 , ttl = CONFIG .broker_cancel_stuck_requests_cache_ttl
115
+ ),
116
+ info = True ,
117
+ )
118
+ def cancel_stuck_requests (client : distributed .Client , session : sa .orm .Session ) -> None :
119
+ """Get the stuck requests from the database and cancel them on the dask scheduler."""
120
+ stuck_requests = db .get_stuck_requests (
121
+ session = session , hours = CONFIG .broker_stuck_requests_limit_hours
122
+ )
123
+ cancel_jobs_on_scheduler (client , job_ids = stuck_requests )
124
+
125
+
95
126
class Scheduler :
96
127
"""A simple scheduler to store the tasks to update the qos_rules in the database.
97
128
@@ -379,9 +410,13 @@ def sync_database(self, session: sa.orm.Session) -> None:
379
410
dismissed_requests = db .get_dismissed_requests (
380
411
session , limit = CONFIG .broker_max_accepted_requests
381
412
)
382
- for i , request in enumerate ( dismissed_requests ) :
413
+ for request in dismissed_requests :
383
414
if future := self .futures .pop (request .request_uid , None ):
384
415
future .cancel ()
416
+ else :
417
+ # if the request is not in the futures, it means that the request has been lost by the broker
418
+ # try to cancel the job directly on the scheduler
419
+ cancel_jobs_on_scheduler (self .client , job_ids = [request .request_uid ])
385
420
session = self .manage_dismissed_request (request , session )
386
421
session .commit ()
387
422
@@ -610,12 +645,12 @@ def processing_time_priority_algorithm(
610
645
interval_stop = datetime .datetime .now ()
611
646
# temporary solution to prioritize high priority user
612
647
users_queue = {
613
- "27888ffa-0973-4794-9b3c-9efb6767f66f" : 0 , # wekeo
614
- "d67a13db-86cc-439d-823d-6517003de29f" : 0 , # CDS Apps user
615
- "365ac1da-090e-4b85-9088-30c676bc5251" : 0 , # Gionata
616
- "74c6f9a1-8efe-4a6c-b06b-9f8ddcab188d" : 0 , # User Support
617
- "4d92cc89-d586-4731-8553-07df5dae1886" : 0 , # Luke Jones
618
- "8d8ee054-6a09-4da8-a5be-d5dff52bbc5f" : 0 , # Petrut
648
+ "27888ffa-0973-4794-9b3c-9efb6767f66f" : 0 , # wekeo
649
+ "d67a13db-86cc-439d-823d-6517003de29f" : 0 , # CDS Apps user
650
+ "365ac1da-090e-4b85-9088-30c676bc5251" : 0 , # Gionata
651
+ "74c6f9a1-8efe-4a6c-b06b-9f8ddcab188d" : 0 , # User Support
652
+ "4d92cc89-d586-4731-8553-07df5dae1886" : 0 , # Luke Jones
653
+ "8d8ee054-6a09-4da8-a5be-d5dff52bbc5f" : 0 , # Petrut
619
654
} | db .get_users_queue_from_processing_time (
620
655
interval_stop = interval_stop ,
621
656
session = session_write ,
@@ -740,6 +775,7 @@ def run(self) -> None:
740
775
self .queue .values (), session_write
741
776
)
742
777
778
+ cancel_stuck_requests (client = self .client , session = session_read )
743
779
running_requests = len (db .get_running_requests (session = session_read ))
744
780
queue_length = self .queue .len ()
745
781
available_workers = self .number_of_workers - running_requests
0 commit comments