Skip to content

Commit 755cba3

Browse files
Fix QoS status in case of workers refresh (#151)
* add debug message * fix * debug * try to fix qos status bug * fix dask scheduler race condition * try to fix released race condition * remove debug messages * remove debug print statement from QoS class
1 parent df21eb0 commit 755cba3

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

cads_broker/dispatcher.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,10 @@ def sync_database(self, session: sa.orm.Session) -> None:
477477
for request in requests:
478478
# if request is in futures, go on
479479
if request.request_uid in self.futures:
480+
# notify start of request if it is not already notified
481+
self.qos.notify_start_of_request(
482+
request, scheduler=self.internal_scheduler
483+
)
480484
continue
481485
elif task := scheduler_tasks.get(request.request_uid, None):
482486
if (state := task["state"]) in ("memory", "erred"):
@@ -507,11 +511,27 @@ def sync_database(self, session: sa.orm.Session) -> None:
507511
**db.logger_kwargs(request=finished_request),
508512
)
509513
# if the task is in processing, it means that the task is still running
510-
if state == "processing":
514+
elif state == "processing":
515+
# notify start of request if it is not already notified
516+
self.qos.notify_start_of_request(
517+
request, scheduler=self.internal_scheduler
518+
)
519+
continue
520+
elif state == "released":
521+
# notify start of request if it is not already notified
522+
queued_request = db.requeue_request(
523+
request=request, session=session
524+
)
525+
if queued_request:
526+
self.queue.add(queued_request.request_uid, request)
527+
self.qos.notify_end_of_request(
528+
request, scheduler=self.internal_scheduler
529+
)
511530
continue
512531
# if it doesn't find the request: re-queue it
513532
else:
514533
request = db.get_request(request.request_uid, session=session)
534+
# if the broker finds the cache_id it means that the job has finished
515535
if request.cache_id:
516536
successful_request = db.set_successful_request(
517537
request_uid=request.request_uid,

cads_broker/qos/QoS.py

+15-13
Original file line numberDiff line numberDiff line change
@@ -339,18 +339,20 @@ def notify_start_of_request(self, request, scheduler):
339339
"""
340340
limits_list = []
341341
for limit in self.limits_for(request):
342-
limit.increment(request.request_uid)
343-
limits_list.append(limit)
344-
scheduler.append(
345-
{
346-
"function": database.delete_request_qos_status,
347-
"kwargs": {
348-
"rules": limits_list,
349-
"request_uid": request.request_uid,
350-
},
351-
}
352-
)
353-
# Keep track of the running request. This is needed by reconfigure(self)
342+
if request.request_uid not in limit.running:
343+
limit.increment(request.request_uid)
344+
limits_list.append(limit)
345+
if limits_list:
346+
scheduler.append(
347+
{
348+
"function": database.delete_request_qos_status,
349+
"kwargs": {
350+
"rules": limits_list,
351+
"request_uid": request.request_uid,
352+
},
353+
}
354+
)
355+
# Keep track of the running request. This is needed by reconfigure(self)
354356

355357
@locked
356358
def notify_end_of_request(self, request, scheduler):
@@ -361,7 +363,7 @@ def notify_end_of_request(self, request, scheduler):
361363
"""
362364
limits_list = []
363365
for limit in self.limits_for(request):
364-
limit.decrement()
366+
limit.decrement(request.request_uid)
365367
limits_list.append(limit)
366368

367369
scheduler.append(

cads_broker/qos/Rule.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -134,28 +134,31 @@ class Limit(QoSRule):
134134

135135
def __init__(self, environment, info, condition, conclusion):
136136
super().__init__(environment, info, condition, conclusion)
137-
# running requests
138-
self.value = 0
139137
self.queued = set()
138+
self.running = set()
140139

141140
def increment(self, request_uid):
142141
self.remove_from_queue(request_uid)
143-
self.value += 1
142+
self.running.add(request_uid)
144143

145144
def remove_from_queue(self, request_uid):
146145
if request_uid in self.queued:
147146
self.queued.remove(request_uid)
148147

149-
def decrement(self):
150-
if self.value > 0:
151-
self.value -= 1
148+
def decrement(self, request_uid):
149+
if request_uid in self.running:
150+
self.running.remove(request_uid)
152151

153152
def queue(self, request_uid):
154153
self.queued.add(request_uid)
155154

156155
def capacity(self, request):
157156
return self.evaluate(request)
158157

158+
@property
159+
def value(self):
160+
return len(self.running)
161+
159162
def full(self, request):
160163
# NOTE: the self.value can be greater than the limit capacity after a
161164
# reconfiguration of the QoS

0 commit comments

Comments
 (0)