Skip to content

Commit f60e9c4

Browse files
[WIP] Qos on database (#103)
* create qos_rules table * various fixes * fix qos status update on can_run and notify * fix alembic configuration * fix alembic * line saving * update get_qos_status_from_request * update unit-test * fix * revert * add conclusion_value to qosrule definition * tests * add conclusion info * qa and tests --------- Co-authored-by: Francesco Nazzaro <[email protected]>
1 parent a07901f commit f60e9c4

File tree

6 files changed

+417
-93
lines changed

6 files changed

+417
-93
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""add qos_rules table.
2+
3+
Revision ID: 8deb52d20c05
4+
Revises: 6ee20703d353
5+
Create Date: 2024-04-09 17:16:01.559118
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
11+
from alembic import op
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "8deb52d20c05"
15+
down_revision = "6ee20703d353"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade() -> None:
21+
op.create_table(
22+
"qos_rules",
23+
sa.Column("uid", sa.Text, primary_key=True),
24+
sa.Column("name", sa.Text),
25+
sa.Column("info", sa.Text),
26+
sa.Column("condition", sa.Text),
27+
sa.Column("conclusion", sa.Text),
28+
sa.Column("conclusion_value", sa.Text),
29+
sa.Column("queued", sa.Integer),
30+
sa.Column("running", sa.Integer),
31+
)
32+
op.create_table(
33+
"system_request_qos_rule",
34+
sa.Column(
35+
"request_uid",
36+
sa.dialects.postgresql.UUID(False),
37+
sa.ForeignKey("system_requests.request_uid"),
38+
primary_key=True,
39+
),
40+
sa.Column(
41+
"rule_uid", sa.Text, sa.ForeignKey("qos_rules.uid"), primary_key=True
42+
),
43+
)
44+
45+
46+
def downgrade() -> None:
47+
op.drop_table("system_request_qos_rule")
48+
op.drop_table("qos_rules")

cads_broker/database.py

+147-32
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,41 @@ class InvalidRequestID(Exception):
3737
pass
3838

3939

40+
class QoSRule(BaseModel):
41+
"""QoS Rule ORM model."""
42+
43+
__tablename__ = "qos_rules"
44+
45+
uid = sa.Column(sa.Text, primary_key=True)
46+
name = sa.Column(sa.Text)
47+
info = sa.Column(sa.Text)
48+
condition = sa.Column(sa.Text)
49+
conclusion = sa.Column(sa.Text)
50+
conclusion_value = sa.Column(sa.Text)
51+
queued = sa.Column(sa.Integer)
52+
running = sa.Column(sa.Integer)
53+
54+
system_requests: sa.orm.Mapped[list["SystemRequest"]] = sa.orm.relationship(
55+
"SystemRequest",
56+
secondary="system_request_qos_rule",
57+
back_populates="qos_rules",
58+
uselist=True,
59+
)
60+
61+
62+
class SystemRequestQoSRule(BaseModel):
63+
"""Association table for SystemRequest and QoSRule."""
64+
65+
__tablename__ = "system_request_qos_rule"
66+
67+
request_uid = sa.Column(
68+
sa.dialects.postgresql.UUID(False),
69+
sa.ForeignKey("system_requests.request_uid"),
70+
primary_key=True,
71+
)
72+
rule_uid = sa.Column(sa.Text, sa.ForeignKey("qos_rules.uid"), primary_key=True)
73+
74+
4075
class Events(BaseModel):
4176
"""Events ORM model."""
4277

@@ -102,9 +137,21 @@ class SystemRequest(BaseModel):
102137
__mapper_args__ = {"eager_defaults": False}
103138

104139
# joined is temporary
105-
cache_entry = sa.orm.relationship(cacholote.database.CacheEntry, lazy="joined")
106-
adaptor_properties = sa.orm.relationship(AdaptorProperties, lazy="select")
107-
events = sa.orm.relationship(Events, lazy="select", passive_deletes=True)
140+
cache_entry: sa.orm.Mapped["cacholote.database.CacheEntry"] = sa.orm.relationship(
141+
cacholote.database.CacheEntry, lazy="joined"
142+
)
143+
adaptor_properties: sa.orm.Mapped["AdaptorProperties"] = sa.orm.relationship(
144+
AdaptorProperties, lazy="select"
145+
)
146+
events: sa.orm.Mapped[list["Events"]] = sa.orm.relationship(
147+
Events, lazy="select", passive_deletes=True
148+
)
149+
qos_rules: sa.orm.Mapped[list["QoSRule"]] = sa.orm.relationship(
150+
"QoSRule",
151+
secondary="system_request_qos_rule",
152+
back_populates="system_requests",
153+
uselist=True,
154+
)
108155

109156
@property
110157
def age(self):
@@ -376,40 +423,108 @@ def get_events_from_request(
376423
return events
377424

378425

379-
def get_qos_status_from_request(
380-
request: SystemRequest,
381-
) -> dict[str, list[tuple[str, str]]]:
382-
ret_value: dict[str, list[str]] = {}
383-
for rule_name, rules in request.qos_status.items():
384-
ret_value[rule_name] = []
385-
for rule in rules.values():
386-
ret_value[rule_name].append(
387-
(rule.get("info", ""), rule.get("conclusion", ""))
388-
)
389-
return ret_value
426+
def reset_qos_rules(session: sa.orm.Session):
427+
"""Delete all QoS rules."""
428+
for rule in session.scalars(sa.select(QoSRule)):
429+
rule.system_requests = []
430+
session.delete(rule)
431+
session.commit()
390432

391433

392-
def set_request_qos_rule(
393-
request: SystemRequest,
434+
def get_qos_rule(uid: str, session: sa.orm.Session):
435+
"""Get a QoS rule."""
436+
statement = sa.select(QoSRule).where(QoSRule.uid == uid)
437+
return session.scalars(statement).one()
438+
439+
440+
def add_qos_rule(
394441
rule,
395442
session: sa.orm.Session,
443+
queued: int = 0,
444+
running: int = 0,
396445
):
397-
qos_status = request.qos_status
398-
old_rules = qos_status.get(rule.name, {})
399-
rule_uid = rule.get_uid(request)
400-
if rule_uid in old_rules:
401-
return
402-
old_rules[rule_uid] = {
403-
"conclusion": str(rule.evaluate(request)),
404-
"info": str(rule.info).replace('"', ""),
405-
"condition": str(rule.condition),
406-
}
407-
qos_status[rule.name] = old_rules
408-
session.execute(
409-
sa.update(SystemRequest)
410-
.filter_by(request_uid=request.request_uid)
411-
.values(qos_status=qos_status)
446+
"""Add a QoS rule."""
447+
qos_rule = QoSRule(
448+
uid=str(rule.__hash__()),
449+
name=str(rule.name),
450+
info=str(rule.info),
451+
condition=str(rule.condition),
452+
conclusion=str(rule.conclusion),
453+
# conclusion_value may change over time, this case is not handled
454+
conclusion_value=str(rule.evaluate(request=None)),
455+
queued=queued,
456+
running=running,
412457
)
458+
session.add(qos_rule)
459+
session.commit()
460+
return qos_rule
461+
462+
463+
def increment_qos_rule_running(rules: list, session: sa.orm.Session):
464+
"""Increment the running counter of a QoS rule."""
465+
for rule in rules:
466+
try:
467+
qos_rule = get_qos_rule(str(rule.__hash__()), session)
468+
except sqlalchemy.orm.exc.NoResultFound:
469+
qos_rule = add_qos_rule(rule=rule, session=session)
470+
qos_rule.running += 1
471+
session.commit()
472+
473+
474+
def decrement_qos_rule_running(rules: list, session: sa.orm.Session):
475+
"""Increment the running counter of a QoS rule."""
476+
for rule in rules:
477+
qos_rule = get_qos_rule(str(rule.__hash__()), session)
478+
qos_rule.running -= 1
479+
session.commit()
480+
481+
482+
def delete_request_qos_status(request_uid: str, rules: list, session: sa.orm.Session):
483+
"""Delete all QoS rules from a request."""
484+
request = get_request(request_uid, session)
485+
for rule in rules:
486+
try:
487+
qos_rule = get_qos_rule(str(rule.__hash__()), session)
488+
except sqlalchemy.orm.exc.NoResultFound:
489+
qos_rule = add_qos_rule(rule=rule, session=session)
490+
if qos_rule in request.qos_rules:
491+
request.qos_rules.remove(qos_rule)
492+
qos_rule.queued -= 1
493+
qos_rule.running += 1
494+
session.commit()
495+
496+
497+
def add_request_qos_status(request_uid: str, rules: list, session: sa.orm.Session):
498+
request = get_request(request_uid, session)
499+
for rule in rules:
500+
try:
501+
qos_rule = get_qos_rule(str(rule.__hash__()), session)
502+
except sqlalchemy.orm.exc.NoResultFound:
503+
qos_rule = add_qos_rule(rule=rule, session=session)
504+
if qos_rule not in request.qos_rules:
505+
qos_rule.queued += 1
506+
request.qos_rules.append(qos_rule)
507+
session.commit()
508+
509+
510+
def get_qos_status_from_request(
511+
request: SystemRequest,
512+
) -> dict[str, list[dict[str, str]]]:
513+
ret_value: dict[str, list[dict[str, str]]] = {}
514+
rules = request.qos_rules
515+
for rule in rules:
516+
rule_name = rule.name
517+
rule_summary = {
518+
"info": rule.info,
519+
"queued": rule.queued,
520+
"running": rule.running,
521+
"conclusion": rule.conclusion_value,
522+
}
523+
if rule_name not in ret_value:
524+
ret_value[rule_name] = [rule_summary]
525+
else:
526+
ret_value[rule_name].append(rule_summary)
527+
return ret_value
413528

414529

415530
def requeue_request(
@@ -428,7 +543,7 @@ def requeue_request(
428543
request.request_metadata = metadata
429544
request.status = "accepted"
430545
session.commit()
431-
logger.info("--------- requeueing request", **logger_kwargs(request=request))
546+
logger.info("requeueing request", **logger_kwargs(request=request))
432547
return request
433548
else:
434549
return

cads_broker/dispatcher.py

+21-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import io
33
import operator
44
import os
5+
import sched
56
import time
67
import traceback
78
from typing import Any
@@ -118,6 +119,7 @@ class Broker:
118119
repr=lambda futures: " ".join(futures.keys()),
119120
)
120121
running_requests: int = 0
122+
internal_scheduler: sched.scheduler = sched.scheduler(time.time, time.sleep)
121123

122124
@classmethod
123125
def from_address(
@@ -131,6 +133,8 @@ def from_address(
131133
factory.register_functions()
132134
session_maker_read = db.ensure_session_obj(session_maker_read, mode="r")
133135
session_maker_write = db.ensure_session_obj(session_maker_write, mode="w")
136+
with session_maker_write() as session:
137+
db.reset_qos_rules(session)
134138
rules_hash = get_rules_hash(qos_config.rules_path)
135139
self = cls(
136140
client=client,
@@ -171,7 +175,9 @@ def sync_database(self, session: sa.orm.Session) -> None:
171175
# this is to better control the status of the QoS
172176
if request.status == "dismissed":
173177
db.delete_request(request=request, session=session)
174-
self.qos.notify_end_of_request(request, session)
178+
self.qos.notify_end_of_request(
179+
request, session, scheduler=self.internal_scheduler
180+
)
175181
continue
176182
# if request is in futures, go on
177183
if request.request_uid in self.futures:
@@ -239,7 +245,9 @@ def on_future_done(self, future: distributed.Future) -> None:
239245
)
240246
return
241247
self.futures.pop(future.key)
242-
self.qos.notify_end_of_request(request, session)
248+
self.qos.notify_end_of_request(
249+
request, session, scheduler=self.internal_scheduler
250+
)
243251
logger.info(
244252
"job has finished",
245253
dask_status=future.status,
@@ -259,7 +267,9 @@ def submit_requests(
259267
requests_counter = 0
260268
for request in queue:
261269
with self.session_maker_write() as session_write:
262-
if self.qos.can_run(request, session=session_write):
270+
if self.qos.can_run(
271+
request, session=session_write, scheduler=self.internal_scheduler
272+
):
263273
self.submit_request(request, session=session_write)
264274
requests_counter += 1
265275
if requests_counter == int(number_of_requests * WORKERS_MULTIPLIER):
@@ -271,7 +281,9 @@ def submit_request(
271281
request = db.set_request_status(
272282
request_uid=request.request_uid, status="running", session=session
273283
)
274-
self.qos.notify_start_of_request(request, session)
284+
self.qos.notify_start_of_request(
285+
request, session, scheduler=self.internal_scheduler
286+
)
275287
future = self.client.submit(
276288
worker.submit_workflow,
277289
key=request.request_uid,
@@ -305,6 +317,11 @@ def run(self) -> None:
305317
self.qos.environment.set_session(session_read)
306318
with self.session_maker_write() as session_write:
307319
self.sync_database(session=session_write)
320+
321+
if len(self.internal_scheduler.queue) > int(
322+
os.getenv("MAX_SCHEDULER_QUEUE", 0)
323+
):
324+
self.internal_scheduler.run()
308325
self.running_requests = len(
309326
[
310327
future

cads_broker/entry_points.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ def delete_running_requests(
104104
database.logger.info(f"Deleting {number_of_requests} requests.")
105105
for request in requests:
106106
database.logger.info(f"deleting {request.request_uid}...")
107-
database.set_request_status(request.request_uid, "dismissed", session=session)
107+
database.set_request_status(
108+
request.request_uid, "dismissed", session=session
109+
)
108110

109111
session.commit()
110112

0 commit comments

Comments
 (0)