Skip to content

Commit 282e402

Browse files
try to fix qos status bug
1 parent 7e89576 commit 282e402

File tree

3 files changed

+33
-20
lines changed

3 files changed

+33
-20
lines changed

cads_broker/dispatcher.py

+8
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,10 @@ def sync_database(self, session: sa.orm.Session) -> None:
477477
for request in requests:
478478
# if request is in futures, go on
479479
if request.request_uid in self.futures:
480+
# notify start of request if it is not already notified
481+
self.qos.notify_start_of_request(
482+
request, scheduler=self.internal_scheduler
483+
)
480484
continue
481485
elif task := scheduler_tasks.get(request.request_uid, None):
482486
if (state := task["state"]) in ("memory", "erred"):
@@ -508,6 +512,10 @@ def sync_database(self, session: sa.orm.Session) -> None:
508512
)
509513
# if the task is in processing, it means that the task is still running
510514
if state == "processing":
515+
# notify start of request if it is not already notified
516+
self.qos.notify_start_of_request(
517+
request, scheduler=self.internal_scheduler
518+
)
511519
continue
512520
# if it doesn't find the request: re-queue it
513521
else:

cads_broker/qos/QoS.py

+16-14
Original file line numberDiff line numberDiff line change
@@ -340,19 +340,21 @@ def notify_start_of_request(self, request, scheduler):
340340
"""
341341
limits_list = []
342342
for limit in self.limits_for(request):
343-
limit.increment(request.request_uid)
344-
print(f"---------START--------- {limit.info}: queued {len(limit.queued)}, running {limit.value}")
345-
limits_list.append(limit)
346-
scheduler.append(
347-
{
348-
"function": database.delete_request_qos_status,
349-
"kwargs": {
350-
"rules": limits_list,
351-
"request_uid": request.request_uid,
352-
},
353-
}
354-
)
355-
# Keep track of the running request. This is needed by reconfigure(self)
343+
if request.request_uid not in limit.running:
344+
limit.increment(request.request_uid)
345+
print(f"---------START--------- {limit.info}: queued {len(limit.queued)}, running {limit.value}")
346+
limits_list.append(limit)
347+
if limits_list:
348+
scheduler.append(
349+
{
350+
"function": database.delete_request_qos_status,
351+
"kwargs": {
352+
"rules": limits_list,
353+
"request_uid": request.request_uid,
354+
},
355+
}
356+
)
357+
# Keep track of the running request. This is needed by reconfigure(self)
356358

357359
@locked
358360
def notify_end_of_request(self, request, scheduler):
@@ -363,7 +365,7 @@ def notify_end_of_request(self, request, scheduler):
363365
"""
364366
limits_list = []
365367
for limit in self.limits_for(request):
366-
limit.decrement()
368+
limit.decrement(request.request_uid)
367369
print(f"---------END--------- {limit.info}: queued {len(limit.queued)}, running {limit.value}")
368370
limits_list.append(limit)
369371

cads_broker/qos/Rule.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -134,28 +134,31 @@ class Limit(QoSRule):
134134

135135
def __init__(self, environment, info, condition, conclusion):
136136
super().__init__(environment, info, condition, conclusion)
137-
# running requests
138-
self.value = 0
139137
self.queued = set()
138+
self.running = set()
140139

141140
def increment(self, request_uid):
142141
self.remove_from_queue(request_uid)
143-
self.value += 1
142+
self.running.add(request_uid)
144143

145144
def remove_from_queue(self, request_uid):
146145
if request_uid in self.queued:
147146
self.queued.remove(request_uid)
148147

149-
def decrement(self):
150-
if self.value > 0:
151-
self.value -= 1
148+
def decrement(self, request_uid):
149+
if request_uid in self.running:
150+
self.running.remove(request_uid)
152151

153152
def queue(self, request_uid):
154153
self.queued.add(request_uid)
155154

156155
def capacity(self, request):
157156
return self.evaluate(request)
158157

158+
@property
159+
def value(self):
160+
return len(self.running)
161+
159162
def full(self, request):
160163
# NOTE: the self.value can be greater than the limit capacity after a
161164
# reconfiguration of the QoS

0 commit comments

Comments
 (0)