@@ -75,6 +75,10 @@ def get_rules_hash(rules_path: str):
75
75
info = True ,
76
76
)
77
77
def get_tasks_from_scheduler (client : distributed .Client ) -> Any :
78
+ """Get the tasks from the scheduler.
79
+
80
+ This function is executed on the scheduler pod.
81
+ """
78
82
def get_tasks_on_scheduler (dask_scheduler : distributed .Scheduler ) -> dict [str , Any ]:
79
83
tasks = {}
80
84
for task_id , task in dask_scheduler .tasks .items ():
@@ -88,6 +92,11 @@ def get_tasks_on_scheduler(dask_scheduler: distributed.Scheduler) -> dict[str, A
88
92
89
93
90
94
class Scheduler :
95
+ """A simple scheduler to store the tasks to update the qos_rules in the database.
96
+
97
+ It ensures that the scheduler is thread-safe.
98
+ """
99
+
91
100
def __init__ (self ) -> None :
92
101
self .queue : list = list ()
93
102
self .index : dict [str , set ] = dict ()
@@ -122,6 +131,12 @@ def wrapper(*args, **kwargs):
122
131
123
132
124
133
class Queue :
134
+ """A simple queue to store the requests that have been accepted by the broker.
135
+
136
+ - It ensures that the queue is thread-safe.
137
+ - It stores the last created_at datetime of the requests that have been added to the queue.
138
+ """
139
+
125
140
def __init__ (self ) -> None :
126
141
self .queue_dict : dict = dict ()
127
142
self ._lock = threading .RLock ()
@@ -244,6 +259,13 @@ def number_of_workers(self):
244
259
def set_request_error_status (
245
260
self , exception , request_uid , session
246
261
) -> db .SystemRequest | None :
262
+ """Set the status of the request to failed and write the error message and reason.
263
+
264
+ If the error reason is "KilledWorker":
265
+ - if the worker has been killed by the Nanny for memory usage, it add the event for the user
266
+ - if the worker is killed for unknown reasons, it re-queues the request
267
+ if the requeue limit is not reached. This is configurable with the environment variable
268
+ """
247
269
error_message = "" .join (traceback .format_exception (exception ))
248
270
error_reason = exception .__class__ .__name__
249
271
request = db .get_request (request_uid , session = session )
@@ -303,7 +325,13 @@ def set_request_error_status(
303
325
def sync_database (self , session : sa .orm .Session ) -> None :
304
326
"""Sync the database with the current status of the dask tasks.
305
327
306
- If the task is not in the dask scheduler, it is re-queued.
328
+ - If the task is in the futures list it does nothing.
329
+ - If the task is not in the futures list but it is in the scheduler:
330
+ - If the task is in memory (it is successful but it has been lost by the broker),
331
+ it is set to successful.
332
+ - If the task is in error, it is set to failed.
333
+ - If the task is not in the dask scheduler, it is re-queued.
334
+ This behaviour can be changed with an environment variable.
307
335
"""
308
336
# the retrieve API sets the status to "dismissed", here the broker deletes the request
309
337
# this is to better control the status of the QoS
@@ -407,6 +435,15 @@ def sync_database(self, session: sa.orm.Session) -> None:
407
435
408
436
@perf_logger
409
437
def sync_qos_rules (self , session_write ) -> None :
438
+ """Sync the qos rules status with the database.
439
+
440
+ The update tasks to the qos_rules table are piled up in the internal_scheduler.
441
+ The internal_scheduler is used to minimize the number of updates to the database using:
442
+ - the same session
443
+ - the same qos_rules that are read from the database once and then updated at each step if needed
444
+ - the requests from the self.queue.
445
+ If a request is updated the relative self.queue entry is updated too
446
+ """
410
447
qos_rules = db .get_qos_rules (session = session_write )
411
448
if tasks_number := len (self .internal_scheduler .queue ):
412
449
logger .info ("performance" , tasks_number = tasks_number )
@@ -430,6 +467,13 @@ def sync_qos_rules(self, session_write) -> None:
430
467
431
468
@perf_logger
432
469
def sync_futures (self ) -> None :
470
+ """Check if the futures are finished, error or cancelled and update the database accordingly.
471
+
472
+ In a previous version of the broker used to call the client.add_done_callback method but
473
+ it appears to be unreliable. The futures are now checked in a loop and the status is updated.
474
+ The futures are removed from the list of futures if they are finished in a different for loop to avoid
475
+ "RuntimeError: dictionary changed size during iteration."
476
+ """
433
477
finished_futures = []
434
478
for future in self .futures .values ():
435
479
if future .status in ("finished" , "error" , "cancelled" ):
@@ -489,6 +533,7 @@ def submit_requests(
489
533
number_of_requests : int ,
490
534
candidates : Iterable [db .SystemRequest ],
491
535
) -> None :
536
+ """Check the qos rules and submit the requests to the dask scheduler."""
492
537
queue = sorted (
493
538
candidates ,
494
539
key = lambda candidate : self .qos .priority (candidate , session_write ),
@@ -506,6 +551,7 @@ def submit_requests(
506
551
def submit_request (
507
552
self , request : db .SystemRequest , session : sa .orm .Session
508
553
) -> None :
554
+ """Submit the request to the dask scheduler and update the qos rules accordingly."""
509
555
request = db .set_request_status (
510
556
request_uid = request .request_uid , status = "running" , session = session
511
557
)
@@ -536,6 +582,7 @@ def submit_request(
536
582
)
537
583
538
584
def run (self ) -> None :
585
+ """Run the broker loop."""
539
586
while True :
540
587
start_loop = time .perf_counter ()
541
588
with self .session_maker_read () as session_read :
0 commit comments