Skip to content

Commit 62c0ec3

Browse files
revert wrong simplification
1 parent 9fa5224 commit 62c0ec3

File tree

2 files changed

+37
-35
lines changed

2 files changed

+37
-35
lines changed

cads_broker/dispatcher.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ def submit_requests(
368368
queue = sorted(
369369
candidates,
370370
key=lambda candidate: self.qos.priority(
371-
candidate, session_write, self.internal_scheduler
371+
candidate, session_write
372372
),
373373
reverse=True,
374374
)
@@ -422,7 +422,7 @@ def run(self) -> None:
422422
if (rules_hash := get_rules_hash(self.qos.path)) != self.qos.rules_hash:
423423
logger.info("reloading qos rules")
424424
self.qos.reload_rules(
425-
session=session_read, scheduler=self.internal_scheduler
425+
session=session_read
426426
)
427427
self.qos.rules_hash = rules_hash
428428
self.qos.environment.set_session(session_read)

cads_broker/qos/QoS.py

+35-33
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,17 @@ def read_rules(self):
6565
self.rules.dump()
6666

6767
@locked
68-
def reload_rules(self, session, scheduler):
68+
def reload_rules(self, session):
6969
"""Allow a 'hot' reloading of the rules.
7070
7171
For example, a thread could be monitoring the time stamp of the rules
7272
file and call this method.
7373
"""
7474
self.read_rules()
75-
self.reconfigure(session=session, scheduler=scheduler)
75+
self.reconfigure(session=session)
7676

7777
@locked
78-
def reconfigure(self, session, scheduler):
78+
def reconfigure(self, session):
7979
"""Reset the status of the QoS.
8080
8181
This method must be called if the rule_set is changed.
@@ -89,27 +89,40 @@ def reconfigure(self, session, scheduler):
8989
# Re-register the active tasks
9090
for request in database.get_running_requests(session=session):
9191
# Recompute the limits
92-
for limit in self.limits_for(request, session, scheduler):
92+
for limit in self.limits_for(request, session):
9393
limit.increment()
9494

9595
@locked
9696
def can_run(self, request, session, scheduler):
9797
"""Check if a request can run."""
9898
properties = self._properties(
99-
request=request, session=session, scheduler=scheduler
99+
request=request, session=session
100100
)
101101
limits = []
102+
new_limits = []
102103
for limit in properties.limits:
103104
if limit.full(request):
104105
limits.append(limit)
106+
if str(limit.__hash__()) not in [r.uid for r in request.qos_rules]:
107+
new_limits.append(limit)
108+
if len(new_limits):
109+
scheduler.append(
110+
{
111+
"function": database.add_request_qos_status,
112+
"kwargs": {
113+
"request_uid": request.request_uid,
114+
"rules": limits,
115+
},
116+
}
117+
)
105118
permissions = []
106119
for permission in properties.permissions:
107120
if not permission.evaluate(request):
108121
permissions.append(permission)
109122
return not len(limits) and not len(permissions)
110123

111124
@locked
112-
def _properties(self, request, session, scheduler):
125+
def _properties(self, request, session):
113126
"""Return the Properties object associated with a request.
114127
115128
If it does not exists it is created.
@@ -148,17 +161,6 @@ def _properties(self, request, session, scheduler):
148161
if limit is not None:
149162
properties.limits.append(limit)
150163

151-
if len(properties.limits) > 0:
152-
scheduler.append(
153-
{
154-
"function": database.add_request_qos_status,
155-
"kwargs": {
156-
"request_uid": request.request_uid,
157-
"rules": properties.limits,
158-
},
159-
}
160-
)
161-
162164
# Add priorities and compute starting priority
163165
priority = 0
164166
for rule in self.rules.priorities:
@@ -175,38 +177,38 @@ def _properties(self, request, session, scheduler):
175177
return properties
176178

177179
@locked
178-
def priority(self, request, session, scheduler):
180+
def priority(self, request, session):
179181
"""Compute the priority of a request."""
180182
# The priority of a request increases with time
181183
return (
182-
self._properties(request, session, scheduler).starting_priority
184+
self._properties(request, session).starting_priority
183185
+ request.age
184186
)
185187

186188
def dump(self, out=print):
187189
self.rules.dump(out)
188190

189191
@locked
190-
def status(self, requests, session, scheduler, out=print):
192+
def status(self, requests, session, out=print):
191193
out()
192194
out("===================================================================")
193195
out("REQUESTS")
194196
out("===================================================================")
195197

196198
for request in requests:
197-
self._status(request, session, scheduler, out)
199+
self._status(request, session, out)
198200

199201
out()
200202
out("===================================================================")
201203

202-
def _status(self, request, session, scheduler, out):
204+
def _status(self, request, session, out):
203205
out()
204206
out("===================================================================")
205207
out("QoS info for:")
206208
out(request, request.status)
207209
out("Priority: {}".format(self.priority(request, session)))
208210
out("Limits rules:")
209-
for limit in self.limits_for(request, session, scheduler):
211+
for limit in self.limits_for(request, session):
210212
out(
211213
" {} ({}/{}) {}".format(
212214
limit,
@@ -217,36 +219,36 @@ def _status(self, request, session, scheduler, out):
217219
)
218220

219221
out("Priorities rules:")
220-
for priority in self.priorities_for(request, session, scheduler):
222+
for priority in self.priorities_for(request, session):
221223
out(" {}".format(priority))
222224

223225
out("Permissions rules:")
224-
for permission in self.permissions_for(request, session, scheduler):
226+
for permission in self.permissions_for(request, session):
225227
out(" {}".format(permission))
226228

227229
@locked
228-
def limits_for(self, request, session, scheduler):
230+
def limits_for(self, request, session):
229231
"""Return the limit rules that applies to a request.
230232
231233
Ensure that the properties cache is created if needed.
232234
"""
233-
return self._properties(request, session, scheduler).limits
235+
return self._properties(request, session).limits
234236

235237
@locked
236-
def permissions_for(self, request, session, scheduler):
238+
def permissions_for(self, request, session):
237239
"""Return the permission rules that applies to a request.
238240
239241
Ensure that the properties cache is created if needed.
240242
"""
241-
return self._properties(request, session, scheduler).permissions
243+
return self._properties(request, session).permissions
242244

243245
@locked
244-
def priorities_for(self, request, session, scheduler):
246+
def priorities_for(self, request, session):
245247
"""Return the priority rules that applies to a request.
246248
247249
Ensure that the properties cache is created if needed.
248250
"""
249-
return self._properties(request, session, scheduler).priorities
251+
return self._properties(request, session).priorities
250252

251253
@locked
252254
def user_limit(self, request):
@@ -304,7 +306,7 @@ def notify_start_of_request(self, request, session, scheduler):
304306
its capacity.
305307
"""
306308
limits_list = []
307-
for limit in self.limits_for(request, session, scheduler):
309+
for limit in self.limits_for(request, session):
308310
limit.increment()
309311
limits_list.append(limit)
310312
scheduler.append(
@@ -326,7 +328,7 @@ def notify_end_of_request(self, request, session, scheduler):
326328
sharing the same limits can run.
327329
"""
328330
limits_list = []
329-
for limit in self.limits_for(request, session, scheduler):
331+
for limit in self.limits_for(request, session):
330332
limit.decrement()
331333
limits_list.append(limit)
332334

0 commit comments

Comments
 (0)